diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index b53189e6..7345f94f 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -101,6 +101,26 @@ func (wd *workflowDispatcher) Bootup(ctx context.Context) error { } wd.booted = true + + ticker := time.NewTicker(1 * time.Minute) + go func() { + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 无需准确获取,不用加锁 + if len(wd.processingTasks) < wd.concurrency && len(wd.pendingRunQueue) > 0 { + wd.tryNextAsync() + } + default: + if !wd.booted { + return + } + } + } + }() + return nil } @@ -305,9 +325,9 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) { }) // 执行工作流 - wd.syslog.Info(fmt.Sprintf("workflow run #%s was started", task.RunId)) + wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was started", task.RunId, task.WorkflowId)) we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph) - wd.syslog.Info(fmt.Sprintf("workflow run #%s was stopped", task.RunId)) + wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was stopped", task.RunId, task.WorkflowId)) } func (wd *workflowDispatcher) tryNextAsync() { @@ -341,6 +361,7 @@ func (wd *workflowDispatcher) tryNextAsync() { task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel} wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...) wd.processingTasks[pendingRunId] = task + wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) is being dispatched ...", task.RunId, task.WorkflowId)) go func() { wd.tryExecuteAsync(task) }() return }