diff --git a/internal/workflow/dispatcher/dispatcher.go b/internal/workflow/dispatcher/dispatcher.go index 806c2e9b..8cf598dc 100644 --- a/internal/workflow/dispatcher/dispatcher.go +++ b/internal/workflow/dispatcher/dispatcher.go @@ -232,6 +232,10 @@ func (wd *workflowDispatcher) tryExecuteAsync(task *taskInfo) { return nil }) we.OnNodeError(func(ctx context.Context, node *engine.Node, err error) error { + if errors.Is(err, engine.ErrTerminated) || errors.Is(err, engine.ErrBlocksException) { + return nil + } + log := domain.WorkflowLog{} log.WorkflowId = task.WorkflowId log.RunId = task.RunId diff --git a/internal/workflow/engine/engine.go b/internal/workflow/engine/engine.go index 9c32af7d..c7a875ab 100644 --- a/internal/workflow/engine/engine.go +++ b/internal/workflow/engine/engine.go @@ -62,7 +62,7 @@ func (we *workflowEngine) Invoke(ctx context.Context, workflowId string, runId s SetInputsManager(newInOutManager()). SetContext(ctx) if err := we.executeBlocks(wfCtx, runGraph.Nodes); err != nil { - if !errors.Is(err, errInterrupted) { + if !errors.Is(err, ErrTerminated) { we.fireOnErrorHooks(ctx, err) return err } @@ -135,7 +135,7 @@ func (we *workflowEngine) executeNode(wfCtx *WorkflowContext, node *Node) error execCtx := newNodeExecutionContext(wfCtx, node) execRes, err := executor.Execute(execCtx) - if err != nil { + if err != nil && !errors.Is(err, ErrTerminated) { we.fireOnNodeErrorHooks(wfCtx.ctx, node, err) return err } @@ -179,11 +179,15 @@ func (we *workflowEngine) executeNode(wfCtx *WorkflowContext, node *Node) error } } - if execRes.Interrupted { - return errInterrupted + if execRes.Terminated { + return ErrTerminated } } + if err != nil && errors.Is(err, ErrTerminated) { + return err + } + return nil } diff --git a/internal/workflow/engine/errors.go b/internal/workflow/engine/errors.go index 8b83d52d..e05a3996 100644 --- a/internal/workflow/engine/errors.go +++ b/internal/workflow/engine/errors.go @@ -4,4 +4,9 @@ import ( "errors" ) -var errInterrupted = errors.New("workflow engine: interrupted, may be ended") +var ( + // 表示工作流引擎执行被中断,可能已结束 + ErrTerminated = errors.New("workflow engine: execution was terminated") + // 表示工作流引擎在执行子节点时发生异常 + ErrBlocksException = errors.New("workflow engine: error occurred when executing blocks") +) diff --git a/internal/workflow/engine/executor.go b/internal/workflow/engine/executor.go index a55182bb..bbd9ae4a 100644 --- a/internal/workflow/engine/executor.go +++ b/internal/workflow/engine/executor.go @@ -69,7 +69,7 @@ func newNodeExecutionContext(wfCtx *WorkflowContext, node *Node) *NodeExecutionC type NodeExecutionResult struct { node *Node - Interrupted bool // 是否中断执行(通常由 End 节点主动触发) + Terminated bool // 是否终止执行(通常由 End 节点主动触发) variablesMtx sync.Mutex Variables []VariableState diff --git a/internal/workflow/engine/executor_condition.go b/internal/workflow/engine/executor_condition.go index 32bff15b..73bcaf3c 100644 --- a/internal/workflow/engine/executor_condition.go +++ b/internal/workflow/engine/executor_condition.go @@ -33,7 +33,7 @@ func (ne *conditionNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeEx err := engine.executeNode(execCtx.Clone(), node) if err != nil { - if errors.Is(err, errInterrupted) { + if errors.Is(err, ErrTerminated) { return execRes, err } errs = append(errs, err) @@ -41,7 +41,7 @@ func (ne *conditionNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeEx } if len(errs) > 0 { - return execRes, fmt.Errorf("error occurred when executing child nodes: %w", errors.Join(errs...)) + return execRes, fmt.Errorf("%w: %w", ErrBlocksException, errors.Join(errs...)) } return execRes, nil @@ -92,7 +92,7 @@ func (ne *branchBlockNodeExecutor) Execute(execCtx *NodeExecutionContext) (*Node panic("impossible!") } else { if err := engine.executeBlocks(execCtx.Clone(), execCtx.Node.Blocks); err != nil { - return execRes, err + return execRes, fmt.Errorf("%w: %w", ErrBlocksException, err) } } diff --git a/internal/workflow/engine/executor_end.go b/internal/workflow/engine/executor_end.go index 1a9a02fb..6d674d3a 100644 --- a/internal/workflow/engine/executor_end.go +++ b/internal/workflow/engine/executor_end.go @@ -8,9 +8,11 @@ type endNodeExecutor struct { nodeExecutor } -func (e *endNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExecutionResult, error) { +func (ne *endNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExecutionResult, error) { execRes := newNodeExecutionResult(execCtx.Node) - execRes.Interrupted = true + execRes.Terminated = true + + ne.logger.Info("the is ending") return execRes, nil } diff --git a/internal/workflow/engine/executor_start.go b/internal/workflow/engine/executor_start.go index ab0ada84..c4f93c1d 100644 --- a/internal/workflow/engine/executor_start.go +++ b/internal/workflow/engine/executor_start.go @@ -8,9 +8,11 @@ type startNodeExecutor struct { nodeExecutor } -func (e *startNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExecutionResult, error) { +func (ne *startNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExecutionResult, error) { execRes := newNodeExecutionResult(execCtx.Node) + ne.logger.Info("") + return execRes, nil } diff --git a/internal/workflow/engine/executor_trycatch.go b/internal/workflow/engine/executor_trycatch.go index 28d8844e..52eb12ce 100644 --- a/internal/workflow/engine/executor_trycatch.go +++ b/internal/workflow/engine/executor_trycatch.go @@ -33,7 +33,7 @@ func (ne *tryCatchNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExe err := engine.executeNode(execCtx.Clone(), node) if err != nil { - if errors.Is(err, errInterrupted) { + if errors.Is(err, ErrTerminated) { return execRes, err } tryErrs = append(tryErrs, err) @@ -52,18 +52,17 @@ func (ne *tryCatchNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExe err := engine.executeNode(execCtx.Clone(), node) if err != nil { - if errors.Is(err, errInterrupted) { + if errors.Is(err, ErrTerminated) { return execRes, err } catchErrs = append(catchErrs, err) } } - if len(catchErrs) > 0 { - return execRes, fmt.Errorf("error occurred when executing child nodes: %w", errors.Join(append(tryErrs, catchErrs...)...)) - } - - return execRes, fmt.Errorf("error occurred when executing child nodes: %w", errors.Join(tryErrs...)) + errs := make([]error, 0) + errs = append(errs, tryErrs...) + errs = append(errs, catchErrs...) + return execRes, fmt.Errorf("%w: %w", ErrBlocksException, errors.Join(errs...)) } return execRes, nil @@ -90,7 +89,7 @@ func (ne *tryBlockNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeExe execRes := newNodeExecutionResult(execCtx.Node) if err := engine.executeBlocks(execCtx.Clone(), execCtx.Node.Blocks); err != nil { - return execRes, err + return execRes, fmt.Errorf("%w: %w", ErrBlocksException, err) } return execRes, nil @@ -117,7 +116,7 @@ func (ne *catchBlockNodeExecutor) Execute(execCtx *NodeExecutionContext) (*NodeE } if err := engine.executeBlocks(execCtx.Clone(), execCtx.Node.Blocks); err != nil { - return execRes, err + return execRes, fmt.Errorf("%w: %w", ErrBlocksException, err) } return execRes, nil diff --git a/ui/src/components/workflow/designer/nodes/_shared.tsx b/ui/src/components/workflow/designer/nodes/_shared.tsx index 58a61f23..6a389320 100644 --- a/ui/src/components/workflow/designer/nodes/_shared.tsx +++ b/ui/src/components/workflow/designer/nodes/_shared.tsx @@ -73,11 +73,11 @@ const InternalNodeCard = ({ const isActivated = useMemo(() => nodeRenderData.activated || nodeRenderData.lineActivated, [nodeRenderData.activated, nodeRenderData.lineActivated]); const [isHovering, setIsHovering] = useState(false); - const [isInvalid, setIsInvalid] = useState(false); + const [isNodeInvalid, setIsNodeInvalid] = useState(false); const isNodeDisabled = useWatchFormValueIn(nodeRender.node, "disabled"); const formState = useWatchFormState(nodeRender.node); - useEffect(() => setIsInvalid(!!formState?.invalid), [formState?.invalid]); + useEffect(() => setIsNodeInvalid(!!formState?.invalid), [formState?.invalid]); return (