♻️ Simplify walk the flow logic in bot engine (#2127)

This commit is contained in:
Baptiste Arnaud 2025-04-15 17:29:17 +02:00 committed by GitHub
parent 786accdf24
commit 81ef7dfdff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 735 additions and 689 deletions

View File

@ -75,7 +75,6 @@ export const PexelsPicker = ({ onVideoSelect }: Props) => {
...((result as Videos)?.videos ?? []),
]);
console.log(videos[0]);
setNextPage((page) => page + 1);
}
} catch (err) {

View File

@ -66,7 +66,6 @@ export const continueChat = async ({
} = await continueBotFlow(message, {
version: 2,
state: session.state,
startTime: Date.now(),
textBubbleContentFormat,
sessionStore,
});

View File

@ -48,8 +48,7 @@ import { parseTime } from "./blocks/inputs/time/parseTime";
import { saveDataInResponseVariableMapping } from "./blocks/integrations/httpRequest/saveDataInResponseVariableMapping";
import { resumeChatCompletion } from "./blocks/integrations/legacy/openai/resumeChatCompletion";
import { executeCommandEvent } from "./events/executeCommandEvent";
import { executeGroup, parseInput } from "./executeGroup";
import { getNextGroup } from "./getNextGroup";
import { formatInputForChatResponse } from "./formatInputForChatResponse";
import { isInputMessage } from "./helpers/isInputMessage";
import { saveAnswer } from "./queries/saveAnswer";
import { resetSessionState } from "./resetSessionState";
@ -59,25 +58,24 @@ import type {
Message,
} from "./schemas/api";
import { startBotFlow } from "./startBotFlow";
import type { ParsedReply, SkipReply, SuccessReply } from "./types";
import type {
ContinueBotFlowResponse,
ParsedReply,
SkipReply,
SuccessReply,
} from "./types";
import { updateVariablesInSession } from "./updateVariablesInSession";
export type ContinueBotFlowResponse = ContinueChatResponse & {
newSessionState: SessionState;
visitedEdges: Prisma.VisitedEdge[];
setVariableHistory: SetVariableHistoryItem[];
};
import { walkFlowForward } from "./walkFlowForward";
type Params = {
version: 1 | 2;
state: SessionState;
startTime?: number;
textBubbleContentFormat: "richText" | "markdown";
sessionStore: SessionStore;
};
export const continueBotFlow = async (
reply: Message | undefined,
{ state, version, startTime, textBubbleContentFormat, sessionStore }: Params,
{ state, version, textBubbleContentFormat, sessionStore }: Params,
): Promise<ContinueBotFlowResponse> => {
if (!state.currentBlockId)
return startBotFlow({
@ -164,16 +162,11 @@ export const continueBotFlow = async (
continueReply = parsedReplyResult;
}
const groupHasMoreBlocks = blockIndex < group.blocks.length - 1;
const { edgeId: nextEdgeId, isOffDefaultPath } = getOutgoingEdgeId(
continueReply,
{
block,
state: newSessionState,
sessionStore,
},
);
const nextEdge = getReplyOutgoingEdge(continueReply, {
block,
state: newSessionState,
sessionStore,
});
const content =
continueReply && "content" in continueReply
@ -182,33 +175,13 @@ export const continueBotFlow = async (
const lastMessageNewFormat =
reply?.type === "text" && content !== reply?.text ? content : undefined;
if (groupHasMoreBlocks && !nextEdgeId) {
const chatReply = await executeGroup(
{
...group,
blocks: group.blocks.slice(blockIndex + 1),
} as Group,
{
version,
state: newSessionState,
visitedEdges: [],
setVariableHistory,
firstBubbleWasStreamed,
startTime,
textBubbleContentFormat,
sessionStore,
},
);
return {
...chatReply,
lastMessageNewFormat,
};
}
const groupHasMoreBlocks = blockIndex < group.blocks.length - 1;
if (
!nextEdgeId &&
newSessionState.typebotsQueue.length === 1 &&
(newSessionState.typebotsQueue[0].queuedEdgeIds ?? []).length === 0
!nextEdge &&
!groupHasMoreBlocks &&
(newSessionState.typebotsQueue[0].queuedEdgeIds ?? []).length === 0 &&
newSessionState.typebotsQueue.length === 1
)
return {
messages: [],
@ -218,36 +191,36 @@ export const continueBotFlow = async (
setVariableHistory,
};
const nextGroup = await getNextGroup({
state: newSessionState,
edgeId: nextEdgeId,
isOffDefaultPath,
});
newSessionState = nextGroup.newSessionState;
if (!nextGroup.group)
return {
messages: [],
newSessionState,
lastMessageNewFormat,
visitedEdges: nextGroup.visitedEdge ? [nextGroup.visitedEdge] : [],
setVariableHistory,
};
const chatReply = await executeGroup(nextGroup.group, {
const walkStartingPoint =
groupHasMoreBlocks && !nextEdge
? {
type: "group" as const,
group: {
...group,
blocks: group.blocks.slice(blockIndex + 1),
} as Group,
}
: {
type: "nextEdge" as const,
nextEdge,
};
const executionResponse = await walkFlowForward(walkStartingPoint, {
version,
state: newSessionState,
firstBubbleWasStreamed,
visitedEdges: nextGroup.visitedEdge ? [nextGroup.visitedEdge] : [],
setVariableHistory,
startTime,
skipFirstMessageBubble: firstBubbleWasStreamed,
textBubbleContentFormat,
sessionStore,
});
return {
...chatReply,
messages: executionResponse.messages,
input: executionResponse.input,
clientSideActions: executionResponse.clientSideActions,
logs: executionResponse.logs,
newSessionState: executionResponse.newSessionState,
visitedEdges: executionResponse.visitedEdges,
setVariableHistory: executionResponse.setVariableHistory,
lastMessageNewFormat,
};
};
@ -558,7 +531,7 @@ const parseRetryMessage = async (
},
},
],
input: await parseInput(block, { state, sessionStore }),
input: await formatInputForChatResponse(block, { state, sessionStore }),
};
};
@ -670,7 +643,7 @@ const setNewAnswerInState =
} satisfies SessionState;
};
const getOutgoingEdgeId = (
const getReplyOutgoingEdge = (
reply: SuccessReply | SkipReply | undefined,
{
block,
@ -681,11 +654,15 @@ const getOutgoingEdgeId = (
state: SessionState;
sessionStore: SessionStore;
},
): { edgeId: string | undefined; isOffDefaultPath: boolean } => {
): { id: string; isOffDefaultPath: boolean } | undefined => {
if (!reply || reply.status === "skip")
return { edgeId: block.outgoingEdgeId, isOffDefaultPath: false };
return block.outgoingEdgeId
? { id: block.outgoingEdgeId, isOffDefaultPath: false }
: undefined;
if (reply.outgoingEdgeId)
return { edgeId: reply.outgoingEdgeId, isOffDefaultPath: true };
return reply.outgoingEdgeId
? { id: reply.outgoingEdgeId, isOffDefaultPath: true }
: undefined;
const variables = state.typebotsQueue[0].typebot.variables;
if (
block.type === InputBlockType.CHOICE &&
@ -703,7 +680,7 @@ const getOutgoingEdgeId = (
}).normalize() === reply.content.normalize(),
);
if (matchedItem?.outgoingEdgeId)
return { edgeId: matchedItem.outgoingEdgeId, isOffDefaultPath: true };
return { id: matchedItem.outgoingEdgeId, isOffDefaultPath: true };
}
if (
block.type === InputBlockType.PICTURE_CHOICE &&
@ -719,9 +696,11 @@ const getOutgoingEdgeId = (
reply.content.normalize(),
);
if (matchedItem?.outgoingEdgeId)
return { edgeId: matchedItem.outgoingEdgeId, isOffDefaultPath: true };
return { id: matchedItem.outgoingEdgeId, isOffDefaultPath: true };
}
return { edgeId: block.outgoingEdgeId, isOffDefaultPath: false };
return block.outgoingEdgeId
? { id: block.outgoingEdgeId, isOffDefaultPath: false }
: undefined;
};
const parseReply = async (

View File

@ -3,9 +3,9 @@ import type { SessionState } from "@typebot.io/chat-session/schemas";
import { EventType } from "@typebot.io/events/constants";
import type { CommandEvent } from "@typebot.io/events/schemas";
import { getBlockById } from "@typebot.io/groups/helpers/getBlockById";
import { byId } from "@typebot.io/lib/utils";
import { addBlockToTypebotIfMissing } from "../addBlockToTypebotIfMissing";
import { addPortalEdge } from "../addPortalEdge";
import { getNextGroup } from "../getNextGroup";
type Props = {
state: SessionState;
@ -53,23 +53,29 @@ export const executeCommandEvent = async ({
],
};
}
const response = await getNextGroup({
state: newSessionState,
edgeId: event.outgoingEdgeId,
isOffDefaultPath: false,
});
newSessionState = response.newSessionState;
if (!response.group)
const nextEdge = newSessionState.typebotsQueue[0].typebot.edges.find(
byId(event.outgoingEdgeId),
);
if (!nextEdge)
throw new TRPCError({
code: "BAD_REQUEST",
message: "Command event doesn't have a connected edge",
});
const nextGroup = newSessionState.typebotsQueue[0].typebot.groups.find(
byId(nextEdge.to.groupId),
);
if (!nextGroup)
throw new TRPCError({
code: "BAD_REQUEST",
message: "Command event doesn't have a connected group",
});
const nextBlockIndex = nextGroup.blocks.findIndex(byId(nextEdge.to.blockId));
newSessionState = addBlockToTypebotIfMissing(
`virtual-${event.id}-block`,
newSessionState,
{
groupId: response.group.id,
index: 0,
groupId: nextGroup.id,
index: nextBlockIndex !== -1 ? nextBlockIndex : 0,
},
);
return {

View File

@ -1,385 +0,0 @@
import { createId } from "@paralleldrive/cuid2";
import { TRPCError } from "@trpc/server";
import { BubbleBlockType } from "@typebot.io/blocks-bubbles/constants";
import {
isBubbleBlock,
isInputBlock,
isIntegrationBlock,
isLogicBlock,
} from "@typebot.io/blocks-core/helpers";
import { InputBlockType } from "@typebot.io/blocks-inputs/constants";
import type { InputBlock } from "@typebot.io/blocks-inputs/schema";
import type { SessionState } from "@typebot.io/chat-session/schemas";
import { env } from "@typebot.io/env";
import type { Group } from "@typebot.io/groups/schemas";
import type { Prisma } from "@typebot.io/prisma/types";
import type { SessionStore } from "@typebot.io/runtime-session-store";
import { deepParseVariables } from "@typebot.io/variables/deepParseVariables";
import type { SetVariableHistoryItem } from "@typebot.io/variables/schemas";
import { injectVariableValuesInCardsBlock } from "./blocks/cards/injectVariableValuesInCardsBlock";
import { injectVariableValuesInButtonsInputBlock } from "./blocks/inputs/buttons/injectVariableValuesInButtonsInputBlock";
import { parseDateInput } from "./blocks/inputs/date/parseDateInput";
import { computePaymentInputRuntimeOptions } from "./blocks/inputs/payment/computePaymentInputRuntimeOptions";
import { injectVariableValuesInPictureChoiceBlock } from "./blocks/inputs/pictureChoice/injectVariableValuesInPictureChoiceBlock";
import { executeIntegration } from "./executeIntegration";
import { executeLogic } from "./executeLogic";
import { getNextGroup } from "./getNextGroup";
import { getPrefilledInputValue } from "./getPrefilledValue";
import {
type BubbleBlockWithDefinedContent,
parseBubbleBlock,
} from "./parseBubbleBlock";
import type { ContinueChatResponse, RuntimeOptions } from "./schemas/api";
import type { ExecuteIntegrationResponse, ExecuteLogicResponse } from "./types";
type ContextProps = {
version: 1 | 2;
state: SessionState;
sessionStore: SessionStore;
currentReply?: ContinueChatResponse;
currentLastBubbleId?: string;
firstBubbleWasStreamed?: boolean;
visitedEdges: Prisma.VisitedEdge[];
setVariableHistory: SetVariableHistoryItem[];
startTime?: number;
textBubbleContentFormat: "richText" | "markdown";
};
export const executeGroup = async (
group: Group,
{
version,
state,
sessionStore,
visitedEdges,
setVariableHistory,
currentReply,
currentLastBubbleId,
firstBubbleWasStreamed,
startTime,
textBubbleContentFormat,
}: ContextProps,
): Promise<
ContinueChatResponse & {
newSessionState: SessionState;
setVariableHistory: SetVariableHistoryItem[];
visitedEdges: Prisma.VisitedEdge[];
}
> => {
let newStartTime = startTime;
const messages: ContinueChatResponse["messages"] =
currentReply?.messages ?? [];
let clientSideActions: ContinueChatResponse["clientSideActions"] =
currentReply?.clientSideActions;
let logs: ContinueChatResponse["logs"] = currentReply?.logs;
let nextEdgeId = null;
let lastBubbleBlockId: string | undefined = currentLastBubbleId;
let newSessionState = state;
let isNextEdgeOffDefaultPath = false;
let index = -1;
for (const block of group.blocks) {
if (
newStartTime &&
env.CHAT_API_TIMEOUT &&
Date.now() - newStartTime > env.CHAT_API_TIMEOUT
) {
throw new TRPCError({
code: "TIMEOUT",
message: `${env.CHAT_API_TIMEOUT / 1000} seconds timeout reached`,
});
}
index++;
nextEdgeId = block.outgoingEdgeId;
if (isBubbleBlock(block)) {
if (!block.content || (firstBubbleWasStreamed && index === 0)) continue;
const message = parseBubbleBlock(block as BubbleBlockWithDefinedContent, {
version,
variables: newSessionState.typebotsQueue[0].typebot.variables,
typebotVersion: newSessionState.typebotsQueue[0].typebot.version,
textBubbleContentFormat,
sessionStore,
});
messages.push(message);
if (
message.type === BubbleBlockType.EMBED &&
message.content.waitForEvent?.isEnabled
) {
return {
messages,
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
}
lastBubbleBlockId = block.id;
continue;
}
if (isInputBlock(block))
return {
messages,
input: await parseInput(block, {
state: newSessionState,
sessionStore,
}),
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
const executionResponse = (
isLogicBlock(block)
? await executeLogic({
block,
state: newSessionState,
setVariableHistory,
sessionStore,
})
: isIntegrationBlock(block)
? await executeIntegration({
block,
state: newSessionState,
sessionStore,
})
: null
) as ExecuteLogicResponse | ExecuteIntegrationResponse | null;
if (!executionResponse) continue;
if (
executionResponse.newSetVariableHistory &&
executionResponse.newSetVariableHistory?.length > 0
) {
if (!newSessionState.typebotsQueue[0].resultId)
newSessionState = {
...newSessionState,
previewMetadata: {
...newSessionState.previewMetadata,
setVariableHistory: (
newSessionState.previewMetadata?.setVariableHistory ?? []
).concat(
executionResponse.newSetVariableHistory.map((item) => ({
blockId: item.blockId,
variableId: item.variableId,
value: item.value,
})),
),
},
};
else setVariableHistory.push(...executionResponse.newSetVariableHistory);
}
if (
"startTimeShouldBeUpdated" in executionResponse &&
executionResponse.startTimeShouldBeUpdated
)
newStartTime = Date.now();
if (executionResponse.logs)
logs = [...(logs ?? []), ...executionResponse.logs];
if (executionResponse.newSessionState)
newSessionState = executionResponse.newSessionState;
if (
"clientSideActions" in executionResponse &&
executionResponse.clientSideActions
) {
clientSideActions = [
...(clientSideActions ?? []),
...executionResponse.clientSideActions.map((action) => ({
...action,
lastBubbleBlockId,
})),
];
if (
"customEmbedBubble" in executionResponse &&
executionResponse.customEmbedBubble
) {
messages.push({
id: createId(),
...executionResponse.customEmbedBubble,
});
}
if (
executionResponse.clientSideActions?.find(
(action) => action.expectsDedicatedReply,
) ||
("customEmbedBubble" in executionResponse &&
executionResponse.customEmbedBubble)
) {
return {
messages,
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
}
}
if (executionResponse.outgoingEdgeId) {
isNextEdgeOffDefaultPath =
block.outgoingEdgeId !== executionResponse.outgoingEdgeId;
nextEdgeId = executionResponse.outgoingEdgeId;
break;
}
}
if (
!nextEdgeId &&
newSessionState.typebotsQueue.length === 1 &&
(newSessionState.typebotsQueue[0].queuedEdgeIds ?? []).length === 0
)
return {
messages,
newSessionState,
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
const nextGroup = await getNextGroup({
state: newSessionState,
edgeId: nextEdgeId ?? undefined,
isOffDefaultPath: isNextEdgeOffDefaultPath,
});
newSessionState = nextGroup.newSessionState;
if (nextGroup.visitedEdge) visitedEdges.push(nextGroup.visitedEdge);
if (!nextGroup.group) {
return {
messages,
newSessionState,
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
}
return executeGroup(nextGroup.group, {
version,
state: newSessionState,
sessionStore,
visitedEdges,
setVariableHistory,
currentReply: {
messages,
clientSideActions,
logs,
},
currentLastBubbleId: lastBubbleBlockId,
startTime: newStartTime,
textBubbleContentFormat,
});
};
const computeRuntimeOptions = (
block: InputBlock,
{ sessionStore, state }: { sessionStore: SessionStore; state: SessionState },
): Promise<RuntimeOptions> | undefined => {
switch (block.type) {
case InputBlockType.PAYMENT: {
return computePaymentInputRuntimeOptions(block.options, {
sessionStore,
state,
});
}
}
};
export const parseInput = async (
block: InputBlock,
{ state, sessionStore }: { state: SessionState; sessionStore: SessionStore },
): Promise<ContinueChatResponse["input"]> => {
switch (block.type) {
case InputBlockType.CHOICE: {
return injectVariableValuesInButtonsInputBlock(block, {
state,
sessionStore,
});
}
case InputBlockType.PICTURE_CHOICE: {
return injectVariableValuesInPictureChoiceBlock(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
case InputBlockType.NUMBER: {
return deepParseVariables(
{
...block,
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
case InputBlockType.DATE: {
return parseDateInput(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
case InputBlockType.RATING: {
return deepParseVariables(
{
...block,
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
case InputBlockType.CARDS: {
return injectVariableValuesInCardsBlock(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
default: {
return deepParseVariables(
{
...block,
runtimeOptions: await computeRuntimeOptions(block, {
sessionStore,
state,
}),
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
}
};

View File

@ -244,6 +244,3 @@ const getNextBlock =
)
: connectedGroup?.blocks.at(0);
};
const isCredentialsV2 = (credentials: { iv: string }) =>
credentials.iv.length === 24;

View File

@ -0,0 +1,104 @@
import { InputBlockType } from "@typebot.io/blocks-inputs/constants";
import type { InputBlock } from "@typebot.io/blocks-inputs/schema";
import type { SessionState } from "@typebot.io/chat-session/schemas";
import type { SessionStore } from "@typebot.io/runtime-session-store";
import { deepParseVariables } from "@typebot.io/variables/deepParseVariables";
import { injectVariableValuesInCardsBlock } from "./blocks/cards/injectVariableValuesInCardsBlock";
import { injectVariableValuesInButtonsInputBlock } from "./blocks/inputs/buttons/injectVariableValuesInButtonsInputBlock";
import { parseDateInput } from "./blocks/inputs/date/parseDateInput";
import { computePaymentInputRuntimeOptions } from "./blocks/inputs/payment/computePaymentInputRuntimeOptions";
import { injectVariableValuesInPictureChoiceBlock } from "./blocks/inputs/pictureChoice/injectVariableValuesInPictureChoiceBlock";
import { getPrefilledInputValue } from "./getPrefilledValue";
import type { ContinueChatResponse, RuntimeOptions } from "./schemas/api";
export const formatInputForChatResponse = async (
block: InputBlock,
{ state, sessionStore }: { state: SessionState; sessionStore: SessionStore },
): Promise<ContinueChatResponse["input"]> => {
switch (block.type) {
case InputBlockType.CHOICE: {
return injectVariableValuesInButtonsInputBlock(block, {
state,
sessionStore,
});
}
case InputBlockType.PICTURE_CHOICE: {
return injectVariableValuesInPictureChoiceBlock(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
case InputBlockType.NUMBER: {
return deepParseVariables(
{
...block,
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
case InputBlockType.DATE: {
return parseDateInput(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
case InputBlockType.RATING: {
return deepParseVariables(
{
...block,
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
case InputBlockType.CARDS: {
return injectVariableValuesInCardsBlock(block, {
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
});
}
default: {
return deepParseVariables(
{
...block,
runtimeOptions: await computeRuntimeOptions(block, {
sessionStore,
state,
}),
prefilledValue: getPrefilledInputValue(
state.typebotsQueue[0].typebot.variables,
)(block),
},
{
variables: state.typebotsQueue[0].typebot.variables,
sessionStore,
},
);
}
}
};
const computeRuntimeOptions = (
block: InputBlock,
{ sessionStore, state }: { sessionStore: SessionStore; state: SessionState },
): Promise<RuntimeOptions> | undefined => {
switch (block.type) {
case InputBlockType.PAYMENT: {
return computePaymentInputRuntimeOptions(block.options, {
sessionStore,
state,
});
}
}
};

View File

@ -1,162 +0,0 @@
import type { SessionState } from "@typebot.io/chat-session/schemas";
import type { Group } from "@typebot.io/groups/schemas";
import { byId, isDefined, isNotDefined } from "@typebot.io/lib/utils";
import type { Prisma } from "@typebot.io/prisma/types";
import type { VariableWithValue } from "@typebot.io/variables/schemas";
import { upsertResult } from "./queries/upsertResult";
export type NextGroup = {
group?: Group;
newSessionState: SessionState;
visitedEdge?: Prisma.VisitedEdge;
};
export const getNextGroup = async ({
state,
edgeId,
isOffDefaultPath,
}: {
state: SessionState;
edgeId?: string;
isOffDefaultPath: boolean;
}): Promise<NextGroup> => {
const nextEdge = state.typebotsQueue[0].typebot.edges.find(byId(edgeId));
if (!nextEdge) {
const nextEdgeResponse = popQueuedEdge(state);
let newSessionState = nextEdgeResponse.state;
if (newSessionState.typebotsQueue.length > 1) {
const isMergingWithParent =
newSessionState.typebotsQueue[0].isMergingWithParent;
const currentResultId = newSessionState.typebotsQueue[0].resultId;
if (!isMergingWithParent && currentResultId)
await upsertResult({
resultId: currentResultId,
typebot: newSessionState.typebotsQueue[0].typebot,
isCompleted: true,
hasStarted: newSessionState.typebotsQueue[0].answers.length > 0,
});
newSessionState = {
...newSessionState,
typebotsQueue: [
{
...newSessionState.typebotsQueue[1],
typebot: isMergingWithParent
? {
...newSessionState.typebotsQueue[1].typebot,
variables: newSessionState.typebotsQueue[1].typebot.variables
.map((variable) => ({
...variable,
value:
newSessionState.typebotsQueue[0].typebot.variables.find(
(v) => v.name === variable.name,
)?.value ?? variable.value,
}))
.concat(
newSessionState.typebotsQueue[0].typebot.variables.filter(
(variable) =>
isDefined(variable.value) &&
isNotDefined(
newSessionState.typebotsQueue[1].typebot.variables.find(
(v) => v.name === variable.name,
),
),
) as VariableWithValue[],
),
}
: newSessionState.typebotsQueue[1].typebot,
answers: isMergingWithParent
? [
...newSessionState.typebotsQueue[1].answers.filter(
(incomingAnswer) =>
!newSessionState.typebotsQueue[0].answers.find(
(currentAnswer) =>
currentAnswer.key === incomingAnswer.key,
),
),
...newSessionState.typebotsQueue[0].answers,
]
: newSessionState.typebotsQueue[1].answers,
},
...newSessionState.typebotsQueue.slice(2),
],
} satisfies SessionState;
if (newSessionState.progressMetadata)
newSessionState.progressMetadata = {
...newSessionState.progressMetadata,
totalAnswers:
newSessionState.progressMetadata.totalAnswers +
newSessionState.typebotsQueue[0].answers.length,
};
}
if (nextEdgeResponse.edgeId)
return getNextGroup({
state: newSessionState,
edgeId: nextEdgeResponse.edgeId,
isOffDefaultPath,
});
return {
newSessionState,
};
}
const nextGroup = state.typebotsQueue[0].typebot.groups.find(
byId(nextEdge.to.groupId),
);
if (!nextGroup)
return {
newSessionState: state,
};
const startBlockIndex = nextEdge.to.blockId
? nextGroup.blocks.findIndex(byId(nextEdge.to.blockId))
: 0;
const currentVisitedEdgeIndex = isOffDefaultPath
? (state.currentVisitedEdgeIndex ?? -1) + 1
: state.currentVisitedEdgeIndex;
const resultId = state.typebotsQueue[0].resultId;
return {
group: {
...nextGroup,
blocks: nextGroup.blocks.slice(startBlockIndex),
} as Group,
newSessionState: {
...state,
currentVisitedEdgeIndex,
previewMetadata:
resultId || !isOffDefaultPath
? state.previewMetadata
: {
...state.previewMetadata,
visitedEdges: (state.previewMetadata?.visitedEdges ?? []).concat(
nextEdge.id,
),
},
},
visitedEdge:
resultId && isOffDefaultPath && !nextEdge.id.startsWith("virtual-")
? {
index: currentVisitedEdgeIndex as number,
edgeId: nextEdge.id,
resultId,
}
: undefined,
};
};
const popQueuedEdge = (
state: SessionState,
): { edgeId?: string; state: SessionState } => {
const edgeId = state.typebotsQueue[0].queuedEdgeIds?.[0];
if (!edgeId) return { state };
return {
edgeId,
state: {
...state,
typebotsQueue: [
{
...state.typebotsQueue[0],
queuedEdgeIds: state.typebotsQueue[0].queuedEdgeIds?.slice(1),
},
...state.typebotsQueue.slice(1),
],
},
};
};

View File

@ -1,27 +1,21 @@
import { TRPCError } from "@trpc/server";
import type { SessionState } from "@typebot.io/chat-session/schemas";
import type { Group } from "@typebot.io/groups/schemas";
import type { Prisma } from "@typebot.io/prisma/types";
import type { SessionStore } from "@typebot.io/runtime-session-store";
import type { SetVariableHistoryItem } from "@typebot.io/variables/schemas";
import { continueBotFlow } from "./continueBotFlow";
import { executeGroup } from "./executeGroup";
import { getFirstEdgeId } from "./getFirstEdgeId";
import { getNextGroup } from "./getNextGroup";
import { upsertResult } from "./queries/upsertResult";
import type { ContinueChatResponse, Message, StartFrom } from "./schemas/api";
type ChatReply = ContinueChatResponse & {
newSessionState: SessionState;
visitedEdges: Prisma.VisitedEdge[];
setVariableHistory: SetVariableHistoryItem[];
};
import type { Message, StartFrom } from "./schemas/api";
import type { ContinueBotFlowResponse } from "./types";
import { type WalkFlowStartingPoint, walkFlowForward } from "./walkFlowForward";
type Props = {
version: 1 | 2;
message: Message | undefined;
state: SessionState;
startFrom?: StartFrom;
startTime?: number;
textBubbleContentFormat: "richText" | "markdown";
sessionStore: SessionStore;
};
@ -32,57 +26,27 @@ export const startBotFlow = async ({
state,
sessionStore,
startFrom,
startTime,
textBubbleContentFormat,
}: Props): Promise<ChatReply> => {
let newSessionState = state;
const visitedEdges: Prisma.VisitedEdge[] = [];
}: Props): Promise<ContinueBotFlowResponse> => {
const newSessionState = state;
const setVariableHistory: SetVariableHistoryItem[] = [];
if (startFrom?.type === "group") {
const group = state.typebotsQueue[0]?.typebot.groups.find(
(group) => group.id === startFrom.groupId,
);
if (!group)
throw new TRPCError({
code: "BAD_REQUEST",
message: "Start group doesn't exist",
});
return executeGroup(group, {
version,
state: newSessionState,
visitedEdges,
setVariableHistory,
startTime,
textBubbleContentFormat,
sessionStore,
});
}
const firstEdgeId = getFirstEdgeId({
typebot: newSessionState.typebotsQueue[0]?.typebot,
startEventId: startFrom?.type === "event" ? startFrom.eventId : undefined,
const startingPoint = getStartingPoint({
state: newSessionState,
startFrom,
});
if (!firstEdgeId)
if (!startingPoint)
return {
messages: [],
newSessionState,
setVariableHistory: [],
visitedEdges: [],
};
const nextGroup = await getNextGroup({
state: newSessionState,
edgeId: firstEdgeId,
isOffDefaultPath: false,
});
newSessionState = nextGroup.newSessionState;
if (!nextGroup.group)
return { messages: [], newSessionState, visitedEdges, setVariableHistory };
const chatReply = await executeGroup(nextGroup.group, {
const chatReply = await walkFlowForward(startingPoint, {
version,
state: newSessionState,
sessionStore,
visitedEdges,
setVariableHistory,
startTime,
textBubbleContentFormat,
});
@ -96,13 +60,46 @@ export const startBotFlow = async ({
});
};
const getStartingPoint = ({
state,
startFrom,
}: {
state: SessionState;
startFrom?: StartFrom;
}): WalkFlowStartingPoint | undefined => {
if (startFrom?.type === "group") {
const group = state.typebotsQueue[0]?.typebot.groups.find(
(group) => group.id === startFrom.groupId,
);
if (!group)
throw new TRPCError({
code: "BAD_REQUEST",
message: "Start group doesn't exist",
});
}
const firstEdgeId = getFirstEdgeId({
typebot: state.typebotsQueue[0]?.typebot,
startEventId: startFrom?.type === "event" ? startFrom.eventId : undefined,
});
if (!firstEdgeId) return;
return {
type: "nextEdge",
nextEdge: {
id: firstEdgeId,
isOffDefaultPath: false,
},
};
};
const autoContinueChatIfStartingWithInput = async ({
version,
message,
chatReply,
textBubbleContentFormat,
sessionStore,
}: Props & { chatReply: ChatReply }): Promise<ChatReply> => {
}: Props & {
chatReply: ContinueBotFlowResponse;
}): Promise<ContinueBotFlowResponse> => {
if (
!message ||
chatReply.messages.length > 0 ||
@ -123,7 +120,6 @@ const autoContinueChatIfStartingWithInput = async ({
version,
state: chatReply.newSessionState,
textBubbleContentFormat: textBubbleContentFormat,
startTime: Date.now(),
sessionStore,
});
};

View File

@ -207,7 +207,6 @@ export const startSession = async ({
state: initialState,
startFrom:
startParams.type === "preview" ? startParams.startFrom : undefined,
startTime: Date.now(),
textBubbleContentFormat: startParams.textBubbleContentFormat,
});

View File

@ -1,4 +1,5 @@
import type { SessionState } from "@typebot.io/chat-session/schemas";
import type { Prisma } from "@typebot.io/prisma/types";
import type { SetVariableHistoryItem } from "@typebot.io/variables/schemas";
import type { ContinueChatResponse, CustomEmbedBubble } from "./schemas/api";
@ -31,3 +32,9 @@ type FailReply = {
};
export type ParsedReply = SuccessReply | SkipReply | FailReply;
export type ContinueBotFlowResponse = ContinueChatResponse & {
newSessionState: SessionState;
visitedEdges: Prisma.VisitedEdge[];
setVariableHistory: SetVariableHistoryItem[];
};

View File

@ -0,0 +1,507 @@
import { createId } from "@paralleldrive/cuid2";
import { TRPCError } from "@trpc/server";
import { BubbleBlockType } from "@typebot.io/blocks-bubbles/constants";
import {
isBubbleBlock,
isInputBlock,
isIntegrationBlock,
isLogicBlock,
} from "@typebot.io/blocks-core/helpers";
import type { SessionState } from "@typebot.io/chat-session/schemas";
import { env } from "@typebot.io/env";
import type { Group } from "@typebot.io/groups/schemas";
import { byId, isDefined, isNotDefined } from "@typebot.io/lib/utils";
import type { Prisma } from "@typebot.io/prisma/types";
import type { SessionStore } from "@typebot.io/runtime-session-store";
import type {
SetVariableHistoryItem,
VariableWithValue,
} from "@typebot.io/variables/schemas";
import { executeIntegration } from "./executeIntegration";
import { executeLogic } from "./executeLogic";
import { formatInputForChatResponse } from "./formatInputForChatResponse";
import {
type BubbleBlockWithDefinedContent,
parseBubbleBlock,
} from "./parseBubbleBlock";
import { upsertResult } from "./queries/upsertResult";
import type { ContinueChatResponse } from "./schemas/api";
import type { ExecuteIntegrationResponse, ExecuteLogicResponse } from "./types";
export type WalkFlowStartingPoint =
| { type: "group"; group: Group }
| {
type: "nextEdge";
nextEdge?: { id: string; isOffDefaultPath: boolean };
};
export const walkFlowForward = async (
startingPoint: WalkFlowStartingPoint,
{
state,
sessionStore,
version,
setVariableHistory,
skipFirstMessageBubble,
textBubbleContentFormat,
}: {
version: 1 | 2;
state: SessionState;
sessionStore: SessionStore;
setVariableHistory: SetVariableHistoryItem[];
/**
* Useful to skip the last message that was streamed to the client so we don't need to send it again
*/
skipFirstMessageBubble?: boolean;
textBubbleContentFormat: "richText" | "markdown";
},
) => {
const timeoutStartTime = Date.now();
const visitedEdges: Prisma.VisitedEdge[] = [];
let newSessionState: SessionState = state;
let input: ContinueChatResponse["input"] | undefined;
const messages: ContinueChatResponse["messages"] = [];
const logs: ContinueChatResponse["logs"] = [];
const clientSideActions: ContinueChatResponse["clientSideActions"] = [];
let nextEdge: { id: string; isOffDefaultPath: boolean } | undefined =
startingPoint.type === "nextEdge" ? startingPoint.nextEdge : undefined;
do {
const nextGroupResponse =
startingPoint.type === "group" && !nextEdge
? {
group: startingPoint.group,
newSessionState,
}
: await navigateToNextGroupAndUpdateState({
state: newSessionState,
edgeId: nextEdge?.id,
isOffDefaultPath: nextEdge?.isOffDefaultPath ?? false,
});
newSessionState = nextGroupResponse.newSessionState;
if (!nextGroupResponse?.group) break;
const executionResponse = await executeGroup(nextGroupResponse.group, {
version,
state: newSessionState,
visitedEdges,
setVariableHistory,
skipFirstMessageBubble,
timeoutStartTime,
textBubbleContentFormat,
sessionStore,
});
if (executionResponse.logs) logs.push(...executionResponse.logs);
newSessionState = executionResponse.newSessionState;
if (executionResponse.visitedEdges)
visitedEdges.push(...executionResponse.visitedEdges);
if (executionResponse.setVariableHistory)
setVariableHistory.push(...executionResponse.setVariableHistory);
if (executionResponse.messages)
messages.push(...executionResponse.messages);
if (executionResponse.input) input = executionResponse.input;
if (executionResponse.clientSideActions)
clientSideActions.push(...executionResponse.clientSideActions);
nextEdge = executionResponse.nextEdge;
} while (nextEdge);
return {
newSessionState,
logs,
messages,
input,
clientSideActions,
visitedEdges,
setVariableHistory,
};
};
type ContextProps = {
version: 1 | 2;
state: SessionState;
sessionStore: SessionStore;
currentLastBubbleId?: string;
skipFirstMessageBubble?: boolean;
visitedEdges: Prisma.VisitedEdge[];
setVariableHistory: SetVariableHistoryItem[];
timeoutStartTime?: number;
textBubbleContentFormat: "richText" | "markdown";
};
export type ExecuteGroupResponse = ContinueChatResponse & {
newSessionState: SessionState;
setVariableHistory: SetVariableHistoryItem[];
visitedEdges: Prisma.VisitedEdge[];
nextEdge?: {
id: string;
isOffDefaultPath: boolean;
};
};
const executeGroup = async (
group: Group,
{
version,
state,
sessionStore,
visitedEdges,
setVariableHistory,
currentLastBubbleId,
skipFirstMessageBubble,
timeoutStartTime,
textBubbleContentFormat,
}: ContextProps,
): Promise<ExecuteGroupResponse> => {
const messages: ContinueChatResponse["messages"] = [];
let clientSideActions: ContinueChatResponse["clientSideActions"] = [];
let logs: ContinueChatResponse["logs"] = [];
let nextEdge;
let lastBubbleBlockId: string | undefined = currentLastBubbleId;
let newSessionState = state;
let index = -1;
for (const block of group.blocks) {
if (
timeoutStartTime &&
env.CHAT_API_TIMEOUT &&
Date.now() - timeoutStartTime > env.CHAT_API_TIMEOUT
) {
throw new TRPCError({
code: "TIMEOUT",
message: `${env.CHAT_API_TIMEOUT / 1000} seconds timeout reached`,
});
}
index++;
nextEdge = block.outgoingEdgeId
? {
id: block.outgoingEdgeId,
isOffDefaultPath: false,
}
: undefined;
if (isBubbleBlock(block)) {
if (!block.content || (skipFirstMessageBubble && index === 0)) continue;
const message = parseBubbleBlock(block as BubbleBlockWithDefinedContent, {
version,
variables: newSessionState.typebotsQueue[0].typebot.variables,
typebotVersion: newSessionState.typebotsQueue[0].typebot.version,
textBubbleContentFormat,
sessionStore,
});
messages.push(message);
if (
message.type === BubbleBlockType.EMBED &&
message.content.waitForEvent?.isEnabled
) {
return {
messages,
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
}
lastBubbleBlockId = block.id;
continue;
}
if (isInputBlock(block))
return {
messages,
input: await formatInputForChatResponse(block, {
state: newSessionState,
sessionStore,
}),
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
const logicOrIntegrationExecutionResponse = (
isLogicBlock(block)
? await executeLogic({
block,
state: newSessionState,
setVariableHistory,
sessionStore,
})
: isIntegrationBlock(block)
? await executeIntegration({
block,
state: newSessionState,
sessionStore,
})
: null
) as ExecuteLogicResponse | ExecuteIntegrationResponse | null;
if (!logicOrIntegrationExecutionResponse) continue;
if (
logicOrIntegrationExecutionResponse.newSetVariableHistory &&
logicOrIntegrationExecutionResponse.newSetVariableHistory?.length > 0
) {
if (!newSessionState.typebotsQueue[0].resultId)
newSessionState = {
...newSessionState,
previewMetadata: {
...newSessionState.previewMetadata,
setVariableHistory: (
newSessionState.previewMetadata?.setVariableHistory ?? []
).concat(
logicOrIntegrationExecutionResponse.newSetVariableHistory.map(
(item) => ({
blockId: item.blockId,
variableId: item.variableId,
value: item.value,
}),
),
),
},
};
else
setVariableHistory.push(
...logicOrIntegrationExecutionResponse.newSetVariableHistory,
);
}
if (
"startTimeShouldBeUpdated" in logicOrIntegrationExecutionResponse &&
logicOrIntegrationExecutionResponse.startTimeShouldBeUpdated
)
timeoutStartTime = Date.now();
if (logicOrIntegrationExecutionResponse.logs)
logs = [...(logs ?? []), ...logicOrIntegrationExecutionResponse.logs];
if (logicOrIntegrationExecutionResponse.newSessionState)
newSessionState = logicOrIntegrationExecutionResponse.newSessionState;
if (
"clientSideActions" in logicOrIntegrationExecutionResponse &&
logicOrIntegrationExecutionResponse.clientSideActions
) {
clientSideActions = [
...(clientSideActions ?? []),
...logicOrIntegrationExecutionResponse.clientSideActions.map(
(action) => ({
...action,
lastBubbleBlockId,
}),
),
];
if (
"customEmbedBubble" in logicOrIntegrationExecutionResponse &&
logicOrIntegrationExecutionResponse.customEmbedBubble
) {
messages.push({
id: createId(),
...logicOrIntegrationExecutionResponse.customEmbedBubble,
});
}
if (
logicOrIntegrationExecutionResponse.clientSideActions?.find(
(action) => action.expectsDedicatedReply,
) ||
("customEmbedBubble" in logicOrIntegrationExecutionResponse &&
logicOrIntegrationExecutionResponse.customEmbedBubble)
) {
return {
messages,
newSessionState: {
...newSessionState,
currentBlockId: block.id,
},
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
}
}
if (logicOrIntegrationExecutionResponse.outgoingEdgeId) {
nextEdge = {
id: logicOrIntegrationExecutionResponse.outgoingEdgeId,
isOffDefaultPath:
block.outgoingEdgeId !==
logicOrIntegrationExecutionResponse.outgoingEdgeId,
};
break;
}
}
return {
nextEdge,
messages,
newSessionState,
clientSideActions,
logs,
visitedEdges,
setVariableHistory,
};
};
type NextGroup = {
group?: Group;
newSessionState: SessionState;
visitedEdge?: Prisma.VisitedEdge;
};
const navigateToNextGroupAndUpdateState = async ({
state,
edgeId,
isOffDefaultPath,
}: {
state: SessionState;
edgeId?: string;
isOffDefaultPath: boolean;
}): Promise<NextGroup> => {
const nextEdge = state.typebotsQueue[0].typebot.edges.find(byId(edgeId));
if (!nextEdge) {
const nextEdgeResponse = popQueuedEdge(state);
let newSessionState = nextEdgeResponse.state;
if (newSessionState.typebotsQueue.length > 1) {
const isMergingWithParent =
newSessionState.typebotsQueue[0].isMergingWithParent;
const currentResultId = newSessionState.typebotsQueue[0].resultId;
if (!isMergingWithParent && currentResultId)
await upsertResult({
resultId: currentResultId,
typebot: newSessionState.typebotsQueue[0].typebot,
isCompleted: true,
hasStarted: newSessionState.typebotsQueue[0].answers.length > 0,
});
newSessionState = {
...newSessionState,
typebotsQueue: [
{
...newSessionState.typebotsQueue[1],
typebot: isMergingWithParent
? {
...newSessionState.typebotsQueue[1].typebot,
variables: newSessionState.typebotsQueue[1].typebot.variables
.map((variable) => ({
...variable,
value:
newSessionState.typebotsQueue[0].typebot.variables.find(
(v) => v.name === variable.name,
)?.value ?? variable.value,
}))
.concat(
newSessionState.typebotsQueue[0].typebot.variables.filter(
(variable) =>
isDefined(variable.value) &&
isNotDefined(
newSessionState.typebotsQueue[1].typebot.variables.find(
(v) => v.name === variable.name,
),
),
) as VariableWithValue[],
),
}
: newSessionState.typebotsQueue[1].typebot,
answers: isMergingWithParent
? [
...newSessionState.typebotsQueue[1].answers.filter(
(incomingAnswer) =>
!newSessionState.typebotsQueue[0].answers.find(
(currentAnswer) =>
currentAnswer.key === incomingAnswer.key,
),
),
...newSessionState.typebotsQueue[0].answers,
]
: newSessionState.typebotsQueue[1].answers,
},
...newSessionState.typebotsQueue.slice(2),
],
} satisfies SessionState;
if (newSessionState.progressMetadata)
newSessionState.progressMetadata = {
...newSessionState.progressMetadata,
totalAnswers:
newSessionState.progressMetadata.totalAnswers +
newSessionState.typebotsQueue[0].answers.length,
};
}
if (nextEdgeResponse.edgeId)
return navigateToNextGroupAndUpdateState({
state: newSessionState,
edgeId: nextEdgeResponse.edgeId,
isOffDefaultPath,
});
return {
newSessionState,
};
}
const nextGroup = state.typebotsQueue[0].typebot.groups.find(
byId(nextEdge.to.groupId),
);
if (!nextGroup)
return {
newSessionState: state,
};
const startBlockIndex = nextEdge.to.blockId
? nextGroup.blocks.findIndex(byId(nextEdge.to.blockId))
: 0;
const currentVisitedEdgeIndex = isOffDefaultPath
? (state.currentVisitedEdgeIndex ?? -1) + 1
: state.currentVisitedEdgeIndex;
const resultId = state.typebotsQueue[0].resultId;
return {
group: {
...nextGroup,
blocks: nextGroup.blocks.slice(startBlockIndex),
} as Group,
newSessionState: {
...state,
currentVisitedEdgeIndex,
previewMetadata:
resultId || !isOffDefaultPath
? state.previewMetadata
: {
...state.previewMetadata,
visitedEdges: (state.previewMetadata?.visitedEdges ?? []).concat(
nextEdge.id,
),
},
},
visitedEdge:
resultId && isOffDefaultPath && !nextEdge.id.startsWith("virtual-")
? {
index: currentVisitedEdgeIndex as number,
edgeId: nextEdge.id,
resultId,
}
: undefined,
};
};
const popQueuedEdge = (
state: SessionState,
): { edgeId?: string; state: SessionState } => {
const edgeId = state.typebotsQueue[0].queuedEdgeIds?.[0];
if (!edgeId) return { state };
return {
edgeId,
state: {
...state,
typebotsQueue: [
{
...state.typebotsQueue[0],
queuedEdgeIds: state.typebotsQueue[0].queuedEdgeIds?.slice(1),
},
...state.typebotsQueue.slice(1),
],
},
};
};