diff --git a/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql new file mode 100644 index 000000000..151ff73c0 --- /dev/null +++ b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql @@ -0,0 +1,21 @@ +-- CreateEnum +CREATE TYPE "StripeWebhookEventStatus" AS ENUM ('PENDING', 'PROCESSED', 'FAILED'); + +-- CreateTable +CREATE TABLE "StripeWebhookEvent" ( + "id" UUID NOT NULL, + "stripeEventId" TEXT NOT NULL, + "eventType" TEXT NOT NULL, + "stripeAccountId" TEXT, + "payload" JSONB NOT NULL, + "status" "StripeWebhookEventStatus" NOT NULL DEFAULT 'PENDING', + "lastError" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "processedAt" TIMESTAMP(3), + + CONSTRAINT "StripeWebhookEvent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "StripeWebhookEvent_stripeEventId_key" ON "StripeWebhookEvent"("stripeEventId"); diff --git a/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts new file mode 100644 index 000000000..8bca6123e --- /dev/null +++ b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts @@ -0,0 +1,60 @@ +import { randomUUID } from "crypto"; +import type { Sql } from "postgres"; +import { expect } from "vitest"; + +export const preMigration = async (sql: Sql) => { + // Table does not exist before the migration, so nothing to seed. + return {}; +}; + +export const postMigration = async (sql: Sql) => { + const tables = await sql<{ table_name: string }[]>` + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'StripeWebhookEvent' + `; + expect(Array.from(tables)).toMatchInlineSnapshot(` + [ + { + "table_name": "StripeWebhookEvent", + }, + ] + `); + + const eventId = `evt_${randomUUID()}`; + + await sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "updatedAt") + VALUES (${randomUUID()}::uuid, ${eventId}, 'invoice.payment_succeeded', '{"id":"evt"}'::jsonb, NOW()) + `; + + // Status defaults to PENDING. + const inserted = await sql` + SELECT "status" FROM "StripeWebhookEvent" WHERE "stripeEventId" = ${eventId} + `; + expect(Array.from(inserted)).toMatchInlineSnapshot(` + [ + { + "status": "PENDING", + }, + ] + `); + + // The same Stripe event id cannot be inserted twice (idempotency guarantee). + await expect(sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "updatedAt") + VALUES (${randomUUID()}::uuid, ${eventId}, 'invoice.payment_succeeded', '{"id":"evt2"}'::jsonb, NOW()) + `).rejects.toThrow(/StripeWebhookEvent_stripeEventId_key/); + + // A different event id is fine, and the status enum rejects invalid values. + await sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "status", "updatedAt") + VALUES (${randomUUID()}::uuid, ${`evt_${randomUUID()}`}, 'invoice.paid', '{}'::jsonb, 'PROCESSED', NOW()) + `; + + await expect(sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "status", "updatedAt") + VALUES (${randomUUID()}::uuid, ${`evt_${randomUUID()}`}, 'invoice.paid', '{}'::jsonb, 'NOT_A_STATUS', NOW()) + `).rejects.toThrow(); +}; diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 24afecea6..7a9421c90 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -1427,6 +1427,32 @@ model OutgoingRequest { @@index([startedFulfillingAt, deduplicationKey]) } +enum StripeWebhookEventStatus { + PENDING + PROCESSED + FAILED +} + +// Idempotency + recovery log for incoming Stripe webhook events. Each event is +// persisted synchronously (keyed on the Stripe `event.id`) before we ack 200 to +// Stripe, so redeliveries are deduped and the full `payload` of any +// PENDING/FAILED row can be replayed manually. Not tenancy-scoped: the Stripe +// account -> tenancy resolution happens during processing, not here. +model StripeWebhookEvent { + id String @id @default(uuid()) @db.Uuid + + stripeEventId String @unique + eventType String + stripeAccountId String? + payload Json + status StripeWebhookEventStatus @default(PENDING) + lastError String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + processedAt DateTime? +} + // BulldozerStorageEngine is managed externally (see prisma.config.ts // `tables.external`). It's created by migrations and interacted with // via raw SQL — not through the Prisma client. Keeping it out of the diff --git a/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx b/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx index 0388a0e80..7656826eb 100644 --- a/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx +++ b/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx @@ -1,5 +1,7 @@ import { sendEmailToMany, type EmailOutboxRecipient } from "@/lib/emails"; import { bulldozerWriteOneTimePurchase } from "@/lib/payments/bulldozer-dual-write"; +import { claimStripeEvent, markStripeEventFailed, markStripeEventProcessed } from "@/lib/stripe-webhook-events"; +import { runAsynchronouslyAndWaitUntil } from "@/utils/background-tasks"; import { listPermissions } from "@/lib/permissions"; import { getHexclaveStripe, getStripeForAccount, resolveProductFromStripeMetadata, syncStripeSubscriptions, upsertStripeInvoice } from "@/lib/stripe"; import type { StripeOverridesMap } from "@/lib/stripe-proxy"; @@ -466,13 +468,29 @@ export const POST = createSmartRouteHandler({ throw new StatusError(400, "Invalid stripe-signature header"); } - try { - await processStripeWebhookEvent(event); - } catch (error) { - captureError("stripe-webhook-receiver", error); - throw error; + // Persist the event for idempotency + recovery BEFORE acking. Stripe + // delivers at-least-once + const { shouldProcess } = await claimStripeEvent(event); + if (!shouldProcess) { + return { + statusCode: 200, + bodyType: "json", + body: { received: true, deduplicated: true }, + }; } + // Ack Stripe immediately and process in the background. + // Stripe recommends ACKing ASAP to avoid timeouts and redeliveries + runAsynchronouslyAndWaitUntil(async () => { + try { + await processStripeWebhookEvent(event); + await markStripeEventProcessed(event.id); + } catch (error) { + captureError("stripe-webhook-receiver", error); + await markStripeEventFailed(event.id, error); + } + }); + return { statusCode: 200, bodyType: "json", diff --git a/apps/backend/src/lib/stripe-webhook-events.test.ts b/apps/backend/src/lib/stripe-webhook-events.test.ts new file mode 100644 index 000000000..5918d01c6 --- /dev/null +++ b/apps/backend/src/lib/stripe-webhook-events.test.ts @@ -0,0 +1,122 @@ +import { randomUUID } from "node:crypto"; +import type Stripe from "stripe"; +import { describe, expect, it } from "vitest"; +import { StripeWebhookEventStatus } from "@/generated/prisma/client"; +import { globalPrismaClient } from "@/prisma-client"; +import { claimStripeEvent, markStripeEventFailed, markStripeEventProcessed } from "./stripe-webhook-events"; + +// Test fixtures only need the fields the helper reads (id/type/account) plus a +// JSON-serializable body. Building a full Stripe.Event is impractical, so we +// cast a minimal object — any drift in the fields we actually use is still +// caught because the helper reads them directly. +function makeEvent(): Stripe.Event { + return { + id: `evt_${randomUUID()}`, + type: "invoice.payment_succeeded", + account: "acct_test_123", + data: { object: { id: "in_test", note: "fixture" } }, + } as unknown as Stripe.Event; +} + +describe("stripe webhook event idempotency (real DB)", () => { + it("claims a brand new event and persists it as PENDING", async ({ expect }) => { + const event = makeEvent(); + + const { shouldProcess } = await claimStripeEvent(event); + expect(shouldProcess).toBe(true); + + const row = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(row).not.toBeNull(); + expect(row?.status).toBe(StripeWebhookEventStatus.PENDING); + expect(row?.eventType).toBe(event.type); + expect(row?.stripeAccountId).toBe(event.account); + expect(row?.processedAt).toBeNull(); + expect(row?.lastError).toBeNull(); + // The full event payload is stored so dropped/failed events can be replayed. + expect(row?.payload).toMatchObject({ id: event.id, type: event.type }); + }); + + it("skips a redelivery while the prior delivery is still in-flight (PENDING)", async ({ expect }) => { + const event = makeEvent(); + + const first = await claimStripeEvent(event); + expect(first.shouldProcess).toBe(true); + + // Single-flight: a redelivery that arrives while the first attempt is still + // PENDING must not spin up a second processor (that would double the fan-out). + const second = await claimStripeEvent(event); + expect(second.shouldProcess).toBe(false); + }); + + it("deduplicates once the event has been fully PROCESSED", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventProcessed(event.id); + + const processedRow = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(processedRow?.status).toBe(StripeWebhookEventStatus.PROCESSED); + expect(processedRow?.processedAt).not.toBeNull(); + expect(processedRow?.lastError).toBeNull(); + + // A Stripe redelivery of an already-processed event must be a no-op. + const redelivery = await claimStripeEvent(event); + expect(redelivery.shouldProcess).toBe(false); + }); + + it("records the error on failure and allows recovery via redelivery", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventFailed(event.id, new Error("boom while processing")); + + const failedRow = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(failedRow?.status).toBe(StripeWebhookEventStatus.FAILED); + expect(failedRow?.lastError).toContain("boom while processing"); + + // FAILED rows must reprocess so a manual Stripe "Resend" can recover them. + const recovery = await claimStripeEvent(event); + expect(recovery.shouldProcess).toBe(true); + + // ...but reclaiming a FAILED row flips it back to in-flight (PENDING), so a + // further redelivery during that retry is once again skipped (single-flight). + const concurrentRetry = await claimStripeEvent(event); + expect(concurrentRetry.shouldProcess).toBe(false); + }); + + it("scrubs a stale processedAt when a row leaves the PROCESSED state", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventProcessed(event.id); + + // markStripeEventFailed must clear processedAt so a recovered/re-failed row is + // never readable as "completed at