mirror of
https://github.com/astaxie/beego.git
synced 2024-11-01 06:30:55 +00:00
193 lines
3.6 KiB
Go
193 lines
3.6 KiB
Go
package alils
|
||
|
||
import (
|
||
"encoding/json"
|
||
"github.com/astaxie/beego/logs"
|
||
"github.com/gogo/protobuf/proto"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
const (
|
||
CacheSize int = 64
|
||
Delimiter string = "##"
|
||
)
|
||
|
||
type AliLSConfig struct {
|
||
Project string `json:"project"`
|
||
Endpoint string `json:"endpoint"`
|
||
KeyID string `json:"key_id"`
|
||
KeySecret string `json:"key_secret"`
|
||
LogStore string `json:"log_store"`
|
||
Topics []string `json:"topics"`
|
||
Source string `json:"source"`
|
||
Level int `json:"level"`
|
||
FlushWhen int `json:"flush_when"`
|
||
}
|
||
|
||
// aliLSWriter implements LoggerInterface.
|
||
// it writes messages in keep-live tcp connection.
|
||
type aliLSWriter struct {
|
||
store *LogStore
|
||
group []*LogGroup
|
||
withMap bool
|
||
groupMap map[string]*LogGroup
|
||
lock *sync.Mutex
|
||
AliLSConfig
|
||
}
|
||
|
||
// 创建提供Logger接口的日志服务
|
||
func NewAliLS() logs.Logger {
|
||
alils := new(aliLSWriter)
|
||
alils.Level = logs.LevelTrace
|
||
return alils
|
||
}
|
||
|
||
// 读取配置
|
||
// 初始化必要的数据结构
|
||
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
|
||
|
||
json.Unmarshal([]byte(jsonConfig), c)
|
||
|
||
if c.FlushWhen > CacheSize {
|
||
c.FlushWhen = CacheSize
|
||
}
|
||
|
||
// 初始化Project
|
||
prj := &LogProject{
|
||
Name: c.Project,
|
||
Endpoint: c.Endpoint,
|
||
AccessKeyId: c.KeyID,
|
||
AccessKeySecret: c.KeySecret,
|
||
}
|
||
|
||
// 获取logstore
|
||
c.store, err = prj.GetLogStore(c.LogStore)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 创建默认Log Group
|
||
c.group = append(c.group, &LogGroup{
|
||
Topic: proto.String(""),
|
||
Source: proto.String(c.Source),
|
||
Logs: make([]*Log, 0, c.FlushWhen),
|
||
})
|
||
|
||
// 创建其它Log Group
|
||
c.groupMap = make(map[string]*LogGroup)
|
||
for _, topic := range c.Topics {
|
||
|
||
lg := &LogGroup{
|
||
Topic: proto.String(topic),
|
||
Source: proto.String(c.Source),
|
||
Logs: make([]*Log, 0, c.FlushWhen),
|
||
}
|
||
|
||
c.group = append(c.group, lg)
|
||
c.groupMap[topic] = lg
|
||
}
|
||
|
||
if len(c.group) == 1 {
|
||
c.withMap = false
|
||
} else {
|
||
c.withMap = true
|
||
}
|
||
|
||
c.lock = &sync.Mutex{}
|
||
|
||
return nil
|
||
}
|
||
|
||
// WriteMsg write message in connection.
|
||
// if connection is down, try to re-connect.
|
||
func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int) (err error) {
|
||
|
||
if level > c.Level {
|
||
return nil
|
||
}
|
||
|
||
var topic string
|
||
var content string
|
||
var lg *LogGroup
|
||
if c.withMap {
|
||
|
||
// 解析出Topic,并匹配LogGroup
|
||
strs := strings.SplitN(msg, Delimiter, 2)
|
||
if len(strs) == 2 {
|
||
pos := strings.LastIndex(strs[0], " ")
|
||
topic = strs[0][pos+1 : len(strs[0])]
|
||
content = strs[0][0:pos] + strs[1]
|
||
lg = c.groupMap[topic]
|
||
}
|
||
|
||
// 默认发到空Topic
|
||
if lg == nil {
|
||
topic = ""
|
||
content = msg
|
||
lg = c.group[0]
|
||
}
|
||
} else {
|
||
topic = ""
|
||
content = msg
|
||
lg = c.group[0]
|
||
}
|
||
|
||
// 生成日志
|
||
c1 := &Log_Content{
|
||
Key: proto.String("msg"),
|
||
Value: proto.String(content),
|
||
}
|
||
|
||
l := &Log{
|
||
Time: proto.Uint32(uint32(when.Unix())), // 填写日志时间
|
||
Contents: []*Log_Content{
|
||
c1,
|
||
},
|
||
}
|
||
|
||
c.lock.Lock()
|
||
lg.Logs = append(lg.Logs, l)
|
||
c.lock.Unlock()
|
||
|
||
// 满足条件则Flush
|
||
if len(lg.Logs) >= c.FlushWhen {
|
||
c.flush(lg)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Flush implementing method. empty.
|
||
func (c *aliLSWriter) Flush() {
|
||
|
||
// flush所有group
|
||
for _, lg := range c.group {
|
||
c.flush(lg)
|
||
}
|
||
}
|
||
|
||
// Destroy destroy connection writer and close tcp listener.
|
||
func (c *aliLSWriter) Destroy() {
|
||
}
|
||
|
||
func (c *aliLSWriter) flush(lg *LogGroup) {
|
||
|
||
c.lock.Lock()
|
||
defer c.lock.Unlock()
|
||
|
||
// 把以上的LogGroup推送到SLS服务器,
|
||
// SLS服务器会根据该logstore的shard个数自动进行负载均衡。
|
||
err := c.store.PutLogs(lg)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
lg.Logs = make([]*Log, 0, c.FlushWhen)
|
||
}
|
||
|
||
func init() {
|
||
logs.Register(logs.AdapterAliLS, NewAliLS)
|
||
}
|