package alils import ( "encoding/json" "fmt" "io/ioutil" "net/http" "net/http/httputil" "strconv" lz4 "github.com/cloudflare/golz4" "github.com/gogo/protobuf/proto" ) // LogStore stores the logs type LogStore struct { Name string `json:"logstoreName"` TTL int ShardCount int CreateTime uint32 LastModifyTime uint32 project *LogProject } // Shard defines the Log Shard type Shard struct { ShardID int `json:"shardID"` } // ListShards returns shard id list of this logstore. func (s *LogStore) ListShards() (shardIDs []int, err error) { h := map[string]string{ "x-sls-bodyrawsize": "0", } uri := fmt.Sprintf("/logstores/%v/shards", s.Name) r, err := request(s.project, "GET", uri, h, nil) if err != nil { return } buf, err := ioutil.ReadAll(r.Body) if err != nil { return } if r.StatusCode != http.StatusOK { errMsg := &errorMessage{} err = json.Unmarshal(buf, errMsg) if err != nil { err = fmt.Errorf("failed to list logstore") dump, _ := httputil.DumpResponse(r, true) fmt.Println(dump) return } err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) return } var shards []*Shard err = json.Unmarshal(buf, &shards) if err != nil { return } for _, v := range shards { shardIDs = append(shardIDs, v.ShardID) } return } // PutLogs puts logs into logstore. // The callers should transform user logs into LogGroup. func (s *LogStore) PutLogs(lg *LogGroup) (err error) { body, err := proto.Marshal(lg) if err != nil { return } // Compresse body with lz4 out := make([]byte, lz4.CompressBound(body)) n, err := lz4.Compress(body, out) if err != nil { return } h := map[string]string{ "x-sls-compresstype": "lz4", "x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)), "Content-Type": "application/x-protobuf", } uri := fmt.Sprintf("/logstores/%v", s.Name) r, err := request(s.project, "POST", uri, h, out[:n]) if err != nil { return } buf, err := ioutil.ReadAll(r.Body) if err != nil { return } if r.StatusCode != http.StatusOK { errMsg := &errorMessage{} err = json.Unmarshal(buf, errMsg) if err != nil { err = fmt.Errorf("failed to put logs") dump, _ := httputil.DumpResponse(r, true) fmt.Println(dump) return } err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) return } return } // GetCursor gets log cursor of one shard specified by shardID. // The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end". // For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) { h := map[string]string{ "x-sls-bodyrawsize": "0", } uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v", s.Name, shardID, from) r, err := request(s.project, "GET", uri, h, nil) if err != nil { return } buf, err := ioutil.ReadAll(r.Body) if err != nil { return } if r.StatusCode != http.StatusOK { errMsg := &errorMessage{} err = json.Unmarshal(buf, errMsg) if err != nil { err = fmt.Errorf("failed to get cursor") dump, _ := httputil.DumpResponse(r, true) fmt.Println(dump) return } err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) return } type Body struct { Cursor string } body := &Body{} err = json.Unmarshal(buf, body) if err != nil { return } cursor = body.Cursor return } // GetLogsBytes gets logs binary data from shard specified by shardID according cursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next curosr can be used to read logs at next time. func (s *LogStore) GetLogsBytes(shardID int, cursor string, logGroupMaxCount int) (out []byte, nextCursor string, err error) { h := map[string]string{ "x-sls-bodyrawsize": "0", "Accept": "application/x-protobuf", "Accept-Encoding": "lz4", } uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v", s.Name, shardID, cursor, logGroupMaxCount) r, err := request(s.project, "GET", uri, h, nil) if err != nil { return } buf, err := ioutil.ReadAll(r.Body) if err != nil { return } if r.StatusCode != http.StatusOK { errMsg := &errorMessage{} err = json.Unmarshal(buf, errMsg) if err != nil { err = fmt.Errorf("failed to get cursor") dump, _ := httputil.DumpResponse(r, true) fmt.Println(dump) return } err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message) return } v, ok := r.Header["X-Sls-Compresstype"] if !ok || len(v) == 0 { err = fmt.Errorf("can't find 'x-sls-compresstype' header") return } if v[0] != "lz4" { err = fmt.Errorf("unexpected compress type:%v", v[0]) return } v, ok = r.Header["X-Sls-Cursor"] if !ok || len(v) == 0 { err = fmt.Errorf("can't find 'x-sls-cursor' header") return } nextCursor = v[0] v, ok = r.Header["X-Sls-Bodyrawsize"] if !ok || len(v) == 0 { err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header") return } bodyRawSize, err := strconv.Atoi(v[0]) if err != nil { return } out = make([]byte, bodyRawSize) err = lz4.Uncompress(buf, out) if err != nil { return } return } // LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) { gl = &LogGroupList{} err = proto.Unmarshal(data, gl) if err != nil { return } return } // GetLogs gets logs from shard specified by shardID according cursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next curosr can be used to read logs at next time. func (s *LogStore) GetLogs(shardID int, cursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) { out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount) if err != nil { return } gl, err = LogsBytesDecode(out) if err != nil { return } return }