This commit is contained in:
Fu Diwei 2025-10-24 21:01:50 +08:00
parent c6e204e00c
commit f40005ffc4
2 changed files with 22 additions and 13 deletions

View File

@ -13,6 +13,8 @@ import (
"sync"
"time"
"github.com/samber/lo"
"github.com/certimate-go/certimate/internal/app"
"github.com/certimate-go/certimate/internal/domain"
"github.com/certimate-go/certimate/internal/repository"
@ -24,7 +26,7 @@ var maxWorkers = 1
func init() {
envMaxWorkers := os.Getenv("CERTIMATE_WORKFLOW_MAX_WORKERS")
if n, err := strconv.Atoi(envMaxWorkers); err != nil && n > 0 {
if n, _ := strconv.Atoi(envMaxWorkers); n > 0 {
maxWorkers = n
} else {
maxWorkers = runtime.GOMAXPROCS(0)
@ -125,20 +127,23 @@ func (wd *workflowDispatcher) Shutdown(ctx context.Context) error {
}
func (wd *workflowDispatcher) Start(ctx context.Context, runId string) error {
wd.taskMtx.Lock()
defer wd.taskMtx.Unlock()
wd.taskMtx.RLock()
if _, exists := wd.processingTasks[runId]; exists {
wd.taskMtx.RUnlock()
return fmt.Errorf("workflow run %s is already processing", runId)
}
for _, pendingRunId := range wd.pendingRunQueue {
if pendingRunId == runId {
wd.taskMtx.RUnlock()
return fmt.Errorf("workflow run %s is already in the queue", runId)
}
}
wd.taskMtx.RUnlock()
wd.taskMtx.Lock()
wd.pendingRunQueue = append(wd.pendingRunQueue, runId)
wd.taskMtx.Unlock()
go func() { wd.tryNextAsync() }()
return nil
@ -216,8 +221,11 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) {
// 尝试继续执行等待队列中的任务
defer func() {
wd.taskMtx.Lock()
delete(wd.processingTasks, task.RunId)
wd.tryNextAsync()
wd.taskMtx.Unlock()
go func() { wd.tryNextAsync() }()
}()
// 查询运行实体,并级联更新状态
@ -327,7 +335,7 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) {
func (wd *workflowDispatcher) tryNextAsync() {
wd.taskMtx.RLock()
for i, pendingRunId := range wd.pendingRunQueue {
for _, pendingRunId := range wd.pendingRunQueue {
workflowRun, err := wd.workflowRunRepo.GetById(context.Background(), pendingRunId)
if err != nil {
wd.syslog.Error(fmt.Sprintf("failed to get workflow run #%s record", pendingRunId), slog.Any("error", err))
@ -335,8 +343,8 @@ func (wd *workflowDispatcher) tryNextAsync() {
}
var hasSameWorkflowTask bool // 相同 Workflow 的任务同一时间只能有一个 Run 在执行
for _, task := range wd.processingTasks {
if task.WorkflowId == workflowRun.WorkflowId {
for _, processingTask := range wd.processingTasks {
if processingTask.WorkflowId == workflowRun.WorkflowId {
hasSameWorkflowTask = true
break
}
@ -348,14 +356,15 @@ func (wd *workflowDispatcher) tryNextAsync() {
wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because the maximum concurrency (limit: %d) has been reached", pendingRunId, wd.concurrency))
} else {
wd.taskMtx.RUnlock()
wd.taskMtx.Lock()
defer wd.taskMtx.Unlock()
wd.taskMtx.Lock()
ctxRun, ctxCancel := context.WithCancel(context.Background())
task := &taskInfo{WorkflowId: workflowRun.WorkflowId, RunId: workflowRun.Id, ctx: ctxRun, cancel: ctxCancel}
wd.pendingRunQueue = append(wd.pendingRunQueue[:i], wd.pendingRunQueue[i+1:]...)
wd.pendingRunQueue = lo.Filter(wd.pendingRunQueue, func(s string, _ int) bool { return s != pendingRunId })
wd.processingTasks[pendingRunId] = task
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) is being dispatched ...", task.RunId, task.WorkflowId))
wd.taskMtx.Unlock()
go func() { wd.tryExecuteAsync(task) }()
return
}

View File

@ -88,7 +88,7 @@ func (d *SSLDeployerProvider) Deploy(ctx context.Context, certPEM string, privke
}
// 设置自定义域名
// REF: https://www.volcengine.com/docs/6559/1250189
// REF: https://www.volcengine.com/docs/6349/764779
putBucketCustomDomainReq := &tos.PutBucketCustomDomainInput{
Bucket: d.config.Bucket,
Rule: tos.CustomDomainRule{