feat: workflow dispatcher compensation mechanism

This commit is contained in:
Fu Diwei 2025-09-09 10:03:58 +08:00
parent 586a11f91c
commit 1bb6da46f1

View File

@ -101,6 +101,26 @@ func (wd *workflowDispatcher) Bootup(ctx context.Context) error {
}
wd.booted = true
ticker := time.NewTicker(1 * time.Minute)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 无需准确获取,不用加锁
if len(wd.processingTasks) < wd.concurrency && len(wd.pendingRunQueue) > 0 {
wd.tryNextAsync()
}
default:
if !wd.booted {
return
}
}
}
}()
return nil
}
@ -305,9 +325,9 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) {
})
// 执行工作流
wd.syslog.Info(fmt.Sprintf("workflow run #%s was started", task.RunId))
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was started", task.RunId, task.WorkflowId))
we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph)
wd.syslog.Info(fmt.Sprintf("workflow run #%s was stopped", task.RunId))
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was stopped", task.RunId, task.WorkflowId))
}
func (wd *workflowDispatcher) tryNextAsync() {
@ -341,6 +361,7 @@ func (wd *workflowDispatcher) tryNextAsync() {
task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel}
wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...)
wd.processingTasks[pendingRunId] = task
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) is being dispatched ...", task.RunId, task.WorkflowId))
go func() { wd.tryExecuteAsync(task) }()
return
}