1
0
mirror of https://github.com/astaxie/beego.git synced 2025-07-02 02:20:19 +00:00

only add golang.org vendor

This commit is contained in:
astaxie
2018-07-31 19:25:43 +08:00
parent 6d69047fff
commit d7b8aa8b52
474 changed files with 0 additions and 323280 deletions

View File

@ -1,363 +0,0 @@
package rpl
import (
"fmt"
"io"
"os"
"github.com/edsrzf/mmap-go"
"github.com/siddontang/go/log"
)
//like leveldb or rocksdb file interface, haha!
type writeFile interface {
Sync() error
Write(b []byte) (n int, err error)
Close() error
ReadAt(buf []byte, offset int64) (int, error)
Truncate(size int64) error
SetOffset(o int64)
Name() string
Size() int
Offset() int64
}
type readFile interface {
ReadAt(buf []byte, offset int64) (int, error)
Close() error
Size() int
Name() string
}
type rawWriteFile struct {
writeFile
f *os.File
offset int64
name string
}
func newRawWriteFile(name string, size int64) (writeFile, error) {
m := new(rawWriteFile)
var err error
m.name = name
m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return m, nil
}
func (m *rawWriteFile) Close() error {
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if err := m.f.Close(); err != nil {
return fmt.Errorf("close %s error %s", m.name, err.Error())
}
return nil
}
func (m *rawWriteFile) Sync() error {
return m.f.Sync()
}
func (m *rawWriteFile) Write(b []byte) (n int, err error) {
n, err = m.f.WriteAt(b, m.offset)
if err != nil {
return
} else if n != len(b) {
err = io.ErrShortWrite
return
}
m.offset += int64(n)
return
}
func (m *rawWriteFile) ReadAt(buf []byte, offset int64) (int, error) {
return m.f.ReadAt(buf, offset)
}
func (m *rawWriteFile) Truncate(size int64) error {
var err error
if err = m.f.Truncate(size); err != nil {
return err
}
if m.offset > size {
m.offset = size
}
return nil
}
func (m *rawWriteFile) SetOffset(o int64) {
m.offset = o
}
func (m *rawWriteFile) Offset() int64 {
return m.offset
}
func (m *rawWriteFile) Name() string {
return m.name
}
func (m *rawWriteFile) Size() int {
st, _ := m.f.Stat()
return int(st.Size())
}
type rawReadFile struct {
readFile
f *os.File
name string
}
func newRawReadFile(name string) (readFile, error) {
m := new(rawReadFile)
var err error
m.f, err = os.Open(name)
m.name = name
if err != nil {
return nil, err
}
return m, err
}
func (m *rawReadFile) Close() error {
return m.f.Close()
}
func (m *rawReadFile) Size() int {
st, _ := m.f.Stat()
return int(st.Size())
}
func (m *rawReadFile) ReadAt(b []byte, offset int64) (int, error) {
return m.f.ReadAt(b, offset)
}
func (m *rawReadFile) Name() string {
return m.name
}
/////////////////////////////////////////////////
type mmapWriteFile struct {
writeFile
f *os.File
m mmap.MMap
name string
size int64
offset int64
}
func newMmapWriteFile(name string, size int64) (writeFile, error) {
m := new(mmapWriteFile)
m.name = name
var err error
m.f, err = os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
if size == 0 {
st, _ := m.f.Stat()
size = st.Size()
}
if err = m.f.Truncate(size); err != nil {
return nil, err
}
if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil {
return nil, err
}
m.size = size
m.offset = 0
return m, nil
}
func (m *mmapWriteFile) Size() int {
return int(m.size)
}
func (m *mmapWriteFile) Sync() error {
return m.m.Flush()
}
func (m *mmapWriteFile) Close() error {
if err := m.m.Unmap(); err != nil {
return fmt.Errorf("unmap %s error %s", m.name, err.Error())
}
if err := m.f.Truncate(m.offset); err != nil {
return fmt.Errorf("close truncate %s error %s", m.name, err.Error())
}
if err := m.f.Close(); err != nil {
return fmt.Errorf("close %s error %s", m.name, err.Error())
}
return nil
}
func (m *mmapWriteFile) Write(b []byte) (n int, err error) {
extra := int64(len(b)) - (m.size - m.offset)
if extra > 0 {
newSize := m.size + extra + m.size/10
if err = m.Truncate(newSize); err != nil {
return
}
m.size = newSize
}
n = copy(m.m[m.offset:], b)
if n != len(b) {
return 0, io.ErrShortWrite
}
m.offset += int64(len(b))
return len(b), nil
}
func (m *mmapWriteFile) ReadAt(buf []byte, offset int64) (int, error) {
if offset > m.offset {
return 0, fmt.Errorf("invalid offset %d", offset)
}
n := copy(buf, m.m[offset:m.offset])
if n != len(buf) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
func (m *mmapWriteFile) Truncate(size int64) error {
var err error
if err = m.m.Unmap(); err != nil {
return err
}
if err = m.f.Truncate(size); err != nil {
return err
}
if m.m, err = mmap.Map(m.f, mmap.RDWR, 0); err != nil {
return err
}
m.size = size
if m.offset > m.size {
m.offset = m.size
}
return nil
}
func (m *mmapWriteFile) SetOffset(o int64) {
m.offset = o
}
func (m *mmapWriteFile) Offset() int64 {
return m.offset
}
func (m *mmapWriteFile) Name() string {
return m.name
}
type mmapReadFile struct {
readFile
f *os.File
m mmap.MMap
name string
}
func newMmapReadFile(name string) (readFile, error) {
m := new(mmapReadFile)
m.name = name
var err error
m.f, err = os.Open(name)
if err != nil {
return nil, err
}
m.m, err = mmap.Map(m.f, mmap.RDONLY, 0)
return m, err
}
func (m *mmapReadFile) ReadAt(buf []byte, offset int64) (int, error) {
if int64(offset) > int64(len(m.m)) {
return 0, fmt.Errorf("invalid offset %d", offset)
}
n := copy(buf, m.m[offset:])
if n != len(buf) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
func (m *mmapReadFile) Close() error {
if m.m != nil {
if err := m.m.Unmap(); err != nil {
log.Errorf("unmap %s error %s", m.name, err.Error())
}
m.m = nil
}
if m.f != nil {
if err := m.f.Close(); err != nil {
log.Errorf("close %s error %s", m.name, err.Error())
}
m.f = nil
}
return nil
}
func (m *mmapReadFile) Size() int {
return len(m.m)
}
func (m *mmapReadFile) Name() string {
return m.name
}
/////////////////////////////////////
func newWriteFile(useMmap bool, name string, size int64) (writeFile, error) {
if useMmap {
return newMmapWriteFile(name, size)
} else {
return newRawWriteFile(name, size)
}
}
func newReadFile(useMmap bool, name string) (readFile, error) {
if useMmap {
return newMmapReadFile(name)
} else {
return newRawReadFile(name)
}
}

View File

@ -1,416 +0,0 @@
package rpl
import (
"fmt"
"io/ioutil"
"os"
"sort"
"sync"
"time"
"github.com/siddontang/go/log"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/config"
)
const (
defaultMaxLogFileSize = int64(256 * 1024 * 1024)
maxLogFileSize = int64(1024 * 1024 * 1024)
defaultLogNumInFile = int64(1024 * 1024)
)
/*
File Store:
00000001.data
00000001.meta
00000002.data
00000002.meta
data: log1 data | log2 data | magic data
if data has no magic data, it means that we don't close replication gracefully.
so we must repair the log data
log data: id (bigendian uint64), create time (bigendian uint32), compression (byte), data len(bigendian uint32), data
split data = log0 data + [padding 0] -> file % pagesize() == 0
meta: log1 offset | log2 offset
log offset: bigendian uint32 | bigendian uint32
//sha1 of github.com/siddontang/ledisdb 20 bytes
magic data = "\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17"
we must guarantee that the log id is monotonic increment strictly.
if log1's id is 1, log2 must be 2
*/
type FileStore struct {
LogStore
cfg *config.Config
base string
rm sync.RWMutex
wm sync.Mutex
rs tableReaders
w *tableWriter
quit chan struct{}
}
func NewFileStore(base string, cfg *config.Config) (*FileStore, error) {
s := new(FileStore)
s.quit = make(chan struct{})
var err error
if err = os.MkdirAll(base, 0755); err != nil {
return nil, err
}
s.base = base
if cfg.Replication.MaxLogFileSize == 0 {
cfg.Replication.MaxLogFileSize = defaultMaxLogFileSize
}
cfg.Replication.MaxLogFileSize = num.MinInt64(cfg.Replication.MaxLogFileSize, maxLogFileSize)
s.cfg = cfg
if err = s.load(); err != nil {
return nil, err
}
index := int64(1)
if len(s.rs) != 0 {
index = s.rs[len(s.rs)-1].index + 1
}
s.w = newTableWriter(s.base, index, cfg.Replication.MaxLogFileSize, cfg.Replication.UseMmap)
s.w.SetSyncType(cfg.Replication.SyncLog)
go s.checkTableReaders()
return s, nil
}
func (s *FileStore) GetLog(id uint64, l *Log) error {
//first search in table writer
if err := s.w.GetLog(id, l); err == nil {
return nil
} else if err != ErrLogNotFound {
return err
}
s.rm.RLock()
t := s.rs.Search(id)
if t == nil {
s.rm.RUnlock()
return ErrLogNotFound
}
err := t.GetLog(id, l)
s.rm.RUnlock()
return err
}
func (s *FileStore) FirstID() (uint64, error) {
id := uint64(0)
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[0].first
} else {
id = 0
}
s.rm.RUnlock()
if id > 0 {
return id, nil
}
//if id = 0,
return s.w.First(), nil
}
func (s *FileStore) LastID() (uint64, error) {
id := s.w.Last()
if id > 0 {
return id, nil
}
//if table writer has no last id, we may find in the last table reader
s.rm.RLock()
if len(s.rs) > 0 {
id = s.rs[len(s.rs)-1].last
}
s.rm.RUnlock()
return id, nil
}
func (s *FileStore) StoreLog(l *Log) error {
s.wm.Lock()
err := s.storeLog(l)
s.wm.Unlock()
return err
}
func (s *FileStore) storeLog(l *Log) error {
err := s.w.StoreLog(l)
if err == nil {
return nil
} else if err != errTableNeedFlush {
return err
}
var r *tableReader
r, err = s.w.Flush()
if err != nil {
log.Fatalf("write table flush error %s, can not store!!!", err.Error())
s.w.Close()
return err
}
s.rm.Lock()
s.rs = append(s.rs, r)
s.rm.Unlock()
err = s.w.StoreLog(l)
return err
}
func (s *FileStore) PurgeExpired(n int64) error {
s.rm.Lock()
var purges []*tableReader
t := uint32(time.Now().Unix() - int64(n))
for i, r := range s.rs {
if r.lastTime > t {
purges = append([]*tableReader{}, s.rs[0:i]...)
n := copy(s.rs, s.rs[i:])
s.rs = s.rs[0:n]
break
}
}
s.rm.Unlock()
s.purgeTableReaders(purges)
return nil
}
func (s *FileStore) Sync() error {
return s.w.Sync()
}
func (s *FileStore) Clear() error {
s.wm.Lock()
s.rm.Lock()
defer func() {
s.rm.Unlock()
s.wm.Unlock()
}()
s.w.Close()
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
if err := os.RemoveAll(s.base); err != nil {
return err
}
if err := os.MkdirAll(s.base, 0755); err != nil {
return err
}
s.w = newTableWriter(s.base, 1, s.cfg.Replication.MaxLogFileSize, s.cfg.Replication.UseMmap)
return nil
}
func (s *FileStore) Close() error {
close(s.quit)
s.wm.Lock()
s.rm.Lock()
if r, err := s.w.Flush(); err != nil {
if err != errNilHandler {
log.Errorf("close err: %s", err.Error())
}
} else {
r.Close()
s.w.Close()
}
for i := range s.rs {
s.rs[i].Close()
}
s.rs = tableReaders{}
s.rm.Unlock()
s.wm.Unlock()
return nil
}
func (s *FileStore) checkTableReaders() {
t := time.NewTicker(60 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
s.rm.Lock()
for _, r := range s.rs {
if !r.Keepalived() {
r.Close()
}
}
purges := []*tableReader{}
maxNum := s.cfg.Replication.MaxLogFileNum
num := len(s.rs)
if num > maxNum {
purges = s.rs[:num-maxNum]
s.rs = s.rs[num-maxNum:]
}
s.rm.Unlock()
s.purgeTableReaders(purges)
case <-s.quit:
return
}
}
}
func (s *FileStore) purgeTableReaders(purges []*tableReader) {
for _, r := range purges {
dataName := fmtTableDataName(r.base, r.index)
metaName := fmtTableMetaName(r.base, r.index)
r.Close()
if err := os.Remove(dataName); err != nil {
log.Errorf("purge table data %s err: %s", dataName, err.Error())
}
if err := os.Remove(metaName); err != nil {
log.Errorf("purge table meta %s err: %s", metaName, err.Error())
}
}
}
func (s *FileStore) load() error {
fs, err := ioutil.ReadDir(s.base)
if err != nil {
return err
}
s.rs = make(tableReaders, 0, len(fs))
var r *tableReader
var index int64
for _, f := range fs {
if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil {
if r, err = newTableReader(s.base, index, s.cfg.Replication.UseMmap); err != nil {
log.Errorf("load table %s err: %s", f.Name(), err.Error())
} else {
s.rs = append(s.rs, r)
}
}
}
if err := s.rs.check(); err != nil {
return err
}
return nil
}
type tableReaders []*tableReader
func (ts tableReaders) Len() int {
return len(ts)
}
func (ts tableReaders) Swap(i, j int) {
ts[i], ts[j] = ts[j], ts[i]
}
func (ts tableReaders) Less(i, j int) bool {
return ts[i].first < ts[j].first
}
func (ts tableReaders) Search(id uint64) *tableReader {
i, j := 0, len(ts)-1
for i <= j {
h := i + (j-i)/2
if ts[h].first <= id && id <= ts[h].last {
return ts[h]
} else if ts[h].last < id {
i = h + 1
} else {
j = h - 1
}
}
return nil
}
func (ts tableReaders) check() error {
if len(ts) == 0 {
return nil
}
sort.Sort(ts)
first := ts[0].first
last := ts[0].last
index := ts[0].index
if first == 0 || first > last {
return fmt.Errorf("invalid log in table %s", ts[0])
}
for i := 1; i < len(ts); i++ {
if ts[i].first <= last {
return fmt.Errorf("invalid first log id %d in table %s", ts[i].first, ts[i])
}
if ts[i].index <= index {
return fmt.Errorf("invalid index %d in table %s", ts[i].index, ts[i])
}
first = ts[i].first
last = ts[i].last
index = ts[i].index
}
return nil
}

