Workflow queue

This commit is contained in:
Konstantin Wohlwend 2025-09-03 13:31:40 -07:00
parent faf79e5a9e
commit 7387f029c0
15 changed files with 267 additions and 76 deletions

View File

@ -76,6 +76,7 @@
"Proxied",
"psql",
"qrcode",
"QSTASH",
"quetzallabs",
"rehype",
"reqs",

View File

@ -65,6 +65,11 @@ STACK_AWS_ACCESS_KEY_ID=
STACK_AWS_SECRET_ACCESS_KEY=
STACK_AWS_VERCEL_OIDC_ROLE_ARN=
# Upstash configuration
STACK_QSTASH_URL=
STACK_QSTASH_TOKEN=
STACK_QSTASH_CURRENT_SIGNING_KEY=
STACK_QSTASH_NEXT_SIGNING_KEY=
# Misc, optional

View File

@ -62,3 +62,9 @@ STACK_AWS_REGION=us-east-1
STACK_AWS_KMS_ENDPOINT=http://localhost:8124
STACK_AWS_ACCESS_KEY_ID=test
STACK_AWS_SECRET_ACCESS_KEY=test
# Upstash defaults to one of the pre-build test users of the local emulator
STACK_QSTASH_URL=http://localhost:8125
STACK_QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=
STACK_QSTASH_CURRENT_SIGNING_KEY=sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r
STACK_QSTASH_NEXT_SIGNING_KEY=sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs

View File

@ -64,6 +64,7 @@
"@sentry/nextjs": "^8.40.0",
"@simplewebauthn/server": "^11.0.0",
"@stackframe/stack-shared": "workspace:*",
"@upstash/qstash": "^2.8.2",
"@vercel/functions": "^2.0.0",
"@vercel/otel": "^1.10.4",
"ai": "^4.3.17",

View File

@ -810,6 +810,7 @@ model WorkflowTrigger {
// the following fields determine the state of the trigger:
// - scheduledAt && !compiledWorkflowId && !output && !error: the trigger is scheduled to be executed
// - !scheduledAt && !compiledWorkflowId: the trigger was scheduled, but its workflow subsequently deleted. The trigger never ran
// - !scheduledAt && compiledWorkflowId && !output && !error: the trigger is currently executing
// - !scheduledAt && compiledWorkflowId && output && !error: the trigger has successfully completed execution
// - !scheduledAt && compiledWorkflowId && !output && error: the trigger has failed execution

View File

@ -0,0 +1,41 @@
import { getTenancy } from "@/lib/tenancies";
import { ensureUpstashSignature } from "@/lib/upstash";
import { triggerScheduledWorkflows } from "@/lib/workflows";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
export const POST = createSmartRouteHandler({
metadata: {
hidden: true,
},
request: yupObject({
headers: yupObject({
"upstash-signature": yupTuple([yupString().defined()]).defined(),
}).defined(),
body: yupObject({
tenancyId: yupString().defined(),
}).defined(),
method: yupString().oneOf(["POST"]).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["success"]).defined(),
}),
handler: async (req, fullReq) => {
await ensureUpstashSignature(fullReq);
const tenancy = await getTenancy(req.body.tenancyId);
if (!tenancy) {
throw new StackAssertionError(`Tenancy not found for scheduled trigger`, { tenancyId: req.body.tenancyId });
}
await triggerScheduledWorkflows(tenancy);
return {
statusCode: 200,
bodyType: "success",
} as const;
},
});

View File

