design Command for governor module & decouple web module from task module

This commit is contained in:
Ming Deng 2020-09-20 14:18:13 +00:00
parent 089006525e
commit 44127edefc
6 changed files with 311 additions and 25 deletions

View File

@ -0,0 +1,87 @@
// Copyright 2020
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package governor
import (
"github.com/pkg/errors"
)
// Command is an experimental interface
// We try to use this to decouple modules
// All other modules depends on this, and they register the command they support
// We may change the API in the future, so be careful about this.
type Command interface {
Execute(params ...interface{}) *Result
}
var CommandNotFound = errors.New("Command not found")
type Result struct {
// Status is the same as http.Status
Status int
Error error
Content interface{}
}
func (r *Result) IsSuccess() bool {
return r.Status >= 200 && r.Status < 300
}
// CommandRegistry stores all commands
// name => command
type moduleCommands map[string]Command
// Get returns command with the name
func (m moduleCommands) Get(name string) Command {
c, ok := m[name]
if ok {
return c
}
return &doNothingCommand{}
}
// module name => moduleCommand
type commandRegistry map[string]moduleCommands
// Get returns module's commands
func (c commandRegistry) Get(moduleName string) moduleCommands {
if mcs, ok := c[moduleName]; ok {
return mcs
}
res := make(moduleCommands)
c[moduleName] = res
return res
}
var cmdRegistry = make(commandRegistry)
// RegisterCommand is not thread-safe
// do not use it in concurrent case
func RegisterCommand(module string, commandName string, command Command) {
cmdRegistry.Get(module)[commandName] = command
}
func GetCommand(module string, cmdName string) Command {
return cmdRegistry.Get(module).Get(cmdName)
}
type doNothingCommand struct{}
func (d *doNothingCommand) Execute(params ...interface{}) *Result {
return &Result{
Status: 404,
Error: CommandNotFound,
}
}

View File

