1
0
mirror of https://github.com/astaxie/beego.git synced 2025-01-23 05:47:13 +00:00

185 lines
3.4 KiB
Go
Raw Normal View History

2020-07-22 22:50:08 +08:00
package alils
import (
"encoding/json"
"strings"
"sync"
"github.com/astaxie/beego/pkg/infrastructure/logs"
2020-07-22 22:50:08 +08:00
"github.com/gogo/protobuf/proto"
)
const (
2020-08-06 16:07:18 +01:00
// CacheSize sets the flush size
2020-07-22 22:50:08 +08:00
CacheSize int = 64
2020-08-06 16:07:18 +01:00
// Delimiter defines the topic delimiter
2020-07-22 22:50:08 +08:00
Delimiter string = "##"
)
// Config is the Config for Ali Log
type Config struct {
Project string `json:"project"`
Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"`
KeySecret string `json:"key_secret"`
LogStore string `json:"log_store"`
Topics []string `json:"topics"`
Source string `json:"source"`
Level int `json:"level"`
FlushWhen int `json:"flush_when"`
}
// aliLSWriter implements LoggerInterface.
2020-08-06 16:07:18 +01:00
// Writes messages in keep-live tcp connection.
2020-07-22 22:50:08 +08:00
type aliLSWriter struct {
store *LogStore
group []*LogGroup
withMap bool
groupMap map[string]*LogGroup
lock *sync.Mutex
Config
}
2020-08-06 16:07:18 +01:00
// NewAliLS creates a new Logger
2020-07-22 22:50:08 +08:00
func NewAliLS() logs.Logger {
alils := new(aliLSWriter)
alils.Level = logs.LevelTrace
return alils
}
2020-08-06 16:07:18 +01:00
// Init parses config and initializes struct
2020-07-22 22:50:08 +08:00
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
json.Unmarshal([]byte(jsonConfig), c)
if c.FlushWhen > CacheSize {
c.FlushWhen = CacheSize
}
prj := &LogProject{
Name: c.Project,
Endpoint: c.Endpoint,
AccessKeyID: c.KeyID,
AccessKeySecret: c.KeySecret,
}
c.store, err = prj.GetLogStore(c.LogStore)
if err != nil {
return err
}
// Create default Log Group
c.group = append(c.group, &LogGroup{
Topic: proto.String(""),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
})
// Create other Log Group
c.groupMap = make(map[string]*LogGroup)
for _, topic := range c.Topics {
lg := &LogGroup{
Topic: proto.String(topic),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
}
c.group = append(c.group, lg)
c.groupMap[topic] = lg
}
if len(c.group) == 1 {
c.withMap = false
} else {
c.withMap = true
}
c.lock = &sync.Mutex{}
return nil
}
2020-08-06 16:07:18 +01:00
// WriteMsg writes a message in connection.
// If connection is down, try to re-connect.
func (c *aliLSWriter) WriteMsg(lm *logs.LogMsg) error {
if lm.Level > c.Level {
2020-07-22 22:50:08 +08:00
return nil
}
var topic string
var content string
var lg *LogGroup
if c.withMap {
// TopicLogGroup
2020-08-19 16:13:42 +01:00
strs := strings.SplitN(lm.Msg, Delimiter, 2)
2020-07-22 22:50:08 +08:00
if len(strs) == 2 {
pos := strings.LastIndex(strs[0], " ")
topic = strs[0][pos+1 : len(strs[0])]
content = strs[0][0:pos] + strs[1]
lg = c.groupMap[topic]
}
// send to empty Topic
if lg == nil {
2020-08-19 16:13:42 +01:00
content = lm.Msg
2020-07-22 22:50:08 +08:00
lg = c.group[0]
}
} else {
2020-08-19 16:13:42 +01:00
content = lm.Msg
2020-07-22 22:50:08 +08:00
lg = c.group[0]
}
c1 := &LogContent{
Key: proto.String("msg"),
Value: proto.String(content),
}
l := &Log{
2020-08-19 16:13:42 +01:00
Time: proto.Uint32(uint32(lm.When.Unix())),
2020-07-22 22:50:08 +08:00
Contents: []*LogContent{
c1,
},
}
c.lock.Lock()
lg.Logs = append(lg.Logs, l)
c.lock.Unlock()
if len(lg.Logs) >= c.FlushWhen {
c.flush(lg)
}
return nil
}
// Flush implementing method. empty.
func (c *aliLSWriter) Flush() {
// flush all group
for _, lg := range c.group {
c.flush(lg)
}
}
// Destroy destroy connection writer and close tcp listener.
func (c *aliLSWriter) Destroy() {
}
func (c *aliLSWriter) flush(lg *LogGroup) {
c.lock.Lock()
defer c.lock.Unlock()
err := c.store.PutLogs(lg)
if err != nil {
return
}
lg.Logs = make([]*Log, 0, c.FlushWhen)
}
func init() {
logs.Register(logs.AdapterAliLS, NewAliLS)
}