@ -0,0 +1,35 @@
import { SmartRequest } from "@/route-handlers/smart-request";
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { Client, Receiver } from "@upstash/qstash";
export const upstash = new Client({
baseUrl: getEnvVariable("STACK_QSTASH_URL", ""),
token: getEnvVariable("STACK_QSTASH_TOKEN", ""),
});
export const upstashReceiver = new Receiver({
currentSigningKey: getEnvVariable("STACK_QSTASH_CURRENT_SIGNING_KEY", ""),
nextSigningKey: getEnvVariable("STACK_QSTASH_NEXT_SIGNING_KEY", ""),
});
export async function ensureUpstashSignature(fullReq: SmartRequest): Promise<void> {
const upstashSignature = fullReq.headers["upstash-signature"]?.[0];
if (!upstashSignature) {
throw new StatusError(400, "upstash-signature header is required");
}
const url = new URL(fullReq.url);
if ((getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) && url.hostname === "localhost") {
url.hostname = "host.docker.internal";
}
const isValid = await upstashReceiver.verify({
signature: upstashSignature,
url: url.toString(),
body: new TextDecoder().decode(fullReq.bodyBuffer),
});
if (!isValid) {
throw new StatusError(400, "Invalid Upstash signature");
}
}

View File

@ -13,6 +13,7 @@ import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { generateUuid } from "@stackframe/stack-shared/dist/utils/uuids";
import { Freestyle } from "./freestyle";
import { Tenancy } from "./tenancies";
import { upstash } from "./upstash";
const externalPackages = {
'@stackframe/stack': 'latest',
@ -431,74 +432,119 @@ async function createScheduledTrigger(tenancy: Tenancy, workflowId: string, trig
},
},
});
await upstash.publishJSON({
url: new URL(`/api/v1/internal/trigger/run-scheduled`, getEnvVariable("NEXT_PUBLIC_STACK_API_URL").replace("http://localhost", "http://host.docker.internal")).toString(),
body: {
tenancyId: tenancy.id,
},
notBefore: Math.floor(scheduledAt.getTime() / 1000),
});
return dbTrigger;
}
async function triggerWorkflow(tenancy: Tenancy, compiledWorkflow: CompiledWorkflow, triggerId: string): Promise<Result<void, string>> {
if (compiledWorkflow.compiledCode === null) {
return Result.error(`Workflow ${compiledWorkflow.id} failed to compile: ${compiledWorkflow.compileError}`);
}
export async function triggerScheduledWorkflows(tenancy: Tenancy) {
const prisma = await getPrismaClientForTenancy(tenancy);
const trigger = await prisma.workflowTrigger.update({
where: {
tenancyId_id: {
tenancyId: tenancy.id,
id: triggerId,
},
},
data: {
compiledWorkflowId: compiledWorkflow.id,
scheduledAt: null,
output: Prisma.DbNull,
error: Prisma.DbNull,
},
});
const compiledWorkflows = await compileAndGetEnabledWorkflows(tenancy);
const res = await triggerWorkflowRaw(tenancy, compiledWorkflow.compiledCode, trigger.triggerData as WorkflowTrigger);
if (res.status === "error") {
console.log(`Compiled workflow failed to process trigger: ${res.error}`, { trigger, compiledWorkflowId: compiledWorkflow.id, res });
} else {
if (res.data && typeof res.data === "object" && "scheduledCallback" in res.data && res.data.scheduledCallback && typeof res.data.scheduledCallback === "object") {
const scheduledCallback: any = res.data.scheduledCallback;
const callbackId = `${scheduledCallback.callbackId}`;
const scheduleAt = new Date(scheduledCallback.scheduleAtMillis);
const callbackData = scheduledCallback.data;
await createScheduledTrigger(
tenancy,
compiledWorkflow.id,
{
type: "callback",
callbackId,
data: callbackData,
scheduledAtMillis: scheduleAt.getTime(),
callerTriggerId: triggerId,
executionId: trigger.executionId,
const toTrigger = await retryTransaction(prisma, async (tx) => {
const triggers = await tx.workflowTrigger.findMany({
where: {
tenancyId: tenancy.id,
scheduledAt: { lt: new Date(Date.now() + 5_000) },
},
include: {
execution: true,
},
orderBy: {
scheduledAt: "asc",
},
// let's take multiple triggers so we can catch up on the backlog, in case some triggers never went through (eg. if the queue was down)
// however, to prevent deadlocks as we are doing multiple writes in this transaction, we randomize it (so there's
// a chance that we only take one trigger, which would never deadlock)
take: Math.floor(1 + Math.random() * 3),
});
const toTrigger = [];
for (const trigger of triggers) {
const compiledWorkflow = compiledWorkflows.get(trigger.execution.workflowId);
const updatedTrigger = await tx.workflowTrigger.update({
where: {
tenancyId_id: {
tenancyId: tenancy.id,
id: trigger.id,
},
},
scheduleAt
);
data: {
scheduledAt: null,
compiledWorkflowId: compiledWorkflow?.id ?? null,
output: Prisma.DbNull,
error: Prisma.DbNull,
},
include: {
execution: true,
},
});
if (compiledWorkflow) {
toTrigger.push(updatedTrigger);
} else {
// the workflow was deleted; we don't run the trigger, but we still mark it in the DB
}
}
}
await prisma.workflowTrigger.update({
where: {
tenancyId_id: {
tenancyId: tenancy.id,
id: triggerId,
},
},
data: {
...res.status === "ok" ? {
output: res.data as any,
} : {
error: res.error,
},
},
});
return Result.ok(undefined);
}
return toTrigger;
}, { level: "serializable" });
export async function triggerScheduledCallbacks(tenancy: Tenancy) {
await allPromisesAndWaitUntilEach(toTrigger.map(async (trigger) => {
const compiledWorkflow = compiledWorkflows.get(trigger.execution.workflowId) ?? throwErr(`Compiled workflow ${trigger.execution.workflowId} not found in trigger execution; this should not happen because we should've already checked for this in the transaction!`);
if (compiledWorkflow.compiledCode === null) {
return Result.error(`Workflow ${compiledWorkflow.id} failed to compile: ${compiledWorkflow.compileError}`);
}
const res = await triggerWorkflowRaw(tenancy, compiledWorkflow.compiledCode, trigger.triggerData as WorkflowTrigger);
if (res.status === "error") {
// This is probably fine and just a user error, but let's log it regardless
console.log(`Compiled workflow failed to process trigger: ${res.error}`, { trigger, compiledWorkflowId: compiledWorkflow.id, res });
} else {
if (res.data && typeof res.data === "object" && "scheduledCallback" in res.data && res.data.scheduledCallback && typeof res.data.scheduledCallback === "object") {
const scheduledCallback: any = res.data.scheduledCallback;
const callbackId = `${scheduledCallback.callbackId}`;
const scheduleAt = new Date(scheduledCallback.scheduleAtMillis);
const callbackData = scheduledCallback.data;
await createScheduledTrigger(
tenancy,
compiledWorkflow.id,
{
type: "callback",
callbackId,
data: callbackData,
scheduledAtMillis: scheduleAt.getTime(),
callerTriggerId: trigger.id,
executionId: trigger.executionId,
},
scheduleAt
);
}
}
const prisma = await getPrismaClientForTenancy(tenancy);
await prisma.workflowTrigger.update({
where: {
tenancyId_id: {
tenancyId: tenancy.id,
id: trigger.id,
},
},
data: {
...res.status === "ok" ? {
output: res.data as any,
} : {
error: res.error,
},
},
});
return Result.ok(undefined);
}));
}
export async function triggerWorkflows(tenancy: Tenancy, trigger: WorkflowTrigger & { type: WorkflowRegisteredTriggerType }) {
@ -507,9 +553,8 @@ export async function triggerWorkflows(tenancy: Tenancy, trigger: WorkflowTrigge
const promises = [...compiledWorkflows]
.filter(([_, compiledWorkflow]) => compiledWorkflow.registeredTriggers.includes(trigger.type))
.map(async ([workflowId, compiledWorkflow]) => {
const dbTrigger = await createScheduledTrigger(tenancy, workflowId, trigger, new Date());
await triggerWorkflow(tenancy, compiledWorkflow, dbTrigger.id);
await createScheduledTrigger(tenancy, workflowId, trigger, new Date());
});
await Promise.all(promises);
await allPromisesAndWaitUntilEach(promises);
});
}

View File

@ -5,7 +5,7 @@ import { yupResolver } from "@hookform/resolvers/yup";
import { AuthPage, TeamSwitcher, useUser } from "@stackframe/stack";
import { allProviders } from "@stackframe/stack-shared/dist/utils/oauth";
import { runAsynchronouslyWithAlert, wait } from "@stackframe/stack-shared/dist/utils/promises";
import { BrowserFrame, Button, Form, FormControl, FormField, FormItem, FormMessage, Label, Separator, Typography } from "@stackframe/stack-ui";
import { BrowserFrame, Button, Form, FormControl, FormField, FormItem, FormMessage, Separator, Typography } from "@stackframe/stack-ui";
import { useSearchParams } from "next/navigation";
import { useState } from "react";
import { useForm } from "react-hook-form";
@ -47,7 +47,7 @@ export default function PageClient() {
credentialEnabled: form.watch("signInMethods").includes("credential"),
magicLinkEnabled: form.watch("signInMethods").includes("magicLink"),
passkeyEnabled: form.watch("signInMethods").includes("passkey"),
oauthProviders: form.watch('signInMethods').filter((method) => ["google", "github", "microsoft", "spotify"].includes(method)).map(provider => ({ id: provider, type: 'shared' })),
oauthProviders: form.watch('signInMethods').filter((method) => ["google", "github", "microsoft"].includes(method)).map(provider => ({ id: provider, type: 'shared' })),
}
};
@ -133,7 +133,6 @@ export default function PageClient() {
{ value: "google", label: "Google" },
{ value: "github", label: "GitHub" },
{ value: "microsoft", label: "Microsoft" },
{ value: "spotify", label: "Spotify" },
]}
info="More sign-in methods are available on the dashboard later."
/>

