certimate/internal/workflow/engine/engine.go

307 lines
9.3 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"github.com/samber/lo"
"github.com/certimate-go/certimate/internal/app"
"github.com/certimate-go/certimate/internal/domain"
"github.com/certimate-go/certimate/internal/repository"
"github.com/certimate-go/certimate/pkg/logging"
)
type WorkflowEngine interface {
Invoke(ctx context.Context, workflowId string, runId string, graph *Graph) error
OnStart(callback func(ctx context.Context) error)
OnEnd(callback func(ctx context.Context) error)
OnError(callback func(ctx context.Context, err error) error)
OnNodeStart(callback func(ctx context.Context, node *Node) error)
OnNodeEnd(callback func(ctx context.Context, node *Node, res *NodeExecutionResult) error)
OnNodeError(callback func(ctx context.Context, node *Node, err error) error)
OnNodeLogging(callback func(ctx context.Context, node *Node, log logging.Record) error)
}
type workflowEngine struct {
logger *slog.Logger
executors map[NodeType]NodeExecutor
hooksMtx sync.RWMutex
onStartHooks [](func(ctx context.Context) error)
onEndHooks [](func(ctx context.Context) error)
onErrorHooks [](func(ctx context.Context, err error) error)
onNodeStartHooks [](func(ctx context.Context, node *Node) error)
onNodeEndHooks [](func(ctx context.Context, node *Node, res *NodeExecutionResult) error)
onNodeErrorHooks [](func(ctx context.Context, node *Node, err error) error)
onNodeLoggingHooks [](func(ctx context.Context, node *Node, log logging.Record) error)
wfoutputRepo workflowOutputRepository
}
var _ WorkflowEngine = (*workflowEngine)(nil)
func (we *workflowEngine) Invoke(ctx context.Context, workflowId string, runId string, runGraph *Graph) error {
we.fireOnStartHooks(ctx)
defer func() {
if r := recover(); r != nil {
we.fireOnErrorHooks(ctx, fmt.Errorf("workflow engine panic: %v", r))
}
}()
wfCtx := (&WorkflowContext{}).
SetExecutingWorkflow(workflowId, runId, runGraph).
SetEngine(we).
SetVariablesManager(newVariableManager()).
SetInputsManager(newInOutManager()).
SetContext(ctx)
if err := we.executeBlocks(wfCtx, runGraph.Nodes); err != nil {
if !errors.Is(err, ErrTerminated) {
we.fireOnErrorHooks(ctx, err)
return err
}
}
we.fireOnEndHooks(ctx)
return nil
}
func (we *workflowEngine) OnStart(callback func(ctx context.Context) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onStartHooks = append(we.onStartHooks, callback)
}
func (we *workflowEngine) OnEnd(callback func(ctx context.Context) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onEndHooks = append(we.onEndHooks, callback)
}
func (we *workflowEngine) OnError(callback func(ctx context.Context, err error) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onErrorHooks = append(we.onErrorHooks, callback)
}
func (we *workflowEngine) OnNodeStart(callback func(ctx context.Context, node *Node) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onNodeStartHooks = append(we.onNodeStartHooks, callback)
}
func (we *workflowEngine) OnNodeEnd(callback func(ctx context.Context, node *Node, res *NodeExecutionResult) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onNodeEndHooks = append(we.onNodeEndHooks, callback)
}
func (we *workflowEngine) OnNodeError(callback func(ctx context.Context, node *Node, err error) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onNodeErrorHooks = append(we.onNodeErrorHooks, callback)
}
func (we *workflowEngine) OnNodeLogging(callback func(ctx context.Context, node *Node, log logging.Record) error) {
we.hooksMtx.Lock()
defer we.hooksMtx.Unlock()
we.onNodeLoggingHooks = append(we.onNodeLoggingHooks, callback)
}
func (we *workflowEngine) executeNode(wfCtx *WorkflowContext, node *Node) error {
executor, ok := we.executors[node.Type]
if !ok {
err := fmt.Errorf("workflow engine: no executor registered for node type: '%s'", node.Type)
return err
} else {
logger := slog.New(logging.NewHookHandler(&logging.HookHandlerOptions{
Level: slog.LevelDebug,
WriteFunc: func(ctx context.Context, record *logging.Record) error {
we.fireOnNodeLoggingHooks(ctx, node, *record)
return nil
},
}))
executor.SetLogger(logger)
}
we.fireOnNodeStartHooks(wfCtx.ctx, node)
execCtx := newNodeExecutionContext(wfCtx, node)
execRes, err := executor.Execute(execCtx)
if err != nil && !errors.Is(err, ErrTerminated) {
we.fireOnNodeErrorHooks(wfCtx.ctx, node, err)
return err
}
we.fireOnNodeEndHooks(wfCtx.ctx, node, execRes)
if execRes != nil {
if execRes.Variables != nil {
for _, variable := range execRes.Variables {
wfCtx.variables.Add(variable)
}
}
if execRes.Outputs != nil {
for _, output := range execRes.Outputs {
wfCtx.inputs.Add(output)
}
}
execOutputs := lo.Filter(execRes.Outputs, func(state InOutState, _ int) bool { return state.Persistent })
if execRes.outputForced || len(execOutputs) > 0 {
output := &domain.WorkflowOutput{
WorkflowId: execCtx.WorkflowId,
RunId: execCtx.RunId,
NodeId: execCtx.Node.Id,
NodeConfig: execCtx.Node.Data.Config,
Succeeded: true, // 目前恒为 true
}
if len(execOutputs) > 0 {
output.Outputs = lo.Map(execOutputs, func(state InOutState, _ int) *domain.WorkflowOutputEntry {
return &domain.WorkflowOutputEntry{
Name: state.Name,
Type: state.Type,
Value: state.ValueString(),
ValueType: state.ValueType,
}
})
}
if _, err := we.wfoutputRepo.Save(execCtx.ctx, output); err != nil {
we.logger.Warn("failed to save node output")
}
}
if execRes.Terminated {
return ErrTerminated
}
}
if err != nil && errors.Is(err, ErrTerminated) {
return err
}
return nil
}
func (we *workflowEngine) executeBlocks(wfCtx *WorkflowContext, blocks []*Node) error {
for _, node := range blocks {
select {
case <-wfCtx.ctx.Done():
return wfCtx.ctx.Err()
default:
}
// 节点已禁用,直接跳过执行
if node.Data.Disabled {
continue
}
err := we.executeNode(wfCtx, node)
if err != nil {
return err
}
}
return nil
}
func (we *workflowEngine) fireOnStartHooks(ctx context.Context) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onStartHooks {
if cbErr := cb(ctx); cbErr != nil {
we.logger.Error("workflow engine: error in onStart hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnEndHooks(ctx context.Context) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onEndHooks {
if cbErr := cb(ctx); cbErr != nil {
we.logger.Error("workflow engine: error in onEnd hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnErrorHooks(ctx context.Context, err error) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onErrorHooks {
if cbErr := cb(ctx, err); cbErr != nil {
we.logger.Error("workflow engine: error in onError hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnNodeStartHooks(ctx context.Context, node *Node) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onNodeStartHooks {
if cbErr := cb(ctx, node); cbErr != nil {
we.logger.Error("workflow engine: error in onNodeStart hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnNodeEndHooks(ctx context.Context, node *Node, result *NodeExecutionResult) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onNodeEndHooks {
if cbErr := cb(ctx, node, result); cbErr != nil {
we.logger.Error("workflow engine: error in onNodeEnd hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnNodeErrorHooks(ctx context.Context, node *Node, err error) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onNodeErrorHooks {
if cbErr := cb(ctx, node, err); cbErr != nil {
we.logger.Error("workflow engine: error in onNodeError hook", slog.Any("error", cbErr))
}
}
}
func (we *workflowEngine) fireOnNodeLoggingHooks(ctx context.Context, node *Node, log logging.Record) {
we.hooksMtx.RLock()
defer we.hooksMtx.RUnlock()
for _, cb := range we.onNodeLoggingHooks {
if cbErr := cb(ctx, node, log); cbErr != nil {
we.logger.Error("workflow engine: error in onNodeLogging hook", slog.Any("error", cbErr))
}
}
}
func NewWorkflowEngine() WorkflowEngine {
engine := &workflowEngine{
logger: app.GetLogger(),
executors: make(map[NodeType]NodeExecutor),
wfoutputRepo: repository.NewWorkflowOutputRepository(),
}
engine.executors[NodeTypeStart] = newStartNodeExecutor()
engine.executors[NodeTypeEnd] = newEndNodeExecutor()
engine.executors[NodeTypeDelay] = newDelayNodeExecutor()
engine.executors[NodeTypeCondition] = newConditionNodeExecutor()
engine.executors[NodeTypeBranchBlock] = newBranchBlockNodeExecutor()
engine.executors[NodeTypeTryCatch] = newTryCatchNodeExecutor()
engine.executors[NodeTypeTryBlock] = newTryBlockNodeExecutor()
engine.executors[NodeTypeCatchBlock] = newCatchBlockNodeExecutor()
engine.executors[NodeTypeBizApply] = newBizApplyNodeExecutor()
engine.executors[NodeTypeBizUpload] = newBizUploadNodeExecutor()
engine.executors[NodeTypeBizMonitor] = newBizMonitorNodeExecutor()
engine.executors[NodeTypeBizDeploy] = newBizDeployNodeExecutor()
engine.executors[NodeTypeBizNotify] = newBizNotifyNodeExecutor()
return engine
}