From 70cca5e2981dc1b6a16377a823b6f912e3884f9e Mon Sep 17 00:00:00 2001 From: Anker Jam Date: Mon, 5 Oct 2020 10:13:29 +0800 Subject: [PATCH] make code testable in task module --- pkg/task/govenor_command.go | 6 +- pkg/task/task.go | 118 ++++++++++++++++++++++++------------ pkg/task/task_test.go | 29 +++++---- 3 files changed, 99 insertions(+), 54 deletions(-) diff --git a/pkg/task/govenor_command.go b/pkg/task/govenor_command.go index fff08374..be351dc1 100644 --- a/pkg/task/govenor_command.go +++ b/pkg/task/govenor_command.go @@ -28,8 +28,8 @@ type listTaskCommand struct { } func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result { - resultList := make([][]string, 0, len(AdminTaskList)) - for tname, tk := range AdminTaskList { + resultList := make([][]string, 0, len(globalTaskManager.adminTaskList)) + for tname, tk := range globalTaskManager.adminTaskList { result := []string{ template.HTMLEscapeString(tname), template.HTMLEscapeString(tk.GetSpec(nil)), @@ -65,7 +65,7 @@ func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result { } } - if t, ok := AdminTaskList[tn]; ok { + if t, ok := globalTaskManager.adminTaskList[tn]; ok { err := t.Run(context.Background()) if err != nil { return &governor.Result{ diff --git a/pkg/task/task.go b/pkg/task/task.go index c8228fd2..5faa2fd8 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -31,13 +31,28 @@ type bounds struct { names map[string]uint } -// The bounds for each field. -var ( - AdminTaskList map[string]Tasker +type taskManager struct { + adminTaskList map[string]Tasker taskLock sync.RWMutex stop chan bool changed chan bool isstart bool +} + +func newTaskManager()*taskManager{ + return &taskManager{ + adminTaskList: make(map[string]Tasker), + taskLock: sync.RWMutex{}, + stop: make(chan bool), + changed: make(chan bool), + isstart: false, + } +} + +// The bounds for each field. +var ( + globalTaskManager *taskManager + seconds = bounds{0, 59, nil} minutes = bounds{0, 59, nil} hours = bounds{0, 23, nil} @@ -398,32 +413,53 @@ func dayMatches(s *Schedule, t time.Time) bool { // StartTask start all tasks func StartTask() { - taskLock.Lock() - defer taskLock.Unlock() - if isstart { + globalTaskManager.StartTask() +} + +// StopTask stop all tasks +func StopTask() { + globalTaskManager.StopTask() +} + +// AddTask add task with name +func AddTask(taskName string, t Tasker) { + globalTaskManager.AddTask(taskName, t) +} + +// DeleteTask delete task with name +func DeleteTask(taskName string) { + globalTaskManager.DeleteTask(taskName) +} + + +// StartTask start all tasks +func (m *taskManager) StartTask() { + m.taskLock.Lock() + defer m.taskLock.Unlock() + if m.isstart { // If already started, no need to start another goroutine. return } - isstart = true + m.isstart = true registerCommands() - go run() + go m.run() } -func run() { +func(m *taskManager) run() { now := time.Now().Local() - for _, t := range AdminTaskList { + for _, t := range m.adminTaskList { t.SetNext(nil, now) } for { // we only use RLock here because NewMapSorter copy the reference, do not change any thing - taskLock.RLock() - sortList := NewMapSorter(AdminTaskList) - taskLock.RUnlock() + m.taskLock.RLock() + sortList := NewMapSorter(m.adminTaskList) + m.taskLock.RUnlock() sortList.Sort() var effective time.Time - if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { + if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. effective = now.AddDate(10, 0, 0) @@ -442,60 +478,64 @@ func run() { e.SetNext(nil, effective) } continue - case <-changed: + case <-m.changed: now = time.Now().Local() - taskLock.Lock() - for _, t := range AdminTaskList { + m.taskLock.Lock() + for _, t := range m.adminTaskList { t.SetNext(nil, now) } - taskLock.Unlock() + m.taskLock.Unlock() continue - case <-stop: - taskLock.Lock() - if isstart { - isstart = false + case <-m.stop: + m.taskLock.Lock() + if m.isstart { + m.isstart = false } - taskLock.Unlock() + m.taskLock.Unlock() return } } } // StopTask stop all tasks -func StopTask() { - stop <- true +func(m *taskManager) StopTask() { + go func() { + m.stop <- true + }() } // AddTask add task with name -func AddTask(taskname string, t Tasker) { +func (m *taskManager)AddTask(taskname string, t Tasker) { isChanged := false - taskLock.Lock() + m.taskLock.Lock() t.SetNext(nil, time.Now().Local()) - AdminTaskList[taskname] = t - if isstart { + m.adminTaskList[taskname] = t + if m.isstart { isChanged = true } - taskLock.Unlock() + m.taskLock.Unlock() if isChanged { - changed <- true + go func() { + m.changed <- true + }() } } // DeleteTask delete task with name -func DeleteTask(taskname string) { +func(m *taskManager) DeleteTask(taskname string) { isChanged := false - taskLock.Lock() - delete(AdminTaskList, taskname) - if isstart { + m.taskLock.Lock() + delete(m.adminTaskList, taskname) + if m.isstart { isChanged = true } - taskLock.Unlock() + m.taskLock.Unlock() if isChanged { - changed <- true + m.changed <- true } } @@ -648,7 +688,5 @@ func all(r bounds) uint64 { } func init() { - AdminTaskList = make(map[string]Tasker) - stop = make(chan bool) - changed = make(chan bool) + globalTaskManager = newTaskManager() } diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go index f58e374b..9a74ff24 100644 --- a/pkg/task/task_test.go +++ b/pkg/task/task_test.go @@ -41,7 +41,8 @@ func TestParse(t *testing.T) { } func TestModifyTaskListAfterRunning(t *testing.T) { - tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { + m := newTaskManager() + tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error { fmt.Println("hello world") return nil }) @@ -49,26 +50,32 @@ func TestModifyTaskListAfterRunning(t *testing.T) { if err != nil { t.Fatal(err) } - AddTask("taska", tk) - StartTask() - DeleteTask("taska") - AddTask("taska1", tk) + m.AddTask("taskb", tk) + m.StartTask() + go func() { + m.DeleteTask("taskb") + }() + go func() { + m.AddTask("taskb1", tk) + }() + time.Sleep(3 * time.Second) - StopTask() + m.StopTask() } func TestSpec(t *testing.T) { + m := newTaskManager() wg := &sync.WaitGroup{} wg.Add(2) tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil }) tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil }) tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil }) - AddTask("tk1", tk1) - AddTask("tk2", tk2) - AddTask("tk3", tk3) - StartTask() - defer StopTask() + m.AddTask("tk1", tk1) + m.AddTask("tk2", tk2) + m.AddTask("tk3", tk3) + m.StartTask() + defer m.StopTask() select { case <-time.After(200 * time.Second):