diff --git a/pkg/adapter/toolbox/task.go b/pkg/adapter/toolbox/task.go index 2a6d9aa6..5b2fa14c 100644 --- a/pkg/adapter/toolbox/task.go +++ b/pkg/adapter/toolbox/task.go @@ -212,6 +212,11 @@ func DeleteTask(taskname string) { task.DeleteTask(taskname) } +// ClearTask clear all tasks +func ClearTask() { + task.ClearTask() +} + // MapSorter sort map for tasker type MapSorter task.MapSorter diff --git a/pkg/adapter/toolbox/task_test.go b/pkg/adapter/toolbox/task_test.go index 596bc9c5..994c4976 100644 --- a/pkg/adapter/toolbox/task_test.go +++ b/pkg/adapter/toolbox/task_test.go @@ -22,6 +22,8 @@ import ( ) func TestParse(t *testing.T) { + defer ClearTask() + tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil }) err := tk.Run() if err != nil { @@ -34,6 +36,8 @@ func TestParse(t *testing.T) { } func TestSpec(t *testing.T) { + defer ClearTask() + wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func() error { fmt.Println("tk1"); return nil }) diff --git a/pkg/client/orm/filter/prometheus/filter.go b/pkg/client/orm/filter/prometheus/filter.go index 175b26be..2d819ef7 100644 --- a/pkg/client/orm/filter/prometheus/filter.go +++ b/pkg/client/orm/filter/prometheus/filter.go @@ -23,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/astaxie/beego/pkg/client/orm" - "github.com/astaxie/beego/pkg/server/web" ) // FilterChainBuilder is an extension point, @@ -35,27 +34,24 @@ import ( // actually we only records metrics of invoking "QueryTable" and "QueryTableWithCtx" type FilterChainBuilder struct { summaryVec prometheus.ObserverVec + AppName string + ServerName string + RunMode string } -func NewFilterChainBuilder() *FilterChainBuilder { - summaryVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ +func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter { + + builder.summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Name: "beego", Subsystem: "orm_operation", ConstLabels: map[string]string{ - "server": web.BConfig.ServerName, - "env": web.BConfig.RunMode, - "appname": web.BConfig.AppName, + "server": builder.ServerName, + "env": builder.RunMode, + "appname": builder.AppName, }, Help: "The statics info for orm operation", }, []string{"method", "name", "duration", "insideTx", "txName"}) - prometheus.MustRegister(summaryVec) - return &FilterChainBuilder{ - summaryVec: summaryVec, - } -} - -func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter { return func(ctx context.Context, inv *orm.Invocation) []interface{} { startTime := time.Now() res := next(ctx, inv) diff --git a/pkg/client/orm/filter/prometheus/filter_test.go b/pkg/client/orm/filter/prometheus/filter_test.go index 1b55b989..0368d321 100644 --- a/pkg/client/orm/filter/prometheus/filter_test.go +++ b/pkg/client/orm/filter/prometheus/filter_test.go @@ -24,14 +24,15 @@ import ( "github.com/astaxie/beego/pkg/client/orm" ) -func TestFilterChainBuilder_FilterChain(t *testing.T) { - builder := NewFilterChainBuilder() - assert.NotNil(t, builder.summaryVec) - - filter := builder.FilterChain(func(ctx context.Context, inv *orm.Invocation) []interface{} { +func TestFilterChainBuilder_FilterChain1(t *testing.T) { + next := func(ctx context.Context, inv *orm.Invocation) []interface{} { inv.Method = "coming" return []interface{}{} - }) + } + builder := &FilterChainBuilder{} + filter := builder.FilterChain(next) + + assert.NotNil(t, builder.summaryVec) assert.NotNil(t, filter) inv := &orm.Invocation{} diff --git a/pkg/task/govenor_command.go b/pkg/task/govenor_command.go index fff08374..be351dc1 100644 --- a/pkg/task/govenor_command.go +++ b/pkg/task/govenor_command.go @@ -28,8 +28,8 @@ type listTaskCommand struct { } func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result { - resultList := make([][]string, 0, len(AdminTaskList)) - for tname, tk := range AdminTaskList { + resultList := make([][]string, 0, len(globalTaskManager.adminTaskList)) + for tname, tk := range globalTaskManager.adminTaskList { result := []string{ template.HTMLEscapeString(tname), template.HTMLEscapeString(tk.GetSpec(nil)), @@ -65,7 +65,7 @@ func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result { } } - if t, ok := AdminTaskList[tn]; ok { + if t, ok := globalTaskManager.adminTaskList[tn]; ok { err := t.Run(context.Background()) if err != nil { return &governor.Result{ diff --git a/pkg/task/task.go b/pkg/task/task.go index e3a8bba4..e76706e3 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -31,13 +31,28 @@ type bounds struct { names map[string]uint } -// The bounds for each field. -var ( - AdminTaskList map[string]Tasker +type taskManager struct { + adminTaskList map[string]Tasker taskLock sync.RWMutex stop chan bool changed chan bool - isstart bool + started bool +} + +func newTaskManager()*taskManager{ + return &taskManager{ + adminTaskList: make(map[string]Tasker), + taskLock: sync.RWMutex{}, + stop: make(chan bool), + changed: make(chan bool), + started: false, + } +} + +// The bounds for each field. +var ( + globalTaskManager *taskManager + seconds = bounds{0, 59, nil} minutes = bounds{0, 59, nil} hours = bounds{0, 23, nil} @@ -398,32 +413,58 @@ func dayMatches(s *Schedule, t time.Time) bool { // StartTask start all tasks func StartTask() { - taskLock.Lock() - defer taskLock.Unlock() - if isstart { + globalTaskManager.StartTask() +} + +// StopTask stop all tasks +func StopTask() { + globalTaskManager.StopTask() +} + +// AddTask add task with name +func AddTask(taskName string, t Tasker) { + globalTaskManager.AddTask(taskName, t) +} + +// DeleteTask delete task with name +func DeleteTask(taskName string) { + globalTaskManager.DeleteTask(taskName) +} + +// ClearTask clear all tasks +func ClearTask() { + globalTaskManager.ClearTask() +} + + +// StartTask start all tasks +func (m *taskManager) StartTask() { + m.taskLock.Lock() + defer m.taskLock.Unlock() + if m.started { // If already started, no need to start another goroutine. return } - isstart = true + m.started = true registerCommands() - go run() + go m.run() } -func run() { +func(m *taskManager) run() { now := time.Now().Local() - for _, t := range AdminTaskList { + for _, t := range m.adminTaskList { t.SetNext(nil, now) } for { // we only use RLock here because NewMapSorter copy the reference, do not change any thing - taskLock.RLock() - sortList := NewMapSorter(AdminTaskList) - taskLock.RUnlock() + m.taskLock.RLock() + sortList := NewMapSorter(m.adminTaskList) + m.taskLock.RUnlock() sortList.Sort() var effective time.Time - if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { + if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. effective = now.AddDate(10, 0, 0) @@ -442,49 +483,84 @@ func run() { e.SetNext(nil, effective) } continue - case <-changed: + case <-m.changed: now = time.Now().Local() - taskLock.Lock() - for _, t := range AdminTaskList { + m.taskLock.Lock() + for _, t := range m.adminTaskList { t.SetNext(nil, now) } - taskLock.Unlock() + m.taskLock.Unlock() continue - case <-stop: + case <-m.stop: + m.taskLock.Lock() + if m.started { + m.started = false + } + m.taskLock.Unlock() return } } } // StopTask stop all tasks -func StopTask() { - taskLock.Lock() - defer taskLock.Unlock() - if isstart { - isstart = false - stop <- true - } - +func(m *taskManager) StopTask() { + go func() { + m.stop <- true + }() } // AddTask add task with name -func AddTask(taskname string, t Tasker) { - taskLock.Lock() - defer taskLock.Unlock() +func (m *taskManager)AddTask(taskname string, t Tasker) { + isChanged := false + m.taskLock.Lock() t.SetNext(nil, time.Now().Local()) - AdminTaskList[taskname] = t - if isstart { - changed <- true + m.adminTaskList[taskname] = t + if m.started { + isChanged = true } + m.taskLock.Unlock() + + if isChanged { + go func() { + m.changed <- true + }() + } + } // DeleteTask delete task with name -func DeleteTask(taskname string) { - taskLock.Lock() - defer taskLock.Unlock() - delete(AdminTaskList, taskname) - if isstart { - changed <- true +func(m *taskManager) DeleteTask(taskname string) { + isChanged := false + + m.taskLock.Lock() + delete(m.adminTaskList, taskname) + if m.started { + isChanged = true + } + m.taskLock.Unlock() + + if isChanged { + go func() { + m.changed <- true + }() + } +} + +// ClearTask clear all tasks +func(m *taskManager) ClearTask() { + isChanged := false + + m.taskLock.Lock() + m.adminTaskList = make(map[string]Tasker) + if m.started { + isChanged = true + } + m.taskLock.Unlock() + + if isChanged { + go func() { + m.changed <- true + }() } } @@ -637,7 +713,5 @@ func all(r bounds) uint64 { } func init() { - AdminTaskList = make(map[string]Tasker) - stop = make(chan bool) - changed = make(chan bool) + globalTaskManager = newTaskManager() } diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index 488729dc..2cb807ce 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -26,6 +26,8 @@ import ( ) func TestParse(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { fmt.Println("hello world") return nil @@ -34,24 +36,50 @@ func TestParse(t *testing.T) { if err != nil { t.Fatal(err) } - AddTask("taska", tk) - StartTask() - time.Sleep(6 * time.Second) - StopTask() + m.AddTask("taska", tk) + m.StartTask() + time.Sleep(3 * time.Second) + m.StopTask() +} + +func TestModifyTaskListAfterRunning(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() + tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error { + fmt.Println("hello world") + return nil + }) + err := tk.Run(nil) + if err != nil { + t.Fatal(err) + } + m.AddTask("taskb", tk) + m.StartTask() + go func() { + m.DeleteTask("taskb") + }() + go func() { + m.AddTask("taskb1", tk) + }() + + time.Sleep(3 * time.Second) + m.StopTask() } func TestSpec(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil }) tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil }) tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil }) - AddTask("tk1", tk1) - AddTask("tk2", tk2) - AddTask("tk3", tk3) - StartTask() - defer StopTask() + m.AddTask("tk1", tk1) + m.AddTask("tk2", tk2) + m.AddTask("tk3", tk3) + m.StartTask() + defer m.StopTask() select { case <-time.After(200 * time.Second):