View File

@ -22,6 +22,7 @@ export default function WorkflowDetailPage() {
const workflow = workflowId in availableWorkflows ? availableWorkflows[workflowId] : undefined;
const [workflowContent, setWorkflowContent] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [isToggling, setIsToggling] = useState(false);
useEffect(() => {
if (workflow && workflow.tsSource) {
@ -47,6 +48,21 @@ export default function WorkflowDetailPage() {
router.push(`/projects/${projectId}/workflows`);
};
const handleToggleEnabled = async () => {
if (!workflow) return;
setIsToggling(true);
try {
await project.updateConfig({
[`workflows.availableWorkflows.${workflowId}.enabled`]: !workflow.enabled,
});
toast({ title: workflow.enabled ? "Workflow disabled" : "Workflow enabled" });
} catch (error) {
toast({ title: "Failed to toggle workflow", variant: "destructive" });
} finally {
setIsToggling(false);
}
};
if (workflow === undefined) {
return (
<PageLayout title="Workflow Not Found">
@ -70,6 +86,9 @@ export default function WorkflowDetailPage() {
<ArrowLeft className="h-4 w-4 mr-2" />
Back
</Button>
<Button onClick={handleToggleEnabled} size="sm" variant={workflow.enabled ? "outline" : "default"} disabled={isToggling}>
{workflow.enabled ? "Disable" : "Enable"}
</Button>
<Button onClick={handleSave} size="sm" disabled={isLoading}>
<Save className="h-4 w-4 mr-2" />
{isLoading ? "Saving..." : "Save"}

View File

@ -69,7 +69,7 @@ function CreateWorkflowDialog({
}: {
open: boolean,
onOpenChange: (open: boolean) => void,
onSave: (id: string, displayName: string, tsSource: string) => Promise<void>,
onSave: (id: string, displayName: string, tsSource: string, enabled: boolean) => Promise<void>,
}) {
const [displayName, setDisplayName] = useState("");
const [isSubmitting, setIsSubmitting] = useState(false);
@ -95,7 +95,7 @@ function CreateWorkflowDialog({
registerCallback("in-7-days", async (data) => {
await stackApp.sendEmail({ userIds: [data.userId], subject: "Welcome to the app!", html: "<p>Example email</p>" });
});
`);
`, false);
onOpenChange(false);
setDisplayName("");
} finally {
@ -151,12 +151,12 @@ export default function WorkflowsPage() {
setShowCreateDialog(true);
};
const handleSaveWorkflow = async (id: string, displayName: string, tsSource: string) => {
const handleSaveWorkflow = async (id: string, displayName: string, tsSource: string, enabled: boolean) => {
await project.updateConfig({
[`workflows.availableWorkflows.${id}`]: {
displayName,
tsSource,
enabled: true
enabled,
}
});
toast({ title: "Workflow created successfully" });

View File

@ -119,6 +119,9 @@
<li>
8124: LocalStack Gateway (AWS mock)
</li>
<li>
8125: QStash mock
</li>
<li>
8150-8199: Reserved for LocalStack (external services)
</li>

View File

@ -28,22 +28,22 @@ async function configureEmailAndWorkflow(workflowId: string, tsSource: string, e
}
async function waitForMailboxSubject(mailbox: Mailbox, subject: string) {
for (let i = 0; i < 10; i++) {
for (let i = 0; i < 15; i++) {
const messages = await mailbox.fetchMessages();
const message = messages.find((m) => m.subject === subject);
if (message) return;
await wait(1_000);
}
throw new Error(`Message with subject ${subject} not found after 10 tries`);
throw new Error(`Message with subject ${subject} not found after 15 tries`);
}
async function waitForServerMetadataNotNull(userId: string, key: string) {
for (let i = 0; i < 10; i++) {
for (let i = 0; i < 15; i++) {
const user = await niceBackendFetch(`/api/v1/users/${userId}`, { accessType: "server" });
if (user.body.server_metadata?.[key]) return;
await wait(1_000);
}
throw new Error(`Server metadata for user ${userId} with key ${key} not found after 10 tries`);
throw new Error(`Server metadata for user ${userId} with key ${key} not found after 15 tries`);
}
test("onSignUp workflow sends email for client sign-up", async ({ expect }) => {
@ -156,7 +156,7 @@ test("disabled workflows do not trigger", async ({ expect }) => {
await Auth.Password.signUpWithEmail({ password: "password" });
await wait(12_000);
await wait(18_000);
expect(await mailbox.fetchMessages()).toMatchInlineSnapshot(`
[
@ -244,7 +244,7 @@ test("anonymous sign-up does not trigger; upgrade triggers workflow", async ({ e
const { userId: anonUserId } = await Auth.Anonymous.signUp();
// ensure marker not present yet
await wait(16_000);
await wait(18_000);
const me1 = await niceBackendFetch("/api/v1/users/me", { accessType: "client" });
expect(me1.body.server_metadata?.[markerKey]).toBeUndefined();

View File

@ -177,7 +177,7 @@ services:
ports:
- "8122:8080" # POST http://localhost:8119/execute/v1/script
extra_hosts:
- "host.docker.internal:host-gateway" # noop on Docker Desktop/Orbstack enables host.docker.internal on Linux
- "host.docker.internal:host-gateway" # noop on Docker Desktop/Orbstack, enables host.docker.internal on Linux
environment:
DENO_DIR: /deno-cache
HOST_ON_HOST: host.docker.internal
@ -193,6 +193,18 @@ services:
environment:
- STRIPE_API_KEY=sk_test_1234567890
# ================= QStash =================
qstash:
image: public.ecr.aws/upstash/qstash:latest
ports:
- 8125:8080
command: qstash dev
extra_hosts:
- "host.docker.internal:host-gateway" # noop on Docker Desktop/Orbstack, enables host.docker.internal on Linux
environment:
HOST_ON_HOST: host.docker.internal
# ================= volumes =================

View File

@ -177,6 +177,9 @@ importers:
'@stackframe/stack-shared':
specifier: workspace:*
version: link:../../packages/stack-shared
'@upstash/qstash':
specifier: ^2.8.2
version: 2.8.2
'@vercel/functions':
specifier: ^2.0.0
version: 2.0.0(@aws-sdk/credential-provider-web-identity@3.876.0)
@ -7860,6 +7863,9 @@ packages:
'@ungap/structured-clone@1.3.0':
resolution: {integrity: sha512-WmoN8qaIAo7WTYWbAZuG8PYEhn5fkz7dZrqTBZ7dtt//lL2Gwms1IcnQ5yHqjDfX8Ft5j4YzDM23f87zBfDe9g==}
'@upstash/qstash@2.8.2':
resolution: {integrity: sha512-gQMCs2YXmRJWGh28t3fsWuPTzGgVFVRBd4o5QeWM9l3HW8TMNwt1qzv5wtzzSlG7hv1ylEy/PQCznkxGcAwTfw==}
'@urql/core@5.2.0':
resolution: {integrity: sha512-/n0ieD0mvvDnVAXEQgX/7qJiVcvYvNkOHeBvkwtylfjydar123caCXcl58PXFY11oU1oquJocVXHxLAbtv4x1A==}
@ -9112,6 +9118,9 @@ packages:
resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==}
engines: {node: '>= 8'}
crypto-js@4.2.0:
resolution: {integrity: sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==}
crypto-random-string@2.0.0:
resolution: {integrity: sha512-v1plID3y9r/lPhviJ1wrXpLeyUIGAZ2SHNYTEapm7/8A9nLPoyvVp3RK/EPFqn5kEznyWgYZNsRtYYIWbuG8KA==}
engines: {node: '>=8'}
@ -12362,6 +12371,10 @@ packages:
nested-error-stacks@2.0.1:
resolution: {integrity: sha512-SrQrok4CATudVzBS7coSz26QRSmlK9TzzoFbeKfcPBUFPjcQM9Rqvr/DlJkOrwI/0KcgvMub1n1g5Jt9EgRn4A==}
neverthrow@7.2.0:
resolution: {integrity: sha512-iGBUfFB7yPczHHtA8dksKTJ9E8TESNTAx1UQWW6TzMF280vo9jdPYpLUXrMN1BCkPdHFdNG3fxOt2CUad8KhAw==}
engines: {node: '>=18'}
next-intl@3.19.1:
resolution: {integrity: sha512-KlJSomzbB5dNkWBIiSIRaoy5zqwLgHNV5Zw0ULhkHjNnPN7aLFFv2G+VOQKle630sNH2JiKc9nsmi6PM1GdkhA==}
peerDependencies:
@ -23542,6 +23555,12 @@ snapshots:
'@ungap/structured-clone@1.3.0': {}
'@upstash/qstash@2.8.2':
dependencies:
crypto-js: 4.2.0
jose: 5.6.3
neverthrow: 7.2.0
'@urql/core@5.2.0':
dependencies:
'@0no-co/graphql.web': 1.1.2
@ -25036,6 +25055,8 @@ snapshots:
shebang-command: 2.0.0
which: 2.0.2
crypto-js@4.2.0: {}
crypto-random-string@2.0.0: {}
css-select@5.1.0:
@ -29471,6 +29492,8 @@ snapshots:
nested-error-stacks@2.0.1: {}
neverthrow@7.2.0: {}
next-intl@3.19.1(next@14.2.5(@babel/core@7.26.0)(@opentelemetry/api@1.9.0)(react-dom@18.3.1(react@18.2.0))(react@18.2.0))(react@18.3.1):
dependencies:
'@formatjs/intl-localematcher': 0.5.4