From 2dc2c2fe84fad14fd79dd93473854e5d88e5ee89 Mon Sep 17 00:00:00 2001 From: Konstantin Wohlwend Date: Wed, 3 Sep 2025 09:47:42 -0700 Subject: [PATCH 01/15] Data vault ARN --- apps/backend/.env | 1 + packages/stack-shared/package.json | 1 + .../stack-shared/src/helpers/vault/server-side.ts | 11 ++++++++--- packages/stack-shared/src/schema-fields.ts | 4 ++-- pnpm-lock.yaml | 3 +++ 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/apps/backend/.env b/apps/backend/.env index 6be4449b2..5dc658214 100644 --- a/apps/backend/.env +++ b/apps/backend/.env @@ -63,6 +63,7 @@ STACK_AWS_REGION= STACK_AWS_KMS_ENDPOINT= STACK_AWS_ACCESS_KEY_ID= STACK_AWS_SECRET_ACCESS_KEY= +STACK_AWS_VERCEL_OIDC_ROLE_ARN= diff --git a/packages/stack-shared/package.json b/packages/stack-shared/package.json index 15744a257..e265ae44e 100644 --- a/packages/stack-shared/package.json +++ b/packages/stack-shared/package.json @@ -56,6 +56,7 @@ "@aws-sdk/client-kms": "^3.876.0", "@opentelemetry/api": "^1.9.0", "@simplewebauthn/browser": "^11.0.0", + "@vercel/functions": "^2.0.0", "async-mutex": "^0.5.0", "bcryptjs": "^3.0.2", "crc": "^4.3.2", diff --git a/packages/stack-shared/src/helpers/vault/server-side.ts b/packages/stack-shared/src/helpers/vault/server-side.ts index 9185eecef..a1f0b3221 100644 --- a/packages/stack-shared/src/helpers/vault/server-side.ts +++ b/packages/stack-shared/src/helpers/vault/server-side.ts @@ -6,19 +6,24 @@ import { GenerateDataKeyCommand, KMSClient } from "@aws-sdk/client-kms"; +import { awsCredentialsProvider } from '@vercel/functions/oidc'; import { decodeBase64, encodeBase64 } from "../../utils/bytes"; import { decrypt, encrypt } from "../../utils/crypto"; import { getEnvVariable } from "../../utils/env"; import { Result } from "../../utils/results"; + function getKmsClient() { + const roleArn = getEnvVariable("STACK_AWS_VERCEL_OIDC_ROLE_ARN", ""); return new KMSClient({ region: getEnvVariable("STACK_AWS_REGION"), endpoint: getEnvVariable("STACK_AWS_KMS_ENDPOINT"), - credentials: { + credentials: roleArn ? awsCredentialsProvider({ + roleArn, + }) : { accessKeyId: getEnvVariable("STACK_AWS_ACCESS_KEY_ID"), - secretAccessKey: getEnvVariable("STACK_AWS_SECRET_ACCESS_KEY") - } + secretAccessKey: getEnvVariable("STACK_AWS_SECRET_ACCESS_KEY"), + }, }); } diff --git a/packages/stack-shared/src/schema-fields.ts b/packages/stack-shared/src/schema-fields.ts index dae1d7e95..63bb5b08e 100644 --- a/packages/stack-shared/src/schema-fields.ts +++ b/packages/stack-shared/src/schema-fields.ts @@ -1,7 +1,7 @@ import * as yup from "yup"; import { KnownErrors } from "."; import { isBase64 } from "./utils/bytes"; -import { type Currency, type MoneyAmount, SUPPORTED_CURRENCIES } from "./utils/currency-constants"; +import { SUPPORTED_CURRENCIES, type Currency, type MoneyAmount } from "./utils/currency-constants"; import { DayInterval, Interval } from "./utils/dates"; import { StackAssertionError, throwErr } from "./utils/errors"; import { decodeBasicAuthorizationHeader } from "./utils/http"; @@ -423,7 +423,7 @@ export const dayIntervalOrNeverSchema = yupUnion(dayIntervalSchema.defined(), yu * This schema is useful for fields where the user can specify the ID, such as price IDs. It is particularly common * for IDs in the config schema. */ -export const userSpecifiedIdSchema = (idName: `${string}Id`) => yupString().matches(/^[a-zA-Z_][a-zA-Z0-9_-]*$/, `${idName} must start with a letter or underscore and contain only letters, numbers, underscores, and hyphens`); +export const userSpecifiedIdSchema = (idName: `${string}Id`) => yupString().max(63).matches(/^[a-zA-Z_][a-zA-Z0-9_-]*$/, `${idName} must start with a letter or underscore and contain only letters, numbers, underscores, and hyphens`); export const moneyAmountSchema = (currency: Currency) => yupString().test('money-amount', 'Invalid money amount', (value, context) => { if (value == null) return true; const regex = /^([0-9]+)(\.([0-9]+))?$/; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 98e0efa5d..2ac74354d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1461,6 +1461,9 @@ importers: '@types/react-dom': specifier: ^18.2.0 version: 18.3.1 + '@vercel/functions': + specifier: ^2.0.0 + version: 2.0.0(@aws-sdk/credential-provider-web-identity@3.876.0) async-mutex: specifier: ^0.5.0 version: 0.5.0 From 68a44c05fab61ba5e5267356715c690afcb7bab5 Mon Sep 17 00:00:00 2001 From: Konstantin Wohlwend Date: Wed, 3 Sep 2025 10:10:17 -0700 Subject: [PATCH 02/15] Higher workflow test timeouts --- apps/e2e/tests/backend/workflows.test.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/e2e/tests/backend/workflows.test.ts b/apps/e2e/tests/backend/workflows.test.ts index 186abe5bc..e243e114e 100644 --- a/apps/e2e/tests/backend/workflows.test.ts +++ b/apps/e2e/tests/backend/workflows.test.ts @@ -87,6 +87,8 @@ test("onSignUp workflow sends email for client sign-up", async ({ expect }) => { }, ] `); +}, { + timeout: 40_000, }); test("onSignUp workflow sends email for server-created user", async ({ expect }) => { @@ -128,6 +130,8 @@ test("onSignUp workflow sends email for server-created user", async ({ expect }) }, ] `); +}, { + timeout: 40_000, }); test("disabled workflows do not trigger", async ({ expect }) => { @@ -207,6 +211,8 @@ test("compile/runtime errors in one workflow don't block others", async ({ expec }, ] `); +}, { + timeout: 40_000, }); test("anonymous sign-up does not trigger; upgrade triggers workflow", async ({ expect }) => { @@ -229,7 +235,7 @@ test("anonymous sign-up does not trigger; upgrade triggers workflow", async ({ e const { userId: anonUserId } = await Auth.Anonymous.signUp(); // ensure marker not present yet - await wait(12_000); + await wait(16_000); const me1 = await niceBackendFetch("/api/v1/users/me", { accessType: "client" }); expect(me1.body.server_metadata?.[markerKey]).toBeUndefined(); @@ -237,12 +243,12 @@ test("anonymous sign-up does not trigger; upgrade triggers workflow", async ({ e const { userId } = await Auth.Password.signUpWithEmail({ password: "password" }); expect(userId).toEqual(anonUserId); - await wait(12_000); + await wait(16_000); const me2 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me2.body.is_anonymous).toBe(false); expect(me2.body.server_metadata?.[markerKey]).toBe(me2.body.primary_email); }, { - timeout: 40_000, + timeout: 60_000, }); test("workflow source changes take effect for subsequent sign-ups", async ({ expect }) => { @@ -263,7 +269,7 @@ test("workflow source changes take effect for subsequent sign-ups", async ({ exp }); await bumpEmailAddress({ unindexed: true }); await Auth.Password.signUpWithEmail({ password: "password" }); - await wait(12_000); + await wait(16_000); const me1 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me1.body.server_metadata?.[markerKey]).toBe("v1"); @@ -281,9 +287,9 @@ test("workflow source changes take effect for subsequent sign-ups", async ({ exp }); await bumpEmailAddress({ unindexed: true }); await Auth.Password.signUpWithEmail({ password: "password" }); - await wait(12_000); + await wait(16_000); const me2 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me2.body.server_metadata?.[markerKey]).toBe("v2"); }, { - timeout: 40_000, + timeout: 60_000, }); From 0482a078808ce8023a188df1c762e5ac55381f8a Mon Sep 17 00:00:00 2001 From: Konstantin Wohlwend Date: Wed, 3 Sep 2025 10:33:10 -0700 Subject: [PATCH 03/15] Better workflow tests --- apps/backend/src/prisma-client.tsx | 18 ++++++++++++------ apps/e2e/tests/backend/workflows.test.ts | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/apps/backend/src/prisma-client.tsx b/apps/backend/src/prisma-client.tsx index 7b138ae38..bfe1f6fba 100644 --- a/apps/backend/src/prisma-client.tsx +++ b/apps/backend/src/prisma-client.tsx @@ -125,6 +125,16 @@ export async function retryTransaction(client: PrismaClient, fn: (tx: PrismaC // serializable transactions are currently off by default, later we may turn them on const enableSerializable = options.level === "serializable"; + const isRetryablePrismaError = (e: unknown) => { + if (e instanceof Prisma.PrismaClientKnownRequestError) { + return [ + "P2028", // Serializable/repeatable read conflict + "P2034", // Transaction already closed (eg. timeout) + ]; + } + return false; + }; + return await traceSpan('Prisma transaction', async (span) => { const res = await Result.retry(async (attemptIndex) => { return await traceSpan(`transaction attempt #${attemptIndex}`, async (attemptSpan) => { @@ -140,7 +150,7 @@ export async function retryTransaction(client: PrismaClient, fn: (tx: PrismaC // to other (nested) transactions failing // however, we make an exception for "Transaction already closed", as those are (annoyingly) thrown on // the actual query, not the $transaction function itself - if (e instanceof Prisma.PrismaClientKnownRequestError && e.code === "P2028") { // Transaction already closed + if (isRetryablePrismaError(e)) { throw new TransactionErrorThatShouldBeRetried(e); } throw new TransactionErrorThatShouldNotBeRetried(e); @@ -165,11 +175,7 @@ export async function retryTransaction(client: PrismaClient, fn: (tx: PrismaC if (e instanceof TransactionErrorThatShouldNotBeRetried) { throw e.cause; } - if ([ - "Transaction failed due to a write conflict or a deadlock. Please retry your transaction", - "Transaction already closed: A commit cannot be executed on an expired transaction. The timeout for this transaction", - ].some(s => e instanceof Prisma.PrismaClientKnownRequestError && e.message.includes(s))) { - // transaction timeout, retry + if (isRetryablePrismaError(e)) { return Result.error(e); } throw e; diff --git a/apps/e2e/tests/backend/workflows.test.ts b/apps/e2e/tests/backend/workflows.test.ts index e243e114e..2bcd2dc25 100644 --- a/apps/e2e/tests/backend/workflows.test.ts +++ b/apps/e2e/tests/backend/workflows.test.ts @@ -37,6 +37,15 @@ async function waitForMailboxSubject(mailbox: Mailbox, subject: string) { throw new Error(`Message with subject ${subject} not found after 10 tries`); } +async function waitForServerMetadataNotNull(userId: string, key: string) { + for (let i = 0; i < 10; i++) { + const user = await niceBackendFetch(`/api/v1/users/${userId}`, { accessType: "server" }); + if (user.body.server_metadata?.[key]) return; + await wait(1_000); + } + throw new Error(`Server metadata for user ${userId} with key ${key} not found after 10 tries`); +} + test("onSignUp workflow sends email for client sign-up", async ({ expect }) => { await Project.createAndSwitch(); const mailbox = await bumpEmailAddress({ unindexed: true }); @@ -243,7 +252,7 @@ test("anonymous sign-up does not trigger; upgrade triggers workflow", async ({ e const { userId } = await Auth.Password.signUpWithEmail({ password: "password" }); expect(userId).toEqual(anonUserId); - await wait(16_000); + await waitForServerMetadataNotNull(anonUserId, markerKey); const me2 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me2.body.is_anonymous).toBe(false); expect(me2.body.server_metadata?.[markerKey]).toBe(me2.body.primary_email); @@ -269,7 +278,7 @@ test("workflow source changes take effect for subsequent sign-ups", async ({ exp }); await bumpEmailAddress({ unindexed: true }); await Auth.Password.signUpWithEmail({ password: "password" }); - await wait(16_000); + await waitForServerMetadataNotNull("me", markerKey); const me1 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me1.body.server_metadata?.[markerKey]).toBe("v1"); @@ -287,7 +296,7 @@ test("workflow source changes take effect for subsequent sign-ups", async ({ exp }); await bumpEmailAddress({ unindexed: true }); await Auth.Password.signUpWithEmail({ password: "password" }); - await wait(16_000); + await waitForServerMetadataNotNull("me", markerKey); const me2 = await niceBackendFetch("/api/v1/users/me", { accessType: "server" }); expect(me2.body.server_metadata?.[markerKey]).toBe("v2"); }, { From faf79e5a9e5fa8ad608e7d5634339743a55d06f0 Mon Sep 17 00:00:00 2001 From: Konstantin Wohlwend Date: Wed, 3 Sep 2025 10:44:52 -0700 Subject: [PATCH 04/15] Speed up Vitest --- vitest.shared.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vitest.shared.ts b/vitest.shared.ts index aa89c7ba6..866efcaa6 100644 --- a/vitest.shared.ts +++ b/vitest.shared.ts @@ -4,6 +4,8 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ plugins: [tsconfigPaths() as any], test: { + pool: 'threads', + maxWorkers: 8, include: ['**/*.test.{js,ts,jsx,tsx}'], includeSource: ['**/*.{js,ts,jsx,tsx}'], }, From 7387f029c0ef423b82a689a37f9e1c8f363f9df3 Mon Sep 17 00:00:00 2001 From: Konstantin Wohlwend Date: Wed, 3 Sep 2025 13:31:40 -0700 Subject: [PATCH 05/15] Workflow queue --- .vscode/settings.json | 1 + apps/backend/.env | 5 + apps/backend/.env.development | 6 + apps/backend/package.json | 1 + apps/backend/prisma/schema.prisma | 1 + .../internal/trigger/run-scheduled/route.tsx | 41 +++++ apps/backend/src/lib/upstash.tsx | 35 ++++ apps/backend/src/lib/workflows.tsx | 169 +++++++++++------- .../new-project/page-client.tsx | 5 +- .../workflows/[workflowId]/page.tsx | 19 ++ .../projects/[projectId]/workflows/page.tsx | 8 +- apps/dev-launchpad/public/index.html | 3 + apps/e2e/tests/backend/workflows.test.ts | 12 +- docker/dependencies/docker.compose.yaml | 14 +- pnpm-lock.yaml | 23 +++ 15 files changed, 267 insertions(+), 76 deletions(-) create mode 100644 apps/backend/src/app/api/latest/internal/trigger/run-scheduled/route.tsx create mode 100644 apps/backend/src/lib/upstash.tsx diff --git a/.vscode/settings.json b/.vscode/settings.json index eda6e4f46..9d0b243f8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -76,6 +76,7 @@ "Proxied", "psql", "qrcode", + "QSTASH", "quetzallabs", "rehype", "reqs", diff --git a/apps/backend/.env b/apps/backend/.env index 5dc658214..c824e0988 100644 --- a/apps/backend/.env +++ b/apps/backend/.env @@ -65,6 +65,11 @@ STACK_AWS_ACCESS_KEY_ID= STACK_AWS_SECRET_ACCESS_KEY= STACK_AWS_VERCEL_OIDC_ROLE_ARN= +# Upstash configuration +STACK_QSTASH_URL= +STACK_QSTASH_TOKEN= +STACK_QSTASH_CURRENT_SIGNING_KEY= +STACK_QSTASH_NEXT_SIGNING_KEY= # Misc, optional diff --git a/apps/backend/.env.development b/apps/backend/.env.development index bebe26f78..42fd65b02 100644 --- a/apps/backend/.env.development +++ b/apps/backend/.env.development @@ -62,3 +62,9 @@ STACK_AWS_REGION=us-east-1 STACK_AWS_KMS_ENDPOINT=http://localhost:8124 STACK_AWS_ACCESS_KEY_ID=test STACK_AWS_SECRET_ACCESS_KEY=test + +# Upstash defaults to one of the pre-build test users of the local emulator +STACK_QSTASH_URL=http://localhost:8125 +STACK_QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= +STACK_QSTASH_CURRENT_SIGNING_KEY=sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r +STACK_QSTASH_NEXT_SIGNING_KEY=sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs diff --git a/apps/backend/package.json b/apps/backend/package.json index 882c49dd3..4da345b09 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -64,6 +64,7 @@ "@sentry/nextjs": "^8.40.0", "@simplewebauthn/server": "^11.0.0", "@stackframe/stack-shared": "workspace:*", + "@upstash/qstash": "^2.8.2", "@vercel/functions": "^2.0.0", "@vercel/otel": "^1.10.4", "ai": "^4.3.17", diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 374da8a1c..67b5bdc86 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -810,6 +810,7 @@ model WorkflowTrigger { // the following fields determine the state of the trigger: // - scheduledAt && !compiledWorkflowId && !output && !error: the trigger is scheduled to be executed + // - !scheduledAt && !compiledWorkflowId: the trigger was scheduled, but its workflow subsequently deleted. The trigger never ran // - !scheduledAt && compiledWorkflowId && !output && !error: the trigger is currently executing // - !scheduledAt && compiledWorkflowId && output && !error: the trigger has successfully completed execution // - !scheduledAt && compiledWorkflowId && !output && error: the trigger has failed execution diff --git a/apps/backend/src/app/api/latest/internal/trigger/run-scheduled/route.tsx b/apps/backend/src/app/api/latest/internal/trigger/run-scheduled/route.tsx new file mode 100644 index 000000000..281f0a429 --- /dev/null +++ b/apps/backend/src/app/api/latest/internal/trigger/run-scheduled/route.tsx @@ -0,0 +1,41 @@ +import { getTenancy } from "@/lib/tenancies"; +import { ensureUpstashSignature } from "@/lib/upstash"; +import { triggerScheduledWorkflows } from "@/lib/workflows"; +import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; +import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields"; +import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; + +export const POST = createSmartRouteHandler({ + metadata: { + hidden: true, + }, + request: yupObject({ + headers: yupObject({ + "upstash-signature": yupTuple([yupString().defined()]).defined(), + }).defined(), + body: yupObject({ + tenancyId: yupString().defined(), + }).defined(), + method: yupString().oneOf(["POST"]).defined(), + }), + response: yupObject({ + statusCode: yupNumber().oneOf([200]).defined(), + bodyType: yupString().oneOf(["success"]).defined(), + }), + handler: async (req, fullReq) => { + await ensureUpstashSignature(fullReq); + + const tenancy = await getTenancy(req.body.tenancyId); + if (!tenancy) { + throw new StackAssertionError(`Tenancy not found for scheduled trigger`, { tenancyId: req.body.tenancyId }); + } + + await triggerScheduledWorkflows(tenancy); + + return { + statusCode: 200, + bodyType: "success", + } as const; + }, +}); + diff --git a/apps/backend/src/lib/upstash.tsx b/apps/backend/src/lib/upstash.tsx new file mode 100644 index 000000000..9744ce7fc --- /dev/null +++ b/apps/backend/src/lib/upstash.tsx @@ -0,0 +1,35 @@ +import { SmartRequest } from "@/route-handlers/smart-request"; +import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env"; +import { StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { Client, Receiver } from "@upstash/qstash"; + +export const upstash = new Client({ + baseUrl: getEnvVariable("STACK_QSTASH_URL", ""), + token: getEnvVariable("STACK_QSTASH_TOKEN", ""), +}); + +export const upstashReceiver = new Receiver({ + currentSigningKey: getEnvVariable("STACK_QSTASH_CURRENT_SIGNING_KEY", ""), + nextSigningKey: getEnvVariable("STACK_QSTASH_NEXT_SIGNING_KEY", ""), +}); + +export async function ensureUpstashSignature(fullReq: SmartRequest): Promise { + const upstashSignature = fullReq.headers["upstash-signature"]?.[0]; + if (!upstashSignature) { + throw new StatusError(400, "upstash-signature header is required"); + } + + const url = new URL(fullReq.url); + if ((getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) && url.hostname === "localhost") { + url.hostname = "host.docker.internal"; + } + + const isValid = await upstashReceiver.verify({ + signature: upstashSignature, + url: url.toString(), + body: new TextDecoder().decode(fullReq.bodyBuffer), + }); + if (!isValid) { + throw new StatusError(400, "Invalid Upstash signature"); + } +} diff --git a/apps/backend/src/lib/workflows.tsx b/apps/backend/src/lib/workflows.tsx index de084ef89..655947fad 100644 --- a/apps/backend/src/lib/workflows.tsx +++ b/apps/backend/src/lib/workflows.tsx @@ -13,6 +13,7 @@ import { Result } from "@stackframe/stack-shared/dist/utils/results"; import { generateUuid } from "@stackframe/stack-shared/dist/utils/uuids"; import { Freestyle } from "./freestyle"; import { Tenancy } from "./tenancies"; +import { upstash } from "./upstash"; const externalPackages = { '@stackframe/stack': 'latest', @@ -431,74 +432,119 @@ async function createScheduledTrigger(tenancy: Tenancy, workflowId: string, trig }, }, }); + + await upstash.publishJSON({ + url: new URL(`/api/v1/internal/trigger/run-scheduled`, getEnvVariable("NEXT_PUBLIC_STACK_API_URL").replace("http://localhost", "http://host.docker.internal")).toString(), + body: { + tenancyId: tenancy.id, + }, + notBefore: Math.floor(scheduledAt.getTime() / 1000), + }); + return dbTrigger; } -async function triggerWorkflow(tenancy: Tenancy, compiledWorkflow: CompiledWorkflow, triggerId: string): Promise> { - if (compiledWorkflow.compiledCode === null) { - return Result.error(`Workflow ${compiledWorkflow.id} failed to compile: ${compiledWorkflow.compileError}`); - } - +export async function triggerScheduledWorkflows(tenancy: Tenancy) { const prisma = await getPrismaClientForTenancy(tenancy); - const trigger = await prisma.workflowTrigger.update({ - where: { - tenancyId_id: { - tenancyId: tenancy.id, - id: triggerId, - }, - }, - data: { - compiledWorkflowId: compiledWorkflow.id, - scheduledAt: null, - output: Prisma.DbNull, - error: Prisma.DbNull, - }, - }); + const compiledWorkflows = await compileAndGetEnabledWorkflows(tenancy); - const res = await triggerWorkflowRaw(tenancy, compiledWorkflow.compiledCode, trigger.triggerData as WorkflowTrigger); - if (res.status === "error") { - console.log(`Compiled workflow failed to process trigger: ${res.error}`, { trigger, compiledWorkflowId: compiledWorkflow.id, res }); - } else { - if (res.data && typeof res.data === "object" && "scheduledCallback" in res.data && res.data.scheduledCallback && typeof res.data.scheduledCallback === "object") { - const scheduledCallback: any = res.data.scheduledCallback; - const callbackId = `${scheduledCallback.callbackId}`; - const scheduleAt = new Date(scheduledCallback.scheduleAtMillis); - const callbackData = scheduledCallback.data; - await createScheduledTrigger( - tenancy, - compiledWorkflow.id, - { - type: "callback", - callbackId, - data: callbackData, - scheduledAtMillis: scheduleAt.getTime(), - callerTriggerId: triggerId, - executionId: trigger.executionId, + const toTrigger = await retryTransaction(prisma, async (tx) => { + const triggers = await tx.workflowTrigger.findMany({ + where: { + tenancyId: tenancy.id, + scheduledAt: { lt: new Date(Date.now() + 5_000) }, + }, + include: { + execution: true, + }, + orderBy: { + scheduledAt: "asc", + }, + // let's take multiple triggers so we can catch up on the backlog, in case some triggers never went through (eg. if the queue was down) + // however, to prevent deadlocks as we are doing multiple writes in this transaction, we randomize it (so there's + // a chance that we only take one trigger, which would never deadlock) + take: Math.floor(1 + Math.random() * 3), + }); + const toTrigger = []; + for (const trigger of triggers) { + const compiledWorkflow = compiledWorkflows.get(trigger.execution.workflowId); + const updatedTrigger = await tx.workflowTrigger.update({ + where: { + tenancyId_id: { + tenancyId: tenancy.id, + id: trigger.id, + }, }, - scheduleAt - ); + data: { + scheduledAt: null, + compiledWorkflowId: compiledWorkflow?.id ?? null, + output: Prisma.DbNull, + error: Prisma.DbNull, + }, + include: { + execution: true, + }, + }); + + if (compiledWorkflow) { + toTrigger.push(updatedTrigger); + } else { + // the workflow was deleted; we don't run the trigger, but we still mark it in the DB + } } - } - await prisma.workflowTrigger.update({ - where: { - tenancyId_id: { - tenancyId: tenancy.id, - id: triggerId, - }, - }, - data: { - ...res.status === "ok" ? { - output: res.data as any, - } : { - error: res.error, - }, - }, - }); - return Result.ok(undefined); -} + return toTrigger; + }, { level: "serializable" }); -export async function triggerScheduledCallbacks(tenancy: Tenancy) { + await allPromisesAndWaitUntilEach(toTrigger.map(async (trigger) => { + const compiledWorkflow = compiledWorkflows.get(trigger.execution.workflowId) ?? throwErr(`Compiled workflow ${trigger.execution.workflowId} not found in trigger execution; this should not happen because we should've already checked for this in the transaction!`); + if (compiledWorkflow.compiledCode === null) { + return Result.error(`Workflow ${compiledWorkflow.id} failed to compile: ${compiledWorkflow.compileError}`); + } + const res = await triggerWorkflowRaw(tenancy, compiledWorkflow.compiledCode, trigger.triggerData as WorkflowTrigger); + if (res.status === "error") { + // This is probably fine and just a user error, but let's log it regardless + console.log(`Compiled workflow failed to process trigger: ${res.error}`, { trigger, compiledWorkflowId: compiledWorkflow.id, res }); + } else { + if (res.data && typeof res.data === "object" && "scheduledCallback" in res.data && res.data.scheduledCallback && typeof res.data.scheduledCallback === "object") { + const scheduledCallback: any = res.data.scheduledCallback; + const callbackId = `${scheduledCallback.callbackId}`; + const scheduleAt = new Date(scheduledCallback.scheduleAtMillis); + const callbackData = scheduledCallback.data; + await createScheduledTrigger( + tenancy, + compiledWorkflow.id, + { + type: "callback", + callbackId, + data: callbackData, + scheduledAtMillis: scheduleAt.getTime(), + callerTriggerId: trigger.id, + executionId: trigger.executionId, + }, + scheduleAt + ); + } + } + + const prisma = await getPrismaClientForTenancy(tenancy); + await prisma.workflowTrigger.update({ + where: { + tenancyId_id: { + tenancyId: tenancy.id, + id: trigger.id, + }, + }, + data: { + ...res.status === "ok" ? { + output: res.data as any, + } : { + error: res.error, + }, + }, + }); + return Result.ok(undefined); + })); } export async function triggerWorkflows(tenancy: Tenancy, trigger: WorkflowTrigger & { type: WorkflowRegisteredTriggerType }) { @@ -507,9 +553,8 @@ export async function triggerWorkflows(tenancy: Tenancy, trigger: WorkflowTrigge const promises = [...compiledWorkflows] .filter(([_, compiledWorkflow]) => compiledWorkflow.registeredTriggers.includes(trigger.type)) .map(async ([workflowId, compiledWorkflow]) => { - const dbTrigger = await createScheduledTrigger(tenancy, workflowId, trigger, new Date()); - await triggerWorkflow(tenancy, compiledWorkflow, dbTrigger.id); + await createScheduledTrigger(tenancy, workflowId, trigger, new Date()); }); - await Promise.all(promises); + await allPromisesAndWaitUntilEach(promises); }); } diff --git a/apps/dashboard/src/app/(main)/(protected)/(outside-dashboard)/new-project/page-client.tsx b/apps/dashboard/src/app/(main)/(protected)/(outside-dashboard)/new-project/page-client.tsx index ca8309c6a..175c4d0ba 100644 --- a/apps/dashboard/src/app/(main)/(protected)/(outside-dashboard)/new-project/page-client.tsx +++ b/apps/dashboard/src/app/(main)/(protected)/(outside-dashboard)/new-project/page-client.tsx @@ -5,7 +5,7 @@ import { yupResolver } from "@hookform/resolvers/yup"; import { AuthPage, TeamSwitcher, useUser } from "@stackframe/stack"; import { allProviders } from "@stackframe/stack-shared/dist/utils/oauth"; import { runAsynchronouslyWithAlert, wait } from "@stackframe/stack-shared/dist/utils/promises"; -import { BrowserFrame, Button, Form, FormControl, FormField, FormItem, FormMessage, Label, Separator, Typography } from "@stackframe/stack-ui"; +import { BrowserFrame, Button, Form, FormControl, FormField, FormItem, FormMessage, Separator, Typography } from "@stackframe/stack-ui"; import { useSearchParams } from "next/navigation"; import { useState } from "react"; import { useForm } from "react-hook-form"; @@ -47,7 +47,7 @@ export default function PageClient() { credentialEnabled: form.watch("signInMethods").includes("credential"), magicLinkEnabled: form.watch("signInMethods").includes("magicLink"), passkeyEnabled: form.watch("signInMethods").includes("passkey"), - oauthProviders: form.watch('signInMethods').filter((method) => ["google", "github", "microsoft", "spotify"].includes(method)).map(provider => ({ id: provider, type: 'shared' })), + oauthProviders: form.watch('signInMethods').filter((method) => ["google", "github", "microsoft"].includes(method)).map(provider => ({ id: provider, type: 'shared' })), } }; @@ -133,7 +133,6 @@ export default function PageClient() { { value: "google", label: "Google" }, { value: "github", label: "GitHub" }, { value: "microsoft", label: "Microsoft" }, - { value: "spotify", label: "Spotify" }, ]} info="More sign-in methods are available on the dashboard later." /> diff --git a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/workflows/[workflowId]/page.tsx b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/workflows/[workflowId]/page.tsx index 885058b24..9ae82b2e8 100644 --- a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/workflows/[workflowId]/page.tsx +++ b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/workflows/[workflowId]/page.tsx @@ -22,6 +22,7 @@ export default function WorkflowDetailPage() { const workflow = workflowId in availableWorkflows ? availableWorkflows[workflowId] : undefined; const [workflowContent, setWorkflowContent] = useState(""); const [isLoading, setIsLoading] = useState(false); + const [isToggling, setIsToggling] = useState(false); useEffect(() => { if (workflow && workflow.tsSource) { @@ -47,6 +48,21 @@ export default function WorkflowDetailPage() { router.push(`/projects/${projectId}/workflows`); }; + const handleToggleEnabled = async () => { + if (!workflow) return; + setIsToggling(true); + try { + await project.updateConfig({ + [`workflows.availableWorkflows.${workflowId}.enabled`]: !workflow.enabled, + }); + toast({ title: workflow.enabled ? "Workflow disabled" : "Workflow enabled" }); + } catch (error) { + toast({ title: "Failed to toggle workflow", variant: "destructive" }); + } finally { + setIsToggling(false); + } + }; + if (workflow === undefined) { return ( @@ -70,6 +86,9 @@ export default function WorkflowDetailPage() { Back +