package alils import ( "encoding/json" "strings" "sync" "time" "github.com/astaxie/beego/logs" "github.com/gogo/protobuf/proto" ) const ( CacheSize int = 64 Delimiter string = "##" ) type AliLSConfig struct { Project string `json:"project"` Endpoint string `json:"endpoint"` KeyID string `json:"key_id"` KeySecret string `json:"key_secret"` LogStore string `json:"log_store"` Topics []string `json:"topics"` Source string `json:"source"` Level int `json:"level"` FlushWhen int `json:"flush_when"` } // aliLSWriter implements LoggerInterface. // it writes messages in keep-live tcp connection. type aliLSWriter struct { store *LogStore group []*LogGroup withMap bool groupMap map[string]*LogGroup lock *sync.Mutex AliLSConfig } // 创建提供Logger接口的日志服务 func NewAliLS() logs.Logger { alils := new(aliLSWriter) alils.Level = logs.LevelTrace return alils } // 读取配置 // 初始化必要的数据结构 func (c *aliLSWriter) Init(jsonConfig string) (err error) { json.Unmarshal([]byte(jsonConfig), c) if c.FlushWhen > CacheSize { c.FlushWhen = CacheSize } // 初始化Project prj := &LogProject{ Name: c.Project, Endpoint: c.Endpoint, AccessKeyId: c.KeyID, AccessKeySecret: c.KeySecret, } // 获取logstore c.store, err = prj.GetLogStore(c.LogStore) if err != nil { return err } // 创建默认Log Group c.group = append(c.group, &LogGroup{ Topic: proto.String(""), Source: proto.String(c.Source), Logs: make([]*Log, 0, c.FlushWhen), }) // 创建其它Log Group c.groupMap = make(map[string]*LogGroup) for _, topic := range c.Topics { lg := &LogGroup{ Topic: proto.String(topic), Source: proto.String(c.Source), Logs: make([]*Log, 0, c.FlushWhen), } c.group = append(c.group, lg) c.groupMap[topic] = lg } if len(c.group) == 1 { c.withMap = false } else { c.withMap = true } c.lock = &sync.Mutex{} return nil } // WriteMsg write message in connection. // if connection is down, try to re-connect. func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int) (err error) { if level > c.Level { return nil } var topic string var content string var lg *LogGroup if c.withMap { // 解析出Topic,并匹配LogGroup strs := strings.SplitN(msg, Delimiter, 2) if len(strs) == 2 { pos := strings.LastIndex(strs[0], " ") topic = strs[0][pos+1 : len(strs[0])] content = strs[0][0:pos] + strs[1] lg = c.groupMap[topic] } // 默认发到空Topic if lg == nil { content = msg lg = c.group[0] } } else { content = msg lg = c.group[0] } // 生成日志 c1 := &Log_Content{ Key: proto.String("msg"), Value: proto.String(content), } l := &Log{ Time: proto.Uint32(uint32(when.Unix())), // 填写日志时间 Contents: []*Log_Content{ c1, }, } c.lock.Lock() lg.Logs = append(lg.Logs, l) c.lock.Unlock() // 满足条件则Flush if len(lg.Logs) >= c.FlushWhen { c.flush(lg) } return nil } // Flush implementing method. empty. func (c *aliLSWriter) Flush() { // flush所有group for _, lg := range c.group { c.flush(lg) } } // Destroy destroy connection writer and close tcp listener. func (c *aliLSWriter) Destroy() { } func (c *aliLSWriter) flush(lg *LogGroup) { c.lock.Lock() defer c.lock.Unlock() // 把以上的LogGroup推送到SLS服务器, // SLS服务器会根据该logstore的shard个数自动进行负载均衡。 err := c.store.PutLogs(lg) if err != nil { return } lg.Logs = make([]*Log, 0, c.FlushWhen) } func init() { logs.Register(logs.AdapterAliLS, NewAliLS) }