From 2c5dea509776ffebd5af07f723280f915c1ff63f Mon Sep 17 00:00:00 2001 From: Fu Diwei Date: Wed, 10 Sep 2025 12:51:50 +0800 Subject: [PATCH] chore: improve logging --- internal/workflow/dispatcher/dispatcher.go | 6 ++-- internal/workflow/event.go | 36 ++++++++++++------- internal/workflow/service.go | 7 +++- ui/src/pages/settings/SettingsDiagnostics.tsx | 4 --- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 7345f94f..7054f040 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -325,9 +325,9 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) { }) // 执行工作流 - wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was started", task.RunId, task.WorkflowId)) + wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) started", task.RunId, task.WorkflowId)) we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph) - wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was stopped", task.RunId, task.WorkflowId)) + wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) stopped", task.RunId, task.WorkflowId)) } func (wd *workflowDispatcher) tryNextAsync() { @@ -349,7 +349,7 @@ func (wd *workflowDispatcher) tryNextAsync() { } if hasSameWorkflowTask { - wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because tasks that belonging to the same workflow already exists", pendingRunId)) + wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because tasks that belonging to the same workflow #%s already exists", workflowRun.Id, workflowRun.WorkflowId)) } else if len(wd.processingTasks) >= wd.concurrency && wd.concurrency > 0 { wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because the maximum concurrency (limit: %d) has been reached", pendingRunId, wd.concurrency)) } else { diff --git a/internal/workflow/event.go b/internal/workflow/event.go index 58b1660f..81e1db22 100644 --- a/internal/workflow/event.go +++ b/internal/workflow/event.go @@ -6,6 +6,8 @@ import ( "log/slog" "github.com/pocketbase/pocketbase/core" + "github.com/pocketbase/pocketbase/tools/cron" + "github.com/samber/lo" "github.com/certimate-go/certimate/internal/app" "github.com/certimate-go/certimate/internal/domain" @@ -57,27 +59,35 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er scheduler := app.GetScheduler() // 向数据库插入/更新时,同时更新定时任务 - workflowId := record.Id + jobId := fmt.Sprintf("workflow#%s", record.Id) enabled := record.GetBool("enabled") trigger := record.GetString("trigger") + triggerCron := record.GetString("triggerCron") // 如果非定时触发或未启用,移除定时任务 if !enabled || trigger != string(domain.WorkflowTriggerTypeScheduled) { - scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId)) + scheduler.Remove(jobId) return nil } // 反之,重新添加定时任务 - err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() { - workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository(), repository.NewSettingsRepository()) - workflowSrv.StartRun(ctx, &dtos.WorkflowStartRunReq{ - WorkflowId: workflowId, - RunTrigger: domain.WorkflowTriggerTypeScheduled, + job, _ := lo.Find(scheduler.Jobs(), func(j *cron.Job) bool { return j.Id() == jobId }) + if job == nil || job.Expression() != triggerCron { + workflowId := record.Id + err := scheduler.Add(jobId, triggerCron, func() { + workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository(), repository.NewSettingsRepository()) + _, err := workflowSrv.StartRun(context.Background(), &dtos.WorkflowStartRunReq{ + WorkflowId: workflowId, + RunTrigger: domain.WorkflowTriggerTypeScheduled, + }) + if err != nil { + app.GetLogger().Warn(fmt.Sprintf("failed to start scheduled run for workflow #%s", workflowId), slog.Any("error", err)) + } }) - }) - if err != nil { - app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", record.Id), slog.Any("error", err)) - return fmt.Errorf("failed to add cron job: %w", err) + if err != nil { + app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", workflowId), slog.Any("error", err)) + return fmt.Errorf("failed to add cron job: %w", err) + } } return nil @@ -87,8 +97,8 @@ func onWorkflowRecordDelete(_ context.Context, record *core.Record) error { scheduler := app.GetScheduler() // 从数据库删除时,同时移除定时任务 - workflowId := record.Id - scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId)) + jobId := fmt.Sprintf("workflow#%s", record.Id) + scheduler.Remove(jobId) return nil } diff --git a/internal/workflow/service.go b/internal/workflow/service.go index d869a1c0..f450a28e 100644 --- a/internal/workflow/service.go +++ b/internal/workflow/service.go @@ -56,14 +56,19 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error { var errs []error err := app.GetScheduler().Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() { - s.StartRun(context.Background(), &dtos.WorkflowStartRunReq{ + _, err := s.StartRun(context.Background(), &dtos.WorkflowStartRunReq{ WorkflowId: workflow.Id, RunTrigger: domain.WorkflowTriggerTypeScheduled, }) + if err != nil { + app.GetLogger().Error(fmt.Sprintf("failed to start scheduled run for workflow #%s", workflow.Id), slog.Any("error", err)) + } }) if err != nil { app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", workflow.Id), slog.Any("error", err)) errs = append(errs, err) + } else { + app.GetLogger().Info(fmt.Sprintf("registered cron job for workflow #%s", workflow.Id), slog.String("cron", workflow.TriggerCron)) } if len(errs) > 0 { diff --git a/ui/src/pages/settings/SettingsDiagnostics.tsx b/ui/src/pages/settings/SettingsDiagnostics.tsx index e3d146e8..b693d131 100644 --- a/ui/src/pages/settings/SettingsDiagnostics.tsx +++ b/ui/src/pages/settings/SettingsDiagnostics.tsx @@ -217,10 +217,6 @@ const SettingsDiagnosticsCrons = ({ className, style }: { className?: string; st }, { refreshDeps: [page, pageSize], - throttleWait: 1000, - throttleLeading: true, - pollingInterval: 3000, - pollingWhenHidden: true, onSuccess: (res) => { setListData(res.items); setListTotal(res.totalItems);