1
0
mirror of https://github.com/astaxie/beego.git synced 2025-06-12 08:30:40 +00:00

Support etcd

This commit is contained in:
Ming Deng
2020-08-26 03:46:22 +00:00
parent 5b35bf6065
commit c2361170b3
20 changed files with 581 additions and 257 deletions

View File

@ -0,0 +1,219 @@
// Copyright 2020
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/astaxie/beego/pkg/infrastructure/config"
"github.com/astaxie/beego/pkg/infrastructure/logs"
)
const etcdOpts = "etcdOpts"
type EtcdConfiger struct {
prefix string
client *clientv3.Client
config.BaseConfiger
}
func newEtcdConfiger(client *clientv3.Client, prefix string) *EtcdConfiger {
res := &EtcdConfiger{
client: client,
prefix: prefix,
}
res.BaseConfiger = config.NewBaseConfiger(res.reader)
return res
}
// reader is an general implementation that read config from etcd.
func (e *EtcdConfiger) reader(ctx context.Context, key string) (string, error) {
resp, err := get(e.client, ctx, e.prefix+key)
if err != nil {
return "", err
}
if resp.Count > 0 {
return string(resp.Kvs[0].Value), nil
}
return "", nil
}
// Set do nothing and return an error
// I think write data to remote config center is not a good practice
func (e *EtcdConfiger) Set(key, val string) error {
return errors.New("Unsupported operation")
}
// DIY return the original response from etcd
// be careful when you decide to use this
func (e *EtcdConfiger) DIY(key string) (interface{}, error) {
return get(e.client, context.TODO(), key)
}
// GetSection in this implementation, we use section as prefix
func (e *EtcdConfiger) GetSection(section string) (map[string]string, error) {
return e.GetSectionWithCtx(context.Background(), section)
}
func (e *EtcdConfiger) GetSectionWithCtx(ctx context.Context, section string) (map[string]string, error) {
var (
resp *clientv3.GetResponse
err error
)
if opts, ok := ctx.Value(etcdOpts).([]clientv3.OpOption); ok {
opts = append(opts, clientv3.WithPrefix())
resp, err = e.client.Get(context.TODO(), e.prefix+section, opts...)
} else {
resp, err = e.client.Get(context.TODO(), e.prefix+section, clientv3.WithPrefix())
}
if err != nil {
return nil, errors.WithMessage(err, "GetSection failed")
}
res := make(map[string]string, len(resp.Kvs))
for _, kv := range resp.Kvs {
res[string(kv.Key)] = string(kv.Value)
}
return res, nil
}
func (e *EtcdConfiger) SaveConfigFile(filename string) error {
return errors.New("Unsupported operation")
}
// Unmarshaler is not very powerful because we lost the type information when we get configuration from etcd
// for example, when we got "5", we are not sure whether it's int 5, or it's string "5"
// TODO(support more complicated decoder)
func (e *EtcdConfiger) Unmarshaler(ctx context.Context, prefix string, obj interface{}, opt ...config.DecodeOption) error {
res, err := e.GetSectionWithCtx(ctx, prefix)
if err != nil {
return errors.WithMessage(err, fmt.Sprintf("could not read config with prefix: %s", prefix))
}
prefixLen := len(e.prefix + prefix)
m := make(map[string]string, len(res))
for k, v := range res {
m[k[prefixLen:]] = v
}
return mapstructure.Decode(m, obj)
}
// Sub return an sub configer.
func (e *EtcdConfiger) Sub(key string) (config.Configer, error) {
return newEtcdConfiger(e.client, e.prefix+key), nil
}
// TODO remove this before release v2.0.0
func (e *EtcdConfiger) OnChange(ctx context.Context, key string, fn func(value string)) {
buildOptsFunc := func() []clientv3.OpOption {
if opts, ok := ctx.Value(etcdOpts).([]clientv3.OpOption); ok {
opts = append(opts, clientv3.WithCreatedNotify())
return opts
}
return []clientv3.OpOption{}
}
rch := e.client.Watch(ctx, e.prefix+key, buildOptsFunc()...)
go func() {
for {
for resp := range rch {
if err := resp.Err(); err != nil {
logs.Error("listen to key but got error callback", err)
break
}
for _, e := range resp.Events {
if e.Kv == nil {
continue
}
fn(string(e.Kv.Value))
}
}
time.Sleep(time.Second)
rch = e.client.Watch(ctx, e.prefix+key, buildOptsFunc()...)
}
}()
}
type EtcdConfigerProvider struct {
}
// Parse = ParseData([]byte(key))
// key must be json
func (provider *EtcdConfigerProvider) Parse(key string) (config.Configer, error) {
return provider.ParseData([]byte(key))
}
// ParseData try to parse key as clientv3.Config, using this to build etcdClient
func (provider *EtcdConfigerProvider) ParseData(data []byte) (config.Configer, error) {
cfg := &clientv3.Config{}
err := json.Unmarshal(data, cfg)
if err != nil {
return nil, errors.WithMessage(err, "parse data to etcd config failed, please check your input")
}
cfg.DialOptions = []grpc.DialOption{
grpc.WithBlock(),
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
}
client, err := clientv3.New(*cfg)
if err != nil {
return nil, errors.WithMessage(err, "create etcd client failed")
}
return newEtcdConfiger(client, ""), nil
}
func get(client *clientv3.Client, ctx context.Context, key string) (*clientv3.GetResponse, error) {
var (
resp *clientv3.GetResponse
err error
)
if opts, ok := ctx.Value(etcdOpts).([]clientv3.OpOption); ok {
resp, err = client.Get(ctx, key, opts...)
} else {
resp, err = client.Get(ctx, key)
}
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("read config from etcd with key %s failed", key))
}
return resp, err
}
func WithEtcdOption(ctx context.Context, opts ...clientv3.OpOption) context.Context {
return context.WithValue(ctx, etcdOpts, opts)
}
func init() {
config.Register("json", &EtcdConfigerProvider{})
}