View File

@ -1,571 +0,0 @@
package rpl
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"path"
"sync"
"time"
"github.com/siddontang/go/log"
"github.com/siddontang/go/sync2"
)
var (
magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
errTableNeedFlush = errors.New("write table need flush")
errNilHandler = errors.New("nil write handler")
)
const tableReaderKeepaliveInterval int64 = 30
func fmtTableDataName(base string, index int64) string {
return path.Join(base, fmt.Sprintf("%08d.data", index))
}
func fmtTableMetaName(base string, index int64) string {
return path.Join(base, fmt.Sprintf("%08d.meta", index))
}
type tableReader struct {
sync.Mutex
base string
index int64
data readFile
meta readFile
first uint64
last uint64
lastTime uint32
lastReadTime sync2.AtomicInt64
useMmap bool
}
func newTableReader(base string, index int64, useMmap bool) (*tableReader, error) {
if index <= 0 {
return nil, fmt.Errorf("invalid index %d", index)
}
t := new(tableReader)
t.base = base
t.index = index
t.useMmap = useMmap
var err error
if err = t.check(); err != nil {
log.Errorf("check %d error: %s, try to repair", t.index, err.Error())
if err = t.repair(); err != nil {
log.Errorf("repair %d error: %s", t.index, err.Error())
return nil, err
}
}
t.close()
return t, nil
}
func (t *tableReader) String() string {
return fmt.Sprintf("%d", t.index)
}
func (t *tableReader) Close() {
t.Lock()
t.close()
t.Unlock()
}
func (t *tableReader) close() {
if t.data != nil {
t.data.Close()
t.data = nil
}
if t.meta != nil {
t.meta.Close()
t.meta = nil
}
}
func (t *tableReader) Keepalived() bool {
l := t.lastReadTime.Get()
if l > 0 && time.Now().Unix()-l > tableReaderKeepaliveInterval {
return false
}
return true
}
func (t *tableReader) getLogPos(index int) (uint32, error) {
var buf [4]byte
if _, err := t.meta.ReadAt(buf[0:4], int64(index)*4); err != nil {
return 0, err
}
return binary.BigEndian.Uint32(buf[0:4]), nil
}
func (t *tableReader) checkData() error {
var err error
//check will use raw file mode
if t.data, err = newReadFile(false, fmtTableDataName(t.base, t.index)); err != nil {
return err
}
if t.data.Size() < len(magic) {
return fmt.Errorf("data file %s size %d too short", t.data.Name(), t.data.Size())
}
buf := make([]byte, len(magic))
if _, err := t.data.ReadAt(buf, int64(t.data.Size()-len(magic))); err != nil {
return err
}
if !bytes.Equal(magic, buf) {
return fmt.Errorf("data file %s invalid magic data %q", t.data.Name(), buf)
}
return nil
}
func (t *tableReader) checkMeta() error {
var err error
//check will use raw file mode
if t.meta, err = newReadFile(false, fmtTableMetaName(t.base, t.index)); err != nil {
return err
}
if t.meta.Size()%4 != 0 || t.meta.Size() == 0 {
return fmt.Errorf("meta file %s invalid offset len %d, must 4 multiple and not 0", t.meta.Name(), t.meta.Size())
}
return nil
}
func (t *tableReader) check() error {
var err error
if err := t.checkMeta(); err != nil {
return err
}
if err := t.checkData(); err != nil {
return err
}
firstLogPos, _ := t.getLogPos(0)
lastLogPos, _ := t.getLogPos(t.meta.Size()/4 - 1)
if firstLogPos != 0 {
return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
}
var l Log
if _, err = t.decodeLogHead(&l, t.data, int64(firstLogPos)); err != nil {
return fmt.Errorf("decode first log err %s", err.Error())
}
t.first = l.ID
var n int64
if n, err = t.decodeLogHead(&l, t.data, int64(lastLogPos)); err != nil {
return fmt.Errorf("decode last log err %s", err.Error())
} else if n+int64(len(magic)) != int64(t.data.Size()) {
return fmt.Errorf("extra log data at offset %d", n)
}
t.last = l.ID
t.lastTime = l.CreateTime
if t.first > t.last {
return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
} else if (t.last - t.first + 1) != uint64(t.meta.Size()/4) {
return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.meta.Size()/4)
}
return nil
}
func (t *tableReader) repair() error {
t.close()
var err error
var data writeFile
var meta writeFile
//repair will use raw file mode
data, err = newWriteFile(false, fmtTableDataName(t.base, t.index), 0)
data.SetOffset(int64(data.Size()))
meta, err = newWriteFile(false, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4))
var l Log
var pos int64 = 0
var nextPos int64 = 0
b := make([]byte, 4)
t.first = 0
t.last = 0
for {
nextPos, err = t.decodeLogHead(&l, data, pos)
if err != nil {
//if error, we may lost all logs from pos
log.Errorf("%s may lost logs from %d", data.Name(), pos)
break
}
if l.ID == 0 {
log.Errorf("%s may lost logs from %d, invalid log 0", data.Name(), pos)
break
}
if t.first == 0 {
t.first = l.ID
}
if t.last == 0 {
t.last = l.ID
} else if l.ID <= t.last {
log.Errorf("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
break
}
t.last = l.ID
t.lastTime = l.CreateTime
binary.BigEndian.PutUint32(b, uint32(pos))
meta.Write(b)
pos = nextPos
t.lastTime = l.CreateTime
}
var e error
if err := meta.Close(); err != nil {
e = err
}
data.SetOffset(pos)
if _, err = data.Write(magic); err != nil {
log.Errorf("write magic error %s", err.Error())
}
if err = data.Close(); err != nil {
return err
}
return e
}
func (t *tableReader) decodeLogHead(l *Log, r io.ReaderAt, pos int64) (int64, error) {
dataLen, err := l.DecodeHeadAt(r, pos)
if err != nil {
return 0, err
}
return pos + int64(l.HeadSize()) + int64(dataLen), nil
}
func (t *tableReader) GetLog(id uint64, l *Log) error {
if id < t.first || id > t.last {
return ErrLogNotFound
}
t.lastReadTime.Set(time.Now().Unix())
t.Lock()
if err := t.openTable(); err != nil {
t.close()
t.Unlock()
return err
}
t.Unlock()
pos, err := t.getLogPos(int(id - t.first))
if err != nil {
return err
}
if err := l.DecodeAt(t.data, int64(pos)); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", l.ID, id)
}
return nil
}
func (t *tableReader) openTable() error {
var err error
if t.data == nil {
if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil {
return err
}
}
if t.meta == nil {
if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil {
return err
}
}
return nil
}
type tableWriter struct {
sync.RWMutex
data writeFile
meta writeFile
base string
index int64
first uint64
last uint64
lastTime uint32
maxLogSize int64
closed bool
syncType int
posBuf []byte
useMmap bool
}
func newTableWriter(base string, index int64, maxLogSize int64, useMmap bool) *tableWriter {
if index <= 0 {
panic(fmt.Errorf("invalid index %d", index))
}
t := new(tableWriter)
t.base = base
t.index = index
t.maxLogSize = maxLogSize
t.closed = false
t.posBuf = make([]byte, 4)
t.useMmap = useMmap
return t
}
func (t *tableWriter) String() string {
return fmt.Sprintf("%d", t.index)
}
func (t *tableWriter) SetMaxLogSize(s int64) {
t.maxLogSize = s
}
func (t *tableWriter) SetSyncType(tp int) {
t.syncType = tp
}
func (t *tableWriter) close() {
if t.meta != nil {
if err := t.meta.Close(); err != nil {
log.Fatalf("close log meta error %s", err.Error())
}
t.meta = nil
}
if t.data != nil {
if _, err := t.data.Write(magic); err != nil {
log.Fatalf("write magic error %s", err.Error())
}
if err := t.data.Close(); err != nil {
log.Fatalf("close log data error %s", err.Error())
}
t.data = nil
}
}
func (t *tableWriter) Close() {
t.Lock()
t.closed = true
t.close()
t.Unlock()
}
func (t *tableWriter) First() uint64 {
t.Lock()
id := t.first
t.Unlock()
return id
}
func (t *tableWriter) Last() uint64 {
t.Lock()
id := t.last
t.Unlock()
return id
}
func (t *tableWriter) Flush() (*tableReader, error) {
t.Lock()
if t.data == nil || t.meta == nil {
t.Unlock()
return nil, errNilHandler
}
tr := new(tableReader)
tr.base = t.base
tr.index = t.index
tr.first = t.first
tr.last = t.last
tr.lastTime = t.lastTime
tr.useMmap = t.useMmap
t.close()
t.first = 0
t.last = 0
t.index = t.index + 1
t.Unlock()
return tr, nil
}
func (t *tableWriter) StoreLog(l *Log) error {
t.Lock()
err := t.storeLog(l)
t.Unlock()
return err
}
func (t *tableWriter) openFile() error {
var err error
if t.data == nil {
if t.data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), t.maxLogSize+t.maxLogSize/10+int64(len(magic))); err != nil {
return err
}
}
if t.meta == nil {
if t.meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)); err != nil {
return err
}
}
return err
}
func (t *tableWriter) storeLog(l *Log) error {
if l.ID == 0 {
return ErrStoreLogID
}
if t.closed {
return fmt.Errorf("table writer is closed")
}
if t.last > 0 && l.ID != t.last+1 {
return ErrStoreLogID
}
if t.data != nil && t.data.Offset() > t.maxLogSize {
return errTableNeedFlush
}
var err error
if err = t.openFile(); err != nil {
return err
}
offsetPos := t.data.Offset()
if err = l.Encode(t.data); err != nil {
return err
}
binary.BigEndian.PutUint32(t.posBuf, uint32(offsetPos))
if _, err = t.meta.Write(t.posBuf); err != nil {
return err
}
if t.first == 0 {
t.first = l.ID
}
t.last = l.ID
t.lastTime = l.CreateTime
if t.syncType == 2 {
if err := t.data.Sync(); err != nil {
log.Errorf("sync table error %s", err.Error())
}
}
return nil
}
func (t *tableWriter) GetLog(id uint64, l *Log) error {
t.RLock()
defer t.RUnlock()
if id < t.first || id > t.last {
return ErrLogNotFound
}
var buf [4]byte
if _, err := t.meta.ReadAt(buf[0:4], int64((id-t.first)*4)); err != nil {
return err
}
offset := binary.BigEndian.Uint32(buf[0:4])
if err := l.DecodeAt(t.data, int64(offset)); err != nil {
return err
} else if l.ID != id {
return fmt.Errorf("invalid log id %d != %d", id, l.ID)
}
return nil
}
func (t *tableWriter) Sync() error {
t.Lock()
var err error
if t.data != nil {
err = t.data.Sync()
t.Unlock()
return err
}
if t.meta != nil {
err = t.meta.Sync()
}
t.Unlock()
return err
}

