diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 566c0482..65d61ce2 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/samber/lo" + "github.com/certimate-go/certimate/internal/app" "github.com/certimate-go/certimate/internal/domain" "github.com/certimate-go/certimate/internal/repository" @@ -24,7 +26,7 @@ var maxWorkers = 1 func init() { envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS") - if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 { + if n, _ := strconv.Atoi(envMaxWorkers); n > 0 { maxWorkers = n } else { maxWorkers = runtime.GOMAXPROCS(0) @@ -125,20 +127,23 @@ func (wd *workflowDispatcher) Shutdown(ctx context.Context) error { } func (wd *workflowDispatcher) Start(ctx context.Context, runId string) error { - wd.taskMtx.Lock() - defer wd.taskMtx.Unlock() - + wd.taskMtx.RLock() if _, exists := wd.processingTasks[runId]; exists { + wd.taskMtx.RUnlock() return fmt.Errorf("workflow run %s is already processing", runId) } - for _, pendingRunId := range wd.pendingRunQueue { if pendingRunId == runId { + wd.taskMtx.RUnlock() return fmt.Errorf("workflow run %s is already in the queue", runId) } } + wd.taskMtx.RUnlock() + wd.taskMtx.Lock() wd.pendingRunQueue = append(wd.pendingRunQueue, runId) + wd.taskMtx.Unlock() + go func() { wd.tryNextAsync() }() return nil @@ -216,8 +221,11 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) { // 尝试继续执行等待队列中的任务 defer func() { + wd.taskMtx.Lock() delete(wd.processingTasks, task.RunId) - wd.tryNextAsync() + wd.taskMtx.Unlock() + + go func() { wd.tryNextAsync() }() }() // 查询运行实体,并级联更新状态 @@ -327,7 +335,7 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) { func (wd *workflowDispatcher) tryNextAsync() { wd.taskMtx.RLock() - for i, pendingRunId := range wd.pendingRunQueue { + for _, pendingRunId := range wd.pendingRunQueue { workflowRun, err := wd.workflowRunRepo.GetById(context.Background(), pendingRunId) if err != nil { wd.syslog.Error(fmt.Sprintf("failed to get workflow run #%s record", pendingRunId), slog.Any("error", err)) @@ -335,8 +343,8 @@ func (wd *workflowDispatcher) tryNextAsync() { } var hasSameWorkflowTask bool // 相同 Workflow 的任务同一时间只能有一个 Run 在执行 - for _, task := range wd.processingTasks { - if task.WorkflowId == workflowRun.WorkflowId { + for _, processingTask := range wd.processingTasks { + if processingTask.WorkflowId == workflowRun.WorkflowId { hasSameWorkflowTask = true break } @@ -348,14 +356,15 @@ func (wd *workflowDispatcher) tryNextAsync() { wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because the maximum concurrency (limit: %d) has been reached", pendingRunId, wd.concurrency)) } else { wd.taskMtx.RUnlock() - wd.taskMtx.Lock() - defer wd.taskMtx.Unlock() + wd.taskMtx.Lock() ctxRun, ctxCancel := context.WithCancel(context.Background()) task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel} - wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...) + wd.pendingRunQueue = lo.Filter(wd.pendingRunQueue, func(s string, _ int) bool { return s != pendingRunId }) wd.processingTasks[pendingRunId] = task wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) is being dispatched ...", task.RunId, task.WorkflowId)) + wd.taskMtx.Unlock() + go func() { wd.tryExecuteAsync(task) }() return } diff --git a/pkg/core/ssl-deployer/providers/volcengine-tos/volcengine_tos.go b/pkg/core/ssl-deployer/providers/volcengine-tos/volcengine_tos.go index c274b453..7c9fa6a7 100644 --- a/pkg/core/ssl-deployer/providers/volcengine-tos/volcengine_tos.go +++ b/pkg/core/ssl-deployer/providers/volcengine-tos/volcengine_tos.go @@ -88,7 +88,7 @@ func (d *SSLDeployerProvider) Deploy(ctx context.Context, certPEM string, privke } // 设置自定义域名 - // REF: https://www.volcengine.com/docs/6559/1250189 + // REF: https://www.volcengine.com/docs/6349/764779 putBucketCustomDomainReq := &tos.PutBucketCustomDomainInput{ Bucket: d.config.Bucket, Rule: tos.CustomDomainRule{