diff --git a/pkg/infrastructure/governor/command.go b/pkg/infrastructure/governor/command.go
new file mode 100644
index 00000000..75df5815
--- /dev/null
+++ b/pkg/infrastructure/governor/command.go
@@ -0,0 +1,87 @@
+// Copyright 2020
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package governor
+
+import (
+ "github.com/pkg/errors"
+)
+
+// Command is an experimental interface
+// We try to use this to decouple modules
+// All other modules depends on this, and they register the command they support
+// We may change the API in the future, so be careful about this.
+type Command interface {
+ Execute(params ...interface{}) *Result
+}
+
+var CommandNotFound = errors.New("Command not found")
+
+type Result struct {
+ // Status is the same as http.Status
+ Status int
+ Error error
+ Content interface{}
+}
+
+func (r *Result) IsSuccess() bool {
+ return r.Status >= 200 && r.Status < 300
+}
+
+// CommandRegistry stores all commands
+// name => command
+type moduleCommands map[string]Command
+
+// Get returns command with the name
+func (m moduleCommands) Get(name string) Command {
+ c, ok := m[name]
+ if ok {
+ return c
+ }
+ return &doNothingCommand{}
+}
+
+// module name => moduleCommand
+type commandRegistry map[string]moduleCommands
+
+// Get returns module's commands
+func (c commandRegistry) Get(moduleName string) moduleCommands {
+ if mcs, ok := c[moduleName]; ok {
+ return mcs
+ }
+ res := make(moduleCommands)
+ c[moduleName] = res
+ return res
+}
+
+var cmdRegistry = make(commandRegistry)
+
+// RegisterCommand is not thread-safe
+// do not use it in concurrent case
+func RegisterCommand(module string, commandName string, command Command) {
+ cmdRegistry.Get(module)[commandName] = command
+}
+
+func GetCommand(module string, cmdName string) Command {
+ return cmdRegistry.Get(module).Get(cmdName)
+}
+
+type doNothingCommand struct{}
+
+func (d *doNothingCommand) Execute(params ...interface{}) *Result {
+ return &Result{
+ Status: 404,
+ Error: CommandNotFound,
+ }
+}
diff --git a/pkg/server/web/admin.go b/pkg/server/web/admin.go
index 46c0f738..084190a9 100644
--- a/pkg/server/web/admin.go
+++ b/pkg/server/web/admin.go
@@ -21,7 +21,6 @@ import (
"time"
"github.com/astaxie/beego/pkg/infrastructure/logs"
- "github.com/astaxie/beego/pkg/task"
)
// BeeAdminApp is the default adminApp used by admin module.
@@ -86,9 +85,12 @@ type adminApp struct {
// Route adds http.HandlerFunc to adminApp with url pattern.
func (admin *adminApp) Run() {
- if len(task.AdminTaskList) > 0 {
- task.StartTask()
- }
+ // if len(task.AdminTaskList) > 0 {
+ // task.StartTask()
+ // }
+ logs.Warning("now we don't start tasks here, if you use task module," +
+ " please invoke task.StartTask, or task will not be executed")
+
addr := BConfig.Listen.AdminAddr
if BConfig.Listen.AdminPort != 0 {
diff --git a/pkg/server/web/admin_controller.go b/pkg/server/web/admin_controller.go
index c53c54cf..dc3a40b5 100644
--- a/pkg/server/web/admin_controller.go
+++ b/pkg/server/web/admin_controller.go
@@ -16,7 +16,6 @@ package web
import (
"bytes"
- context2 "context"
"encoding/json"
"fmt"
"net/http"
@@ -26,7 +25,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/astaxie/beego/pkg/infrastructure/governor"
- "github.com/astaxie/beego/pkg/task"
)
type adminController struct {
@@ -90,19 +88,22 @@ func (a *adminController) TaskStatus() {
req.ParseForm()
taskname := req.Form.Get("taskname")
if taskname != "" {
- if t, ok := task.AdminTaskList[taskname]; ok {
- if err := t.Run(nil); err != nil {
- data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", err))}
- }
- data["Message"] = []string{"success", template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is
%s", taskname, t.GetStatus(nil)))}
+ cmd := governor.GetCommand("task", "run")
+ res := cmd.Execute(taskname)
+ if res.IsSuccess() {
+
+ data["Message"] = []string{"success",
+ template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is
%s",
+ taskname, res.Content.(string)))}
+
} else {
- data["Message"] = []string{"warning", template.HTMLEscapeString(fmt.Sprintf("there's no task which named: %s", taskname))}
+ data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", res.Error))}
}
}
// List Tasks
content := make(M)
- resultList := new([][]string)
+ resultList := governor.GetCommand("task", "list").Execute().Content.([][]string)
var fields = []string{
"Task Name",
"Task Spec",
@@ -110,15 +111,6 @@ func (a *adminController) TaskStatus() {
"Last Time",
"",
}
- for tname, tk := range task.AdminTaskList {
- result := []string{
- template.HTMLEscapeString(tname),
- template.HTMLEscapeString(tk.GetSpec(nil)),
- template.HTMLEscapeString(tk.GetStatus(nil)),
- template.HTMLEscapeString(tk.GetPrev(context2.Background()).String()),
- }
- *resultList = append(*resultList, result)
- }
content["Fields"] = fields
content["Data"] = resultList
diff --git a/pkg/task/govenor_command.go b/pkg/task/govenor_command.go
new file mode 100644
index 00000000..fff08374
--- /dev/null
+++ b/pkg/task/govenor_command.go
@@ -0,0 +1,92 @@
+// Copyright 2020
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package task
+
+import (
+ "context"
+ "fmt"
+ "html/template"
+
+ "github.com/pkg/errors"
+
+ "github.com/astaxie/beego/pkg/infrastructure/governor"
+)
+
+type listTaskCommand struct {
+}
+
+func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result {
+ resultList := make([][]string, 0, len(AdminTaskList))
+ for tname, tk := range AdminTaskList {
+ result := []string{
+ template.HTMLEscapeString(tname),
+ template.HTMLEscapeString(tk.GetSpec(nil)),
+ template.HTMLEscapeString(tk.GetStatus(nil)),
+ template.HTMLEscapeString(tk.GetPrev(context.Background()).String()),
+ }
+ resultList = append(resultList, result)
+ }
+
+ return &governor.Result{
+ Status: 200,
+ Content: resultList,
+ }
+}
+
+type runTaskCommand struct {
+}
+
+func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result {
+ if len(params) == 0 {
+ return &governor.Result{
+ Status: 400,
+ Error: errors.New("task name not passed"),
+ }
+ }
+
+ tn, ok := params[0].(string)
+
+ if !ok {
+ return &governor.Result{
+ Status: 400,
+ Error: errors.New("parameter is invalid"),
+ }
+ }
+
+ if t, ok := AdminTaskList[tn]; ok {
+ err := t.Run(context.Background())
+ if err != nil {
+ return &governor.Result{
+ Status: 500,
+ Error: err,
+ }
+ }
+ return &governor.Result{
+ Status: 200,
+ Content: t.GetStatus(context.Background()),
+ }
+ } else {
+ return &governor.Result{
+ Status: 400,
+ Error: errors.New(fmt.Sprintf("task with name %s not found", tn)),
+ }
+ }
+
+}
+
+func registerCommands() {
+ governor.RegisterCommand("task", "list", &listTaskCommand{})
+ governor.RegisterCommand("task", "run", &runTaskCommand{})
+}
diff --git a/pkg/task/governor_command_test.go b/pkg/task/governor_command_test.go
new file mode 100644
index 00000000..00ed37f2
--- /dev/null
+++ b/pkg/task/governor_command_test.go
@@ -0,0 +1,111 @@
+// Copyright 2020
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package task
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type countTask struct {
+ cnt int
+ mockErr error
+}
+
+func (c *countTask) GetSpec(ctx context.Context) string {
+ return "AAA"
+}
+
+func (c *countTask) GetStatus(ctx context.Context) string {
+ return "SUCCESS"
+}
+
+func (c *countTask) Run(ctx context.Context) error {
+ c.cnt++
+ return c.mockErr
+}
+
+func (c *countTask) SetNext(ctx context.Context, time time.Time) {
+}
+
+func (c *countTask) GetNext(ctx context.Context) time.Time {
+ return time.Now()
+}
+
+func (c *countTask) SetPrev(ctx context.Context, time time.Time) {
+}
+
+func (c *countTask) GetPrev(ctx context.Context) time.Time {
+ return time.Now()
+}
+
+func TestRunTaskCommand_Execute(t *testing.T) {
+ task := &countTask{}
+ AddTask("count", task)
+
+ cmd := &runTaskCommand{}
+
+ res := cmd.Execute()
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Error)
+ assert.Equal(t, "task name not passed", res.Error.Error())
+
+ res = cmd.Execute(10)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Error)
+ assert.Equal(t, "parameter is invalid", res.Error.Error())
+
+ res = cmd.Execute("CCCC")
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Error)
+ assert.Equal(t, "task with name CCCC not found", res.Error.Error())
+
+ res = cmd.Execute("count")
+ assert.NotNil(t, res)
+ assert.True(t, res.IsSuccess())
+
+ task.mockErr = errors.New("mock error")
+ res = cmd.Execute("count")
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Error)
+ assert.Equal(t, "mock error", res.Error.Error())
+}
+
+func TestListTaskCommand_Execute(t *testing.T) {
+ task := &countTask{}
+
+ cmd := &listTaskCommand{}
+
+ res := cmd.Execute()
+
+ assert.True(t, res.IsSuccess())
+
+ _, ok := res.Content.([][]string)
+ assert.True(t, ok)
+
+ AddTask("count", task)
+
+ res = cmd.Execute()
+
+ assert.True(t, res.IsSuccess())
+
+ rl, ok := res.Content.([][]string)
+ assert.True(t, ok)
+ assert.Equal(t, 1, len(rl))
+}
diff --git a/pkg/task/task.go b/pkg/task/task.go
index bcadb956..e3a8bba4 100644
--- a/pkg/task/task.go
+++ b/pkg/task/task.go
@@ -189,9 +189,9 @@ func (t *Task) GetPrev(context.Context) time.Time {
// SetCron some signals:
// *: any time
// ,: separate signal
-// -:duration
+// -:duration
// /n : do as n times of time duration
-/////////////////////////////////////////////////////////
+// ///////////////////////////////////////////////////////
// 0/30 * * * * * every 30s
// 0 43 21 * * * 21:43
// 0 15 05 * * * 05:15
@@ -401,10 +401,12 @@ func StartTask() {
taskLock.Lock()
defer taskLock.Unlock()
if isstart {
- //If already started, no need to start another goroutine.
+ // If already started, no need to start another goroutine.
return
}
isstart = true
+
+ registerCommands()
go run()
}