View File

@ -0,0 +1,123 @@
// Copyright 2020
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"context"
"encoding/json"
"os"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/stretchr/testify/assert"
)
func TestWithEtcdOption(t *testing.T) {
ctx := WithEtcdOption(context.Background(), clientv3.WithPrefix())
assert.NotNil(t, ctx.Value(etcdOpts))
}
func TestEtcdConfigerProvider_Parse(t *testing.T) {
provider := &EtcdConfigerProvider{}
cfger, err := provider.Parse(readEtcdConfig())
assert.Nil(t, err)
assert.NotNil(t, cfger)
}
func TestEtcdConfiger(t *testing.T) {
provider := &EtcdConfigerProvider{}
cfger, _ := provider.Parse(readEtcdConfig())
subCfger, err := cfger.Sub("sub.")
assert.Nil(t, err)
assert.NotNil(t, subCfger)
subSubCfger, err := subCfger.Sub("sub.")
assert.NotNil(t, subSubCfger)
assert.Nil(t, err)
str, err := subSubCfger.String("key1")
assert.Nil(t, err)
assert.Equal(t, "sub.sub.key", str)
// we cannot test it
subSubCfger.OnChange(context.Background(), "watch", func(value string) {
// do nothing
})
defStr := cfger.DefaultString("not_exit", "default value")
assert.Equal(t, "default value", defStr)
defInt64 := cfger.DefaultInt64("not_exit", -1)
assert.Equal(t, int64(-1), defInt64)
defInt := cfger.DefaultInt("not_exit", -2)
assert.Equal(t, -2, defInt)
defFlt := cfger.DefaultFloat("not_exit", 12.3)
assert.Equal(t, 12.3, defFlt)
defBl := cfger.DefaultBool("not_exit", true)
assert.True(t, defBl)
defStrs := cfger.DefaultStrings("not_exit", []string{"hello"})
assert.Equal(t, []string{"hello"}, defStrs)
fl, err := cfger.Float("current.float")
assert.Nil(t, err)
assert.Equal(t, 1.23, fl)
bl, err := cfger.Bool("current.bool")
assert.Nil(t, err)
assert.True(t, bl)
it, err := cfger.Int("current.int")
assert.Nil(t, err)
assert.Equal(t, 11, it)
str, err = cfger.String("current.string")
assert.Nil(t, err)
assert.Equal(t, "hello", str)
tn := &TestEntity{}
err = cfger.Unmarshaler(context.Background(), "current.serialize.", tn)
assert.Nil(t, err)
assert.Equal(t, "test", tn.Name)
}
type TestEntity struct {
Name string `yaml:"name"`
Sub SubEntity `yaml:"sub"`
}
type SubEntity struct {
SubName string `yaml:"subName"`
}
func readEtcdConfig() string {
addr := os.Getenv("ETCD_ADDR")
if addr == "" {
addr = "localhost:2379"
}
obj := clientv3.Config{
Endpoints: []string{addr},
DialTimeout: 3 * time.Second,
}
cfg, _ := json.Marshal(obj)
return string(cfg)
}