diff --git a/apps/backend/src/lib/email-queue-step.test.tsx b/apps/backend/src/lib/email-queue-step.test.tsx index bb255e77e..be7172666 100644 --- a/apps/backend/src/lib/email-queue-step.test.tsx +++ b/apps/backend/src/lib/email-queue-step.test.tsx @@ -1,10 +1,48 @@ import { EmailOutboxCreatedWith } from "@/generated/prisma/client"; import { globalPrismaClient } from "@/prisma-client"; -import { afterAll, describe, expect, it } from "vitest"; +import { afterAll, describe, expect, it, vi } from "vitest"; import { _forTesting } from "./email-queue-step"; import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch } from "./tenancies"; -const { failEmailsStuckInSending, STUCK_EMAIL_TIMEOUT_MS } = _forTesting; +const { failEmailsStuckInSending, STUCK_EMAIL_TIMEOUT_MS, updateLastExecutionTime } = _forTesting; + +describe.sequential("updateLastExecutionTime", () => { + const metadataKeys: string[] = []; + + afterAll(async () => { + await globalPrismaClient.emailOutboxProcessingMetadata.deleteMany({ + where: { key: { in: metadataKeys } }, + }); + }); + + it("does not move lastExecutedAt backwards when the stored timestamp is ahead", async () => { + const key = `email-queue-step-delta-test-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + metadataKeys.push(key); + + const futureTimestamp = new Date(Date.now() + 60_000); + await globalPrismaClient.emailOutboxProcessingMetadata.create({ + data: { + key, + lastExecutedAt: futureTimestamp, + }, + }); + + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + try { + const delta = await updateLastExecutionTime(key); + + expect(delta).toBe(0); + expect(warnSpy).not.toHaveBeenCalled(); + + const after = await globalPrismaClient.emailOutboxProcessingMetadata.findUniqueOrThrow({ + where: { key }, + }); + expect(after.lastExecutedAt?.toISOString()).toBe(futureTimestamp.toISOString()); + } finally { + warnSpy.mockRestore(); + } + }); +}); // These tests connect to the real dev DB (like payments.test.tsx) and create real EmailOutbox // rows against the seeded `internal` tenancy. Each row is tagged with a unique tsxSource so we diff --git a/apps/backend/src/lib/email-queue-step.tsx b/apps/backend/src/lib/email-queue-step.tsx index f9584c1cc..3b73e1478 100644 --- a/apps/backend/src/lib/email-queue-step.tsx +++ b/apps/backend/src/lib/email-queue-step.tsx @@ -203,32 +203,40 @@ async function failEmailsStuckInSending(additionalWhere?: Prisma.EmailOutboxWher export const _forTesting = { failEmailsStuckInSending, STUCK_EMAIL_TIMEOUT_MS, + updateLastExecutionTime, }; -async function updateLastExecutionTime(): Promise { - const key = "EMAIL_QUEUE_METADATA_KEY"; - +async function updateLastExecutionTime(key = "EMAIL_QUEUE_METADATA_KEY"): Promise { // This query atomically claims the next execution slot and returns the delta. - // It uses FOR UPDATE to lock the row, preventing concurrent workers from reading - // the same previous timestamp. The pattern is: + // It uses FOR UPDATE to lock the row, preventing concurrent workers from reading the + // same previous timestamp. Use clock_timestamp(), not NOW(): NOW() is fixed at the + // transaction start, so a transaction that started earlier but acquired the row lock + // later could otherwise move lastExecutedAt backwards by a few milliseconds. + // The pattern is: // 1. Try UPDATE first (locks row with FOR UPDATE, returns old and new timestamps) // 2. If no row exists, INSERT (with ON CONFLICT DO NOTHING for race handling) // 3. Compute delta based on the result const [{ delta }] = await globalPrismaClient.$queryRaw<{ delta: number }[]>` - WITH now_ts AS ( - SELECT NOW() AS now - ), - do_update AS ( + WITH do_update AS ( -- Update existing row, locking it first and capturing the old timestamp UPDATE "EmailOutboxProcessingMetadata" AS m SET - "updatedAt" = (SELECT now FROM now_ts), - "lastExecutedAt" = (SELECT now FROM now_ts) + "updatedAt" = old.next_timestamp, + "lastExecutedAt" = old.next_timestamp FROM ( - SELECT "key", "lastExecutedAt" AS previous_timestamp - FROM "EmailOutboxProcessingMetadata" - WHERE "key" = ${key} - FOR UPDATE + SELECT + locked."key", + locked."lastExecutedAt" AS previous_timestamp, + GREATEST(locked.observed_timestamp, COALESCE(locked."lastExecutedAt", locked.observed_timestamp)) AS next_timestamp + FROM ( + SELECT + "key", + "lastExecutedAt", + clock_timestamp()::timestamp(3) AS observed_timestamp + FROM "EmailOutboxProcessingMetadata" + WHERE "key" = ${key} + FOR UPDATE + ) AS locked ) AS old WHERE m."key" = old."key" RETURNING old.previous_timestamp, m."lastExecutedAt" AS new_timestamp @@ -236,7 +244,8 @@ async function updateLastExecutionTime(): Promise { do_insert AS ( -- Insert new row if no existing row was updated INSERT INTO "EmailOutboxProcessingMetadata" ("key", "lastExecutedAt", "updatedAt") - SELECT ${key}, (SELECT now FROM now_ts), (SELECT now FROM now_ts) + SELECT ${key}, observed_timestamp, observed_timestamp + FROM (SELECT clock_timestamp()::timestamp(3) AS observed_timestamp) AS now_ts WHERE NOT EXISTS (SELECT 1 FROM do_update) ON CONFLICT ("key") DO NOTHING RETURNING NULL::timestamp AS previous_timestamp, "lastExecutedAt" AS new_timestamp @@ -261,8 +270,7 @@ async function updateLastExecutionTime(): Promise { `; if (delta < 0) { - // TODO: why does this happen, actually? investigate. - console.warn("Email queue step delta is negative. Not sure why it happened. Ignoring the delta. TODO investigate", { delta }); + console.warn("Email queue step delta is negative after monotonic timestamp update; ignoring the delta so the send quota cannot go negative", { delta }); return 0; }