1
0
mirror of https://github.com/astaxie/beego.git synced 2025-01-22 13:37:12 +00:00

Merge pull request #4199 from flycash/rft/task-api

Add ctx to Task module API
This commit is contained in:
Ming Deng 2020-08-31 19:43:53 +08:00 committed by GitHub
commit 0a58428220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 33 deletions

View File

@ -16,6 +16,7 @@ package web
import (
"bytes"
context2 "context"
"encoding/json"
"fmt"
"net/http"
@ -378,10 +379,10 @@ func taskStatus(rw http.ResponseWriter, req *http.Request) {
taskname := req.Form.Get("taskname")
if taskname != "" {
if t, ok := task.AdminTaskList[taskname]; ok {
if err := t.Run(); err != nil {
if err := t.Run(nil); err != nil {
data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", err))}
}
data["Message"] = []string{"success", template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is <br>%s", taskname, t.GetStatus()))}
data["Message"] = []string{"success", template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is <br>%s", taskname, t.GetStatus(nil)))}
} else {
data["Message"] = []string{"warning", template.HTMLEscapeString(fmt.Sprintf("there's no task which named: %s", taskname))}
}
@ -400,9 +401,9 @@ func taskStatus(rw http.ResponseWriter, req *http.Request) {
for tname, tk := range task.AdminTaskList {
result := []string{
template.HTMLEscapeString(tname),
template.HTMLEscapeString(tk.GetSpec()),
template.HTMLEscapeString(tk.GetStatus()),
template.HTMLEscapeString(tk.GetPrev().String()),
template.HTMLEscapeString(tk.GetSpec(nil)),
template.HTMLEscapeString(tk.GetStatus(nil)),
template.HTMLEscapeString(tk.GetPrev(context2.Background()).String()),
}
*resultList = append(*resultList, result)
}

View File

@ -15,6 +15,7 @@
package task
import (
"context"
"log"
"math"
"sort"
@ -86,13 +87,13 @@ type TaskFunc func() error
// Tasker task interface
type Tasker interface {
GetSpec() string
GetStatus() string
Run() error
SetNext(time.Time)
GetNext() time.Time
SetPrev(time.Time)
GetPrev() time.Time
GetSpec(ctx context.Context) string
GetStatus(ctx context.Context) string
Run(ctx context.Context) error
SetNext(context.Context, time.Time)
GetNext(ctx context.Context) time.Time
SetPrev(context.Context, time.Time)
GetPrev(ctx context.Context) time.Time
}
// task error
@ -133,12 +134,12 @@ func NewTask(tname string, spec string, f TaskFunc) *Task {
}
// GetSpec get spec string
func (t *Task) GetSpec() string {
func (t *Task) GetSpec(context.Context) string {
return t.SpecStr
}
// GetStatus get current task status
func (t *Task) GetStatus() string {
func (t *Task) GetStatus(context.Context) string {
var str string
for _, v := range t.Errlist {
str += v.t.String() + ":" + v.errinfo + "<br>"
@ -147,7 +148,7 @@ func (t *Task) GetStatus() string {
}
// Run run all tasks
func (t *Task) Run() error {
func (t *Task) Run(context.Context) error {
err := t.DoFunc()
if err != nil {
index := t.errCnt % t.ErrLimit
@ -158,22 +159,22 @@ func (t *Task) Run() error {
}
// SetNext set next time for this task
func (t *Task) SetNext(now time.Time) {
func (t *Task) SetNext(ctx context.Context, now time.Time) {
t.Next = t.Spec.Next(now)
}
// GetNext get the next call time of this task
func (t *Task) GetNext() time.Time {
func (t *Task) GetNext(context.Context) time.Time {
return t.Next
}
// SetPrev set prev time of this task
func (t *Task) SetPrev(now time.Time) {
func (t *Task) SetPrev(ctx context.Context, now time.Time) {
t.Prev = now
}
// GetPrev get prev time of this task
func (t *Task) GetPrev() time.Time {
func (t *Task) GetPrev(context.Context) time.Time {
return t.Prev
}
@ -410,7 +411,7 @@ func StartTask() {
func run() {
now := time.Now().Local()
for _, t := range AdminTaskList {
t.SetNext(now)
t.SetNext(nil, now)
}
for {
@ -420,30 +421,30 @@ func run() {
taskLock.RUnlock()
sortList.Sort()
var effective time.Time
if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext().IsZero() {
if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
effective = now.AddDate(10, 0, 0)
} else {
effective = sortList.Vals[0].GetNext()
effective = sortList.Vals[0].GetNext(context.Background())
}
select {
case now = <-time.After(effective.Sub(now)):
// Run every entry whose next time was this effective time.
for _, e := range sortList.Vals {
if e.GetNext() != effective {
if e.GetNext(context.Background()) != effective {
break
}
go e.Run()
e.SetPrev(e.GetNext())
e.SetNext(effective)
go e.Run(nil)
e.SetPrev(context.Background(), e.GetNext(context.Background()))
e.SetNext(nil, effective)
}
continue
case <-changed:
now = time.Now().Local()
taskLock.Lock()
for _, t := range AdminTaskList {
t.SetNext(now)
t.SetNext(nil, now)
}
taskLock.Unlock()
continue
@ -468,7 +469,7 @@ func StopTask() {
func AddTask(taskname string, t Tasker) {
taskLock.Lock()
defer taskLock.Unlock()
t.SetNext(time.Now().Local())
t.SetNext(nil, time.Now().Local())
AdminTaskList[taskname] = t
if isstart {
changed <- true
@ -511,13 +512,13 @@ func (ms *MapSorter) Sort() {
func (ms *MapSorter) Len() int { return len(ms.Keys) }
func (ms *MapSorter) Less(i, j int) bool {
if ms.Vals[i].GetNext().IsZero() {
if ms.Vals[i].GetNext(context.Background()).IsZero() {
return false
}
if ms.Vals[j].GetNext().IsZero() {
if ms.Vals[j].GetNext(context.Background()).IsZero() {
return true
}
return ms.Vals[i].GetNext().Before(ms.Vals[j].GetNext())
return ms.Vals[i].GetNext(context.Background()).Before(ms.Vals[j].GetNext(context.Background()))
}
func (ms *MapSorter) Swap(i, j int) {
ms.Vals[i], ms.Vals[j] = ms.Vals[j], ms.Vals[i]

View File

@ -26,7 +26,7 @@ import (
func TestParse(t *testing.T) {
tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil })
err := tk.Run()
err := tk.Run(nil)
if err != nil {
t.Fatal(err)
}
@ -65,7 +65,7 @@ func TestTask_Run(t *testing.T) {
}
tk := NewTask("taska", "0/30 * * * * *", task)
for i := 0; i < 200; i++ {
e := tk.Run()
e := tk.Run(nil)
assert.NotNil(t, e)
}