// Copyright 2020 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package etcd import ( "context" "encoding/json" "fmt" "time" "github.com/coreos/etcd/clientv3" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "google.golang.org/grpc" "github.com/astaxie/beego/core/config" "github.com/astaxie/beego/core/logs" ) type EtcdConfiger struct { prefix string client *clientv3.Client config.BaseConfiger } func newEtcdConfiger(client *clientv3.Client, prefix string) *EtcdConfiger { res := &EtcdConfiger{ client: client, prefix: prefix, } res.BaseConfiger = config.NewBaseConfiger(res.reader) return res } // reader is an general implementation that read config from etcd. func (e *EtcdConfiger) reader(ctx context.Context, key string) (string, error) { resp, err := get(e.client, e.prefix+key) if err != nil { return "", err } if resp.Count > 0 { return string(resp.Kvs[0].Value), nil } return "", nil } // Set do nothing and return an error // I think write data to remote config center is not a good practice func (e *EtcdConfiger) Set(key, val string) error { return errors.New("Unsupported operation") } // DIY return the original response from etcd // be careful when you decide to use this func (e *EtcdConfiger) DIY(key string) (interface{}, error) { return get(e.client, key) } // GetSection in this implementation, we use section as prefix func (e *EtcdConfiger) GetSection(section string) (map[string]string, error) { var ( resp *clientv3.GetResponse err error ) resp, err = e.client.Get(context.TODO(), e.prefix+section, clientv3.WithPrefix()) if err != nil { return nil, errors.WithMessage(err, "GetSection failed") } res := make(map[string]string, len(resp.Kvs)) for _, kv := range resp.Kvs { res[string(kv.Key)] = string(kv.Value) } return res, nil } func (e *EtcdConfiger) SaveConfigFile(filename string) error { return errors.New("Unsupported operation") } // Unmarshaler is not very powerful because we lost the type information when we get configuration from etcd // for example, when we got "5", we are not sure whether it's int 5, or it's string "5" // TODO(support more complicated decoder) func (e *EtcdConfiger) Unmarshaler(prefix string, obj interface{}, opt ...config.DecodeOption) error { res, err := e.GetSection(prefix) if err != nil { return errors.WithMessage(err, fmt.Sprintf("could not read config with prefix: %s", prefix)) } prefixLen := len(e.prefix + prefix) m := make(map[string]string, len(res)) for k, v := range res { m[k[prefixLen:]] = v } return mapstructure.Decode(m, obj) } // Sub return an sub configer. func (e *EtcdConfiger) Sub(key string) (config.Configer, error) { return newEtcdConfiger(e.client, e.prefix+key), nil } // TODO remove this before release v2.0.0 func (e *EtcdConfiger) OnChange(key string, fn func(value string)) { buildOptsFunc := func() []clientv3.OpOption { return []clientv3.OpOption{} } rch := e.client.Watch(context.Background(), e.prefix+key, buildOptsFunc()...) go func() { for { for resp := range rch { if err := resp.Err(); err != nil { logs.Error("listen to key but got error callback", err) break } for _, e := range resp.Events { if e.Kv == nil { continue } fn(string(e.Kv.Value)) } } time.Sleep(time.Second) rch = e.client.Watch(context.Background(), e.prefix+key, buildOptsFunc()...) } }() } type EtcdConfigerProvider struct { } // Parse = ParseData([]byte(key)) // key must be json func (provider *EtcdConfigerProvider) Parse(key string) (config.Configer, error) { return provider.ParseData([]byte(key)) } // ParseData try to parse key as clientv3.Config, using this to build etcdClient func (provider *EtcdConfigerProvider) ParseData(data []byte) (config.Configer, error) { cfg := &clientv3.Config{} err := json.Unmarshal(data, cfg) if err != nil { return nil, errors.WithMessage(err, "parse data to etcd config failed, please check your input") } cfg.DialOptions = []grpc.DialOption{ grpc.WithBlock(), grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), } client, err := clientv3.New(*cfg) if err != nil { return nil, errors.WithMessage(err, "create etcd client failed") } return newEtcdConfiger(client, ""), nil } func get(client *clientv3.Client, key string) (*clientv3.GetResponse, error) { var ( resp *clientv3.GetResponse err error ) resp, err = client.Get(context.Background(), key) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("read config from etcd with key %s failed", key)) } return resp, err } func init() { config.Register("json", &EtcdConfigerProvider{}) }