mirror of
https://github.com/certimate-go/certimate.git
synced 2026-06-19 21:03:27 +08:00
chore: improve logging
This commit is contained in:
parent
8ac96dedff
commit
2c5dea5097
@ -325,9 +325,9 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) {
|
||||
})
|
||||
|
||||
// 执行工作流
|
||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was started", task.RunId, task.WorkflowId))
|
||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) started", task.RunId, task.WorkflowId))
|
||||
we.Invoke(task.ctx, workflowRun.WorkflowId, workflowRun.Id, workflowRun.Graph)
|
||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) was stopped", task.RunId, task.WorkflowId))
|
||||
wd.syslog.Info(fmt.Sprintf("workflow run #%s (work#%s) stopped", task.RunId, task.WorkflowId))
|
||||
}
|
||||
|
||||
func (wd *workflowDispatcher) tryNextAsync() {
|
||||
@ -349,7 +349,7 @@ func (wd *workflowDispatcher) tryNextAsync() {
|
||||
}
|
||||
|
||||
if hasSameWorkflowTask {
|
||||
wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because tasks that belonging to the same workflow already exists", pendingRunId))
|
||||
wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because tasks that belonging to the same workflow #%s already exists", workflowRun.Id, workflowRun.WorkflowId))
|
||||
} else if len(wd.processingTasks) >= wd.concurrency && wd.concurrency > 0 {
|
||||
wd.syslog.Warn(fmt.Sprintf("workflow run #%s is pending, because the maximum concurrency (limit: %d) has been reached", pendingRunId, wd.concurrency))
|
||||
} else {
|
||||
|
||||
@ -6,6 +6,8 @@ import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/pocketbase/pocketbase/core"
|
||||
"github.com/pocketbase/pocketbase/tools/cron"
|
||||
"github.com/samber/lo"
|
||||
|
||||
"github.com/certimate-go/certimate/internal/app"
|
||||
"github.com/certimate-go/certimate/internal/domain"
|
||||
@ -57,27 +59,35 @@ func onWorkflowRecordCreateOrUpdate(ctx context.Context, record *core.Record) er
|
||||
scheduler := app.GetScheduler()
|
||||
|
||||
// 向数据库插入/更新时,同时更新定时任务
|
||||
workflowId := record.Id
|
||||
jobId := fmt.Sprintf("workflow#%s", record.Id)
|
||||
enabled := record.GetBool("enabled")
|
||||
trigger := record.GetString("trigger")
|
||||
triggerCron := record.GetString("triggerCron")
|
||||
|
||||
// 如果非定时触发或未启用,移除定时任务
|
||||
if !enabled || trigger != string(domain.WorkflowTriggerTypeScheduled) {
|
||||
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
|
||||
scheduler.Remove(jobId)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 反之,重新添加定时任务
|
||||
err := scheduler.Add(fmt.Sprintf("workflow#%s", workflowId), record.GetString("triggerCron"), func() {
|
||||
workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository(), repository.NewSettingsRepository())
|
||||
workflowSrv.StartRun(ctx, &dtos.WorkflowStartRunReq{
|
||||
WorkflowId: workflowId,
|
||||
RunTrigger: domain.WorkflowTriggerTypeScheduled,
|
||||
job, _ := lo.Find(scheduler.Jobs(), func(j *cron.Job) bool { return j.Id() == jobId })
|
||||
if job == nil || job.Expression() != triggerCron {
|
||||
workflowId := record.Id
|
||||
err := scheduler.Add(jobId, triggerCron, func() {
|
||||
workflowSrv := NewWorkflowService(repository.NewWorkflowRepository(), repository.NewWorkflowRunRepository(), repository.NewSettingsRepository())
|
||||
_, err := workflowSrv.StartRun(context.Background(), &dtos.WorkflowStartRunReq{
|
||||
WorkflowId: workflowId,
|
||||
RunTrigger: domain.WorkflowTriggerTypeScheduled,
|
||||
})
|
||||
if err != nil {
|
||||
app.GetLogger().Warn(fmt.Sprintf("failed to start scheduled run for workflow #%s", workflowId), slog.Any("error", err))
|
||||
}
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", record.Id), slog.Any("error", err))
|
||||
return fmt.Errorf("failed to add cron job: %w", err)
|
||||
if err != nil {
|
||||
app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", workflowId), slog.Any("error", err))
|
||||
return fmt.Errorf("failed to add cron job: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -87,8 +97,8 @@ func onWorkflowRecordDelete(_ context.Context, record *core.Record) error {
|
||||
scheduler := app.GetScheduler()
|
||||
|
||||
// 从数据库删除时,同时移除定时任务
|
||||
workflowId := record.Id
|
||||
scheduler.Remove(fmt.Sprintf("workflow#%s", workflowId))
|
||||
jobId := fmt.Sprintf("workflow#%s", record.Id)
|
||||
scheduler.Remove(jobId)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -56,14 +56,19 @@ func (s *WorkflowService) InitSchedule(ctx context.Context) error {
|
||||
var errs []error
|
||||
|
||||
err := app.GetScheduler().Add(fmt.Sprintf("workflow#%s", workflow.Id), workflow.TriggerCron, func() {
|
||||
s.StartRun(context.Background(), &dtos.WorkflowStartRunReq{
|
||||
_, err := s.StartRun(context.Background(), &dtos.WorkflowStartRunReq{
|
||||
WorkflowId: workflow.Id,
|
||||
RunTrigger: domain.WorkflowTriggerTypeScheduled,
|
||||
})
|
||||
if err != nil {
|
||||
app.GetLogger().Error(fmt.Sprintf("failed to start scheduled run for workflow #%s", workflow.Id), slog.Any("error", err))
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
app.GetLogger().Error(fmt.Sprintf("failed to register cron job for workflow #%s", workflow.Id), slog.Any("error", err))
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
app.GetLogger().Info(fmt.Sprintf("registered cron job for workflow #%s", workflow.Id), slog.String("cron", workflow.TriggerCron))
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
|
||||
@ -217,10 +217,6 @@ const SettingsDiagnosticsCrons = ({ className, style }: { className?: string; st
|
||||
},
|
||||
{
|
||||
refreshDeps: [page, pageSize],
|
||||
throttleWait: 1000,
|
||||
throttleLeading: true,
|
||||
pollingInterval: 3000,
|
||||
pollingWhenHidden: true,
|
||||
onSuccess: (res) => {
|
||||
setListData(res.items);
|
||||
setListTotal(res.totalItems);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user