Fix email outbox race condition

This commit is contained in:
Konstantin Wohlwend 2026-01-09 11:13:39 -08:00
parent 66b2e65b93
commit f43c238ced
3 changed files with 164 additions and 121 deletions

View File

@ -1,7 +1,7 @@
import { EmailOutbox, EmailOutboxSkippedReason, Prisma } from "@/generated/prisma/client";
import { EmailOutbox, Prisma } from "@/generated/prisma/client";
import { serializeRecipient } from "@/lib/email-queue-step";
import { EmailOutboxRecipient } from "@/lib/emails";
import { globalPrismaClient } from "@/prisma-client";
import { globalPrismaClient, RawQuery, rawQueryAll } from "@/prisma-client";
import { createCrudHandlers } from "@/route-handlers/crud-handler";
import { KnownErrors } from "@stackframe/stack-shared";
import { emailOutboxCrud, EmailOutboxCrud } from "@stackframe/stack-shared/dist/interface/crud/email-outbox";
@ -353,132 +353,166 @@ export const emailOutboxCrudHandlers = createLazyProxy(() => createCrudHandlers(
throw new StatusError(400, "Email ID is required");
}
const email = await globalPrismaClient.emailOutbox.findUnique({
where: {
tenancyId_id: {
tenancyId: auth.tenancy.id,
id: params.id,
},
},
});
// Build SET clause parts for the SQL update
const sets: Prisma.Sql[] = [];
const set = (col: string, val: Prisma.Sql) => sets.push(Prisma.sql`${Prisma.raw(`"${col}"`)} = ${val}`);
const setNull = (...cols: string[]) => cols.forEach(c => set(c, Prisma.sql`NULL`));
if (!email) {
throw new StatusError(404, "Email not found");
}
// Check if email is in an editable state
if (!EDITABLE_STATUSES.has(email.status)) {
throw new KnownErrors.EmailNotEditable(email.id, email.status);
}
// Handle cancel action
// SKIPPED can now happen at any time, so we just set the skipped reason
if (data.cancel) {
const updateData: Prisma.EmailOutboxUpdateInput = {
// Ensure email is not paused (so status can become SKIPPED, not PAUSED)
isPaused: false,
// Reset isQueued so the email won't be picked up by the queue worker
isQueued: false,
// Set skip reason - this alone will make the status become SKIPPED
skippedReason: EmailOutboxSkippedReason.MANUALLY_CANCELLED,
skippedDetails: {},
};
// Cancel action - mark as skipped
set("isPaused", Prisma.sql`false`);
set("isQueued", Prisma.sql`false`);
set("skippedReason", Prisma.sql`'MANUALLY_CANCELLED'::"EmailOutboxSkippedReason"`);
set("skippedDetails", Prisma.sql`'{}'::jsonb`);
} else {
// Normal update path
let needsRerenderReset = false;
const updated = await globalPrismaClient.emailOutbox.update({
where: {
tenancyId_id: {
tenancyId: auth.tenancy.id,
id: params.id,
},
skippedReason: null,
},
data: updateData,
});
return prismaModelToCrud(updated);
if (data.tsx_source !== undefined) {
set("tsxSource", Prisma.sql`${data.tsx_source}`);
needsRerenderReset = true;
}
if (data.theme_id !== undefined) {
set("themeId", Prisma.sql`${data.theme_id}`);
needsRerenderReset = true;
}
if (data.to !== undefined) {
const serialized = serializeRecipient(apiRecipientToDb(data.to));
set("to", Prisma.sql`${JSON.stringify(serialized)}::jsonb`);
needsRerenderReset = true;
}
if (data.variables !== undefined) {
set("extraRenderVariables", Prisma.sql`${JSON.stringify(data.variables)}::jsonb`);
needsRerenderReset = true;
}
if (data.skip_deliverability_check !== undefined) {
set("shouldSkipDeliverabilityCheck", Prisma.sql`${data.skip_deliverability_check}`);
}
if (data.scheduled_at_millis !== undefined) {
set("scheduledAt", Prisma.sql`${new Date(data.scheduled_at_millis)}`);
set("isQueued", Prisma.sql`false`);
}
if (data.is_paused !== undefined) {
set("isPaused", Prisma.sql`${data.is_paused}`);
}
// If content changed, reset rendering and sending state
if (needsRerenderReset) {
set("isQueued", Prisma.sql`false`);
setNull(
"renderedByWorkerId", "startedRenderingAt", "finishedRenderingAt",
"renderErrorExternalMessage", "renderErrorExternalDetails",
"renderErrorInternalMessage", "renderErrorInternalDetails",
"renderedHtml", "renderedText", "renderedSubject",
"renderedIsTransactional", "renderedNotificationCategoryId",
"startedSendingAt", "finishedSendingAt",
"sendServerErrorExternalMessage", "sendServerErrorExternalDetails",
"sendServerErrorInternalMessage", "sendServerErrorInternalDetails",
"skippedReason", "skippedDetails", "canHaveDeliveryInfo",
"deliveredAt", "deliveryDelayedAt", "bouncedAt",
"openedAt", "clickedAt", "unsubscribedAt", "markedAsSpamAt"
);
}
}
// Build update data
const updateData: Prisma.EmailOutboxUpdateInput = {};
let needsRerenderReset = false;
if (data.tsx_source !== undefined) {
updateData.tsxSource = data.tsx_source;
needsRerenderReset = true;
}
if (data.theme_id !== undefined) {
updateData.themeId = data.theme_id;
needsRerenderReset = true;
}
if (data.to !== undefined) {
// Convert API format (snake_case: user_id) to DB format (camelCase: userId)
const internalRecipient = apiRecipientToDb(data.to);
// serializeRecipient always returns a valid JSON object for valid recipients
updateData.to = serializeRecipient(internalRecipient) as Prisma.InputJsonValue;
needsRerenderReset = true;
}
if (data.variables !== undefined) {
updateData.extraRenderVariables = data.variables as any;
needsRerenderReset = true;
}
if (data.skip_deliverability_check !== undefined) {
updateData.shouldSkipDeliverabilityCheck = data.skip_deliverability_check;
}
if (data.scheduled_at_millis !== undefined) {
updateData.scheduledAt = new Date(data.scheduled_at_millis);
updateData.isQueued = false;
}
if (data.is_paused !== undefined) {
updateData.isPaused = data.is_paused;
// If no fields to update, just touch updatedAt
if (sets.length === 0) {
set("updatedAt", Prisma.sql`NOW()`);
}
// If content changed, reset rendering state
if (needsRerenderReset) {
updateData.renderedByWorkerId = null;
updateData.startedRenderingAt = null;
updateData.finishedRenderingAt = null;
updateData.renderErrorExternalMessage = null;
updateData.renderErrorExternalDetails = Prisma.DbNull;
updateData.renderErrorInternalMessage = null;
updateData.renderErrorInternalDetails = Prisma.DbNull;
updateData.renderedHtml = null;
updateData.renderedText = null;
updateData.renderedSubject = null;
updateData.renderedIsTransactional = null;
updateData.renderedNotificationCategoryId = null;
updateData.isQueued = false;
// Also reset sending state if applicable
updateData.startedSendingAt = null;
updateData.finishedSendingAt = null;
updateData.sendServerErrorExternalMessage = null;
updateData.sendServerErrorExternalDetails = Prisma.DbNull;
updateData.sendServerErrorInternalMessage = null;
updateData.sendServerErrorInternalDetails = Prisma.DbNull;
updateData.skippedReason = null;
updateData.skippedDetails = Prisma.DbNull;
updateData.canHaveDeliveryInfo = null;
updateData.deliveredAt = null;
updateData.deliveryDelayedAt = null;
updateData.bouncedAt = null;
updateData.openedAt = null;
updateData.clickedAt = null;
updateData.unsubscribedAt = null;
updateData.markedAsSpamAt = null;
}
const updated = await globalPrismaClient.emailOutbox.update({
where: {
tenancyId_id: {
tenancyId: auth.tenancy.id,
id: params.id,
},
status: {
in: [...EDITABLE_STATUSES as any],
},
const updateQuery: RawQuery<EmailOutbox | null> = {
supportedPrismaClients: ["global"],
readOnlyQuery: false,
sql: Prisma.sql`
UPDATE "EmailOutbox"
SET ${Prisma.join(sets, ", ")}
WHERE "tenancyId" = ${auth.tenancy.id}::uuid
AND "id" = ${params.id}::uuid
AND "status" = ANY(${[...EDITABLE_STATUSES]}::"EmailOutboxStatus"[])
${data.cancel ? Prisma.sql`AND "skippedReason" IS NULL` : Prisma.empty}
RETURNING *
`,
postProcess: (rows): EmailOutbox | null => {
if (rows.length === 0) return null;
return parseEmailOutboxFromJson(rows[0]);
},
data: updateData,
});
};
return prismaModelToCrud(updated);
const checkQuery: RawQuery<{ id: string, status: string } | null> = {
supportedPrismaClients: ["global"],
readOnlyQuery: true,
sql: Prisma.sql`
SELECT "id", "status" FROM "EmailOutbox"
WHERE "tenancyId" = ${auth.tenancy.id}::uuid AND "id" = ${params.id}::uuid
`,
postProcess: (rows) => rows.length > 0 ? { id: rows[0].id, status: rows[0].status } : null,
};
const { updated, existing } = await rawQueryAll(globalPrismaClient, { updated: updateQuery, existing: checkQuery });
if (updated) return prismaModelToCrud(updated);
if (!existing) throw new StatusError(404, "Email not found");
throw new KnownErrors.EmailNotEditable(existing.id, existing.status);
},
}));
/** Parses row_to_json output back to EmailOutbox with proper Date types */
function parseEmailOutboxFromJson(j: Record<string, unknown>): EmailOutbox {
const date = (k: string) => new Date(j[k] + "Z");
const dateOrNull = (k: string) => j[k] ? date(k) : null;
return {
tenancyId: j.tenancyId as string,
id: j.id as string,
createdAt: date("createdAt"),
updatedAt: date("updatedAt"),
tsxSource: j.tsxSource as string,
themeId: j.themeId as string | null,
isHighPriority: j.isHighPriority as boolean,
to: j.to as Prisma.JsonValue,
extraRenderVariables: j.extraRenderVariables as Prisma.JsonValue,
overrideSubject: j.overrideSubject as string | null,
overrideNotificationCategoryId: j.overrideNotificationCategoryId as string | null,
shouldSkipDeliverabilityCheck: j.shouldSkipDeliverabilityCheck as boolean,
createdWith: j.createdWith as EmailOutbox["createdWith"],
emailDraftId: j.emailDraftId as string | null,
emailProgrammaticCallTemplateId: j.emailProgrammaticCallTemplateId as string | null,
status: j.status as EmailOutbox["status"],
simpleStatus: j.simpleStatus as EmailOutbox["simpleStatus"],
priority: j.priority as number,
isPaused: j.isPaused as boolean,
renderedByWorkerId: j.renderedByWorkerId as string | null,
startedRenderingAt: dateOrNull("startedRenderingAt"),
finishedRenderingAt: dateOrNull("finishedRenderingAt"),
renderErrorExternalMessage: j.renderErrorExternalMessage as string | null,
renderErrorExternalDetails: j.renderErrorExternalDetails as Prisma.JsonValue,
renderErrorInternalMessage: j.renderErrorInternalMessage as string | null,
renderErrorInternalDetails: j.renderErrorInternalDetails as Prisma.JsonValue,
renderedHtml: j.renderedHtml as string | null,
renderedText: j.renderedText as string | null,
renderedSubject: j.renderedSubject as string | null,
renderedIsTransactional: j.renderedIsTransactional as boolean | null,
renderedNotificationCategoryId: j.renderedNotificationCategoryId as string | null,
scheduledAt: date("scheduledAt"),
isQueued: j.isQueued as boolean,
scheduledAtIfNotYetQueued: dateOrNull("scheduledAtIfNotYetQueued"),
startedSendingAt: dateOrNull("startedSendingAt"),
finishedSendingAt: dateOrNull("finishedSendingAt"),
sentAt: dateOrNull("sentAt"),
sendServerErrorExternalMessage: j.sendServerErrorExternalMessage as string | null,
sendServerErrorExternalDetails: j.sendServerErrorExternalDetails as Prisma.JsonValue,
sendServerErrorInternalMessage: j.sendServerErrorInternalMessage as string | null,
sendServerErrorInternalDetails: j.sendServerErrorInternalDetails as Prisma.JsonValue,
skippedReason: j.skippedReason as EmailOutbox["skippedReason"],
skippedDetails: j.skippedDetails as Prisma.JsonValue,
canHaveDeliveryInfo: j.canHaveDeliveryInfo as boolean | null,
deliveredAt: dateOrNull("deliveredAt"),
deliveryDelayedAt: dateOrNull("deliveryDelayedAt"),
bouncedAt: dateOrNull("bouncedAt"),
openedAt: dateOrNull("openedAt"),
clickedAt: dateOrNull("clickedAt"),
unsubscribedAt: dateOrNull("unsubscribedAt"),
markedAsSpamAt: dateOrNull("markedAsSpamAt"),
};
}

