2020-07-22 14:50:08 +00:00
|
|
|
|
package alils
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
|
2020-07-29 13:56:19 +00:00
|
|
|
|
"github.com/astaxie/beego/pkg/logs"
|
2020-07-22 14:50:08 +00:00
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// CacheSize sets the flush size
|
2020-07-22 14:50:08 +00:00
|
|
|
|
CacheSize int = 64
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// Delimiter defines the topic delimiter
|
2020-07-22 14:50:08 +00:00
|
|
|
|
Delimiter string = "##"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Config is the Config for Ali Log
|
|
|
|
|
type Config struct {
|
|
|
|
|
Project string `json:"project"`
|
|
|
|
|
Endpoint string `json:"endpoint"`
|
|
|
|
|
KeyID string `json:"key_id"`
|
|
|
|
|
KeySecret string `json:"key_secret"`
|
|
|
|
|
LogStore string `json:"log_store"`
|
|
|
|
|
Topics []string `json:"topics"`
|
|
|
|
|
Source string `json:"source"`
|
|
|
|
|
Level int `json:"level"`
|
|
|
|
|
FlushWhen int `json:"flush_when"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// aliLSWriter implements LoggerInterface.
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// Writes messages in keep-live tcp connection.
|
2020-07-22 14:50:08 +00:00
|
|
|
|
type aliLSWriter struct {
|
|
|
|
|
store *LogStore
|
|
|
|
|
group []*LogGroup
|
|
|
|
|
withMap bool
|
|
|
|
|
groupMap map[string]*LogGroup
|
|
|
|
|
lock *sync.Mutex
|
|
|
|
|
Config
|
|
|
|
|
}
|
|
|
|
|
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// NewAliLS creates a new Logger
|
2020-07-22 14:50:08 +00:00
|
|
|
|
func NewAliLS() logs.Logger {
|
|
|
|
|
alils := new(aliLSWriter)
|
|
|
|
|
alils.Level = logs.LevelTrace
|
|
|
|
|
return alils
|
|
|
|
|
}
|
|
|
|
|
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// Init parses config and initializes struct
|
2020-07-22 14:50:08 +00:00
|
|
|
|
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
|
|
|
|
|
|
|
|
|
|
json.Unmarshal([]byte(jsonConfig), c)
|
|
|
|
|
|
|
|
|
|
if c.FlushWhen > CacheSize {
|
|
|
|
|
c.FlushWhen = CacheSize
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prj := &LogProject{
|
|
|
|
|
Name: c.Project,
|
|
|
|
|
Endpoint: c.Endpoint,
|
|
|
|
|
AccessKeyID: c.KeyID,
|
|
|
|
|
AccessKeySecret: c.KeySecret,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.store, err = prj.GetLogStore(c.LogStore)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create default Log Group
|
|
|
|
|
c.group = append(c.group, &LogGroup{
|
|
|
|
|
Topic: proto.String(""),
|
|
|
|
|
Source: proto.String(c.Source),
|
|
|
|
|
Logs: make([]*Log, 0, c.FlushWhen),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Create other Log Group
|
|
|
|
|
c.groupMap = make(map[string]*LogGroup)
|
|
|
|
|
for _, topic := range c.Topics {
|
|
|
|
|
|
|
|
|
|
lg := &LogGroup{
|
|
|
|
|
Topic: proto.String(topic),
|
|
|
|
|
Source: proto.String(c.Source),
|
|
|
|
|
Logs: make([]*Log, 0, c.FlushWhen),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.group = append(c.group, lg)
|
|
|
|
|
c.groupMap[topic] = lg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(c.group) == 1 {
|
|
|
|
|
c.withMap = false
|
|
|
|
|
} else {
|
|
|
|
|
c.withMap = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.lock = &sync.Mutex{}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-08-06 15:07:18 +00:00
|
|
|
|
// WriteMsg writes a message in connection.
|
|
|
|
|
// If connection is down, try to re-connect.
|
2020-08-18 20:30:11 +00:00
|
|
|
|
func (c *aliLSWriter) WriteMsg(lm *logs.LogMsg) error {
|
|
|
|
|
if lm.Level > c.Level {
|
2020-07-22 14:50:08 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var topic string
|
|
|
|
|
var content string
|
|
|
|
|
var lg *LogGroup
|
|
|
|
|
if c.withMap {
|
|
|
|
|
|
|
|
|
|
// Topic,LogGroup
|
|
|
|
|
strs := strings.SplitN(msg, Delimiter, 2)
|
|
|
|
|
if len(strs) == 2 {
|
|
|
|
|
pos := strings.LastIndex(strs[0], " ")
|
|
|
|
|
topic = strs[0][pos+1 : len(strs[0])]
|
|
|
|
|
content = strs[0][0:pos] + strs[1]
|
|
|
|
|
lg = c.groupMap[topic]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send to empty Topic
|
|
|
|
|
if lg == nil {
|
|
|
|
|
content = msg
|
|
|
|
|
lg = c.group[0]
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
content = msg
|
|
|
|
|
lg = c.group[0]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c1 := &LogContent{
|
|
|
|
|
Key: proto.String("msg"),
|
|
|
|
|
Value: proto.String(content),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
l := &Log{
|
|
|
|
|
Time: proto.Uint32(uint32(when.Unix())),
|
|
|
|
|
Contents: []*LogContent{
|
|
|
|
|
c1,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
lg.Logs = append(lg.Logs, l)
|
|
|
|
|
c.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
if len(lg.Logs) >= c.FlushWhen {
|
|
|
|
|
c.flush(lg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Flush implementing method. empty.
|
|
|
|
|
func (c *aliLSWriter) Flush() {
|
|
|
|
|
|
|
|
|
|
// flush all group
|
|
|
|
|
for _, lg := range c.group {
|
|
|
|
|
c.flush(lg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Destroy destroy connection writer and close tcp listener.
|
|
|
|
|
func (c *aliLSWriter) Destroy() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *aliLSWriter) flush(lg *LogGroup) {
|
|
|
|
|
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
err := c.store.PutLogs(lg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lg.Logs = make([]*Log, 0, c.FlushWhen)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
logs.Register(logs.AdapterAliLS, NewAliLS)
|
|
|
|
|
}
|