1
0
mirror of https://github.com/astaxie/beego.git synced 2024-11-24 14:50:53 +00:00

Merge pull request #4246 from jianzhiyao/frt/seperate_orm_alone

seperate orm alone & deadlock in task module
This commit is contained in:
Ming Deng 2020-10-05 14:24:51 +08:00 committed by GitHub
commit 43560dede4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 182 additions and 74 deletions

View File

@ -212,6 +212,11 @@ func DeleteTask(taskname string) {
task.DeleteTask(taskname) task.DeleteTask(taskname)
} }
// ClearTask clear all tasks
func ClearTask() {
task.ClearTask()
}
// MapSorter sort map for tasker // MapSorter sort map for tasker
type MapSorter task.MapSorter type MapSorter task.MapSorter

View File

@ -22,6 +22,8 @@ import (
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
defer ClearTask()
tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil }) tk := NewTask("taska", "0/30 * * * * *", func() error { fmt.Println("hello world"); return nil })
err := tk.Run() err := tk.Run()
if err != nil { if err != nil {
@ -34,6 +36,8 @@ func TestParse(t *testing.T) {
} }
func TestSpec(t *testing.T) { func TestSpec(t *testing.T) {
defer ClearTask()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
tk1 := NewTask("tk1", "0 12 * * * *", func() error { fmt.Println("tk1"); return nil }) tk1 := NewTask("tk1", "0 12 * * * *", func() error { fmt.Println("tk1"); return nil })

View File

@ -23,7 +23,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/astaxie/beego/pkg/client/orm" "github.com/astaxie/beego/pkg/client/orm"
"github.com/astaxie/beego/pkg/server/web"
) )
// FilterChainBuilder is an extension point, // FilterChainBuilder is an extension point,
@ -35,27 +34,24 @@ import (
// actually we only records metrics of invoking "QueryTable" and "QueryTableWithCtx" // actually we only records metrics of invoking "QueryTable" and "QueryTableWithCtx"
type FilterChainBuilder struct { type FilterChainBuilder struct {
summaryVec prometheus.ObserverVec summaryVec prometheus.ObserverVec
AppName string
ServerName string
RunMode string
} }
func NewFilterChainBuilder() *FilterChainBuilder { func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter {
summaryVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
builder.summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "beego", Name: "beego",
Subsystem: "orm_operation", Subsystem: "orm_operation",
ConstLabels: map[string]string{ ConstLabels: map[string]string{
"server": web.BConfig.ServerName, "server": builder.ServerName,
"env": web.BConfig.RunMode, "env": builder.RunMode,
"appname": web.BConfig.AppName, "appname": builder.AppName,
}, },
Help: "The statics info for orm operation", Help: "The statics info for orm operation",
}, []string{"method", "name", "duration", "insideTx", "txName"}) }, []string{"method", "name", "duration", "insideTx", "txName"})
prometheus.MustRegister(summaryVec)
return &FilterChainBuilder{
summaryVec: summaryVec,
}
}
func (builder *FilterChainBuilder) FilterChain(next orm.Filter) orm.Filter {
return func(ctx context.Context, inv *orm.Invocation) []interface{} { return func(ctx context.Context, inv *orm.Invocation) []interface{} {
startTime := time.Now() startTime := time.Now()
res := next(ctx, inv) res := next(ctx, inv)

View File

@ -24,14 +24,15 @@ import (
"github.com/astaxie/beego/pkg/client/orm" "github.com/astaxie/beego/pkg/client/orm"
) )
func TestFilterChainBuilder_FilterChain(t *testing.T) { func TestFilterChainBuilder_FilterChain1(t *testing.T) {
builder := NewFilterChainBuilder() next := func(ctx context.Context, inv *orm.Invocation) []interface{} {
assert.NotNil(t, builder.summaryVec)
filter := builder.FilterChain(func(ctx context.Context, inv *orm.Invocation) []interface{} {
inv.Method = "coming" inv.Method = "coming"
return []interface{}{} return []interface{}{}
}) }
builder := &FilterChainBuilder{}
filter := builder.FilterChain(next)
assert.NotNil(t, builder.summaryVec)
assert.NotNil(t, filter) assert.NotNil(t, filter)
inv := &orm.Invocation{} inv := &orm.Invocation{}

View File

@ -28,8 +28,8 @@ type listTaskCommand struct {
} }
func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result { func (l *listTaskCommand) Execute(params ...interface{}) *governor.Result {
resultList := make([][]string, 0, len(AdminTaskList)) resultList := make([][]string, 0, len(globalTaskManager.adminTaskList))
for tname, tk := range AdminTaskList { for tname, tk := range globalTaskManager.adminTaskList {
result := []string{ result := []string{
template.HTMLEscapeString(tname), template.HTMLEscapeString(tname),
template.HTMLEscapeString(tk.GetSpec(nil)), template.HTMLEscapeString(tk.GetSpec(nil)),
@ -65,7 +65,7 @@ func (r *runTaskCommand) Execute(params ...interface{}) *governor.Result {
} }
} }
if t, ok := AdminTaskList[tn]; ok { if t, ok := globalTaskManager.adminTaskList[tn]; ok {
err := t.Run(context.Background()) err := t.Run(context.Background())
if err != nil { if err != nil {
return &governor.Result{ return &governor.Result{

View File

@ -31,13 +31,28 @@ type bounds struct {
names map[string]uint names map[string]uint
} }
// The bounds for each field. type taskManager struct {
var ( adminTaskList map[string]Tasker
AdminTaskList map[string]Tasker
taskLock sync.RWMutex taskLock sync.RWMutex
stop chan bool stop chan bool
changed chan bool changed chan bool
isstart bool started bool
}
func newTaskManager()*taskManager{
return &taskManager{
adminTaskList: make(map[string]Tasker),
taskLock: sync.RWMutex{},
stop: make(chan bool),
changed: make(chan bool),
started: false,
}
}
// The bounds for each field.
var (
globalTaskManager *taskManager
seconds = bounds{0, 59, nil} seconds = bounds{0, 59, nil}
minutes = bounds{0, 59, nil} minutes = bounds{0, 59, nil}
hours = bounds{0, 23, nil} hours = bounds{0, 23, nil}
@ -398,32 +413,58 @@ func dayMatches(s *Schedule, t time.Time) bool {
// StartTask start all tasks // StartTask start all tasks
func StartTask() { func StartTask() {
taskLock.Lock() globalTaskManager.StartTask()
defer taskLock.Unlock() }
if isstart {
// StopTask stop all tasks
func StopTask() {
globalTaskManager.StopTask()
}
// AddTask add task with name
func AddTask(taskName string, t Tasker) {
globalTaskManager.AddTask(taskName, t)
}
// DeleteTask delete task with name
func DeleteTask(taskName string) {
globalTaskManager.DeleteTask(taskName)
}
// ClearTask clear all tasks
func ClearTask() {
globalTaskManager.ClearTask()
}
// StartTask start all tasks
func (m *taskManager) StartTask() {
m.taskLock.Lock()
defer m.taskLock.Unlock()
if m.started {
// If already started no need to start another goroutine. // If already started no need to start another goroutine.
return return
} }
isstart = true m.started = true
registerCommands() registerCommands()
go run() go m.run()
} }
func run() { func(m *taskManager) run() {
now := time.Now().Local() now := time.Now().Local()
for _, t := range AdminTaskList { for _, t := range m.adminTaskList {
t.SetNext(nil, now) t.SetNext(nil, now)
} }
for { for {
// we only use RLock here because NewMapSorter copy the reference, do not change any thing // we only use RLock here because NewMapSorter copy the reference, do not change any thing
taskLock.RLock() m.taskLock.RLock()
sortList := NewMapSorter(AdminTaskList) sortList := NewMapSorter(m.adminTaskList)
taskLock.RUnlock() m.taskLock.RUnlock()
sortList.Sort() sortList.Sort()
var effective time.Time var effective time.Time
if len(AdminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() { if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() {
// If there are no entries yet, just sleep - it still handles new entries // If there are no entries yet, just sleep - it still handles new entries
// and stop requests. // and stop requests.
effective = now.AddDate(10, 0, 0) effective = now.AddDate(10, 0, 0)
@ -442,49 +483,84 @@ func run() {
e.SetNext(nil, effective) e.SetNext(nil, effective)
} }
continue continue
case <-changed: case <-m.changed:
now = time.Now().Local() now = time.Now().Local()
taskLock.Lock() m.taskLock.Lock()
for _, t := range AdminTaskList { for _, t := range m.adminTaskList {
t.SetNext(nil, now) t.SetNext(nil, now)
} }
taskLock.Unlock() m.taskLock.Unlock()
continue continue
case <-stop: case <-m.stop:
m.taskLock.Lock()
if m.started {
m.started = false
}
m.taskLock.Unlock()
return return
} }
} }
} }
// StopTask stop all tasks // StopTask stop all tasks
func StopTask() { func(m *taskManager) StopTask() {
taskLock.Lock() go func() {
defer taskLock.Unlock() m.stop <- true
if isstart { }()
isstart = false
stop <- true
}
} }
// AddTask add task with name // AddTask add task with name
func AddTask(taskname string, t Tasker) { func (m *taskManager)AddTask(taskname string, t Tasker) {
taskLock.Lock() isChanged := false
defer taskLock.Unlock() m.taskLock.Lock()
t.SetNext(nil, time.Now().Local()) t.SetNext(nil, time.Now().Local())
AdminTaskList[taskname] = t m.adminTaskList[taskname] = t
if isstart { if m.started {
changed <- true isChanged = true
} }
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
}
} }
// DeleteTask delete task with name // DeleteTask delete task with name
func DeleteTask(taskname string) { func(m *taskManager) DeleteTask(taskname string) {
taskLock.Lock() isChanged := false
defer taskLock.Unlock()
delete(AdminTaskList, taskname) m.taskLock.Lock()
if isstart { delete(m.adminTaskList, taskname)
changed <- true if m.started {
isChanged = true
}
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
}
}
// ClearTask clear all tasks
func(m *taskManager) ClearTask() {
isChanged := false
m.taskLock.Lock()
m.adminTaskList = make(map[string]Tasker)
if m.started {
isChanged = true
}
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
} }
} }
@ -637,7 +713,5 @@ func all(r bounds) uint64 {
} }
func init() { func init() {
AdminTaskList = make(map[string]Tasker) globalTaskManager = newTaskManager()
stop = make(chan bool)
changed = make(chan bool)
} }

View File

@ -26,6 +26,8 @@ import (
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
m := newTaskManager()
defer m.ClearTask()
tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error { tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error {
fmt.Println("hello world") fmt.Println("hello world")
return nil return nil
@ -34,24 +36,50 @@ func TestParse(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
AddTask("taska", tk) m.AddTask("taska", tk)
StartTask() m.StartTask()
time.Sleep(6 * time.Second) time.Sleep(3 * time.Second)
StopTask() m.StopTask()
}
func TestModifyTaskListAfterRunning(t *testing.T) {
m := newTaskManager()
defer m.ClearTask()
tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error {
fmt.Println("hello world")
return nil
})
err := tk.Run(nil)
if err != nil {
t.Fatal(err)
}
m.AddTask("taskb", tk)
m.StartTask()
go func() {
m.DeleteTask("taskb")
}()
go func() {
m.AddTask("taskb1", tk)
}()
time.Sleep(3 * time.Second)
m.StopTask()
} }
func TestSpec(t *testing.T) { func TestSpec(t *testing.T) {
m := newTaskManager()
defer m.ClearTask()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil }) tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil })
tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil }) tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil })
tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil }) tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil })
AddTask("tk1", tk1) m.AddTask("tk1", tk1)
AddTask("tk2", tk2) m.AddTask("tk2", tk2)
AddTask("tk3", tk3) m.AddTask("tk3", tk3)
StartTask() m.StartTask()
defer StopTask() defer m.StopTask()
select { select {
case <-time.After(200 * time.Second): case <-time.After(200 * time.Second):