@ -21,7 +21,6 @@ import (
"time"
"github.com/astaxie/beego/pkg/infrastructure/logs"
"github.com/astaxie/beego/pkg/task"
)
// BeeAdminApp is the default adminApp used by admin module.
@ -86,9 +85,12 @@ type adminApp struct {
// Route adds http.HandlerFunc to adminApp with url pattern.
func (admin *adminApp) Run() {
if len(task.AdminTaskList) > 0 {
task.StartTask()
}
// if len(task.AdminTaskList) > 0 {
// task.StartTask()
// }
logs.Warning("now we don't start tasks here, if you use task module," +
" please invoke task.StartTask, or task will not be executed")
addr := BConfig.Listen.AdminAddr
if BConfig.Listen.AdminPort != 0 {

View File

@ -16,7 +16,6 @@ package web
import (
"bytes"
context2 "context"
"encoding/json"
"fmt"
"net/http"
@ -26,7 +25,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/astaxie/beego/pkg/infrastructure/governor"
"github.com/astaxie/beego/pkg/task"
)
type adminController struct {
@ -90,19 +88,22 @@ func (a *adminController) TaskStatus() {
req.ParseForm()
taskname := req.Form.Get("taskname")
if taskname != "" {
if t, ok := task.AdminTaskList[taskname]; ok {
if err := t.Run(nil); err != nil {
data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", err))}
}
data["Message"] = []string{"success", template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is <br>%s", taskname, t.GetStatus(nil)))}
cmd := governor.GetCommand("task", "run")
res := cmd.Execute(taskname)
if res.IsSuccess() {
data["Message"] = []string{"success",
template.HTMLEscapeString(fmt.Sprintf("%s run success,Now the Status is <br>%s",
taskname, res.Content.(string)))}
} else {
data["Message"] = []string{"warning", template.HTMLEscapeString(fmt.Sprintf("there's no task which named: %s", taskname))}
data["Message"] = []string{"error", template.HTMLEscapeString(fmt.Sprintf("%s", res.Error))}
}
}
// List Tasks
content := make(M)
resultList := new([][]string)
resultList := governor.GetCommand("task", "list").Execute().Content.([][]string)
var fields = []string{
"Task Name",
"Task Spec",
@ -110,15 +111,6 @@ func (a *adminController) TaskStatus() {
"Last Time",
"",
}
for tname, tk := range task.AdminTaskList {
result := []string{
template.HTMLEscapeString(tname),
template.HTMLEscapeString(tk.GetSpec(nil)),
template.HTMLEscapeString(tk.GetStatus(nil)),
template.HTMLEscapeString(tk.GetPrev(context2.Background()).String()),
}
*resultList = append(*resultList, result)
}
content["Fields"] = fields
content["Data"] = resultList

View File

@ -0,0 +1,92 @@
// Copyright 2020
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"context"
"fmt"
"html/template"
"github.com/pkg/errors"
"github.com/astaxie/beego/pkg/infrastructure/governor"
)
type listTaskCommand struct {
}
func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result {
resultList := make([][]string, 0, len(AdminTaskList))
for tname, tk := range AdminTaskList {
result := []string{
template.HTMLEscapeString(tname),
template.HTMLEscapeString(tk.GetSpec(nil)),
template.HTMLEscapeString(tk.GetStatus(nil)),
template.HTMLEscapeString(tk.GetPrev(context.Background()).String()),
}
resultList = append(resultList, result)
}
return &governor.Result{
Status: 200,
Content: resultList,
}
}
type runTaskCommand struct {
}
func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result {
if len(params) == 0 {
return &governor.Result{
Status: 400,
Error: errors.New("task name not passed"),
}
}
tn, ok := params[0].(string)
if !ok {
return &governor.Result{
Status: 400,
Error: errors.New("parameter is invalid"),
}
}
if t, ok := AdminTaskList[tn]; ok {
err := t.Run(context.Background())
if err != nil {
return &governor.Result{
Status: 500,
Error: err,
}
}
return &governor.Result{
Status: 200,
Content: t.GetStatus(context.Background()),
}
} else {
return &governor.Result{
Status: 400,
Error: errors.New(fmt.Sprintf("task with name %s not found", tn)),
}
}
}
func registerCommands() {
governor.RegisterCommand("task", "list", &listTaskCommand{})
governor.RegisterCommand("task", "run", &runTaskCommand{})
}

View File

@ -0,0 +1,111 @@
// Copyright 2020
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type countTask struct {
cnt int
mockErr error
}
func (c *countTask) GetSpec(ctx context.Context) string {
return "AAA"
}
func (c *countTask) GetStatus(ctx context.Context) string {
return "SUCCESS"
}
func (c *countTask) Run(ctx context.Context) error {
c.cnt++
return c.mockErr
}
func (c *countTask) SetNext(ctx context.Context, time time.Time) {
}
func (c *countTask) GetNext(ctx context.Context) time.Time {
return time.Now()
}
func (c *countTask) SetPrev(ctx context.Context, time time.Time) {
}
func (c *countTask) GetPrev(ctx context.Context) time.Time {
return time.Now()
}
func TestRunTaskCommand_Execute(t *testing.T) {
task := &countTask{}
AddTask("count", task)
cmd := &runTaskCommand{}
res := cmd.Execute()
assert.NotNil(t, res)
assert.NotNil(t, res.Error)
assert.Equal(t, "task name not passed", res.Error.Error())
res = cmd.Execute(10)
assert.NotNil(t, res)
assert.NotNil(t, res.Error)
assert.Equal(t, "parameter is invalid", res.Error.Error())
res = cmd.Execute("CCCC")
assert.NotNil(t, res)
assert.NotNil(t, res.Error)
assert.Equal(t, "task with name CCCC not found", res.Error.Error())
res = cmd.Execute("count")
assert.NotNil(t, res)
assert.True(t, res.IsSuccess())
task.mockErr = errors.New("mock error")
res = cmd.Execute("count")
assert.NotNil(t, res)
assert.NotNil(t, res.Error)
assert.Equal(t, "mock error", res.Error.Error())
}
func TestListTaskCommand_Execute(t *testing.T) {
task := &countTask{}
cmd := &listTaskCommand{}
res := cmd.Execute()
assert.True(t, res.IsSuccess())
_, ok := res.Content.([][]string)
assert.True(t, ok)
AddTask("count", task)
res = cmd.Execute()
assert.True(t, res.IsSuccess())
rl, ok := res.Content.([][]string)
assert.True(t, ok)
assert.Equal(t, 1, len(rl))
}

View File

@ -189,9 +189,9 @@ func (t *Task) GetPrev(context.Context) time.Time {
// SetCron some signals
// * any time
// ,  separate signal
//   duration
//    duration
// /n : do as n times of time duration
/////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////
// 0/30 * * * * * every 30s
// 0 43 21 * * * 21:43
// 0 15 05 * * *    05:15
@ -401,10 +401,12 @@ func StartTask() {
taskLock.Lock()
defer taskLock.Unlock()
if isstart {
//If already started no need to start another goroutine.
// If already started no need to start another goroutine.
return
}
isstart = true
registerCommands()
go run()
}