From 3530457ff9a51e721be139bec94de2299a027197 Mon Sep 17 00:00:00 2001 From: Ming Deng Date: Thu, 3 Sep 2020 21:34:46 +0800 Subject: [PATCH] Adapter: toolbox module --- pkg/adapter/toolbox/healthcheck.go | 52 +++++ pkg/adapter/toolbox/profile.go | 50 +++++ pkg/adapter/toolbox/profile_test.go | 28 +++ pkg/adapter/toolbox/statistics.go | 50 +++++ pkg/adapter/toolbox/statistics_test.go | 40 ++++ pkg/adapter/toolbox/task.go | 286 +++++++++++++++++++++++++ pkg/adapter/toolbox/task_test.go | 63 ++++++ pkg/task/task.go | 6 +- 8 files changed, 572 insertions(+), 3 deletions(-) create mode 100644 pkg/adapter/toolbox/healthcheck.go create mode 100644 pkg/adapter/toolbox/profile.go create mode 100644 pkg/adapter/toolbox/profile_test.go create mode 100644 pkg/adapter/toolbox/statistics.go create mode 100644 pkg/adapter/toolbox/statistics_test.go create mode 100644 pkg/adapter/toolbox/task.go create mode 100644 pkg/adapter/toolbox/task_test.go diff --git a/pkg/adapter/toolbox/healthcheck.go b/pkg/adapter/toolbox/healthcheck.go new file mode 100644 index 00000000..56be8089 --- /dev/null +++ b/pkg/adapter/toolbox/healthcheck.go @@ -0,0 +1,52 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package toolbox healthcheck +// +// type DatabaseCheck struct { +// } +// +// func (dc *DatabaseCheck) Check() error { +// if dc.isConnected() { +// return nil +// } else { +// return errors.New("can't connect database") +// } +// } +// +// AddHealthCheck("database",&DatabaseCheck{}) +// +// more docs: http://beego.me/docs/module/toolbox.md +package toolbox + +import ( + "github.com/astaxie/beego/pkg/infrastructure/governor" +) + +// AdminCheckList holds health checker map +// Deprecated using governor.AdminCheckList +var AdminCheckList map[string]HealthChecker + +// HealthChecker health checker interface +type HealthChecker governor.HealthChecker + +// AddHealthCheck add health checker with name string +func AddHealthCheck(name string, hc HealthChecker) { + governor.AddHealthCheck(name, hc) + AdminCheckList[name] = hc +} + +func init() { + AdminCheckList = make(map[string]HealthChecker) +} diff --git a/pkg/adapter/toolbox/profile.go b/pkg/adapter/toolbox/profile.go new file mode 100644 index 00000000..16cf80b1 --- /dev/null +++ b/pkg/adapter/toolbox/profile.go @@ -0,0 +1,50 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "io" + "os" + "time" + + "github.com/astaxie/beego/pkg/infrastructure/governor" +) + +var startTime = time.Now() +var pid int + +func init() { + pid = os.Getpid() +} + +// ProcessInput parse input command string +func ProcessInput(input string, w io.Writer) { + governor.ProcessInput(input, w) +} + +// MemProf record memory profile in pprof +func MemProf(w io.Writer) { + governor.MemProf(w) +} + +// GetCPUProfile start cpu profile monitor +func GetCPUProfile(w io.Writer) { + governor.GetCPUProfile(w) +} + +// PrintGCSummary print gc information to io.Writer +func PrintGCSummary(w io.Writer) { + governor.PrintGCSummary(w) +} diff --git a/pkg/adapter/toolbox/profile_test.go b/pkg/adapter/toolbox/profile_test.go new file mode 100644 index 00000000..07a20c4e --- /dev/null +++ b/pkg/adapter/toolbox/profile_test.go @@ -0,0 +1,28 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "os" + "testing" +) + +func TestProcessInput(t *testing.T) { + ProcessInput("lookup goroutine", os.Stdout) + ProcessInput("lookup heap", os.Stdout) + ProcessInput("lookup threadcreate", os.Stdout) + ProcessInput("lookup block", os.Stdout) + ProcessInput("gc summary", os.Stdout) +} diff --git a/pkg/adapter/toolbox/statistics.go b/pkg/adapter/toolbox/statistics.go new file mode 100644 index 00000000..b7d3bda9 --- /dev/null +++ b/pkg/adapter/toolbox/statistics.go @@ -0,0 +1,50 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "time" + + "github.com/astaxie/beego/pkg/server/web" +) + +// Statistics struct +type Statistics web.Statistics + +// URLMap contains several statistics struct to log different data +type URLMap web.URLMap + +// AddStatistics add statistics task. +// it needs request method, request url, request controller and statistics time duration +func (m *URLMap) AddStatistics(requestMethod, requestURL, requestController string, requesttime time.Duration) { + (*web.URLMap)(m).AddStatistics(requestMethod, requestURL, requestController, requesttime) +} + +// GetMap put url statistics result in io.Writer +func (m *URLMap) GetMap() map[string]interface{} { + return (*web.URLMap)(m).GetMap() +} + +// GetMapData return all mapdata +func (m *URLMap) GetMapData() []map[string]interface{} { + return (*web.URLMap)(m).GetMapData() +} + +// StatisticsMap hosld global statistics data map +var StatisticsMap *URLMap + +func init() { + StatisticsMap = (*URLMap)(web.StatisticsMap) +} diff --git a/pkg/adapter/toolbox/statistics_test.go b/pkg/adapter/toolbox/statistics_test.go new file mode 100644 index 00000000..ac29476c --- /dev/null +++ b/pkg/adapter/toolbox/statistics_test.go @@ -0,0 +1,40 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "encoding/json" + "testing" + "time" +) + +func TestStatics(t *testing.T) { + StatisticsMap.AddStatistics("POST", "/api/user", "&admin.user", time.Duration(2000)) + StatisticsMap.AddStatistics("POST", "/api/user", "&admin.user", time.Duration(120000)) + StatisticsMap.AddStatistics("GET", "/api/user", "&admin.user", time.Duration(13000)) + StatisticsMap.AddStatistics("POST", "/api/admin", "&admin.user", time.Duration(14000)) + StatisticsMap.AddStatistics("POST", "/api/user/astaxie", "&admin.user", time.Duration(12000)) + StatisticsMap.AddStatistics("POST", "/api/user/xiemengjun", "&admin.user", time.Duration(13000)) + StatisticsMap.AddStatistics("DELETE", "/api/user", "&admin.user", time.Duration(1400)) + t.Log(StatisticsMap.GetMap()) + + data := StatisticsMap.GetMapData() + b, err := json.Marshal(data) + if err != nil { + t.Errorf(err.Error()) + } + + t.Log(string(b)) +} diff --git a/pkg/adapter/toolbox/task.go b/pkg/adapter/toolbox/task.go new file mode 100644 index 00000000..2a6d9aa6 --- /dev/null +++ b/pkg/adapter/toolbox/task.go @@ -0,0 +1,286 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "context" + "sort" + "time" + + "github.com/astaxie/beego/pkg/task" +) + +// The bounds for each field. +var ( + AdminTaskList map[string]Tasker +) + +const ( + // Set the top bit if a star was included in the expression. + starBit = 1 << 63 +) + +// Schedule time taks schedule +type Schedule task.Schedule + +// TaskFunc task func type +type TaskFunc func() error + +// Tasker task interface +type Tasker interface { + GetSpec() string + GetStatus() string + Run() error + SetNext(time.Time) + GetNext() time.Time + SetPrev(time.Time) + GetPrev() time.Time +} + +// task error +type taskerr struct { + t time.Time + errinfo string +} + +// Task task struct +// Deprecated +type Task struct { + // Deprecated + Taskname string + // Deprecated + Spec *Schedule + // Deprecated + SpecStr string + // Deprecated + DoFunc TaskFunc + // Deprecated + Prev time.Time + // Deprecated + Next time.Time + // Deprecated + Errlist []*taskerr // like errtime:errinfo + // Deprecated + ErrLimit int // max length for the errlist, 0 stand for no limit + + delegate *task.Task +} + +// NewTask add new task with name, time and func +func NewTask(tname string, spec string, f TaskFunc) *Task { + + task := task.NewTask(tname, spec, func(ctx context.Context) error { + return f() + }) + return &Task{ + delegate: task, + } +} + +// GetSpec get spec string +func (t *Task) GetSpec() string { + t.initDelegate() + + return t.delegate.GetSpec(context.Background()) +} + +// GetStatus get current task status +func (t *Task) GetStatus() string { + + t.initDelegate() + + return t.delegate.GetStatus(context.Background()) +} + +// Run run all tasks +func (t *Task) Run() error { + t.initDelegate() + return t.delegate.Run(context.Background()) +} + +// SetNext set next time for this task +func (t *Task) SetNext(now time.Time) { + t.initDelegate() + t.delegate.SetNext(context.Background(), now) +} + +// GetNext get the next call time of this task +func (t *Task) GetNext() time.Time { + t.initDelegate() + return t.delegate.GetNext(context.Background()) +} + +// SetPrev set prev time of this task +func (t *Task) SetPrev(now time.Time) { + t.initDelegate() + t.delegate.SetPrev(context.Background(), now) +} + +// GetPrev get prev time of this task +func (t *Task) GetPrev() time.Time { + t.initDelegate() + return t.delegate.GetPrev(context.Background()) +} + +// six columns mean: +// second:0-59 +// minute:0-59 +// hour:1-23 +// day:1-31 +// month:1-12 +// week:0-6(0 means Sunday) + +// SetCron some signals: +// *: any time +// ,:  separate signal +//    -:duration +// /n : do as n times of time duration +// /////////////////////////////////////////////////////// +// 0/30 * * * * * every 30s +// 0 43 21 * * * 21:43 +// 0 15 05 * * *    05:15 +// 0 0 17 * * * 17:00 +// 0 0 17 * * 1 17:00 in every Monday +// 0 0,10 17 * * 0,2,3 17:00 and 17:10 in every Sunday, Tuesday and Wednesday +// 0 0-10 17 1 * * 17:00 to 17:10 in 1 min duration each time on the first day of month +// 0 0 0 1,15 * 1 0:00 on the 1st day and 15th day of month +// 0 42 4 1 * *     4:42 on the 1st day of month +// 0 0 21 * * 1-6   21:00 from Monday to Saturday +// 0 0,10,20,30,40,50 * * * *  every 10 min duration +// 0 */10 * * * *        every 10 min duration +// 0 * 1 * * *         1:00 to 1:59 in 1 min duration each time +// 0 0 1 * * *         1:00 +// 0 0 */1 * * *        0 min of hour in 1 hour duration +// 0 0 * * * *         0 min of hour in 1 hour duration +// 0 2 8-20/3 * * *       8:02, 11:02, 14:02, 17:02, 20:02 +// 0 30 5 1,15 * *       5:30 on the 1st day and 15th day of month +func (t *Task) SetCron(spec string) { + t.initDelegate() + t.delegate.SetCron(spec) +} + +func (t *Task) initDelegate() { + if t.delegate == nil { + t.delegate = &task.Task{ + Taskname: t.Taskname, + Spec: (*task.Schedule)(t.Spec), + SpecStr: t.SpecStr, + DoFunc: func(ctx context.Context) error { + return t.DoFunc() + }, + Prev: t.Prev, + Next: t.Next, + ErrLimit: t.ErrLimit, + } + } +} + +// Next set schedule to next time +func (s *Schedule) Next(t time.Time) time.Time { + return (*task.Schedule)(s).Next(t) +} + +// StartTask start all tasks +func StartTask() { + task.StartTask() +} + +// StopTask stop all tasks +func StopTask() { + task.StopTask() +} + +// AddTask add task with name +func AddTask(taskname string, t Tasker) { + task.AddTask(taskname, &oldToNewAdapter{delegate: t}) +} + +// DeleteTask delete task with name +func DeleteTask(taskname string) { + task.DeleteTask(taskname) +} + +// MapSorter sort map for tasker +type MapSorter task.MapSorter + +// NewMapSorter create new tasker map +func NewMapSorter(m map[string]Tasker) *MapSorter { + + newTaskerMap := make(map[string]task.Tasker, len(m)) + + for key, value := range m { + newTaskerMap[key] = &oldToNewAdapter{ + delegate: value, + } + } + + return (*MapSorter)(task.NewMapSorter(newTaskerMap)) +} + +// Sort sort tasker map +func (ms *MapSorter) Sort() { + sort.Sort(ms) +} + +func (ms *MapSorter) Len() int { return len(ms.Keys) } +func (ms *MapSorter) Less(i, j int) bool { + if ms.Vals[i].GetNext(context.Background()).IsZero() { + return false + } + if ms.Vals[j].GetNext(context.Background()).IsZero() { + return true + } + return ms.Vals[i].GetNext(context.Background()).Before(ms.Vals[j].GetNext(context.Background())) +} +func (ms *MapSorter) Swap(i, j int) { + ms.Vals[i], ms.Vals[j] = ms.Vals[j], ms.Vals[i] + ms.Keys[i], ms.Keys[j] = ms.Keys[j], ms.Keys[i] +} + +func init() { + AdminTaskList = make(map[string]Tasker) +} + +type oldToNewAdapter struct { + delegate Tasker +} + +func (o *oldToNewAdapter) GetSpec(ctx context.Context) string { + return o.delegate.GetSpec() +} + +func (o *oldToNewAdapter) GetStatus(ctx context.Context) string { + return o.delegate.GetStatus() +} + +func (o *oldToNewAdapter) Run(ctx context.Context) error { + return o.delegate.Run() +} + +func (o *oldToNewAdapter) SetNext(ctx context.Context, t time.Time) { + o.delegate.SetNext(t) +} + +func (o *oldToNewAdapter) GetNext(ctx context.Context) time.Time { + return o.delegate.GetNext() +} + +func (o *oldToNewAdapter) SetPrev(ctx context.Context, t time.Time) { + o.delegate.SetPrev(t) +} + +func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time { + return o.delegate.GetPrev() +} diff --git a/pkg/adapter/toolbox/task_test.go b/pkg/adapter/toolbox/task_test.go new file mode 100644 index 00000000..596bc9c5 --- /dev/null +++ b/pkg/adapter/toolbox/task_test.go @@ -0,0 +1,63 @@ +// Copyright 2014 beego Author. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package toolbox + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestParse(t *testing.T) { + tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil }) + err := tk.Run() + if err != nil { + t.Fatal(err) + } + AddTask("taska", tk) + StartTask() + time.Sleep(6 * time.Second) + StopTask() +} + +func TestSpec(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + tk1 := NewTask("tk1", "0 12 * * * *", func() error { fmt.Println("tk1"); return nil }) + tk2 := NewTask("tk2", "0,10,20 * * * * *", func() error { fmt.Println("tk2"); wg.Done(); return nil }) + tk3 := NewTask("tk3", "0 10 * * * *", func() error { fmt.Println("tk3"); wg.Done(); return nil }) + + AddTask("tk1", tk1) + AddTask("tk2", tk2) + AddTask("tk3", tk3) + StartTask() + defer StopTask() + + select { + case <-time.After(200 * time.Second): + t.FailNow() + case <-wait(wg): + } +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +} diff --git a/pkg/task/task.go b/pkg/task/task.go index e2962000..bcadb956 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -83,7 +83,7 @@ type Schedule struct { } // TaskFunc task func type -type TaskFunc func() error +type TaskFunc func(ctx context.Context) error // Tasker task interface type Tasker interface { @@ -148,8 +148,8 @@ func (t *Task) GetStatus(context.Context) string { } // Run run all tasks -func (t *Task) Run(context.Context) error { - err := t.DoFunc() +func (t *Task) Run(ctx context.Context) error { + err := t.DoFunc(ctx) if err != nil { index := t.errCnt % t.ErrLimit t.Errlist[index] = &taskerr{t: t.Next, errinfo: err.Error()}