View File

@ -1,225 +0,0 @@
package rpl
import (
"bytes"
"fmt"
"os"
"sync"
"time"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
)
type GoLevelDBStore struct {
LogStore
m sync.Mutex
db *store.DB
cfg *config.Config
first uint64
last uint64
buf bytes.Buffer
}
func (s *GoLevelDBStore) FirstID() (uint64, error) {
s.m.Lock()
id, err := s.firstID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) LastID() (uint64, error) {
s.m.Lock()
id, err := s.lastID()
s.m.Unlock()
return id, err
}
func (s *GoLevelDBStore) firstID() (uint64, error) {
if s.first != InvalidLogID {
return s.first, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToFirst()
if it.Valid() {
s.first = num.BytesToUint64(it.RawKey())
}
return s.first, nil
}
func (s *GoLevelDBStore) lastID() (uint64, error) {
if s.last != InvalidLogID {
return s.last, nil
}
it := s.db.NewIterator()
defer it.Close()
it.SeekToLast()
if it.Valid() {
s.last = num.BytesToUint64(it.RawKey())
}
return s.last, nil
}
func (s *GoLevelDBStore) GetLog(id uint64, log *Log) error {
v, err := s.db.Get(num.Uint64ToBytes(id))
if err != nil {
return err
} else if v == nil {
return ErrLogNotFound
} else {
return log.Decode(bytes.NewBuffer(v))
}
}
func (s *GoLevelDBStore) StoreLog(log *Log) error {
s.m.Lock()
defer s.m.Unlock()
last, err := s.lastID()
if err != nil {
return err
}
s.last = InvalidLogID
s.buf.Reset()
if log.ID != last+1 {
return ErrStoreLogID
}
last = log.ID
key := num.Uint64ToBytes(log.ID)
if err := log.Encode(&s.buf); err != nil {
return err
}
if err = s.db.Put(key, s.buf.Bytes()); err != nil {
return err
}
s.last = last
return nil
}
func (s *GoLevelDBStore) PurgeExpired(n int64) error {
if n <= 0 {
return fmt.Errorf("invalid expired time %d", n)
}
t := uint32(time.Now().Unix() - int64(n))
s.m.Lock()
defer s.m.Unlock()
s.reset()
it := s.db.NewIterator()
it.SeekToFirst()
w := s.db.NewWriteBatch()
defer w.Rollback()
l := new(Log)
for ; it.Valid(); it.Next() {
v := it.RawValue()
if err := l.Unmarshal(v); err != nil {
return err
} else if l.CreateTime > t {
break
} else {
w.Delete(it.RawKey())
}
}
if err := w.Commit(); err != nil {
return err
}
return nil
}
func (s *GoLevelDBStore) Sync() error {
//no other way for sync, so ignore here
return nil
}
func (s *GoLevelDBStore) reset() {
s.first = InvalidLogID
s.last = InvalidLogID
}
func (s *GoLevelDBStore) Clear() error {
s.m.Lock()
defer s.m.Unlock()
if s.db != nil {
s.db.Close()
}
s.reset()
os.RemoveAll(s.cfg.DBPath)
return s.open()
}
func (s *GoLevelDBStore) Close() error {
s.m.Lock()
defer s.m.Unlock()
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
func (s *GoLevelDBStore) open() error {
var err error
s.first = InvalidLogID
s.last = InvalidLogID
s.db, err = store.Open(s.cfg)
return err
}
func NewGoLevelDBStore(base string, syncLog int) (*GoLevelDBStore, error) {
cfg := config.NewConfigDefault()
cfg.DBName = "goleveldb"
cfg.DBPath = base
cfg.LevelDB.BlockSize = 16 * 1024 * 1024
cfg.LevelDB.CacheSize = 64 * 1024 * 1024
cfg.LevelDB.WriteBufferSize = 64 * 1024 * 1024
cfg.LevelDB.Compression = false
cfg.DBSyncCommit = syncLog
s := new(GoLevelDBStore)
s.cfg = cfg
if err := s.open(); err != nil {
return nil, err
}
return s, nil
}

View File

@ -1,167 +0,0 @@
package rpl
import (
"bytes"
"encoding/binary"
"io"
"sync"
)
const LogHeadSize = 17
type Log struct {
ID uint64
CreateTime uint32
Compression uint8
Data []byte
}
func (l *Log) HeadSize() int {
return LogHeadSize
}
func (l *Log) Size() int {
return l.HeadSize() + len(l.Data)
}
func (l *Log) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, l.Size()))
buf.Reset()
if err := l.Encode(buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (l *Log) Unmarshal(b []byte) error {
buf := bytes.NewBuffer(b)
return l.Decode(buf)
}
var headPool = sync.Pool{
New: func() interface{} { return make([]byte, LogHeadSize) },
}
func (l *Log) Encode(w io.Writer) error {
b := headPool.Get().([]byte)
pos := 0
binary.BigEndian.PutUint64(b[pos:], l.ID)
pos += 8
binary.BigEndian.PutUint32(b[pos:], uint32(l.CreateTime))
pos += 4
b[pos] = l.Compression
pos++
binary.BigEndian.PutUint32(b[pos:], uint32(len(l.Data)))
n, err := w.Write(b)
headPool.Put(b)
if err != nil {
return err
} else if n != LogHeadSize {
return io.ErrShortWrite
}
if n, err = w.Write(l.Data); err != nil {
return err
} else if n != len(l.Data) {
return io.ErrShortWrite
}
return nil
}
func (l *Log) Decode(r io.Reader) error {
length, err := l.DecodeHead(r)
if err != nil {
return err
}
l.growData(int(length))
if _, err := io.ReadFull(r, l.Data); err != nil {
return err
}
return nil
}
func (l *Log) DecodeHead(r io.Reader) (uint32, error) {
buf := headPool.Get().([]byte)
if _, err := io.ReadFull(r, buf); err != nil {
headPool.Put(buf)
return 0, err
}
length := l.decodeHeadBuf(buf)
headPool.Put(buf)
return length, nil
}
func (l *Log) DecodeAt(r io.ReaderAt, pos int64) error {
length, err := l.DecodeHeadAt(r, pos)
if err != nil {
return err
}
l.growData(int(length))
var n int
n, err = r.ReadAt(l.Data, pos+int64(LogHeadSize))
if err == io.EOF && n == len(l.Data) {
err = nil
}
return err
}
func (l *Log) growData(length int) {
l.Data = l.Data[0:0]
if cap(l.Data) >= length {
l.Data = l.Data[0:length]
} else {
l.Data = make([]byte, length)
}
}
func (l *Log) DecodeHeadAt(r io.ReaderAt, pos int64) (uint32, error) {
buf := headPool.Get().([]byte)
n, err := r.ReadAt(buf, pos)
if err != nil && err != io.EOF {
headPool.Put(buf)
return 0, err
}
length := l.decodeHeadBuf(buf)
headPool.Put(buf)
if err == io.EOF && (length != 0 || n != len(buf)) {
return 0, err
}
return length, nil
}
func (l *Log) decodeHeadBuf(buf []byte) uint32 {
pos := 0
l.ID = binary.BigEndian.Uint64(buf[pos:])
pos += 8
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
pos += 4
l.Compression = uint8(buf[pos])
pos++
length := binary.BigEndian.Uint32(buf[pos:])
return length
}

View File

@ -1,336 +0,0 @@
package rpl
import (
"encoding/binary"
"os"
"path"
"sync"
"time"
"github.com/siddontang/go/log"
"github.com/siddontang/go/snappy"
"github.com/siddontang/ledisdb/config"
)
type Stat struct {
FirstID uint64
LastID uint64
CommitID uint64
}
type Replication struct {
m sync.Mutex
cfg *config.Config
s LogStore
commitID uint64
commitLog *os.File
quit chan struct{}
wg sync.WaitGroup
nc chan struct{}
ncm sync.Mutex
}
func NewReplication(cfg *config.Config) (*Replication, error) {
if len(cfg.Replication.Path) == 0 {
cfg.Replication.Path = path.Join(cfg.DataDir, "rpl")
}
base := cfg.Replication.Path
r := new(Replication)
r.quit = make(chan struct{})
r.nc = make(chan struct{})
r.cfg = cfg
var err error
switch cfg.Replication.StoreName {
case "goleveldb":
if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
return nil, err
}
default:
if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg); err != nil {
return nil, err
}
}
if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, err
}
if s, _ := r.commitLog.Stat(); s.Size() == 0 {
r.commitID = 0
} else if err = binary.Read(r.commitLog, binary.BigEndian, &r.commitID); err != nil {
return nil, err
}
log.Infof("staring replication with commit ID %d", r.commitID)
r.wg.Add(1)
go r.run()
return r, nil
}
func (r *Replication) Close() error {
close(r.quit)
r.wg.Wait()
r.m.Lock()
defer r.m.Unlock()
log.Infof("closing replication with commit ID %d", r.commitID)
if r.s != nil {
r.s.Close()
r.s = nil
}
if err := r.updateCommitID(r.commitID, true); err != nil {
log.Errorf("update commit id err %s", err.Error())
}
if r.commitLog != nil {
r.commitLog.Close()
r.commitLog = nil
}
return nil
}
func (r *Replication) Log(data []byte) (*Log, error) {
if r.cfg.Replication.Compression {
//todo optimize
var err error
if data, err = snappy.Encode(nil, data); err != nil {
return nil, err
}
}
r.m.Lock()
lastID, err := r.s.LastID()
if err != nil {
r.m.Unlock()
return nil, err
}
commitId := r.commitID
if lastID < commitId {
lastID = commitId
} else if lastID > commitId {
r.m.Unlock()
return nil, ErrCommitIDBehind
}
l := new(Log)
l.ID = lastID + 1
l.CreateTime = uint32(time.Now().Unix())
if r.cfg.Replication.Compression {
l.Compression = 1
} else {
l.Compression = 0
}
l.Data = data
if err = r.s.StoreLog(l); err != nil {
r.m.Unlock()
return nil, err
}
r.m.Unlock()
r.ncm.Lock()
close(r.nc)
r.nc = make(chan struct{})
r.ncm.Unlock()
return l, nil
}
func (r *Replication) WaitLog() <-chan struct{} {
r.ncm.Lock()
ch := r.nc
r.ncm.Unlock()
return ch
}
func (r *Replication) StoreLog(log *Log) error {
r.m.Lock()
err := r.s.StoreLog(log)
r.m.Unlock()
return err
}
func (r *Replication) FirstLogID() (uint64, error) {
r.m.Lock()
id, err := r.s.FirstID()
r.m.Unlock()
return id, err
}
func (r *Replication) LastLogID() (uint64, error) {
r.m.Lock()
id, err := r.s.LastID()
r.m.Unlock()
return id, err
}
func (r *Replication) LastCommitID() (uint64, error) {
r.m.Lock()
id := r.commitID
r.m.Unlock()
return id, nil
}
func (r *Replication) UpdateCommitID(id uint64) error {
r.m.Lock()
err := r.updateCommitID(id, r.cfg.Replication.SyncLog == 2)
r.m.Unlock()
return err
}
func (r *Replication) Stat() (*Stat, error) {
r.m.Lock()
defer r.m.Unlock()
s := &Stat{}
var err error
if s.FirstID, err = r.s.FirstID(); err != nil {
return nil, err
}
if s.LastID, err = r.s.LastID(); err != nil {
return nil, err
}
s.CommitID = r.commitID
return s, nil
}
func (r *Replication) updateCommitID(id uint64, force bool) error {
if force {
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
return err
}
}
r.commitID = id
return nil
}
func (r *Replication) CommitIDBehind() (bool, error) {
r.m.Lock()
id, err := r.s.LastID()
if err != nil {
r.m.Unlock()
return false, err
}
behind := id > r.commitID
r.m.Unlock()
return behind, nil
}
func (r *Replication) GetLog(id uint64, log *Log) error {
return r.s.GetLog(id, log)
}
func (r *Replication) NextNeedCommitLog(log *Log) error {
r.m.Lock()
defer r.m.Unlock()
id, err := r.s.LastID()
if err != nil {
return err
}
if id <= r.commitID {
return ErrNoBehindLog
}
return r.s.GetLog(r.commitID+1, log)
}
func (r *Replication) Clear() error {
return r.ClearWithCommitID(0)
}
func (r *Replication) ClearWithCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
if err := r.s.Clear(); err != nil {
return err
}
return r.updateCommitID(id, true)
}
func (r *Replication) run() {
defer r.wg.Done()
syncTc := time.NewTicker(1 * time.Second)
purgeTc := time.NewTicker(1 * time.Hour)
for {
select {
case <-purgeTc.C:
n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600)
r.m.Lock()
err := r.s.PurgeExpired(int64(n))
r.m.Unlock()
if err != nil {
log.Errorf("purge expired log error %s", err.Error())
}
case <-syncTc.C:
if r.cfg.Replication.SyncLog == 1 {
r.m.Lock()
err := r.s.Sync()
r.m.Unlock()
if err != nil {
log.Errorf("sync store error %s", err.Error())
}
}
if r.cfg.Replication.SyncLog != 2 {
//we will sync commit id every 1 second
r.m.Lock()
err := r.updateCommitID(r.commitID, true)
r.m.Unlock()
if err != nil {
log.Errorf("sync commitid error %s", err.Error())
}
}
case <-r.quit:
syncTc.Stop()
purgeTc.Stop()
return
}
}
}

View File

@ -1,36 +0,0 @@
package rpl
import (
"errors"
)
const (
InvalidLogID uint64 = 0
)
var (
ErrLogNotFound = errors.New("log not found")
ErrStoreLogID = errors.New("log id is less")
ErrNoBehindLog = errors.New("no behind commit log")
ErrCommitIDBehind = errors.New("commit id is behind last log id")
)
type LogStore interface {
GetLog(id uint64, log *Log) error
FirstID() (uint64, error)
LastID() (uint64, error)
// if log id is less than current last id, return error
StoreLog(log *Log) error
// Delete logs before n seconds
PurgeExpired(n int64) error
Sync() error
// Clear all logs
Clear() error
Close() error
}