From 44127edefc069882605031afcd2e310b24548bc7 Mon Sep 17 00:00:00 2001 From: Ming Deng Date: Sun, 20 Sep 2020 14:18:13 +0000 Subject: [PATCH] design Command for governor module & decouple web module from task module --- pkg/infrastructure/governor/command.go | 87 +++++++++++++++++++ pkg/server/web/admin.go | 10 ++- pkg/server/web/admin_controller.go | 28 +++---- pkg/task/govenor_command.go | 92 ++++++++++++++++++++ pkg/task/governor_command_test.go | 111 +++++++++++++++++++++++++ pkg/task/task.go | 8 +- 6 files changed, 311 insertions(+), 25 deletions(-) create mode 100644 pkg/infrastructure/governor/command.go create mode 100644 pkg/task/govenor_command.go create mode 100644 pkg/task/governor_command_test.go diff --git a/pkg/infrastructure/governor/command.go b/pkg/infrastructure/governor/command.go new file mode 100644 index 00000000..75df5815 --- /dev/null +++ b/pkg/infrastructure/governor/command.go @@ -0,0 +1,87 @@ +// Copyright 2020 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package governor + +import ( + "github.com/pkg/errors" +) + +// Command is an experimental interface +// We try to use this to decouple modules +// All other modules depends on this, and they register the command they support +// We may change the API in the future, so be careful about this. +type Command interface { + Execute(params ...interface{}) *Result +} + +var CommandNotFound = errors.New("Command not found") + +type Result struct { + // Status is the same as http.Status + Status int + Error error + Content interface{} +} + +func (r *Result) IsSuccess() bool { + return r.Status >= 200 && r.Status < 300 +} + +// CommandRegistry stores all commands +// name => command +type moduleCommands map[string]Command + +// Get returns command with the name +func (m moduleCommands) Get(name string) Command { + c, ok := m[name] + if ok { + return c + } + return &doNothingCommand{} +} + +// module name => moduleCommand +type commandRegistry map[string]moduleCommands + +// Get returns module's commands +func (c commandRegistry) Get(moduleName string) moduleCommands { + if mcs, ok := c[moduleName]; ok { + return mcs + } + res := make(moduleCommands) + c[moduleName] = res + return res +} + +var cmdRegistry = make(commandRegistry) + +// RegisterCommand is not thread-safe +// do not use it in concurrent case +func RegisterCommand(module string, commandName string, command Command) { + cmdRegistry.Get(module)[commandName] = command +} + +func GetCommand(module string, cmdName string) Command { + return cmdRegistry.Get(module).Get(cmdName) +} + +type doNothingCommand struct{} + +func (d *doNothingCommand) Execute(params ...interface{}) *Result { + return &Result{ + Status: 404, + Error: CommandNotFound, + } +} diff --git a/pkg/server/web/admin.go b/pkg/server/web/admin.go index 46c0f738..084190a9 100644 --- a/pkg/server/web/admin.go +++ b/pkg/server/web/admin.go @@ -21,7 +21,6 @@ import ( "time" "github.com/astaxie/beego/pkg/infrastructure/logs" - "github.com/astaxie/beego/pkg/task" ) // BeeAdminApp is the default adminApp used by admin module. @@ -86,9 +85,12 @@ type adminApp struct { // Route adds http.HandlerFunc to adminApp with url pattern. func (admin *adminApp) Run() { - if len(task.AdminTaskList) > 0 { - task.StartTask() - } + // if len(task.AdminTaskList) > 0 { + // task.StartTask() + // } + logs.Warning("now we don't start tasks here, if you use task module," + + " please invoke task.StartTask, or task will not be executed") + addr := BConfig.Listen.AdminAddr if BConfig.Listen.AdminPort != 0 { diff --git a/pkg/server/web/admin_controller.go b/pkg/server/web/admin_controller.go index c53c54cf..dc3a40b5 100644 --- a/pkg/server/web/admin_controller.go +++ b/pkg/server/web/admin_controller.go @@ -16,7 +16,6 @@ package web import ( "bytes" - context2 "context" "encoding/json" "fmt" "net/http" @@ -26,7 +25,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/astaxie/beego/pkg/infrastructure/governor" - "github.com/astaxie/beego/pkg/task" ) type adminController struct { @@ -90,19 +88,22 @@ func (a *adminController) TaskStatus() { req.ParseForm() taskname := req.Form.Get("taskname") if taskname != "" { - if t, ok := task.AdminTaskList[taskname]; ok { - if err := t.Run(nil); err != nil { - data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", err))} - } - data["Message"] = []string{"success", template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is
%s", taskname, t.GetStatus(nil)))} + cmd := governor.GetCommand("task", "run") + res := cmd.Execute(taskname) + if res.IsSuccess() { + + data["Message"] = []string{"success", + template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is
%s", + taskname, res.Content.(string)))} + } else { - data["Message"] = []string{"warning", template.HTMLEscapeString(fmt.Sprintf("there's no task which named: %s", taskname))} + data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", res.Error))} } } // List Tasks content := make(M) - resultList := new([][]string) + resultList := governor.GetCommand("task", "list").Execute().Content.([][]string) var fields = []string{ "Task Name", "Task Spec", @@ -110,15 +111,6 @@ func (a *adminController) TaskStatus() { "Last Time", "", } - for tname, tk := range task.AdminTaskList { - result := []string{ - template.HTMLEscapeString(tname), - template.HTMLEscapeString(tk.GetSpec(nil)), - template.HTMLEscapeString(tk.GetStatus(nil)), - template.HTMLEscapeString(tk.GetPrev(context2.Background()).String()), - } - *resultList = append(*resultList, result) - } content["Fields"] = fields content["Data"] = resultList diff --git a/pkg/task/govenor_command.go b/pkg/task/govenor_command.go new file mode 100644 index 00000000..fff08374 --- /dev/null +++ b/pkg/task/govenor_command.go @@ -0,0 +1,92 @@ +// Copyright 2020 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + "html/template" + + "github.com/pkg/errors" + + "github.com/astaxie/beego/pkg/infrastructure/governor" +) + +type listTaskCommand struct { +} + +func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result { + resultList := make([][]string, 0, len(AdminTaskList)) + for tname, tk := range AdminTaskList { + result := []string{ + template.HTMLEscapeString(tname), + template.HTMLEscapeString(tk.GetSpec(nil)), + template.HTMLEscapeString(tk.GetStatus(nil)), + template.HTMLEscapeString(tk.GetPrev(context.Background()).String()), + } + resultList = append(resultList, result) + } + + return &governor.Result{ + Status: 200, + Content: resultList, + } +} + +type runTaskCommand struct { +} + +func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result { + if len(params) == 0 { + return &governor.Result{ + Status: 400, + Error: errors.New("task name not passed"), + } + } + + tn, ok := params[0].(string) + + if !ok { + return &governor.Result{ + Status: 400, + Error: errors.New("parameter is invalid"), + } + } + + if t, ok := AdminTaskList[tn]; ok { + err := t.Run(context.Background()) + if err != nil { + return &governor.Result{ + Status: 500, + Error: err, + } + } + return &governor.Result{ + Status: 200, + Content: t.GetStatus(context.Background()), + } + } else { + return &governor.Result{ + Status: 400, + Error: errors.New(fmt.Sprintf("task with name %s not found", tn)), + } + } + +} + +func registerCommands() { + governor.RegisterCommand("task", "list", &listTaskCommand{}) + governor.RegisterCommand("task", "run", &runTaskCommand{}) +} diff --git a/pkg/task/governor_command_test.go b/pkg/task/governor_command_test.go new file mode 100644 index 00000000..00ed37f2 --- /dev/null +++ b/pkg/task/governor_command_test.go @@ -0,0 +1,111 @@ +// Copyright 2020 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type countTask struct { + cnt int + mockErr error +} + +func (c *countTask) GetSpec(ctx context.Context) string { + return "AAA" +} + +func (c *countTask) GetStatus(ctx context.Context) string { + return "SUCCESS" +} + +func (c *countTask) Run(ctx context.Context) error { + c.cnt++ + return c.mockErr +} + +func (c *countTask) SetNext(ctx context.Context, time time.Time) { +} + +func (c *countTask) GetNext(ctx context.Context) time.Time { + return time.Now() +} + +func (c *countTask) SetPrev(ctx context.Context, time time.Time) { +} + +func (c *countTask) GetPrev(ctx context.Context) time.Time { + return time.Now() +} + +func TestRunTaskCommand_Execute(t *testing.T) { + task := &countTask{} + AddTask("count", task) + + cmd := &runTaskCommand{} + + res := cmd.Execute() + assert.NotNil(t, res) + assert.NotNil(t, res.Error) + assert.Equal(t, "task name not passed", res.Error.Error()) + + res = cmd.Execute(10) + assert.NotNil(t, res) + assert.NotNil(t, res.Error) + assert.Equal(t, "parameter is invalid", res.Error.Error()) + + res = cmd.Execute("CCCC") + assert.NotNil(t, res) + assert.NotNil(t, res.Error) + assert.Equal(t, "task with name CCCC not found", res.Error.Error()) + + res = cmd.Execute("count") + assert.NotNil(t, res) + assert.True(t, res.IsSuccess()) + + task.mockErr = errors.New("mock error") + res = cmd.Execute("count") + assert.NotNil(t, res) + assert.NotNil(t, res.Error) + assert.Equal(t, "mock error", res.Error.Error()) +} + +func TestListTaskCommand_Execute(t *testing.T) { + task := &countTask{} + + cmd := &listTaskCommand{} + + res := cmd.Execute() + + assert.True(t, res.IsSuccess()) + + _, ok := res.Content.([][]string) + assert.True(t, ok) + + AddTask("count", task) + + res = cmd.Execute() + + assert.True(t, res.IsSuccess()) + + rl, ok := res.Content.([][]string) + assert.True(t, ok) + assert.Equal(t, 1, len(rl)) +} diff --git a/pkg/task/task.go b/pkg/task/task.go index bcadb956..e3a8bba4 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -189,9 +189,9 @@ func (t *Task) GetPrev(context.Context) time.Time { // SetCron some signals: // *: any time // ,:  separate signal -//   -:duration +//    -:duration // /n : do as n times of time duration -///////////////////////////////////////////////////////// +// /////////////////////////////////////////////////////// // 0/30 * * * * * every 30s // 0 43 21 * * * 21:43 // 0 15 05 * * *    05:15 @@ -401,10 +401,12 @@ func StartTask() { taskLock.Lock() defer taskLock.Unlock() if isstart { - //If already started, no need to start another goroutine. + // If already started, no need to start another goroutine. return } isstart = true + + registerCommands() go run() }