mirror of
https://github.com/certimate-go/certimate.git
synced 2026-06-22 21:05:48 +08:00
feat: workflow dispatcher compensation mechanism
This commit is contained in:
parent
586a11f91c
commit
1bb6da46f1
@ -101,6 +101,26 @@ func (wd *workflowDispatcher) Bootup(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wd.booted = true
|
wd.booted = true
|
||||||
|
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
go func() {
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
// 无需准确获取,不用加锁
|
||||||
|
if len(wd.processingTasks) < wd.concurrency && len(wd.pendingRunQueue) > 0 {
|
||||||
|
wd.tryNextAsync()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if !wd.booted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -305,9 +325,9 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// 执行工作流
|
// 执行工作流
|
||||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s was started", task.RunId))
|
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was started", task.RunId, task.WorkflowId))
|
||||||
we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph)
|
we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph)
|
||||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s was stopped", task.RunId))
|
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was stopped", task.RunId, task.WorkflowId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wd *workflowDispatcher) tryNextAsync() {
|
func (wd *workflowDispatcher) tryNextAsync() {
|
||||||
@ -341,6 +361,7 @@ func (wd *workflowDispatcher) tryNextAsync() {
|
|||||||
task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel}
|
task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel}
|
||||||
wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...)
|
wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...)
|
||||||
wd.processingTasks[pendingRunId] = task
|
wd.processingTasks[pendingRunId] = task
|
||||||
|
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) is being dispatched ...", task.RunId, task.WorkflowId))
|
||||||
go func() { wd.tryExecuteAsync(task) }()
|
go func() { wd.tryExecuteAsync(task) }()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user