From c5d43e87fe89c7beebf202e646be436996b6696d Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Sun, 4 Oct 2020 22:16:19 +0800 Subject: [PATCH 1/6] seperate orm alone --- pkg/client/orm/filter/prometheus/filter.go | 22 ++++++++----------- .../orm/filter/prometheus/filter_test.go | 13 ++++++----- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/pkg/client/orm/filter/prometheus/filter.go b/pkg/client/orm/filter/prometheus/filter.go index 175b26be..2d819ef7 100644 --- a/pkg/client/orm/filter/prometheus/filter.go +++ b/pkg/client/orm/filter/prometheus/filter.go @@ -23,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/astaxie/beego/pkg/client/orm" - "github.com/astaxie/beego/pkg/server/web" ) // FilterChainBuilder is an extension point, @@ -35,27 +34,24 @@ import ( // actually we only records metrics of invoking "QueryTable" and "QueryTableWithCtx" type FilterChainBuilder struct { summaryVec prometheus.ObserverVec + AppName string + ServerName string + RunMode string } -func NewFilterChainBuilder() *FilterChainBuilder { - summaryVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ +func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter { + + builder.summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Name: "beego", Subsystem: "orm_operation", ConstLabels: map[string]string{ - "server": web.BConfig.ServerName, - "env": web.BConfig.RunMode, - "appname": web.BConfig.AppName, + "server": builder.ServerName, + "env": builder.RunMode, + "appname": builder.AppName, }, Help: "The statics info for orm operation", }, []string{"method", "name", "duration", "insideTx", "txName"}) - prometheus.MustRegister(summaryVec) - return &FilterChainBuilder{ - summaryVec: summaryVec, - } -} - -func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter { return func(ctx context.Context, inv *orm.Invocation) []interface{} { startTime := time.Now() res := next(ctx, inv) diff --git a/pkg/client/orm/filter/prometheus/filter_test.go b/pkg/client/orm/filter/prometheus/filter_test.go index 1b55b989..0368d321 100644 --- a/pkg/client/orm/filter/prometheus/filter_test.go +++ b/pkg/client/orm/filter/prometheus/filter_test.go @@ -24,14 +24,15 @@ import ( "github.com/astaxie/beego/pkg/client/orm" ) -func TestFilterChainBuilder_FilterChain(t *testing.T) { - builder := NewFilterChainBuilder() - assert.NotNil(t, builder.summaryVec) - - filter := builder.FilterChain(func(ctx context.Context, inv *orm.Invocation) []interface{} { +func TestFilterChainBuilder_FilterChain1(t *testing.T) { + next := func(ctx context.Context, inv *orm.Invocation) []interface{} { inv.Method = "coming" return []interface{}{} - }) + } + builder := &FilterChainBuilder{} + filter := builder.FilterChain(next) + + assert.NotNil(t, builder.summaryVec) assert.NotNil(t, filter) inv := &orm.Invocation{} From 4dc694411f3a4537c03aa95f69105e546355b516 Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 00:16:58 +0800 Subject: [PATCH 2/6] fix deadlock in task module --- pkg/task/task.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/task/task.go b/pkg/task/task.go index e3a8bba4..4835ad24 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -451,6 +451,11 @@ func run() { taskLock.Unlock() continue case <-stop: + taskLock.Lock() + if isstart { + isstart = false + } + taskLock.Unlock() return } } @@ -458,13 +463,7 @@ func run() { // StopTask stop all tasks func StopTask() { - taskLock.Lock() - defer taskLock.Unlock() - if isstart { - isstart = false - stop <- true - } - + stop <- true } // AddTask add task with name From f1cca45d8d2235daad5da3f25e74274887ccb290 Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 01:31:27 +0800 Subject: [PATCH 3/6] fix deadlock about changed sign --- pkg/task/task.go | 16 ++++++++++++++-- pkg/task/task_test.go | 19 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/task/task.go b/pkg/task/task.go index 4835ad24..c8228fd2 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -468,21 +468,33 @@ func StopTask() { // AddTask add task with name func AddTask(taskname string, t Tasker) { + isChanged := false taskLock.Lock() - defer taskLock.Unlock() t.SetNext(nil, time.Now().Local()) AdminTaskList[taskname] = t if isstart { + isChanged = true + } + taskLock.Unlock() + + if isChanged { changed <- true } + } // DeleteTask delete task with name func DeleteTask(taskname string) { + isChanged := false + taskLock.Lock() - defer taskLock.Unlock() delete(AdminTaskList, taskname) if isstart { + isChanged = true + } + taskLock.Unlock() + + if isChanged { changed <- true } } diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index 488729dc..f58e374b 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -36,7 +36,24 @@ func TestParse(t *testing.T) { } AddTask("taska", tk) StartTask() - time.Sleep(6 * time.Second) + time.Sleep(3 * time.Second) + StopTask() +} + +func TestModifyTaskListAfterRunning(t *testing.T) { + tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { + fmt.Println("hello world") + return nil + }) + err := tk.Run(nil) + if err != nil { + t.Fatal(err) + } + AddTask("taska", tk) + StartTask() + DeleteTask("taska") + AddTask("taska1", tk) + time.Sleep(3 * time.Second) StopTask() } From 70cca5e2981dc1b6a16377a823b6f912e3884f9e Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 10:13:29 +0800 Subject: [PATCH 4/6] make code testable in task module --- pkg/task/govenor_command.go | 6 +- pkg/task/task.go | 118 ++++++++++++++++++++++++------------ pkg/task/task_test.go | 29 +++++---- 3 files changed, 99 insertions(+), 54 deletions(-) diff --git a/pkg/task/govenor_command.go b/pkg/task/govenor_command.go index fff08374..be351dc1 100644 --- a/pkg/task/govenor_command.go +++ b/pkg/task/govenor_command.go @@ -28,8 +28,8 @@ type listTaskCommand struct { } func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result { - resultList := make([][]string, 0, len(AdminTaskList)) - for tname, tk := range AdminTaskList { + resultList := make([][]string, 0, len(globalTaskManager.adminTaskList)) + for tname, tk := range globalTaskManager.adminTaskList { result := []string{ template.HTMLEscapeString(tname), template.HTMLEscapeString(tk.GetSpec(nil)), @@ -65,7 +65,7 @@ func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result { } } - if t, ok := AdminTaskList[tn]; ok { + if t, ok := globalTaskManager.adminTaskList[tn]; ok { err := t.Run(context.Background()) if err != nil { return &governor.Result{ diff --git a/pkg/task/task.go b/pkg/task/task.go index c8228fd2..5faa2fd8 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -31,13 +31,28 @@ type bounds struct { names map[string]uint } -// The bounds for each field. -var ( - AdminTaskList map[string]Tasker +type taskManager struct { + adminTaskList map[string]Tasker taskLock sync.RWMutex stop chan bool changed chan bool isstart bool +} + +func newTaskManager()*taskManager{ + return &taskManager{ + adminTaskList: make(map[string]Tasker), + taskLock: sync.RWMutex{}, + stop: make(chan bool), + changed: make(chan bool), + isstart: false, + } +} + +// The bounds for each field. +var ( + globalTaskManager *taskManager + seconds = bounds{0, 59, nil} minutes = bounds{0, 59, nil} hours = bounds{0, 23, nil} @@ -398,32 +413,53 @@ func dayMatches(s *Schedule, t time.Time) bool { // StartTask start all tasks func StartTask() { - taskLock.Lock() - defer taskLock.Unlock() - if isstart { + globalTaskManager.StartTask() +} + +// StopTask stop all tasks +func StopTask() { + globalTaskManager.StopTask() +} + +// AddTask add task with name +func AddTask(taskName string, t Tasker) { + globalTaskManager.AddTask(taskName, t) +} + +// DeleteTask delete task with name +func DeleteTask(taskName string) { + globalTaskManager.DeleteTask(taskName) +} + + +// StartTask start all tasks +func (m *taskManager) StartTask() { + m.taskLock.Lock() + defer m.taskLock.Unlock() + if m.isstart { // If already started, no need to start another goroutine. return } - isstart = true + m.isstart = true registerCommands() - go run() + go m.run() } -func run() { +func(m *taskManager) run() { now := time.Now().Local() - for _, t := range AdminTaskList { + for _, t := range m.adminTaskList { t.SetNext(nil, now) } for { // we only use RLock here because NewMapSorter copy the reference, do not change any thing - taskLock.RLock() - sortList := NewMapSorter(AdminTaskList) - taskLock.RUnlock() + m.taskLock.RLock() + sortList := NewMapSorter(m.adminTaskList) + m.taskLock.RUnlock() sortList.Sort() var effective time.Time - if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { + if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. effective = now.AddDate(10, 0, 0) @@ -442,60 +478,64 @@ func run() { e.SetNext(nil, effective) } continue - case <-changed: + case <-m.changed: now = time.Now().Local() - taskLock.Lock() - for _, t := range AdminTaskList { + m.taskLock.Lock() + for _, t := range m.adminTaskList { t.SetNext(nil, now) } - taskLock.Unlock() + m.taskLock.Unlock() continue - case <-stop: - taskLock.Lock() - if isstart { - isstart = false + case <-m.stop: + m.taskLock.Lock() + if m.isstart { + m.isstart = false } - taskLock.Unlock() + m.taskLock.Unlock() return } } } // StopTask stop all tasks -func StopTask() { - stop <- true +func(m *taskManager) StopTask() { + go func() { + m.stop <- true + }() } // AddTask add task with name -func AddTask(taskname string, t Tasker) { +func (m *taskManager)AddTask(taskname string, t Tasker) { isChanged := false - taskLock.Lock() + m.taskLock.Lock() t.SetNext(nil, time.Now().Local()) - AdminTaskList[taskname] = t - if isstart { + m.adminTaskList[taskname] = t + if m.isstart { isChanged = true } - taskLock.Unlock() + m.taskLock.Unlock() if isChanged { - changed <- true + go func() { + m.changed <- true + }() } } // DeleteTask delete task with name -func DeleteTask(taskname string) { +func(m *taskManager) DeleteTask(taskname string) { isChanged := false - taskLock.Lock() - delete(AdminTaskList, taskname) - if isstart { + m.taskLock.Lock() + delete(m.adminTaskList, taskname) + if m.isstart { isChanged = true } - taskLock.Unlock() + m.taskLock.Unlock() if isChanged { - changed <- true + m.changed <- true } } @@ -648,7 +688,5 @@ func all(r bounds) uint64 { } func init() { - AdminTaskList = make(map[string]Tasker) - stop = make(chan bool) - changed = make(chan bool) + globalTaskManager = newTaskManager() } diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index f58e374b..9a74ff24 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -41,7 +41,8 @@ func TestParse(t *testing.T) { } func TestModifyTaskListAfterRunning(t *testing.T) { - tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { + m := newTaskManager() + tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error { fmt.Println("hello world") return nil }) @@ -49,26 +50,32 @@ func TestModifyTaskListAfterRunning(t *testing.T) { if err != nil { t.Fatal(err) } - AddTask("taska", tk) - StartTask() - DeleteTask("taska") - AddTask("taska1", tk) + m.AddTask("taskb", tk) + m.StartTask() + go func() { + m.DeleteTask("taskb") + }() + go func() { + m.AddTask("taskb1", tk) + }() + time.Sleep(3 * time.Second) - StopTask() + m.StopTask() } func TestSpec(t *testing.T) { + m := newTaskManager() wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil }) tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil }) tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil }) - AddTask("tk1", tk1) - AddTask("tk2", tk2) - AddTask("tk3", tk3) - StartTask() - defer StopTask() + m.AddTask("tk1", tk1) + m.AddTask("tk2", tk2) + m.AddTask("tk3", tk3) + m.StartTask() + defer m.StopTask() select { case <-time.After(200 * time.Second): From b838683731bbe683cff94b7c99dc01e6c4b059be Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 10:33:23 +0800 Subject: [PATCH 5/6] add api for testing --- pkg/adapter/toolbox/task.go | 5 +++++ pkg/adapter/toolbox/task_test.go | 4 ++++ pkg/task/task.go | 36 ++++++++++++++++++++++++-------- pkg/task/task_test.go | 10 ++++++--- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/pkg/adapter/toolbox/task.go b/pkg/adapter/toolbox/task.go index 2a6d9aa6..5b2fa14c 100644 --- a/pkg/adapter/toolbox/task.go +++ b/pkg/adapter/toolbox/task.go @@ -212,6 +212,11 @@ func DeleteTask(taskname string) { task.DeleteTask(taskname) } +// ClearTask clear all tasks +func ClearTask() { + task.ClearTask() +} + // MapSorter sort map for tasker type MapSorter task.MapSorter diff --git a/pkg/adapter/toolbox/task_test.go b/pkg/adapter/toolbox/task_test.go index 596bc9c5..994c4976 100644 --- a/pkg/adapter/toolbox/task_test.go +++ b/pkg/adapter/toolbox/task_test.go @@ -22,6 +22,8 @@ import ( ) func TestParse(t *testing.T) { + defer ClearTask() + tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil }) err := tk.Run() if err != nil { @@ -34,6 +36,8 @@ func TestParse(t *testing.T) { } func TestSpec(t *testing.T) { + defer ClearTask() + wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func() error { fmt.Println("tk1"); return nil }) diff --git a/pkg/task/task.go b/pkg/task/task.go index 5faa2fd8..a781e47a 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -36,7 +36,7 @@ type taskManager struct { taskLock sync.RWMutex stop chan bool changed chan bool - isstart bool + started bool } func newTaskManager()*taskManager{ @@ -45,7 +45,7 @@ func newTaskManager()*taskManager{ taskLock: sync.RWMutex{}, stop: make(chan bool), changed: make(chan bool), - isstart: false, + started: false, } } @@ -431,16 +431,21 @@ func DeleteTask(taskName string) { globalTaskManager.DeleteTask(taskName) } +// ClearTask clear all tasks +func ClearTask() { + globalTaskManager.ClearTask() +} + // StartTask start all tasks func (m *taskManager) StartTask() { m.taskLock.Lock() defer m.taskLock.Unlock() - if m.isstart { + if m.started { // If already started, no need to start another goroutine. return } - m.isstart = true + m.started = true registerCommands() go m.run() @@ -488,8 +493,8 @@ func(m *taskManager) run() { continue case <-m.stop: m.taskLock.Lock() - if m.isstart { - m.isstart = false + if m.started { + m.started = false } m.taskLock.Unlock() return @@ -510,7 +515,7 @@ func (m *taskManager)AddTask(taskname string, t Tasker) { m.taskLock.Lock() t.SetNext(nil, time.Now().Local()) m.adminTaskList[taskname] = t - if m.isstart { + if m.started { isChanged = true } m.taskLock.Unlock() @@ -529,16 +534,29 @@ func(m *taskManager) DeleteTask(taskname string) { m.taskLock.Lock() delete(m.adminTaskList, taskname) - if m.isstart { + if m.started { isChanged = true } m.taskLock.Unlock() if isChanged { - m.changed <- true + go func() { + m.changed <- true + }() } } +// ClearTask clear all tasks +func(m *taskManager) ClearTask() { + m.taskLock.Lock() + m.adminTaskList = make(map[string]Tasker) + m.taskLock.Unlock() + + go func() { + m.changed <- true + }() +} + // MapSorter sort map for tasker type MapSorter struct { Keys []string diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index 9a74ff24..2cb807ce 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -26,6 +26,8 @@ import ( ) func TestParse(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { fmt.Println("hello world") return nil @@ -34,14 +36,15 @@ func TestParse(t *testing.T) { if err != nil { t.Fatal(err) } - AddTask("taska", tk) - StartTask() + m.AddTask("taska", tk) + m.StartTask() time.Sleep(3 * time.Second) - StopTask() + m.StopTask() } func TestModifyTaskListAfterRunning(t *testing.T) { m := newTaskManager() + defer m.ClearTask() tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error { fmt.Println("hello world") return nil @@ -65,6 +68,7 @@ func TestModifyTaskListAfterRunning(t *testing.T) { func TestSpec(t *testing.T) { m := newTaskManager() + defer m.ClearTask() wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil }) From c435d231ab687cf80afb5af3f03f6e71eea49ab2 Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 10:38:14 +0800 Subject: [PATCH 6/6] complete check --- pkg/task/task.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/task/task.go b/pkg/task/task.go index a781e47a..e76706e3 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -548,13 +548,20 @@ func(m *taskManager) DeleteTask(taskname string) { // ClearTask clear all tasks func(m *taskManager) ClearTask() { + isChanged := false + m.taskLock.Lock() m.adminTaskList = make(map[string]Tasker) + if m.started { + isChanged = true + } m.taskLock.Unlock() - go func() { - m.changed <- true - }() + if isChanged { + go func() { + m.changed <- true + }() + } } // MapSorter sort map for tasker