Fix negative email queue step deltas

This commit is contained in:
Konstantin Wohlwend 2026-06-27 14:54:24 -07:00
parent 9c438fa604
commit c6d162e5c8
2 changed files with 66 additions and 20 deletions

View File

@ -1,10 +1,48 @@
import { EmailOutboxCreatedWith } from "@/generated/prisma/client";
import { globalPrismaClient } from "@/prisma-client";
import { afterAll, describe, expect, it } from "vitest";
import { afterAll, describe, expect, it, vi } from "vitest";
import { _forTesting } from "./email-queue-step";
import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch } from "./tenancies";
const { failEmailsStuckInSending, STUCK_EMAIL_TIMEOUT_MS } = _forTesting;
const { failEmailsStuckInSending, STUCK_EMAIL_TIMEOUT_MS, updateLastExecutionTime } = _forTesting;
describe.sequential("updateLastExecutionTime", () => {
const metadataKeys: string[] = [];
afterAll(async () => {
await globalPrismaClient.emailOutboxProcessingMetadata.deleteMany({
where: { key: { in: metadataKeys } },
});
});
it("does not move lastExecutedAt backwards when the stored timestamp is ahead", async () => {
const key = `email-queue-step-delta-test-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
metadataKeys.push(key);
const futureTimestamp = new Date(Date.now() + 60_000);
await globalPrismaClient.emailOutboxProcessingMetadata.create({
data: {
key,
lastExecutedAt: futureTimestamp,
},
});
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
try {
const delta = await updateLastExecutionTime(key);
expect(delta).toBe(0);
expect(warnSpy).not.toHaveBeenCalled();
const after = await globalPrismaClient.emailOutboxProcessingMetadata.findUniqueOrThrow({
where: { key },
});
expect(after.lastExecutedAt?.toISOString()).toBe(futureTimestamp.toISOString());
} finally {
warnSpy.mockRestore();
}
});
});
// These tests connect to the real dev DB (like payments.test.tsx) and create real EmailOutbox
// rows against the seeded `internal` tenancy. Each row is tagged with a unique tsxSource so we

View File

@ -203,32 +203,40 @@ async function failEmailsStuckInSending(additionalWhere?: Prisma.EmailOutboxWher
export const _forTesting = {
failEmailsStuckInSending,
STUCK_EMAIL_TIMEOUT_MS,
updateLastExecutionTime,
};
async function updateLastExecutionTime(): Promise<number> {
const key = "EMAIL_QUEUE_METADATA_KEY";
async function updateLastExecutionTime(key = "EMAIL_QUEUE_METADATA_KEY"): Promise<number> {
// This query atomically claims the next execution slot and returns the delta.
// It uses FOR UPDATE to lock the row, preventing concurrent workers from reading
// the same previous timestamp. The pattern is:
// It uses FOR UPDATE to lock the row, preventing concurrent workers from reading the
// same previous timestamp. Use clock_timestamp(), not NOW(): NOW() is fixed at the
// transaction start, so a transaction that started earlier but acquired the row lock
// later could otherwise move lastExecutedAt backwards by a few milliseconds.
// The pattern is:
// 1. Try UPDATE first (locks row with FOR UPDATE, returns old and new timestamps)
// 2. If no row exists, INSERT (with ON CONFLICT DO NOTHING for race handling)
// 3. Compute delta based on the result
const [{ delta }] = await globalPrismaClient.$queryRaw<{ delta: number }[]>`
WITH now_ts AS (
SELECT NOW() AS now
),
do_update AS (
WITH do_update AS (
-- Update existing row, locking it first and capturing the old timestamp
UPDATE "EmailOutboxProcessingMetadata" AS m
SET
"updatedAt" = (SELECT now FROM now_ts),
"lastExecutedAt" = (SELECT now FROM now_ts)
"updatedAt" = old.next_timestamp,
"lastExecutedAt" = old.next_timestamp
FROM (
SELECT "key", "lastExecutedAt" AS previous_timestamp
FROM "EmailOutboxProcessingMetadata"
WHERE "key" = ${key}
FOR UPDATE
SELECT
locked."key",
locked."lastExecutedAt" AS previous_timestamp,
GREATEST(locked.observed_timestamp, COALESCE(locked."lastExecutedAt", locked.observed_timestamp)) AS next_timestamp
FROM (
SELECT
"key",
"lastExecutedAt",
clock_timestamp()::timestamp(3) AS observed_timestamp
FROM "EmailOutboxProcessingMetadata"
WHERE "key" = ${key}
FOR UPDATE
) AS locked
) AS old
WHERE m."key" = old."key"
RETURNING old.previous_timestamp, m."lastExecutedAt" AS new_timestamp
@ -236,7 +244,8 @@ async function updateLastExecutionTime(): Promise<number> {
do_insert AS (
-- Insert new row if no existing row was updated
INSERT INTO "EmailOutboxProcessingMetadata" ("key", "lastExecutedAt", "updatedAt")
SELECT ${key}, (SELECT now FROM now_ts), (SELECT now FROM now_ts)
SELECT ${key}, observed_timestamp, observed_timestamp
FROM (SELECT clock_timestamp()::timestamp(3) AS observed_timestamp) AS now_ts
WHERE NOT EXISTS (SELECT 1 FROM do_update)
ON CONFLICT ("key") DO NOTHING
RETURNING NULL::timestamp AS previous_timestamp, "lastExecutedAt" AS new_timestamp
@ -261,8 +270,7 @@ async function updateLastExecutionTime(): Promise<number> {
`;
if (delta < 0) {
// TODO: why does this happen, actually? investigate.
console.warn("Email queue step delta is negative. Not sure why it happened. Ignoring the delta. TODO investigate", { delta });
console.warn("Email queue step delta is negative after monotonic timestamp update; ignoring the delta so the send quota cannot go negative", { delta });
return 0;
}