1
0
mirror of https://github.com/astaxie/beego.git synced 2025-01-13 20:57:12 +00:00

209 lines
3.9 KiB
Go
Raw Normal View History

2020-07-22 22:50:08 +08:00
package alils
import (
"encoding/json"
2020-09-11 21:10:12 +08:00
"fmt"
2020-07-22 22:50:08 +08:00
"strings"
"sync"
"github.com/gogo/protobuf/proto"
2020-09-11 21:10:12 +08:00
"github.com/pkg/errors"
"github.com/astaxie/beego/pkg/infrastructure/logs"
2020-07-22 22:50:08 +08:00
)
const (
2020-08-06 16:07:18 +01:00
// CacheSize sets the flush size
2020-07-22 22:50:08 +08:00
CacheSize int = 64
2020-08-06 16:07:18 +01:00
// Delimiter defines the topic delimiter
2020-07-22 22:50:08 +08:00
Delimiter string = "##"
)
// Config is the Config for Ali Log
type Config struct {
Project string `json:"project"`
Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"`
KeySecret string `json:"key_secret"`
LogStore string `json:"log_store"`
Topics []string `json:"topics"`
Source string `json:"source"`
Level int `json:"level"`
FlushWhen int `json:"flush_when"`
2020-09-11 21:10:12 +08:00
Formatter string `json:"formatter"`
2020-07-22 22:50:08 +08:00
}
// aliLSWriter implements LoggerInterface.
2020-08-06 16:07:18 +01:00
// Writes messages in keep-live tcp connection.
2020-07-22 22:50:08 +08:00
type aliLSWriter struct {
2020-09-11 21:10:12 +08:00
store *LogStore
group []*LogGroup
withMap bool
groupMap map[string]*LogGroup
lock *sync.Mutex
2020-07-22 22:50:08 +08:00
Config
2020-09-11 21:10:12 +08:00
formatter logs.LogFormatter
2020-07-22 22:50:08 +08:00
}
2020-08-06 16:07:18 +01:00
// NewAliLS creates a new Logger
2020-07-22 22:50:08 +08:00
func NewAliLS() logs.Logger {
alils := new(aliLSWriter)
alils.Level = logs.LevelTrace
2020-09-11 21:10:12 +08:00
alils.formatter = alils
2020-07-22 22:50:08 +08:00
return alils
}
2020-08-06 16:07:18 +01:00
// Init parses config and initializes struct
2020-09-11 21:10:12 +08:00
func (c *aliLSWriter) Init(config string) error {
err := json.Unmarshal([]byte(config), c)
if err != nil {
return err
2020-08-24 20:39:53 +01:00
}
2020-07-22 22:50:08 +08:00
if c.FlushWhen > CacheSize {
c.FlushWhen = CacheSize
}
prj := &LogProject{
Name: c.Project,
Endpoint: c.Endpoint,
AccessKeyID: c.KeyID,
AccessKeySecret: c.KeySecret,
}
2020-08-28 18:18:28 +01:00
store, err := prj.GetLogStore(c.LogStore)
2020-07-22 22:50:08 +08:00
if err != nil {
return err
}
2020-08-28 18:18:28 +01:00
c.store = store
2020-07-22 22:50:08 +08:00
// Create default Log Group
c.group = append(c.group, &LogGroup{
Topic: proto.String(""),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
})
// Create other Log Group
c.groupMap = make(map[string]*LogGroup)
for _, topic := range c.Topics {
lg := &LogGroup{
Topic: proto.String(topic),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
}
c.group = append(c.group, lg)
c.groupMap[topic] = lg
}
if len(c.group) == 1 {
c.withMap = false
} else {
c.withMap = true
}
c.lock = &sync.Mutex{}
2020-09-11 21:10:12 +08:00
if len(c.Formatter) > 0 {
fmtr, ok := logs.GetFormatter(c.Formatter)
if !ok {
return errors.New(fmt.Sprintf("the formatter with name: %s not found", c.Formatter))
}
c.formatter = fmtr
}
2020-07-22 22:50:08 +08:00
return nil
}
2020-08-20 19:15:27 +01:00
func (c *aliLSWriter) Format(lm *logs.LogMsg) string {
2020-09-11 21:10:12 +08:00
return lm.OldStyleFormat()
}
func (c *aliLSWriter) SetFormatter(f logs.LogFormatter) {
c.formatter = f
2020-08-20 19:15:27 +01:00
}
2020-08-06 16:07:18 +01:00
// WriteMsg writes a message in connection.
// If connection is down, try to re-connect.
func (c *aliLSWriter) WriteMsg(lm *logs.LogMsg) error {
if lm.Level > c.Level {
2020-07-22 22:50:08 +08:00
return nil
}
var topic string
var content string
var lg *LogGroup
if c.withMap {
// TopicLogGroup
2020-08-19 16:13:42 +01:00
strs := strings.SplitN(lm.Msg, Delimiter, 2)
2020-07-22 22:50:08 +08:00
if len(strs) == 2 {
pos := strings.LastIndex(strs[0], " ")
topic = strs[0][pos+1 : len(strs[0])]
lg = c.groupMap[topic]
}
// send to empty Topic
if lg == nil {
lg = c.group[0]
}
} else {
lg = c.group[0]
}
2020-09-11 21:10:12 +08:00
content = c.formatter.Format(lm)
2020-08-24 20:39:53 +01:00
2020-07-22 22:50:08 +08:00
c1 := &LogContent{
Key: proto.String("msg"),
Value: proto.String(content),
}
l := &Log{
2020-08-19 16:13:42 +01:00
Time: proto.Uint32(uint32(lm.When.Unix())),
2020-07-22 22:50:08 +08:00
Contents: []*LogContent{
c1,
},
}
c.lock.Lock()
lg.Logs = append(lg.Logs, l)
c.lock.Unlock()
if len(lg.Logs) >= c.FlushWhen {
c.flush(lg)
}
return nil
}
// Flush implementing method. empty.
func (c *aliLSWriter) Flush() {
// flush all group
for _, lg := range c.group {
c.flush(lg)
}
}
// Destroy destroy connection writer and close tcp listener.
func (c *aliLSWriter) Destroy() {
}
func (c *aliLSWriter) flush(lg *LogGroup) {
c.lock.Lock()
defer c.lock.Unlock()
err := c.store.PutLogs(lg)
if err != nil {
return
}
lg.Logs = make([]*Log, 0, c.FlushWhen)
}
func init() {
logs.Register(logs.AdapterAliLS, NewAliLS)
}