Add onboarding email workflow and unsubscribe flow (#2370)

Adds onboarding email workflow with delayed send and suppression checks.
Introduces unsubscribe/resubscribe UI and API endpoints. Wires workflows
RPC config changes for user onboarding.
This commit is contained in:
Baptiste Arnaud 2026-01-26 14:09:08 +01:00 committed by GitHub
parent 1e2e3f79c5
commit 406ef51b07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 745 additions and 67 deletions

View File

@ -31,6 +31,7 @@ Follow this workflow unless explicitly instructed otherwise.
- Explain what you found from opensrc or web searches
- Ask questions about unclear requirements, edge cases, or technical decisions.
- Report what you are planning on doing
- If I mention "IMPLEMENT" or "GO", it means you can move to step 4.
4. **Implement**
@ -87,6 +88,8 @@ Source code for dependencies is then available in `opensrc/`.
- Whenever possible, never use `as`. Instead, use `satisfies` as a last resort to make sure we keep strong type-safety.
- Only add a comment if a piece of logic is hard to grasp.
- Prefer inferring types instead of declaring it.
- Function name should always start with a verb.
- Functions used only locally should stay in the same file at the bottom of it. Only export helpers if used elsewhere then the helper file should have the same name as the function.
- No brackets on `if` blocks if it's just 1 line.
- Outside of Effect code, prefer using `env` from `@typebot.io/env` instead of `process.env` directly. This package provides type-safe, validated environment variables.

View File

@ -15,6 +15,7 @@ import { collaboratorsRouter } from "@/features/collaboration/api/router";
import { credentialsRouter } from "@/features/credentials/api/router";
import { customDomainsRouter } from "@/features/customDomains/api/router";
import { generateGroupTitle } from "@/features/editor/api/generateGroupTitle";
import { emailsRouter } from "@/features/emails/api/router";
import { getFeatureFlags } from "@/features/featureFlags/api/getFeatureFlags";
import { folderRouter } from "@/features/folders/api/router";
import { forgeRouter } from "@/features/forge/api/router";
@ -38,6 +39,7 @@ export const appRouter = {
customDomains: customDomainsRouter,
whatsApp: builderWhatsAppRouter,
folders: folderRouter,
emails: emailsRouter,
user: userRouter,
healthz: publicProcedure.handler(async () => ({
status: "ok",

View File

@ -0,0 +1,145 @@
"use client";
import { Button } from "@typebot.io/ui/components/Button";
import { useState, useTransition } from "react";
import { orpcClient } from "@/lib/queryClient";
type Props = {
email?: string;
token?: string;
isValid: boolean;
};
type Status =
| "confirm"
| "unsubscribed"
| "unsubscribe-failed"
| "resubscribed"
| "already-subscribed"
| "blocked"
| "unknown"
| "invalid";
export const UnsubscribePageClient = ({ email, token, isValid }: Props) => {
const [status, setStatus] = useState<Status>(isValid ? "confirm" : "invalid");
const [isPending, startTransition] = useTransition();
const handleUnsubscribe = () => {
if (!email || !token) return setStatus("invalid");
startTransition(() => {
void triggerUnsubscribe(email, token, setStatus);
});
};
const handleResubscribe = () => {
if (!email || !token) return setStatus("invalid");
startTransition(() => {
void triggerResubscribe(email, token, setStatus);
});
};
const { message, helperText } = getCopy(status);
const showUnsubscribe = status === "confirm";
const showResubscribe = status === "unsubscribed";
return (
<main className="flex flex-col gap-4 h-dvh justify-center items-center text-gray-12 px-8 py-8">
<div className="w-full max-w-lg">
<div className="flex flex-col p-8 rounded-lg gap-6 bg-gray-1">
<div className="flex flex-col gap-3">
<h1 className="text-base font-semibold text-balance">
Email preferences
</h1>
<p className="text-sm leading-relaxed text-pretty">{message}</p>
{helperText ? (
<p className="text-sm leading-relaxed text-gray-11 text-pretty">
{helperText}
</p>
) : null}
</div>
{showUnsubscribe ? (
<Button
onClick={handleUnsubscribe}
disabled={isPending}
className="self-start"
>
Unsubscribe
</Button>
) : null}
{showResubscribe ? (
<Button
onClick={handleResubscribe}
disabled={isPending}
className="self-start"
>
Resubscribe
</Button>
) : null}
</div>
</div>
</main>
);
};
const triggerUnsubscribe = async (
email: string,
token: string,
setStatus: (status: Status) => void,
) => {
try {
await orpcClient.emails.unsubscribe({
query: { email, token },
});
setStatus("unsubscribed");
} catch {
setStatus("unsubscribe-failed");
}
};
const triggerResubscribe = async (
email: string,
token: string,
setStatus: (status: Status) => void,
) => {
try {
const response = await orpcClient.emails.resubscribe({
query: { email, token },
});
setStatus(response.status);
} catch {
setStatus("unknown");
}
};
const getCopy = (status: Status) => {
if (status === "invalid")
return { message: "This unsubscribe link is invalid." };
if (status === "confirm")
return {
message: "Confirm unsubscribe.",
helperText: "Click the button below to stop receiving these emails.",
};
if (status === "unsubscribed")
return {
message: "Successfully unsubscribed.",
helperText: "You will stop receiving these emails within 48 hours.",
};
if (status === "resubscribed")
return {
message: "You are resubscribed.",
helperText: "It can take up to 48 hours for emails to resume.",
};
if (status === "blocked")
return {
message: "We could not resubscribe this email.",
helperText:
"This address had multiple bounces, so resubscribe is disabled.",
};
if (status === "already-subscribed")
return {
message: "You are already subscribed.",
};
return {
message: "We could not update your email preferences.",
};
};

View File

@ -0,0 +1,41 @@
import { verifyUnsubscribeToken } from "@typebot.io/user/verifyUnsubscribeToken";
import type { Metadata } from "next";
import { UnsubscribePageClient } from "./UnsubscribePageClient";
type SearchParams = Record<string, string | string[] | undefined>;
type PageProps = {
searchParams?: Promise<SearchParams>;
};
export const metadata: Metadata = {
title: "Email preferences",
icons: {
icon: "/favicon.svg",
},
};
const showUnsubscribePage = async ({ searchParams }: PageProps) => {
const resolvedSearchParams = await searchParams;
const email = getSearchParam(resolvedSearchParams, "email");
const token = getSearchParam(resolvedSearchParams, "token");
const isValid = Boolean(
email && token && verifyUnsubscribeToken(email, token),
);
return (
<UnsubscribePageClient email={email} token={token} isValid={isValid} />
);
};
export default showUnsubscribePage;
const getSearchParam = (
searchParams: SearchParams | undefined,
key: string,
) => {
const value = searchParams?.[key];
if (typeof value === "string") return value;
if (Array.isArray(value)) return value[0];
return undefined;
};

View File

@ -0,0 +1,19 @@
import "@/assets/styles/routerProgressBar.css";
import "@/assets/styles/plate.css";
import "@/assets/styles/resultsTable.css";
import "@/assets/styles/custom.css";
import "@/assets/styles/globals.css";
export default function renderRootLayout({
children,
}: {
children: React.ReactNode;
}) {
return (
<html lang="en">
<body className="font-body text-gray-12 bg-gray-2 dark:bg-gray-1 antialiased">
{children}
</body>
</html>
);
}

View File

@ -14,7 +14,7 @@ export const SignInPage = ({ type }: Props) => {
const { query } = useRouter();
return (
<div className="flex flex-col gap-4 h-screen justify-center items-center">
<div className="flex flex-col gap-4 h-dvh justify-center items-center">
<Seo
title={
type === "signin"

View File

@ -0,0 +1,59 @@
import prisma from "@typebot.io/prisma";
import { normalizeEmail } from "@typebot.io/user/normalizeEmail";
import { verifyUnsubscribeToken } from "@typebot.io/user/verifyUnsubscribeToken";
import { z } from "zod";
const resubscribeStatusSchema = z.enum([
"resubscribed",
"already-subscribed",
"blocked",
"unknown",
"invalid",
]);
export const resubscribeEmailOutputSchema = z.object({
status: resubscribeStatusSchema,
});
export type ResubscribeStatus = z.infer<typeof resubscribeStatusSchema>;
type ResubscribeResponse = z.infer<typeof resubscribeEmailOutputSchema>;
export const resubscribeEmailInputSchema = z.object({
query: z
.object({
email: z.string().optional(),
token: z.string().optional(),
})
.optional(),
});
export const handleResubscribeEmail = async ({
input,
}: {
input: z.infer<typeof resubscribeEmailInputSchema>;
}): Promise<ResubscribeResponse> => {
const email = input.query?.email ?? "";
const token = input.query?.token ?? "";
if (!email || !token) return { status: "invalid" };
if (!verifyUnsubscribeToken(email, token)) return { status: "invalid" };
const status = await clearSuppression(email);
return { status };
};
const clearSuppression = async (email: string): Promise<ResubscribeStatus> => {
const normalized = normalizeEmail(email);
if (!normalized) return "unknown";
const suppressed = await prisma.suppressedEmail.findUnique({
where: { email: normalized },
select: { suppressedAt: true, transientGeneralBounceCount: true },
});
if (!suppressed) return "already-subscribed";
if (suppressed.transientGeneralBounceCount >= 2) return "blocked";
if (!suppressed.suppressedAt) return "already-subscribed";
await prisma.suppressedEmail.update({
where: { email: normalized },
data: { suppressedAt: null },
});
return "resubscribed";
};

View File

@ -0,0 +1,41 @@
import prisma from "@typebot.io/prisma";
import { normalizeEmail } from "@typebot.io/user/normalizeEmail";
import { verifyUnsubscribeToken } from "@typebot.io/user/verifyUnsubscribeToken";
import { z } from "zod";
export const unsubscribeEmailInputSchema = z.object({
query: z
.object({
email: z.string().optional(),
token: z.string().optional(),
})
.optional(),
});
export const handleUnsubscribeEmail = async ({
input,
}: {
input: z.infer<typeof unsubscribeEmailInputSchema>;
}) => {
const email = input.query?.email ?? "";
const token = input.query?.token ?? "";
if (!email || !token) return { message: "Ignored request" };
if (!verifyUnsubscribeToken(email, token))
return { message: "Invalid unsubscribe token" };
await suppressEmail(email);
return { message: "Unsubscribed" };
};
const suppressEmail = async (email: string) => {
const normalized = normalizeEmail(email);
if (!normalized) return false;
const now = new Date();
await prisma.suppressedEmail.upsert({
where: { email: normalized },
update: { suppressedAt: now },
create: { email: normalized, suppressedAt: now },
});
return true;
};

View File

@ -0,0 +1,38 @@
import { publicProcedure } from "@typebot.io/config/orpc/builder/middlewares";
import { z } from "zod";
import {
handleResubscribeEmail,
resubscribeEmailInputSchema,
resubscribeEmailOutputSchema,
} from "./handleResubscribeEmail";
import {
handleUnsubscribeEmail,
unsubscribeEmailInputSchema,
} from "./handleUnsubscribeEmail";
export const emailsRouter = {
unsubscribe: publicProcedure
.route({
method: "POST",
path: "/emails/unsubscribe",
summary: "Unsubscribe email",
tags: ["Email"],
inputStructure: "detailed",
successStatus: 202,
})
.input(unsubscribeEmailInputSchema)
.output(z.object({ message: z.string() }))
.handler(handleUnsubscribeEmail),
resubscribe: publicProcedure
.route({
method: "POST",
path: "/emails/unsubscribe/resubscribe",
summary: "Resubscribe email",
tags: ["Email"],
inputStructure: "detailed",
})
.input(resubscribeEmailInputSchema)
.output(resubscribeEmailOutputSchema)
.handler(handleResubscribeEmail),
};

View File

@ -29,6 +29,7 @@
"@typebot.io/lib": "workspace:*",
"@typebot.io/prisma": "workspace:*",
"@typebot.io/results": "workspace:*",
"@typebot.io/user": "workspace:*",
"@typebot.io/typebot": "workspace:*",
"effect": "^3.19.14"
},

View File

@ -22,6 +22,7 @@ import {
WorkflowsDatabaseConfig,
WorkflowsServerConfig,
} from "@typebot.io/config";
import { RPC_SECRET_HEADER_KEY } from "@typebot.io/config/constants";
import { NodemailerClientLayer } from "@typebot.io/lib/nodemailer/NodemailerClient";
import { RedisClientLayer } from "@typebot.io/lib/redis/RedisClient";
import prisma from "@typebot.io/prisma";
@ -34,9 +35,13 @@ import {
import {
ResultsWorkflowsRpc,
ResultsWorkflowsRpcLayer,
RPC_SECRET_HEADER_KEY,
} from "@typebot.io/results/workflows/rpc";
import { TypebotServiceLayer } from "@typebot.io/typebot/services/TypebotService";
import {
UsersWorkflowsRpc,
UsersWorkflowsRpcLayer,
} from "@typebot.io/user/workflows/rpc";
import { StartUserOnboardingWorkflowLayer } from "@typebot.io/user/workflows/startUserOnboardingWorkflow";
import { Effect, Equivalence, Layer, Redacted } from "effect";
const WorkflowEngineLayer = ClusterWorkflowEngine.layer.pipe(
@ -56,6 +61,7 @@ const WorkflowEngineLayer = ClusterWorkflowEngine.layer.pipe(
const WorkflowLayer = Layer.mergeAll(
ExportResultsWorkflowLayer,
SendExportToEmailWorkflowLayer,
StartUserOnboardingWorkflowLayer,
).pipe(Layer.provideMerge(WorkflowEngineLayer));
const PrismaLayer = Layer.provide(
@ -93,12 +99,15 @@ const AuthMiddleware = HttpLayerRouter.middleware(
}),
).layer;
const ResultsRpcRouterLayer = RpcServer.layerHttpRouter({
group: ResultsWorkflowsRpc,
const WorkflowsRpcGroup = ResultsWorkflowsRpc.merge(UsersWorkflowsRpc);
const WorkflowsRpcRouterLayer = RpcServer.layerHttpRouter({
group: WorkflowsRpcGroup,
path: "/rpc",
protocol: "http",
}).pipe(
Layer.provide(ResultsWorkflowsRpcLayer),
Layer.provide(UsersWorkflowsRpcLayer),
Layer.provide(AuthMiddleware),
Layer.provide(RpcSerialization.layerNdjson),
);
@ -118,7 +127,7 @@ const OtelNodeSdkLive = NodeSdk.layer(() => ({
logRecordProcessor: new BatchLogRecordProcessor(new OTLPLogExporter()),
}));
const Routes = Layer.mergeAll(HealthRoute, ResultsRpcRouterLayer);
const Routes = Layer.mergeAll(HealthRoute, WorkflowsRpcRouterLayer);
const Main = HttpLayerRouter.serve(Routes).pipe(
Layer.provide(WorkflowLayer),

View File

@ -14,7 +14,10 @@
"!**/emojiList.json",
"!**/iconNames.ts",
"!**/.last-run.json",
"!**/test/assets/**/*.json"
"!**/test/assets/**/*.json",
"!.opencode",
"!.codex",
"!.cursor"
]
},
"css": {

View File

@ -292,6 +292,7 @@
"@typebot.io/prisma": "workspace:*",
"@typebot.io/results": "workspace:*",
"@typebot.io/typebot": "workspace:*",
"@typebot.io/user": "workspace:*",
"effect": "^3.19.14",
},
"devDependencies": {
@ -591,6 +592,8 @@
"version": "0.0.1",
"dependencies": {
"@effect-aws/s3": "^0.2.5",
"@effect/platform": "^0.94.1",
"@effect/rpc": "^0.73.0",
"effect": "^3.19.14",
},
"devDependencies": {
@ -1367,7 +1370,15 @@
"name": "@typebot.io/user",
"version": "0.0.1",
"dependencies": {
"@effect/platform": "^0.94.1",
"@effect/rpc": "^0.73.0",
"@effect/workflow": "^0.16.0",
"@typebot.io/config": "workspace:*",
"@typebot.io/emails": "workspace:*",
"@typebot.io/env": "workspace:*",
"@typebot.io/lib": "workspace:*",
"@typebot.io/prisma": "workspace:*",
"effect": "^3.19.14",
"zod": "^4.3.5",
},
"devDependencies": {

View File

@ -5,6 +5,7 @@ import type {
AdapterUser,
} from "@auth/core/adapters";
import { createId } from "@paralleldrive/cuid2";
import { WorkflowsRpcClientConfig } from "@typebot.io/config";
import { env } from "@typebot.io/env";
import { ky } from "@typebot.io/lib/ky";
import { omit } from "@typebot.io/lib/utils";
@ -16,7 +17,9 @@ import type { Prisma } from "@typebot.io/prisma/types";
import type { TelemetryEvent } from "@typebot.io/telemetry/schemas";
import { trackEvents } from "@typebot.io/telemetry/trackEvents";
import { userSchema } from "@typebot.io/user/schemas";
import { UsersWorkflowsRpcClient } from "@typebot.io/user/workflows/rpc";
import { parseWorkspaceDefaultPlan } from "@typebot.io/workspaces/parseWorkspaceDefaultPlan";
import { Effect } from "effect";
import { convertInvitationsToCollaborations } from "./convertInvitationsToCollaborations";
import { getNewUserInvitations } from "./getNewUserInvitations";
import { joinWorkspaces } from "./joinWorkspaces";
@ -88,6 +91,8 @@ export const createAuthPrismaAdapter = (p: Prisma.PrismaClient): Adapter => ({
}
}
await trackEvents(events);
if (createdUser.email)
triggerStartUserOnboardingWorkflow(createdUser.id, createdUser.email);
if (invitations.length > 0)
await convertInvitationsToCollaborations(p, user, invitations);
if (workspaceInvitations.length > 0)
@ -196,6 +201,27 @@ export const createAuthPrismaAdapter = (p: Prisma.PrismaClient): Adapter => ({
},
});
const triggerStartUserOnboardingWorkflow = (userId: string, email: string) => {
const program = Effect.gen(function* () {
const client = yield* UsersWorkflowsRpcClient;
yield* client.SendUserOnboardingEmail({ userId, email });
}).pipe(
Effect.withSpan("triggerStartUserOnboardingWorkflow", {
attributes: { userId, email },
root: true,
}),
Effect.provide(UsersWorkflowsRpcClient.Default),
Effect.provide(WorkflowsRpcClientConfig.layer),
Effect.catchAll((error) =>
Effect.sync(() => {
console.error("Failed to trigger onboarding email workflow", error);
}),
),
);
Effect.runFork(program);
};
/** @see https://www.prisma.io/docs/orm/prisma-client/special-fields-and-types/null-and-undefined */
function stripUndefined<T>(obj: T) {
const data = {} as T;

View File

@ -9,6 +9,8 @@
},
"dependencies": {
"@effect-aws/s3": "^0.2.5",
"@effect/platform": "^0.94.1",
"@effect/rpc": "^0.73.0",
"effect": "^3.19.14"
},
"devDependencies": {

View File

@ -0,0 +1 @@
export const RPC_SECRET_HEADER_KEY = "x-rpc-secret";

View File

@ -0,0 +1,32 @@
import {
FetchHttpClient,
HttpClient,
HttpClientRequest,
} from "@effect/platform";
import { RpcClient, RpcSerialization } from "@effect/rpc";
import { Effect, Layer, Option, Redacted } from "effect";
import { RPC_SECRET_HEADER_KEY } from "./constants";
import { WorkflowsRpcClientConfig } from "./index";
export const WorkflowsRpcClientProtocolLayer = Effect.gen(function* () {
const { rpcSecret, rpcUrl } = yield* WorkflowsRpcClientConfig;
return RpcClient.layerProtocolHttp({
url: Option.getOrElse(
rpcUrl,
() => new URL("http://localhost:3007/rpc"),
).toString(),
transformClient: (client) =>
HttpClient.mapRequest(client, (request) =>
request.pipe(
HttpClientRequest.setHeader(
RPC_SECRET_HEADER_KEY,
Redacted.value(rpcSecret),
),
),
),
});
}).pipe(
Layer.unwrapEffect,
Layer.provide(FetchHttpClient.layer),
Layer.provide(RpcSerialization.layerNdjson),
);

View File

@ -0,0 +1,88 @@
import {
Body,
Container,
Head,
Html,
Link,
Preview,
Text,
} from "@react-email/components";
import { render } from "@react-email/render";
import type { ComponentProps } from "react";
// biome-ignore lint/correctness/noUnusedImports: Need it for tsx execution
import React from "react";
import { bodyText, container, footerText, main } from "./styles";
interface Props {
unsubscribeUrl?: string;
}
export const UserOnboardingEmail = ({ unsubscribeUrl }: Props) => (
<Html>
<Head />
<Preview>Welcome to Typebot!</Preview>
<Body style={main}>
<Container
align="left"
style={{
...container,
margin: "0",
maxWidth: "100%",
textAlign: "left",
}}
>
<Text style={bodyText}>
Hi,
<br />
<br />
Thanks for trying out Typebot! I&apos;m Baptiste, the founder. 🙌
<br />
<br />
I&apos;ve created Typebot because I think it should be easy to create
beautiful and engaging chat experiences.
<br />
<br />
Typebot has been designed to give you all the freedom you need to
create the perfect bots for your business while still being super easy
to use.
<br />
<br />
Watch this quick 5-minute overview video to get started:
<br />
<Link href="https://www.youtube.com/watch?v=jp3ggg_42-M">
https://www.youtube.com/watch?v=jp3ggg_42-M
</Link>
<br />
<br />
Join our community on Discord to connect with others and get instant
help:
<br />
<Link href="https://typebot.io/discord">
https://typebot.io/discord
</Link>
<br />
<br />
See you soon!
<br />
<br />
Baptiste.
</Text>
{unsubscribeUrl ? (
<Text style={{ ...footerText, marginTop: "24px" }}>
<Link href={unsubscribeUrl}>Click here to unsubscribe</Link>
</Text>
) : null}
</Container>
</Body>
</Html>
);
UserOnboardingEmail.PreviewProps = {
unsubscribeUrl: "https://typebot.io/emails/unsubscribe",
} satisfies Props;
export default UserOnboardingEmail;
export const renderUserOnboardingEmail = async (
props: ComponentProps<typeof UserOnboardingEmail>,
) => render(<UserOnboardingEmail {...props} />);

View File

@ -67,6 +67,7 @@ const baseEnv = {
.url()
.refine((url) => url.startsWith("postgres") || url.startsWith("mysql")),
ENCRYPTION_SECRET: z.string().length(32),
EMAIL_UNSUBSCRIBE_SECRET: z.string().min(1).optional(),
NEXTAUTH_URL: z.preprocess(
guessNextAuthUrlForVercelPreview,
z.string().url(),
@ -473,6 +474,22 @@ const otelEnv = {
},
};
const formatEnvIssues = (issues: readonly StandardSchemaV1.Issue[]) =>
issues.reduce<Record<string, string[]>>((acc, issue) => {
const path = issue.path?.map((segment) =>
isPathSegment(segment) ? String(segment.key) : String(segment),
);
const key = path?.length ? path.join(".") : "root";
if (!acc[key]) acc[key] = [];
acc[key].push(issue.message);
return acc;
}, {});
const isPathSegment = (
value: StandardSchemaV1.PathSegment | PropertyKey,
): value is StandardSchemaV1.PathSegment =>
typeof value === "object" && value !== null && "key" in value;
export const env = createEnv({
server: {
...baseEnv.server,
@ -542,19 +559,3 @@ export const env = createEnv({
);
},
});
const formatEnvIssues = (issues: readonly StandardSchemaV1.Issue[]) =>
issues.reduce<Record<string, string[]>>((acc, issue) => {
const path = issue.path?.map((segment) =>
isPathSegment(segment) ? String(segment.key) : String(segment),
);
const key = path?.length ? path.join(".") : "root";
if (!acc[key]) acc[key] = [];
acc[key].push(issue.message);
return acc;
}, {});
const isPathSegment = (
value: StandardSchemaV1.PathSegment | PropertyKey,
): value is StandardSchemaV1.PathSegment =>
typeof value === "object" && value !== null && "key" in value;

View File

@ -1,32 +1,16 @@
import {
FetchHttpClient,
HttpClient,
HttpClientRequest,
} from "@effect/platform";
import { Rpc, RpcClient, RpcGroup, RpcSerialization } from "@effect/rpc";
import { WorkflowsRpcClientConfig } from "@typebot.io/config";
import { Rpc, RpcClient, RpcGroup } from "@effect/rpc";
import { WorkflowsRpcClientProtocolLayer } from "@typebot.io/config/workflowsRpcProtocol";
import {
RedisClient,
RedisSubscribeError,
} from "@typebot.io/lib/redis/RedisClient";
import {
Cause,
Effect,
Fiber,
Layer,
Option,
Redacted,
Schema,
Stream,
} from "effect";
import { Cause, Effect, Fiber, Schema, Stream } from "effect";
import {
EXPORT_PROGRESS_CHANNEL_PREFIX,
ExportResultsWorkflow,
SendExportToEmailWorkflow,
} from "./exportResultsWorkflow";
export const RPC_SECRET_HEADER_KEY = "x-rpc-secret";
const ExportResultsWorkflowStatusChunk = Schema.Union(
Schema.Struct({
status: Schema.Literal("starting"),
@ -126,35 +110,10 @@ export const ResultsWorkflowsRpcLayer = ResultsWorkflowsRpc.toLayer(
}),
);
// Client
const ProtocolLive = Effect.gen(function* () {
const { rpcSecret, rpcUrl } = yield* WorkflowsRpcClientConfig;
return RpcClient.layerProtocolHttp({
url: Option.getOrElse(
rpcUrl,
() => new URL("http://localhost:3007/rpc"),
).toString(),
transformClient: (client) =>
HttpClient.mapRequest(client, (request) =>
request.pipe(
HttpClientRequest.setHeader(
RPC_SECRET_HEADER_KEY,
Redacted.value(rpcSecret),
),
),
),
});
}).pipe(
Layer.unwrapEffect,
Layer.provide(FetchHttpClient.layer),
Layer.provide(RpcSerialization.layerNdjson),
);
export class ResultsWorkflowsRpcClient extends Effect.Service<ResultsWorkflowsRpcClient>()(
"@typebot/ResultsWorkflowsRpcClient",
{
scoped: RpcClient.make(ResultsWorkflowsRpc),
dependencies: [ProtocolLive],
dependencies: [WorkflowsRpcClientProtocolLayer],
},
) {}

View File

@ -7,6 +7,14 @@
"./*": "./src/*.ts"
},
"dependencies": {
"@effect/platform": "^0.94.1",
"@effect/rpc": "^0.73.0",
"@effect/workflow": "^0.16.0",
"effect": "^3.19.14",
"@typebot.io/config": "workspace:*",
"@typebot.io/emails": "workspace:*",
"@typebot.io/env": "workspace:*",
"@typebot.io/lib": "workspace:*",
"@typebot.io/prisma": "workspace:*",
"zod": "^4.3.5"
},

View File

@ -0,0 +1,12 @@
import { createHmac } from "node:crypto";
import { env } from "@typebot.io/env";
import { normalizeEmail } from "./normalizeEmail";
export const createUnsubscribeToken = (email: string) => {
const normalized = normalizeEmail(email);
if (!normalized) return null;
if (!env.EMAIL_UNSUBSCRIBE_SECRET) return null;
return createHmac("sha256", env.EMAIL_UNSUBSCRIBE_SECRET)
.update(normalized)
.digest("base64url");
};

View File

@ -0,0 +1,5 @@
export const normalizeEmail = (value: string) => {
const normalized = value.trim().toLowerCase();
if (!normalized) return null;
return normalized;
};

View File

@ -0,0 +1,9 @@
import { timingSafeEqual } from "node:crypto";
import { createUnsubscribeToken } from "./createUnsubscribeToken";
export const verifyUnsubscribeToken = (email: string, token: string) => {
const expected = createUnsubscribeToken(email);
if (!expected) return false;
if (expected.length !== token.length) return false;
return timingSafeEqual(Buffer.from(expected), Buffer.from(token));
};

View File

@ -0,0 +1,28 @@
import { Rpc, RpcClient, RpcGroup } from "@effect/rpc";
import { WorkflowsRpcClientProtocolLayer } from "@typebot.io/config/workflowsRpcProtocol";
import { Effect } from "effect";
import { StartUserOnboardingWorkflow } from "./startUserOnboardingWorkflow";
export class UsersWorkflowsRpc extends RpcGroup.make(
Rpc.make("SendUserOnboardingEmail", {
error: StartUserOnboardingWorkflow.errorSchema,
payload: StartUserOnboardingWorkflow.payloadSchema,
}),
) {}
export const UsersWorkflowsRpcLayer = UsersWorkflowsRpc.toLayer(
Effect.succeed({
SendUserOnboardingEmail: (payload) =>
StartUserOnboardingWorkflow.execute(payload, {
discard: true,
}),
}),
);
export class UsersWorkflowsRpcClient extends Effect.Service<UsersWorkflowsRpcClient>()(
"@typebot/UsersWorkflowsRpcClient",
{
scoped: RpcClient.make(UsersWorkflowsRpc),
dependencies: [WorkflowsRpcClientProtocolLayer],
},
) {}

View File

@ -0,0 +1,135 @@
import { Activity, DurableClock, Workflow } from "@effect/workflow";
import { listSuppressedEmails } from "@typebot.io/emails/helpers/suppressedEmails";
import { renderUserOnboardingEmail } from "@typebot.io/emails/transactional/UserOnboardingEmail";
import { env } from "@typebot.io/env";
import {
NodemailerClient,
NodemailerError,
} from "@typebot.io/lib/nodemailer/NodemailerClient";
import { Effect, Option, Schema } from "effect";
import { createUnsubscribeToken } from "../createUnsubscribeToken";
import { normalizeEmail } from "../normalizeEmail";
export const StartUserOnboardingWorkflow = Workflow.make({
name: "StartUserOnboardingWorkflow",
payload: {
userId: Schema.String,
email: Schema.String,
},
error: Schema.Union(NodemailerError),
idempotencyKey: ({ userId }) => userId,
});
export const StartUserOnboardingWorkflowLayer =
StartUserOnboardingWorkflow.toLayer(
Effect.fn(function* (payload) {
yield* Effect.annotateCurrentSpan({
userId: payload.userId,
email: payload.email,
});
yield* Effect.annotateLogsScoped({
userId: payload.userId,
email: payload.email,
});
const normalizedEmail = normalizeEmail(payload.email);
if (!normalizedEmail) {
yield* Effect.logWarning("Invalid email, skipping onboarding email");
return;
}
yield* DurableClock.sleep({
name: `UserOnboardingDelay-${payload.userId}`,
duration: "2 hours",
});
const suppressedEmailsResult = yield* listSuppressedEmails([
normalizedEmail,
]).pipe(
Effect.map(Option.some),
Effect.catchAll((error) =>
Effect.logError("Suppressed email check failed").pipe(
Effect.annotateLogs({
error: String(error),
email: normalizedEmail,
}),
Effect.as(Option.none()),
),
),
);
if (Option.isNone(suppressedEmailsResult)) {
yield* Effect.logWarning(
"Suppressed email check failed, skipping onboarding email",
).pipe(
Effect.annotateLogs({
email: normalizedEmail,
}),
);
return;
}
if (suppressedEmailsResult.value.length > 0) {
yield* Effect.logWarning(
"Email suppressed, skipping onboarding email",
).pipe(
Effect.annotateLogs({
email: normalizedEmail,
suppressedEmails: suppressedEmailsResult.value,
}),
);
return;
}
const unsubscribeUrls = buildUnsubscribeUrls(normalizedEmail);
yield* Activity.make({
name: "SendUserOnboardingEmail",
error: Schema.Union(NodemailerError),
execute: Effect.gen(function* () {
const emailClient = yield* NodemailerClient;
const html = yield* Effect.tryPromise({
try: () =>
renderUserOnboardingEmail({
unsubscribeUrl: unsubscribeUrls?.pageUrl ?? undefined,
}),
catch: (error) => new NodemailerError({ cause: error }),
});
const headers = unsubscribeUrls
? {
"List-Unsubscribe": `<${unsubscribeUrls.apiUrl}>`,
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
}
: undefined;
yield* emailClient.sendMail({
to: normalizedEmail,
subject: "Welcome to Typebot!",
html,
headers,
});
}),
}).pipe(
Effect.tapError((error) =>
Effect.logError("SendUserOnboardingEmail failed").pipe(
Effect.annotateLogs({
error: String(error),
email: normalizedEmail,
}),
),
),
);
}),
);
const buildUnsubscribeUrls = (email: string) => {
if (!env.EMAIL_UNSUBSCRIBE_SECRET) return null;
const token = createUnsubscribeToken(email);
if (!token) return null;
const apiUrl = new URL("/api/emails/unsubscribe", env.NEXTAUTH_URL);
apiUrl.searchParams.set("email", email);
apiUrl.searchParams.set("token", token);
const pageUrl = new URL("/emails/unsubscribe", env.NEXTAUTH_URL);
pageUrl.searchParams.set("email", email);
pageUrl.searchParams.set("token", token);
return { apiUrl: apiUrl.toString(), pageUrl: pageUrl.toString() };
};