1
0
mirror of https://github.com/astaxie/beego.git synced 2024-11-21 19:40:53 +00:00

make code testable in task module

This commit is contained in:
Anker Jam 2020-10-05 10:13:29 +08:00
parent f1cca45d8d
commit 70cca5e298
3 changed files with 99 additions and 54 deletions

View File

@ -28,8 +28,8 @@ type listTaskCommand struct {
}
func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result {
resultList := make([][]string, 0, len(AdminTaskList))
for tname, tk := range AdminTaskList {
resultList := make([][]string, 0, len(globalTaskManager.adminTaskList))
for tname, tk := range globalTaskManager.adminTaskList {
result := []string{
template.HTMLEscapeString(tname),
template.HTMLEscapeString(tk.GetSpec(nil)),
@ -65,7 +65,7 @@ func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result {
}
}
if t, ok := AdminTaskList[tn]; ok {
if t, ok := globalTaskManager.adminTaskList[tn]; ok {
err := t.Run(context.Background())
if err != nil {
return &governor.Result{

View File

@ -31,13 +31,28 @@ type bounds struct {
names map[string]uint
}
// The bounds for each field.
var (
AdminTaskList map[string]Tasker
type taskManager struct {
adminTaskList map[string]Tasker
taskLock sync.RWMutex
stop chan bool
changed chan bool
isstart bool
}
func newTaskManager()*taskManager{
return &taskManager{
adminTaskList: make(map[string]Tasker),
taskLock: sync.RWMutex{},
stop: make(chan bool),
changed: make(chan bool),
isstart: false,
}
}
// The bounds for each field.
var (
globalTaskManager *taskManager
seconds = bounds{0, 59, nil}
minutes = bounds{0, 59, nil}
hours = bounds{0, 23, nil}
@ -398,32 +413,53 @@ func dayMatches(s *Schedule, t time.Time) bool {
// StartTask start all tasks
func StartTask() {
taskLock.Lock()
defer taskLock.Unlock()
if isstart {
globalTaskManager.StartTask()
}
// StopTask stop all tasks
func StopTask() {
globalTaskManager.StopTask()
}
// AddTask add task with name
func AddTask(taskName string, t Tasker) {
globalTaskManager.AddTask(taskName, t)
}
// DeleteTask delete task with name
func DeleteTask(taskName string) {
globalTaskManager.DeleteTask(taskName)
}
// StartTask start all tasks
func (m *taskManager) StartTask() {
m.taskLock.Lock()
defer m.taskLock.Unlock()
if m.isstart {
// If already started no need to start another goroutine.
return
}
isstart = true
m.isstart = true
registerCommands()
go run()
go m.run()
}
func run() {
func(m *taskManager) run() {
now := time.Now().Local()
for _, t := range AdminTaskList {
for _, t := range m.adminTaskList {
t.SetNext(nil, now)
}
for {
// we only use RLock here because NewMapSorter copy the reference, do not change any thing
taskLock.RLock()
sortList := NewMapSorter(AdminTaskList)
taskLock.RUnlock()
m.taskLock.RLock()
sortList := NewMapSorter(m.adminTaskList)
m.taskLock.RUnlock()
sortList.Sort()
var effective time.Time
if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() {
if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
effective = now.AddDate(10, 0, 0)
@ -442,60 +478,64 @@ func run() {
e.SetNext(nil, effective)
}
continue
case <-changed:
case <-m.changed:
now = time.Now().Local()
taskLock.Lock()
for _, t := range AdminTaskList {
m.taskLock.Lock()
for _, t := range m.adminTaskList {
t.SetNext(nil, now)
}
taskLock.Unlock()
m.taskLock.Unlock()
continue
case <-stop:
taskLock.Lock()
if isstart {
isstart = false
case <-m.stop:
m.taskLock.Lock()
if m.isstart {
m.isstart = false
}
taskLock.Unlock()
m.taskLock.Unlock()
return
}
}
}
// StopTask stop all tasks
func StopTask() {
stop <- true
func(m *taskManager) StopTask() {
go func() {
m.stop <- true
}()
}
// AddTask add task with name
func AddTask(taskname string, t Tasker) {
func (m *taskManager)AddTask(taskname string, t Tasker) {
isChanged := false
taskLock.Lock()
m.taskLock.Lock()
t.SetNext(nil, time.Now().Local())
AdminTaskList[taskname] = t
if isstart {
m.adminTaskList[taskname] = t
if m.isstart {
isChanged = true
}
taskLock.Unlock()
m.taskLock.Unlock()
if isChanged {
changed <- true
go func() {
m.changed <- true
}()
}
}
// DeleteTask delete task with name
func DeleteTask(taskname string) {
func(m *taskManager) DeleteTask(taskname string) {
isChanged := false
taskLock.Lock()
delete(AdminTaskList, taskname)
if isstart {
m.taskLock.Lock()
delete(m.adminTaskList, taskname)
if m.isstart {
isChanged = true
}
taskLock.Unlock()
m.taskLock.Unlock()
if isChanged {
changed <- true
m.changed <- true
}
}
@ -648,7 +688,5 @@ func all(r bounds) uint64 {
}
func init() {
AdminTaskList = make(map[string]Tasker)
stop = make(chan bool)
changed = make(chan bool)
globalTaskManager = newTaskManager()
}

View File

@ -41,7 +41,8 @@ func TestParse(t *testing.T) {
}
func TestModifyTaskListAfterRunning(t *testing.T) {
tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error {
m := newTaskManager()
tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error {
fmt.Println("hello world")
return nil
})
@ -49,26 +50,32 @@ func TestModifyTaskListAfterRunning(t *testing.T) {
if err != nil {
t.Fatal(err)
}
AddTask("taska", tk)
StartTask()
DeleteTask("taska")
AddTask("taska1", tk)
m.AddTask("taskb", tk)
m.StartTask()
go func() {
m.DeleteTask("taskb")
}()
go func() {
m.AddTask("taskb1", tk)
}()
time.Sleep(3 * time.Second)
StopTask()
m.StopTask()
}
func TestSpec(t *testing.T) {
m := newTaskManager()
wg := &sync.WaitGroup{}
wg.Add(2)
tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil })
tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil })
tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil })
AddTask("tk1", tk1)
AddTask("tk2", tk2)
AddTask("tk3", tk3)
StartTask()
defer StopTask()
m.AddTask("tk1", tk1)
m.AddTask("tk2", tk2)
m.AddTask("tk3", tk3)
m.StartTask()
defer m.StopTask()
select {
case <-time.After(200 * time.Second):