fix(payments): race with webhook inserts

Of course, if we get two events at the same time they will be processed but we cant change that.
However, two events in near succession should now be dropped
This commit is contained in:
nams1570 2026-06-23 22:22:37 -07:00
parent 3df80cf5fc
commit 59456a36e8
2 changed files with 61 additions and 35 deletions

View File

@ -38,16 +38,16 @@ describe("stripe webhook event idempotency (real DB)", () => {
expect(row?.payload).toMatchObject({ id: event.id, type: event.type });
});
it("allows reprocessing while the prior delivery is still PENDING", async ({ expect }) => {
it("skips a redelivery while the prior delivery is still in-flight (PENDING)", async ({ expect }) => {
const event = makeEvent();
const first = await claimStripeEvent(event);
expect(first.shouldProcess).toBe(true);
// Redelivery before the background work finished: we must NOT skip, otherwise
// a crash between claim and processing would silently drop the event forever.
// Single-flight: a redelivery that arrives while the first attempt is still
// PENDING must not spin up a second processor (that would double the fan-out).
const second = await claimStripeEvent(event);
expect(second.shouldProcess).toBe(true);
expect(second.shouldProcess).toBe(false);
});
it("deduplicates once the event has been fully PROCESSED", async ({ expect }) => {
@ -83,5 +83,40 @@ describe("stripe webhook event idempotency (real DB)", () => {
// FAILED rows must reprocess so a manual Stripe "Resend" can recover them.
const recovery = await claimStripeEvent(event);
expect(recovery.shouldProcess).toBe(true);
// ...but reclaiming a FAILED row flips it back to in-flight (PENDING), so a
// further redelivery during that retry is once again skipped (single-flight).
const concurrentRetry = await claimStripeEvent(event);
expect(concurrentRetry.shouldProcess).toBe(false);
});
it("scrubs a stale processedAt when a row leaves the PROCESSED state", async ({ expect }) => {
const event = makeEvent();
await claimStripeEvent(event);
await markStripeEventProcessed(event.id);
// markStripeEventFailed must clear processedAt so a recovered/re-failed row is
// never readable as "completed at <time>".
await markStripeEventFailed(event.id, new Error("late failure after success"));
const failedRow = await globalPrismaClient.stripeWebhookEvent.findUnique({
where: { stripeEventId: event.id },
});
expect(failedRow?.status).toBe(StripeWebhookEventStatus.FAILED);
expect(failedRow?.processedAt).toBeNull();
// Force a stale processedAt on a FAILED row, then prove the FAILED -> PENDING
// recovery transition scrubs it too.
await globalPrismaClient.stripeWebhookEvent.update({
where: { stripeEventId: event.id },
data: { processedAt: new Date() },
});
const recovery = await claimStripeEvent(event);
expect(recovery.shouldProcess).toBe(true);
const recoveredRow = await globalPrismaClient.stripeWebhookEvent.findUnique({
where: { stripeEventId: event.id },
});
expect(recoveredRow?.status).toBe(StripeWebhookEventStatus.PENDING);
expect(recoveredRow?.processedAt).toBeNull();
});
});

View File

@ -1,6 +1,7 @@
import { Prisma, StripeWebhookEventStatus } from "@/generated/prisma/client";
import { randomUUID } from "node:crypto";
import { StripeWebhookEventStatus } from "@/generated/prisma/client";
import { globalPrismaClient } from "@/prisma-client";
import { errorToNiceString, HexclaveAssertionError } from "@hexclave/shared/dist/utils/errors";
import { errorToNiceString } from "@hexclave/shared/dist/utils/errors";
import type Stripe from "stripe";
/**
@ -14,37 +15,26 @@ import type Stripe from "stripe";
*/
/**
* Records the event (or detects a prior one) and decides whether processing
* should run. Returns `shouldProcess: false` only when the event has already
* been fully PROCESSED PENDING/FAILED rows are allowed to reprocess so that a
* manual Stripe "Resend" can recover a dropped/failed event.
* Atomically claims an event for processing, guaranteeing single-flight: at most
* one worker processes a given event at a time. Returns `shouldProcess: true`
* only to the caller that won the claim.
*
* A new event is inserted as PENDING. On a redelivery, the claim is only handed
* out again if the previous attempt FAILED (the `WHERE status = 'FAILED'` makes
* the takeover conditional and atomic, so concurrent redeliveries can't both
* win). PENDING (in-flight) and PROCESSED (done) rows yield no row and are
* skipped. A PENDING row whose worker died is recovered manually from `payload`.
*/
export async function claimStripeEvent(event: Stripe.Event): Promise<{ shouldProcess: boolean }> {
try {
await globalPrismaClient.stripeWebhookEvent.create({
data: {
stripeEventId: event.id,
eventType: event.type,
stripeAccountId: event.account ?? null,
payload: JSON.parse(JSON.stringify(event)) as Prisma.InputJsonValue,
status: StripeWebhookEventStatus.PENDING,
},
});
return { shouldProcess: true };
} catch (error) {
// Unique violation on stripeEventId => we've seen this event before.
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
const existing = await globalPrismaClient.stripeWebhookEvent.findUnique({
where: { stripeEventId: event.id },
});
return { shouldProcess: existing?.status !== StripeWebhookEventStatus.PROCESSED };
}
throw new HexclaveAssertionError(
`Failed to claim Stripe webhook event for idempotency: ${event.id}`,
{ cause: error, stripeEventId: event.id, eventType: event.type },
);
}
const claimed = await globalPrismaClient.$queryRaw<{ id: string }[]>`
INSERT INTO "StripeWebhookEvent" ("id", "stripeEventId", "eventType", "stripeAccountId", "payload", "status", "updatedAt")
VALUES (${randomUUID()}::uuid, ${event.id}, ${event.type}, ${event.account ?? null}, ${JSON.stringify(event)}::jsonb, 'PENDING', now())
ON CONFLICT ("stripeEventId") DO UPDATE
SET "status" = 'PENDING', "lastError" = NULL, "processedAt" = NULL, "updatedAt" = now()
WHERE "StripeWebhookEvent"."status" = 'FAILED'
RETURNING "id"
`;
return { shouldProcess: claimed.length === 1 };
}
export async function markStripeEventProcessed(stripeEventId: string): Promise<void> {
@ -64,6 +54,7 @@ export async function markStripeEventFailed(stripeEventId: string, error: unknow
data: {
status: StripeWebhookEventStatus.FAILED,
lastError: errorToNiceString(error),
processedAt: null,
},
});
}