View File

@ -413,13 +413,18 @@ async function rawQueryArray<Q extends RawQuery<any>[]>(tx: PrismaClientTransact
// TODO: check that combinedQuery supports the prisma client that created tx
// Supabase's index advisor only analyzes rows that start with "SELECT" (for some reason)
// Since ours starts with "WITH", we prepend a SELECT to it
const sqlQuery = Prisma.sql`SELECT * FROM (${combinedQuery.sql}) AS _`;
// Since ours starts with "WITH", we prepend a SELECT to it.
// However, we can't do this for data-modifying queries because PostgreSQL requires
// CTEs with UPDATE/INSERT/DELETE to be at the top level, not inside a subquery.
const sqlQuery = allReadOnly
? Prisma.sql`SELECT * FROM (${combinedQuery.sql}) AS _`
: combinedQuery.sql;
// Use the read replica if all queries are read-only and a replica is available
const queryClient = allReadOnly && '$replica' in tx
? (tx as any).$replica()
: tx;
// eslint-disable-next-line no-restricted-syntax -- $queryRaw is allowed here
const rawResult = await queryClient.$queryRaw(sqlQuery);
const postProcessed = combinedQuery.postProcess(rawResult as any);

View File

@ -111,6 +111,10 @@ module.exports = {
selector: "ImportDeclaration[source.value='react'] ImportSpecifier[imported.name='use']",
message: "Use `use` from @stack-shared/dist/utils/react instead (as it also supports React 18).",
},
{
selector: "CallExpression > MemberExpression[property.name='$queryRaw']",
message: "use rawQuery from prisma-client.tsx instead",
},
],
"@typescript-eslint/no-misused-promises": [
"error",