diff --git a/apps/backend/package.json b/apps/backend/package.json index 69466c868..27aaf2735 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -1,6 +1,6 @@ { "name": "@hexclave/backend", - "version": "1.0.28", + "version": "1.0.30", "repository": "https://github.com/hexclave/hexclave", "private": true, "type": "module", diff --git a/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql new file mode 100644 index 000000000..151ff73c0 --- /dev/null +++ b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/migration.sql @@ -0,0 +1,21 @@ +-- CreateEnum +CREATE TYPE "StripeWebhookEventStatus" AS ENUM ('PENDING', 'PROCESSED', 'FAILED'); + +-- CreateTable +CREATE TABLE "StripeWebhookEvent" ( + "id" UUID NOT NULL, + "stripeEventId" TEXT NOT NULL, + "eventType" TEXT NOT NULL, + "stripeAccountId" TEXT, + "payload" JSONB NOT NULL, + "status" "StripeWebhookEventStatus" NOT NULL DEFAULT 'PENDING', + "lastError" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "processedAt" TIMESTAMP(3), + + CONSTRAINT "StripeWebhookEvent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "StripeWebhookEvent_stripeEventId_key" ON "StripeWebhookEvent"("stripeEventId"); diff --git a/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts new file mode 100644 index 000000000..8bca6123e --- /dev/null +++ b/apps/backend/prisma/migrations/20260622000000_add_stripe_webhook_event/tests/unique-stripe-event-id.ts @@ -0,0 +1,60 @@ +import { randomUUID } from "crypto"; +import type { Sql } from "postgres"; +import { expect } from "vitest"; + +export const preMigration = async (sql: Sql) => { + // Table does not exist before the migration, so nothing to seed. + return {}; +}; + +export const postMigration = async (sql: Sql) => { + const tables = await sql<{ table_name: string }[]>` + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'StripeWebhookEvent' + `; + expect(Array.from(tables)).toMatchInlineSnapshot(` + [ + { + "table_name": "StripeWebhookEvent", + }, + ] + `); + + const eventId = `evt_${randomUUID()}`; + + await sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "updatedAt") + VALUES (${randomUUID()}::uuid, ${eventId}, 'invoice.payment_succeeded', '{"id":"evt"}'::jsonb, NOW()) + `; + + // Status defaults to PENDING. + const inserted = await sql` + SELECT "status" FROM "StripeWebhookEvent" WHERE "stripeEventId" = ${eventId} + `; + expect(Array.from(inserted)).toMatchInlineSnapshot(` + [ + { + "status": "PENDING", + }, + ] + `); + + // The same Stripe event id cannot be inserted twice (idempotency guarantee). + await expect(sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "updatedAt") + VALUES (${randomUUID()}::uuid, ${eventId}, 'invoice.payment_succeeded', '{"id":"evt2"}'::jsonb, NOW()) + `).rejects.toThrow(/StripeWebhookEvent_stripeEventId_key/); + + // A different event id is fine, and the status enum rejects invalid values. + await sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "status", "updatedAt") + VALUES (${randomUUID()}::uuid, ${`evt_${randomUUID()}`}, 'invoice.paid', '{}'::jsonb, 'PROCESSED', NOW()) + `; + + await expect(sql` + INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "payload", "status", "updatedAt") + VALUES (${randomUUID()}::uuid, ${`evt_${randomUUID()}`}, 'invoice.paid', '{}'::jsonb, 'NOT_A_STATUS', NOW()) + `).rejects.toThrow(); +}; diff --git a/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql new file mode 100644 index 000000000..cebc53f24 --- /dev/null +++ b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql @@ -0,0 +1,11 @@ +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_tenancyId_startedSendingAt_idx" + ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("tenancyId", "startedSendingAt"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_tenancyId_startedAt_idx" + ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("tenancyId", "startedAt"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 24afecea6..f830b2989 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -407,6 +407,7 @@ model SessionReplay { @@id([tenancyId, id]) @@index([tenancyId, projectUserId, startedAt]) + @@index([tenancyId, startedAt], name: "SessionReplay_tenancyId_startedAt_idx") @@index([tenancyId, lastEventAt]) // index by updatedAt instead of lastEventAt because event timing can be spoofed @@index([tenancyId, refreshTokenId, updatedAt]) @@ -1069,6 +1070,7 @@ model EmailOutbox { @@id([tenancyId, id]) @@index([tenancyId, finishedSendingAt(sort: Desc), scheduledAtIfNotYetQueued(sort: Desc), priority, id], map: "EmailOutbox_ordering_idx") + @@index([tenancyId, startedSendingAt], name: "EmailOutbox_tenancyId_startedSendingAt_idx") @@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx") @@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx") @@index([isQueued], map: "EmailOutbox_isQueued_idx") @@ -1427,6 +1429,32 @@ model OutgoingRequest { @@index([startedFulfillingAt, deduplicationKey]) } +enum StripeWebhookEventStatus { + PENDING + PROCESSED + FAILED +} + +// Idempotency + recovery log for incoming Stripe webhook events. Each event is +// persisted synchronously (keyed on the Stripe `event.id`) before we ack 200 to +// Stripe, so redeliveries are deduped and the full `payload` of any +// PENDING/FAILED row can be replayed manually. Not tenancy-scoped: the Stripe +// account -> tenancy resolution happens during processing, not here. +model StripeWebhookEvent { + id String @id @default(uuid()) @db.Uuid + + stripeEventId String @unique + eventType String + stripeAccountId String? + payload Json + status StripeWebhookEventStatus @default(PENDING) + lastError String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + processedAt DateTime? +} + // BulldozerStorageEngine is managed externally (see prisma.config.ts // `tables.external`). It's created by migrations and interacted with // via raw SQL — not through the Prisma client. Keeping it out of the diff --git a/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx b/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx index 0388a0e80..7656826eb 100644 --- a/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx +++ b/apps/backend/src/app/api/latest/integrations/stripe/webhooks/route.tsx @@ -1,5 +1,7 @@ import { sendEmailToMany, type EmailOutboxRecipient } from "@/lib/emails"; import { bulldozerWriteOneTimePurchase } from "@/lib/payments/bulldozer-dual-write"; +import { claimStripeEvent, markStripeEventFailed, markStripeEventProcessed } from "@/lib/stripe-webhook-events"; +import { runAsynchronouslyAndWaitUntil } from "@/utils/background-tasks"; import { listPermissions } from "@/lib/permissions"; import { getHexclaveStripe, getStripeForAccount, resolveProductFromStripeMetadata, syncStripeSubscriptions, upsertStripeInvoice } from "@/lib/stripe"; import type { StripeOverridesMap } from "@/lib/stripe-proxy"; @@ -466,13 +468,29 @@ export const POST = createSmartRouteHandler({ throw new StatusError(400, "Invalid stripe-signature header"); } - try { - await processStripeWebhookEvent(event); - } catch (error) { - captureError("stripe-webhook-receiver", error); - throw error; + // Persist the event for idempotency + recovery BEFORE acking. Stripe + // delivers at-least-once + const { shouldProcess } = await claimStripeEvent(event); + if (!shouldProcess) { + return { + statusCode: 200, + bodyType: "json", + body: { received: true, deduplicated: true }, + }; } + // Ack Stripe immediately and process in the background. + // Stripe recommends ACKing ASAP to avoid timeouts and redeliveries + runAsynchronouslyAndWaitUntil(async () => { + try { + await processStripeWebhookEvent(event); + await markStripeEventProcessed(event.id); + } catch (error) { + captureError("stripe-webhook-receiver", error); + await markStripeEventFailed(event.id, error); + } + }); + return { statusCode: 200, bodyType: "json", diff --git a/apps/backend/src/lib/plan-entitlements.test.ts b/apps/backend/src/lib/plan-entitlements.test.ts index aec03993e..7d9f0d50c 100644 --- a/apps/backend/src/lib/plan-entitlements.test.ts +++ b/apps/backend/src/lib/plan-entitlements.test.ts @@ -4,6 +4,8 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { arePlanLimitsEnforced, getBillingTeamId, + getNonAnonymousUserCountForTenancies, + getOwnedProjectAndTenancyIdsForBillingTeam, getOwnedProjectIdsForBillingTeam, getOwnedTenancyIdsForBillingTeam, getTeamWideItemCapacityForTests, @@ -98,6 +100,28 @@ describe("team-wide ownership aggregation", () => { expect(tenancyIds).toEqual(["tenancy-a-main", "tenancy-a-dev", "tenancy-b-main"]); }); + it("lists owned project and tenancy ids from one ownership scope", async () => { + const scope = await getOwnedProjectAndTenancyIdsForBillingTeam("team-1", globalPrisma); + expect(scope).toMatchInlineSnapshot(` + { + "projectIds": [ + "project-a", + "project-b", + ], + "tenancyIds": [ + "tenancy-a-main", + "tenancy-a-dev", + "tenancy-b-main", + ], + } + `); + }); + + it("counts non-anonymous users from already-resolved tenancies", async () => { + const usage = await getNonAnonymousUserCountForTenancies(["tenancy-a-main", "tenancy-b-main"], globalPrisma); + expect(usage).toBe(2); + }); + it("counts only non-anonymous users across all owned tenancies", async () => { const usage = await getTeamWideNonAnonymousUserCount("team-1", globalPrisma); expect(usage).toBe(3); diff --git a/apps/backend/src/lib/plan-entitlements.ts b/apps/backend/src/lib/plan-entitlements.ts index 932ab977c..1b8527061 100644 --- a/apps/backend/src/lib/plan-entitlements.ts +++ b/apps/backend/src/lib/plan-entitlements.ts @@ -39,6 +39,11 @@ type GlobalPrismaLike = { }, }; +type OwnedBillingScope = { + projectIds: string[], + tenancyIds: string[], +}; + type ItemCapacityReaders = { getPrismaForTenancy: (tenancy: Tenancy) => Promise, getItemQuantityForCustomer: (options: { @@ -85,13 +90,16 @@ export async function getOwnedProjectIdsForBillingTeam( return projects.map((project) => project.id); } -export async function getOwnedTenancyIdsForBillingTeam( +export async function getOwnedProjectAndTenancyIdsForBillingTeam( billingTeamId: string, globalPrisma: GlobalPrismaLike = globalPrismaClient, -): Promise { +): Promise { const projectIds = await getOwnedProjectIdsForBillingTeam(billingTeamId, globalPrisma); if (projectIds.length === 0) { - return []; + return { + projectIds, + tenancyIds: [], + }; } const tenancies = await globalPrisma.tenancy.findMany({ where: { @@ -103,16 +111,23 @@ export async function getOwnedTenancyIdsForBillingTeam( id: true, }, }); - return tenancies.map((tenancy) => tenancy.id); + return { + projectIds, + tenancyIds: tenancies.map((tenancy) => tenancy.id), + }; } -export async function getTeamWideNonAnonymousUserCount( +export async function getOwnedTenancyIdsForBillingTeam( billingTeamId: string, globalPrisma: GlobalPrismaLike = globalPrismaClient, +): Promise { + return (await getOwnedProjectAndTenancyIdsForBillingTeam(billingTeamId, globalPrisma)).tenancyIds; +} + +export async function getNonAnonymousUserCountForTenancies( + tenancyIds: string[], + globalPrisma: GlobalPrismaLike = globalPrismaClient, ): Promise { - // Usage metric: how many non-anonymous users are currently consumed by this billing team. - // This is compared against auth user capacity to determine over-limit conditions. - const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma); if (tenancyIds.length === 0) { return 0; } @@ -126,6 +141,16 @@ export async function getTeamWideNonAnonymousUserCount( }); } +export async function getTeamWideNonAnonymousUserCount( + billingTeamId: string, + globalPrisma: GlobalPrismaLike = globalPrismaClient, +): Promise { + // Usage metric: how many non-anonymous users are currently consumed by this billing team. + // This is compared against auth user capacity to determine over-limit conditions. + const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma); + return await getNonAnonymousUserCountForTenancies(tenancyIds, globalPrisma); +} + async function getTeamWideItemCapacity( billingTeamId: string, itemId: string, diff --git a/apps/backend/src/lib/plan-usage.ts b/apps/backend/src/lib/plan-usage.ts index b53c01391..6c28c398e 100644 --- a/apps/backend/src/lib/plan-usage.ts +++ b/apps/backend/src/lib/plan-usage.ts @@ -4,20 +4,24 @@ import { getSubscriptionMapForCustomer } from "@/lib/payments/customer-data"; import { isActiveSubscription } from "@/lib/payments"; import { getBillingTeamId, - getOwnedProjectIdsForBillingTeam, - getOwnedTenancyIdsForBillingTeam, - getTeamWideNonAnonymousUserCount, + getNonAnonymousUserCountForTenancies, + getOwnedProjectAndTenancyIdsForBillingTeam, } from "@/lib/plan-entitlements"; import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch, getTenancy, type Tenancy } from "@/lib/tenancies"; import { getPrismaClientForTenancy, getPrismaSchemaForTenancy, globalPrismaClient, sqlQuoteIdent } from "@/prisma-client"; import { BASE_PLAN_IDS_BY_TIER, ITEM_IDS, PLAN_LIMITS, UNLIMITED, type ItemId, type PlanId } from "@hexclave/shared/dist/plans"; import type { PlanUsageResponse } from "@hexclave/shared/dist/interface/admin-interface"; import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors"; +import { mapWithConcurrency } from "@hexclave/shared/dist/utils/promises"; import type { SubscriptionRow } from "./payments/schema/types"; type PlanUsageKind = PlanUsageResponse["rows"][number]["kind"]; type PlanUsageRow = PlanUsageResponse["rows"][number]; type UsageLimit = number | null; +type TenancyMeteredUsage = { + emails: number, + sessionReplays: number, +}; type UsagePeriod = { start: Date, @@ -46,6 +50,8 @@ const PLAN_LABELS = new Map([ ["growth", "Growth"], ]); +const PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY = 4; + export function getNextPlanId(planId: PlanId): "team" | "growth" | null { if (planId === "free") { return "team"; @@ -202,38 +208,99 @@ async function getOwnerTeamDisplayName(internalTenancy: Tenancy, ownerTeamId: st return team?.displayName ?? throwErr(`Owner team ${ownerTeamId} not found in the internal tenancy`); } -async function countEmailsForTenancy(tenancyId: string, period: UsagePeriod): Promise { - const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting email usage`); - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); - const rows = await prisma.$replica().$queryRaw<[{ count: number }]>` - SELECT COUNT(*)::int AS count - FROM ${sqlQuoteIdent(schema)}."EmailOutbox" - WHERE "tenancyId" = ${tenancy.id}::uuid - AND "startedSendingAt" IS NOT NULL - AND "startedSendingAt" >= ${period.start} - AND "startedSendingAt" < ${period.end} - `; - return Number(rows[0].count); +type TenancyPrismaClient = Awaited>; + +type TenancyMeteredUsageGroup = { + prisma: TenancyPrismaClient, + schema: string, + tenancyIds: string[], +}; + +// Tenancies can route to different source-of-truth databases/schemas, so we can't assume a single +// query covers every tenancy. We group tenancies that share a (client, schema) and run one aggregate +// COUNT per group: the common case (all projects on one database) collapses to a single round trip, +// while multi-database teams fan out to one query per distinct database instead of one per tenancy. +async function groupTenanciesByMeteredUsageSource(tenancyIds: string[]): Promise { + const resolved = await mapWithConcurrency(tenancyIds, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, async (tenancyId) => { + const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting plan usage`); + const [schema, prisma] = await Promise.all([ + getPrismaSchemaForTenancy(tenancy), + getPrismaClientForTenancy(tenancy), + ]); + return { tenancyId: tenancy.id, schema, prisma }; + }); + + const byClient = new Map>(); + for (const { tenancyId, schema, prisma } of resolved) { + let bySchema = byClient.get(prisma); + if (bySchema == null) { + bySchema = new Map(); + byClient.set(prisma, bySchema); + } + const existing = bySchema.get(schema); + if (existing == null) { + bySchema.set(schema, [tenancyId]); + } else { + existing.push(tenancyId); + } + } + + const groups: TenancyMeteredUsageGroup[] = []; + for (const [prisma, bySchema] of byClient) { + for (const [schema, groupTenancyIds] of bySchema) { + groups.push({ prisma, schema, tenancyIds: groupTenancyIds }); + } + } + return groups; } -async function countSessionReplaysForTenancy(tenancyId: string, period: UsagePeriod): Promise { - const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting session replay usage`); - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); - const rows = await prisma.$replica().$queryRaw<[{ count: number }]>` - SELECT COUNT(*)::int AS count - FROM ${sqlQuoteIdent(schema)}."SessionReplay" - WHERE "tenancyId" = ${tenancy.id}::uuid - AND "startedAt" >= ${period.start} - AND "startedAt" < ${period.end} +async function countMeteredUsageForGroup(group: TenancyMeteredUsageGroup, period: UsagePeriod): Promise { + const rows = await group.prisma.$replica().$queryRaw>` + SELECT + ( + SELECT COUNT(*)::int + FROM ${sqlQuoteIdent(group.schema)}."EmailOutbox" + WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[]) + AND "startedSendingAt" IS NOT NULL + AND "startedSendingAt" >= ${period.start} + AND "startedSendingAt" < ${period.end} + ) AS "emails", + ( + SELECT COUNT(*)::int + FROM ${sqlQuoteIdent(group.schema)}."SessionReplay" + WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[]) + AND "startedAt" >= ${period.start} + AND "startedAt" < ${period.end} + ) AS "sessionReplays" `; - return Number(rows[0].count); + const row = rows[0] ?? throwErr(`Missing plan usage count row for metered usage group on schema ${group.schema}`); + return { + emails: Number(row.emails), + sessionReplays: Number(row.sessionReplays), + }; } -async function sumTenancyUsage(tenancyIds: string[], counter: (tenancyId: string) => Promise): Promise { - const counts = await Promise.all(tenancyIds.map(counter)); - return counts.reduce((sum, count) => sum + count, 0); +async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod): Promise { + if (tenancyIds.length === 0) { + return { emails: 0, sessionReplays: 0 }; + } + + const groups = await groupTenanciesByMeteredUsageSource(tenancyIds); + // The group count equals the number of distinct databases (usually 1), so concurrency mostly guards + // the pathological multi-database team rather than the per-tenancy fan-out it used to. + const subtotals = await mapWithConcurrency( + groups, + PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, + (group) => countMeteredUsageForGroup(group, period), + ); + + return subtotals.reduce( + (totals, subtotal) => ({ + emails: totals.emails + subtotal.emails, + sessionReplays: totals.sessionReplays + subtotal.sessionReplays, + }), + { emails: 0, sessionReplays: 0 }, + ); } async function countAnalyticsEventsForProjects(projectIds: string[], period: UsagePeriod): Promise { @@ -336,18 +403,16 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D const planId = resolveActivePlanId(activePlanSubscription); const period = getPlanUsagePeriod(activePlanSubscription, now); - const [ownerTeamDisplayName, ownedProjectIds, ownedTenancyIds, dashboardAdmins, authUsers] = await Promise.all([ + const [ownerTeamDisplayName, ownedScope, dashboardAdmins] = await Promise.all([ getOwnerTeamDisplayName(internalTenancy, ownerTeamId), - getOwnedProjectIdsForBillingTeam(ownerTeamId), - getOwnedTenancyIdsForBillingTeam(ownerTeamId), + getOwnedProjectAndTenancyIdsForBillingTeam(ownerTeamId), countDashboardAdmins(internalTenancy, ownerTeamId, now), - getTeamWideNonAnonymousUserCount(ownerTeamId), ]); - const [emails, analyticsEvents, sessionReplays] = await Promise.all([ - sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countEmailsForTenancy(tenancyId, period)), - countAnalyticsEventsForProjects(ownedProjectIds, period), - sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countSessionReplaysForTenancy(tenancyId, period)), + const [authUsers, meteredUsage, analyticsEvents] = await Promise.all([ + getNonAnonymousUserCountForTenancies(ownedScope.tenancyIds), + sumTenancyMeteredUsage(ownedScope.tenancyIds, period), + countAnalyticsEventsForProjects(ownedScope.projectIds, period), ]); return { @@ -362,9 +427,9 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D planId, dashboardAdmins, authUsers, - emails, + emails: meteredUsage.emails, analyticsEvents, - sessionReplays, + sessionReplays: meteredUsage.sessionReplays, }), }; } diff --git a/apps/backend/src/lib/stripe-webhook-events.test.ts b/apps/backend/src/lib/stripe-webhook-events.test.ts new file mode 100644 index 000000000..5918d01c6 --- /dev/null +++ b/apps/backend/src/lib/stripe-webhook-events.test.ts @@ -0,0 +1,122 @@ +import { randomUUID } from "node:crypto"; +import type Stripe from "stripe"; +import { describe, expect, it } from "vitest"; +import { StripeWebhookEventStatus } from "@/generated/prisma/client"; +import { globalPrismaClient } from "@/prisma-client"; +import { claimStripeEvent, markStripeEventFailed, markStripeEventProcessed } from "./stripe-webhook-events"; + +// Test fixtures only need the fields the helper reads (id/type/account) plus a +// JSON-serializable body. Building a full Stripe.Event is impractical, so we +// cast a minimal object — any drift in the fields we actually use is still +// caught because the helper reads them directly. +function makeEvent(): Stripe.Event { + return { + id: `evt_${randomUUID()}`, + type: "invoice.payment_succeeded", + account: "acct_test_123", + data: { object: { id: "in_test", note: "fixture" } }, + } as unknown as Stripe.Event; +} + +describe("stripe webhook event idempotency (real DB)", () => { + it("claims a brand new event and persists it as PENDING", async ({ expect }) => { + const event = makeEvent(); + + const { shouldProcess } = await claimStripeEvent(event); + expect(shouldProcess).toBe(true); + + const row = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(row).not.toBeNull(); + expect(row?.status).toBe(StripeWebhookEventStatus.PENDING); + expect(row?.eventType).toBe(event.type); + expect(row?.stripeAccountId).toBe(event.account); + expect(row?.processedAt).toBeNull(); + expect(row?.lastError).toBeNull(); + // The full event payload is stored so dropped/failed events can be replayed. + expect(row?.payload).toMatchObject({ id: event.id, type: event.type }); + }); + + it("skips a redelivery while the prior delivery is still in-flight (PENDING)", async ({ expect }) => { + const event = makeEvent(); + + const first = await claimStripeEvent(event); + expect(first.shouldProcess).toBe(true); + + // Single-flight: a redelivery that arrives while the first attempt is still + // PENDING must not spin up a second processor (that would double the fan-out). + const second = await claimStripeEvent(event); + expect(second.shouldProcess).toBe(false); + }); + + it("deduplicates once the event has been fully PROCESSED", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventProcessed(event.id); + + const processedRow = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(processedRow?.status).toBe(StripeWebhookEventStatus.PROCESSED); + expect(processedRow?.processedAt).not.toBeNull(); + expect(processedRow?.lastError).toBeNull(); + + // A Stripe redelivery of an already-processed event must be a no-op. + const redelivery = await claimStripeEvent(event); + expect(redelivery.shouldProcess).toBe(false); + }); + + it("records the error on failure and allows recovery via redelivery", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventFailed(event.id, new Error("boom while processing")); + + const failedRow = await globalPrismaClient.stripeWebhookEvent.findUnique({ + where: { stripeEventId: event.id }, + }); + expect(failedRow?.status).toBe(StripeWebhookEventStatus.FAILED); + expect(failedRow?.lastError).toContain("boom while processing"); + + // FAILED rows must reprocess so a manual Stripe "Resend" can recover them. + const recovery = await claimStripeEvent(event); + expect(recovery.shouldProcess).toBe(true); + + // ...but reclaiming a FAILED row flips it back to in-flight (PENDING), so a + // further redelivery during that retry is once again skipped (single-flight). + const concurrentRetry = await claimStripeEvent(event); + expect(concurrentRetry.shouldProcess).toBe(false); + }); + + it("scrubs a stale processedAt when a row leaves the PROCESSED state", async ({ expect }) => { + const event = makeEvent(); + + await claimStripeEvent(event); + await markStripeEventProcessed(event.id); + + // markStripeEventFailed must clear processedAt so a recovered/re-failed row is + // never readable as "completed at