clickhouse sync email outbox table

This commit is contained in:
Bilal Godil 2026-03-16 12:36:07 -07:00
parent 3c88950419
commit 9f9c9a46dc
11 changed files with 725 additions and 8 deletions

View File

@ -0,0 +1,12 @@
-- AlterTable
ALTER TABLE "EmailOutbox" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "EmailOutbox_sequenceId_key" ON "EmailOutbox"("sequenceId");
-- CreateIndex
CREATE INDEX "EmailOutbox_tenancyId_sequenceId_idx" ON "EmailOutbox"("tenancyId", "sequenceId");
-- CreateIndex
CREATE INDEX "EmailOutbox_shouldUpdateSequenceId_idx" ON "EmailOutbox"("shouldUpdateSequenceId", "tenancyId");

View File

@ -1004,6 +1004,9 @@ model EmailOutbox {
unsubscribedAt DateTime?
markedAsSpamAt DateTime?
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade)
@@id([tenancyId, id])
@ -1011,6 +1014,8 @@ model EmailOutbox {
@@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx")
@@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx")
@@index([isQueued], map: "EmailOutbox_isQueued_idx")
@@index([tenancyId, sequenceId], name: "EmailOutbox_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "EmailOutbox_shouldUpdateSequenceId_idx")
}
model EmailOutboxProcessingMetadata {

View File

@ -22,6 +22,8 @@ export async function runClickhouseMigrations() {
await client.exec({ query: TEAMS_VIEW_SQL });
await client.exec({ query: TEAM_MEMBERS_TABLE_BASE_SQL });
await client.exec({ query: TEAM_MEMBERS_VIEW_SQL });
await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL });
await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL });
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
@ -36,6 +38,7 @@ export async function runClickhouseMigrations() {
"GRANT SELECT ON default.contact_channels TO limited_user;",
"GRANT SELECT ON default.teams TO limited_user;",
"GRANT SELECT ON default.team_members TO limited_user;",
"GRANT SELECT ON default.email_outboxes TO limited_user;",
];
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
@ -52,6 +55,9 @@ export async function runClickhouseMigrations() {
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS team_members_project_isolation ON default.team_members FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
for (const query of queries) {
await client.exec({ query });
}
@ -330,6 +336,81 @@ FINAL
WHERE sync_is_deleted = 0;
`;
const EMAIL_OUTBOXES_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.email_outboxes (
project_id String,
branch_id String,
id UUID,
status LowCardinality(String),
simple_status LowCardinality(String),
created_with LowCardinality(String),
email_draft_id Nullable(String),
email_programmatic_call_template_id Nullable(String),
is_high_priority UInt8,
rendered_is_transactional Nullable(UInt8),
rendered_subject Nullable(String),
rendered_notification_category_id Nullable(String),
scheduled_at DateTime64(3, 'UTC'),
created_at DateTime64(3, 'UTC'),
started_sending_at Nullable(DateTime64(3, 'UTC')),
finished_sending_at Nullable(DateTime64(3, 'UTC')),
sent_at Nullable(DateTime64(3, 'UTC')),
delivered_at Nullable(DateTime64(3, 'UTC')),
opened_at Nullable(DateTime64(3, 'UTC')),
clicked_at Nullable(DateTime64(3, 'UTC')),
unsubscribed_at Nullable(DateTime64(3, 'UTC')),
marked_as_spam_at Nullable(DateTime64(3, 'UTC')),
bounced_at Nullable(DateTime64(3, 'UTC')),
can_have_delivery_info Nullable(UInt8),
skipped_reason LowCardinality(Nullable(String)),
send_retries Int32,
is_paused UInt8,
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;
const EMAIL_OUTBOXES_VIEW_SQL = `
CREATE OR REPLACE VIEW default.email_outboxes
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
status,
simple_status,
created_with,
email_draft_id,
email_programmatic_call_template_id,
is_high_priority,
rendered_is_transactional,
rendered_subject,
rendered_notification_category_id,
scheduled_at,
created_at,
started_sending_at,
finished_sending_at,
sent_at,
delivered_at,
opened_at,
clicked_at,
unsubscribed_at,
marked_as_spam_at,
bounced_at,
can_have_delivery_info,
skipped_reason,
send_retries,
is_paused
FROM analytics_internal.email_outboxes
FINAL
WHERE sync_is_deleted = 0;
`;
const EXTERNAL_ANALYTICS_DB_SQL = `
CREATE DATABASE IF NOT EXISTS analytics_internal;
`;

View File

@ -447,6 +447,9 @@ export const emailOutboxCrudHandlers = createLazyProxy(() => createCrudHandlers(
set("updatedAt", Prisma.sql`NOW()`);
}
// Mark for external DB sync
set("shouldUpdateSequenceId", Prisma.sql`TRUE`);
const updateQuery: RawQuery<EmailOutbox | null> = {
supportedPrismaClients: ["global"],
readOnlyQuery: false,
@ -543,6 +546,8 @@ function parseEmailOutboxFromJson(j: Record<string, unknown>): EmailOutbox {
clickedAt: dateOrNull("clickedAt"),
unsubscribedAt: dateOrNull("unsubscribedAt"),
markedAsSpamAt: dateOrNull("markedAsSpamAt"),
sequenceId: j.sequenceId != null ? BigInt(j.sequenceId as string | number) : null,
shouldUpdateSequenceId: j.shouldUpdateSequenceId as boolean,
};
}

View File

@ -166,6 +166,34 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
didUpdate = true;
}
const emailOutboxTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "id"
FROM "EmailOutbox"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "EmailOutbox" eo
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE eo."tenancyId" = r."tenancyId"
AND eo."id" = r."id"
RETURNING eo."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.email-outbox-tenants", emailOutboxTenants.length);
if (emailOutboxTenants.length > 0) {
await enqueueExternalDbSyncBatch(emailOutboxTenants.map(t => t.tenancyId));
didUpdate = true;
}
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
@ -195,7 +223,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (didUpdate) {
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, DR=${deletedRowTenants.length}`);
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`);
}
return didUpdate;

View File

@ -87,6 +87,7 @@ const globalSchema = yupObject({
sequencer: yupObject({
project_users: sequenceStatsSchema.defined(),
contact_channels: sequenceStatsSchema.defined(),
email_outboxes: sequenceStatsSchema.defined(),
deleted_rows: sequenceStatsSchema.shape({
by_table: yupArray(deletedRowByTableSchema).defined(),
}).defined(),
@ -119,6 +120,7 @@ const responseSchema = yupObject({
sequencer: yupObject({
project_users: sequenceStatsSchema.defined(),
contact_channels: sequenceStatsSchema.defined(),
email_outboxes: sequenceStatsSchema.defined(),
deleted_rows: sequenceStatsSchema.shape({
by_table: yupArray(deletedRowByTableSchema).defined(),
}).defined(),
@ -233,6 +235,7 @@ function maxBigIntString(values: Array<string | null | undefined>): string | nul
function buildMappingInternalStats(
projectUsersStats: SequenceStats,
emailOutboxStats: SequenceStats,
deletedRowsByTable: DeletedRowSummary[],
) {
const deletedProjectUserStats = deletedRowsByTable.find((row) => row.table_name === "ProjectUser") ?? null;
@ -264,6 +267,13 @@ function buildMappingInternalStats(
internal_pending_count: usersMappingPending,
});
mappingInternalStats.set("email_outboxes", {
mapping_id: "email_outboxes",
internal_min_sequence_id: emailOutboxStats.min_sequence_id,
internal_max_sequence_id: emailOutboxStats.max_sequence_id,
internal_pending_count: emailOutboxStats.pending,
});
const mappings = Array.from(mappingInternalStats.values());
const mappingStatuses = mappings.map((mapping) => ({
mapping_id: mapping.mapping_id,
@ -300,6 +310,17 @@ async function fetchInternalStats(tenancyId: string | null) {
${tenancyWhere}
`).at(0) ?? throwErr("Contact channel stats query returned no rows.");
const emailOutboxStatsRow = (await globalPrismaClient.$queryRaw<SequenceStatsRow[]>`
SELECT
COUNT(*)::bigint AS "total",
COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending",
COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id",
MIN("sequenceId") AS "min_sequence_id",
MAX("sequenceId") AS "max_sequence_id"
FROM "EmailOutbox"
${tenancyWhere}
`).at(0) ?? throwErr("Email outbox stats query returned no rows.");
const deletedRowStatsRow = (await globalPrismaClient.$queryRaw<SequenceStatsRow[]>`
SELECT
COUNT(*)::bigint AS "total",
@ -346,6 +367,7 @@ async function fetchInternalStats(tenancyId: string | null) {
const projectUsersStats = formatSequenceStats(projectUserStatsRow);
const contactChannelStats = formatSequenceStats(contactChannelStatsRow);
const emailOutboxStats = formatSequenceStats(emailOutboxStatsRow);
const deletedRowStats = formatSequenceStats(deletedRowStatsRow);
const deletedRowsByTable = deletedRowsByTableRows.map((row) => ({
@ -353,11 +375,12 @@ async function fetchInternalStats(tenancyId: string | null) {
...formatSequenceStats(row),
}));
const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, deletedRowsByTable);
const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, emailOutboxStats, deletedRowsByTable);
return {
projectUsersStats,
contactChannelStats,
emailOutboxStats,
deletedRowStats,
deletedRowsByTable,
outgoingStatsRow,
@ -1003,6 +1026,7 @@ export const GET = createSmartRouteHandler({
sequencer: {
project_users: globalStats.projectUsersStats,
contact_channels: globalStats.contactChannelStats,
email_outboxes: globalStats.emailOutboxStats,
deleted_rows: {
...globalStats.deletedRowStats,
by_table: globalStats.deletedRowsByTable,
@ -1021,6 +1045,7 @@ export const GET = createSmartRouteHandler({
sequencer: {
project_users: currentStats.projectUsersStats,
contact_channels: currentStats.contactChannelStats,
email_outboxes: currentStats.emailOutboxStats,
deleted_rows: {
...currentStats.deletedRowStats,
by_table: currentStats.deletedRowsByTable,

View File

@ -195,6 +195,7 @@ async function retryEmailsStuckInRendering(): Promise<void> {
data: {
renderedByWorkerId: null,
startedRenderingAt: null,
shouldUpdateSequenceId: true,
},
});
if (res.length > 0) {
@ -398,6 +399,7 @@ async function renderTenancyEmails(workerId: string, tenancyId: string, group: E
renderErrorInternalMessage: error,
renderErrorInternalDetails: { error },
finishedRenderingAt: new Date(),
shouldUpdateSequenceId: true,
},
});
};
@ -418,6 +420,7 @@ async function renderTenancyEmails(workerId: string, tenancyId: string, group: E
renderErrorInternalMessage: null,
renderErrorInternalDetails: Prisma.DbNull,
finishedRenderingAt: new Date(),
shouldUpdateSequenceId: true,
},
});
};
@ -508,7 +511,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
// Query 1: Fresh emails (scheduledAt has passed, no retry pending)
const freshEmails = await globalPrismaClient.$queryRaw<{ id: string }[]>`
UPDATE "EmailOutbox"
SET "isQueued" = TRUE
SET "isQueued" = TRUE, "shouldUpdateSequenceId" = TRUE
WHERE "isQueued" = FALSE
AND "isPaused" = FALSE
AND "skippedReason" IS NULL
@ -523,7 +526,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
// Clear nextSendRetryAt when queuing so the email is in a clean "queued" state.
const retryEmails = await globalPrismaClient.$queryRaw<{ id: string }[]>`
UPDATE "EmailOutbox"
SET "isQueued" = TRUE, "nextSendRetryAt" = NULL
SET "isQueued" = TRUE, "nextSendRetryAt" = NULL, "shouldUpdateSequenceId" = TRUE
WHERE "isQueued" = FALSE
AND "isPaused" = FALSE
AND "skippedReason" IS NULL
@ -749,6 +752,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
sendRetries: newAttemptCount,
nextSendRetryAt: new Date(Date.now() + backoffMs),
sendAttemptErrors: updatedErrors as Prisma.InputJsonArray,
shouldUpdateSequenceId: true,
},
});
} else {
@ -789,6 +793,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
failureReason,
allAttemptErrors: updatedErrors as Json[],
},
shouldUpdateSequenceId: true,
},
});
}
@ -809,6 +814,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
sendServerErrorExternalDetails: Prisma.DbNull,
sendServerErrorInternalMessage: null,
sendServerErrorInternalDetails: Prisma.DbNull,
shouldUpdateSequenceId: true,
},
});
}
@ -829,6 +835,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
sendServerErrorExternalDetails: {},
sendServerErrorInternalMessage: errorToNiceString(error),
sendServerErrorInternalDetails: {},
shouldUpdateSequenceId: true,
},
});
}
@ -914,6 +921,7 @@ async function markSkipped(row: EmailOutbox, reason: EmailOutboxSkippedReason, d
data: {
skippedReason: reason,
skippedDetails: details as Prisma.InputJsonValue,
shouldUpdateSequenceId: true,
},
});
}

View File

@ -499,6 +499,13 @@ function normalizeClickhouseBoolean(value: unknown, label: string): number {
throw new StackAssertionError(`${label} must be a boolean or 0/1. Received: ${JSON.stringify(value)}`);
}
function normalizeClickhouseNullableBoolean(value: unknown, label: string): number | null {
if (value === null || value === undefined) {
return null;
}
return normalizeClickhouseBoolean(value, label);
}
function parseSequenceId(value: unknown, mappingId: string): number | null {
if (value == null) {
return null;
@ -530,7 +537,7 @@ async function ensureClickhouseSchema(
// Map of target table name -> column normalizers for ClickHouse
// 'json' columns get JSON.stringify, 'boolean' columns get normalizeClickhouseBoolean
const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boolean'>> = {
const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boolean' | 'nullable_boolean'>> = {
users: {
client_metadata: 'json',
client_read_only_metadata: 'json',
@ -555,6 +562,13 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boo
team_members: {
sync_is_deleted: 'boolean',
},
email_outboxes: {
is_high_priority: 'boolean',
rendered_is_transactional: 'nullable_boolean',
can_have_delivery_info: 'nullable_boolean',
is_paused: 'boolean',
sync_is_deleted: 'boolean',
},
};
async function pushRowsToClickhouse(
@ -626,6 +640,8 @@ async function pushRowsToClickhouse(
if (col in normalized) {
if (type === 'json') {
normalized[col] = JSON.stringify(normalized[col]);
} else if (type === 'nullable_boolean') {
normalized[col] = normalizeClickhouseNullableBoolean(normalized[col], col);
} else {
normalized[col] = normalizeClickhouseBoolean(normalized[col], col);
}

View File

@ -2,7 +2,8 @@ import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { afterAll, beforeAll, describe, expect } from 'vitest';
import { test } from '../../../../helpers';
import { InternalApiKey, Project, User, niceBackendFetch } from '../../../backend-helpers';
import { withPortPrefix } from '../../../../helpers/ports';
import { backendContext, InternalApiKey, Project, User, niceBackendFetch } from '../../../backend-helpers';
import {
TEST_TIMEOUT,
TestDbManager,
@ -13,6 +14,8 @@ import {
waitForSyncedContactChannelDeletion,
waitForSyncedData,
waitForSyncedDeletion,
waitForSyncedEmailOutbox,
waitForSyncedEmailOutboxByStatus,
waitForSyncedTeam,
waitForSyncedTeamDeletion,
waitForSyncedTeamMember,
@ -87,7 +90,7 @@ describe.sequential('External DB Sync - Basic Tests', () => {
let dbManager: TestDbManager;
const createProjectWithExternalDb = (
externalDatabases: any,
projectOptions?: { display_name?: string, description?: string }
projectOptions?: { display_name?: string, description?: string, config?: Record<string, unknown> }
) => {
return createProjectWithExternalDbRaw(
externalDatabases,
@ -837,6 +840,227 @@ describe.sequential('External DB Sync - Basic Tests', () => {
await waitForSyncedTeamMemberDeletion(client, teamId, user.userId);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project with email config, sends an email, and verifies
* the email outbox row is synced to the external Postgres DB.
*/
test('EmailOutbox sync (Postgres)', async () => {
const dbName = 'email_outbox_pg_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
display_name: 'Email Outbox Sync Test',
config: {
email_config: {
type: "standard",
host: "localhost",
port: Number(withPortPrefix("29")),
username: "test",
password: "test",
sender_name: "Test Project",
sender_email: "test@example.com",
},
},
});
// Create a user
const createUserResponse = await niceBackendFetch("/api/v1/users", {
method: "POST",
accessType: "server",
body: {
primary_email: backendContext.value.mailbox.emailAddress,
primary_email_verified: true,
},
});
expect(createUserResponse.status).toBe(201);
const userId = createUserResponse.body.id;
// Send an email
const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", {
method: "POST",
accessType: "server",
body: {
user_ids: [userId],
html: "<p>Sync test email</p>",
subject: "DB Sync Test Email",
notification_category_name: "Transactional",
},
});
expect(sendResponse.status).toBe(200);
// Wait for the email to be processed (rendered + sent)
await wait(8_000);
// Get the email ID from the outbox API
const listResponse = await niceBackendFetch("/api/v1/emails/outbox", {
method: "GET",
accessType: "server",
});
expect(listResponse.status).toBe(200);
expect(listResponse.body.items.length).toBeGreaterThanOrEqual(1);
const emailId = listResponse.body.items[0].id;
const client = dbManager.getClient(dbName);
// Wait for the email outbox row to appear in external DB
await waitForSyncedEmailOutbox(client, emailId);
// Verify the synced row has expected columns
const res = await client.query(`SELECT * FROM "email_outboxes" WHERE "id" = $1`, [emailId]);
expect(res.rows.length).toBe(1);
const row = res.rows[0];
expect(row.created_with).toBe('PROGRAMMATIC_CALL');
expect(row.is_high_priority).toBe(false);
expect(row.is_paused).toBe(false);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project, sends an email, and verifies the email outbox row
* is synced to ClickHouse.
*/
test('EmailOutbox sync (ClickHouse)', async ({ expect }) => {
await Project.createAndSwitch({
config: {
magic_link_enabled: true,
email_config: {
type: "standard",
host: "localhost",
port: Number(withPortPrefix("29")),
username: "test",
password: "test",
sender_name: "Test Project",
sender_email: "test@example.com",
},
},
});
// Create a user
const createUserResponse = await niceBackendFetch("/api/v1/users", {
method: "POST",
accessType: "server",
body: {
primary_email: backendContext.value.mailbox.emailAddress,
primary_email_verified: true,
},
});
expect(createUserResponse.status).toBe(201);
const userId = createUserResponse.body.id;
// Send an email
const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", {
method: "POST",
accessType: "server",
body: {
user_ids: [userId],
html: "<p>ClickHouse sync test email</p>",
subject: "CH Sync Test Email",
notification_category_name: "Transactional",
},
});
expect(sendResponse.status).toBe(200);
// Wait for the email to be processed
await wait(8_000);
await InternalApiKey.createAndSetProjectKeys();
// Poll ClickHouse until the email_outboxes row appears
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT id, status, simple_status, created_with, is_high_priority FROM email_outboxes LIMIT 10",
});
expect(response.status).toBe(200);
if (response.body.result.length >= 1) {
break;
}
await wait(intervalMs);
}
expect(response!.body.result.length).toBeGreaterThanOrEqual(1);
const row = response!.body.result[0];
expect(row.created_with).toBe('PROGRAMMATIC_CALL');
}, TEST_TIMEOUT);
/**
* What it does:
* - Sends an email, waits for it to reach a terminal state, then verifies
* the status update is reflected in the external Postgres DB.
*/
test('EmailOutbox status updates are synced (Postgres)', async () => {
const dbName = 'email_outbox_status_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
config: {
email_config: {
type: "standard",
host: "localhost",
port: Number(withPortPrefix("29")),
username: "test",
password: "test",
sender_name: "Test Project",
sender_email: "test@example.com",
},
},
});
const createUserResponse = await niceBackendFetch("/api/v1/users", {
method: "POST",
accessType: "server",
body: {
primary_email: backendContext.value.mailbox.emailAddress,
primary_email_verified: true,
},
});
expect(createUserResponse.status).toBe(201);
const userId = createUserResponse.body.id;
const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", {
method: "POST",
accessType: "server",
body: {
user_ids: [userId],
html: "<p>Status sync test</p>",
subject: "Status Sync Test",
notification_category_name: "Transactional",
},
});
expect(sendResponse.status).toBe(200);
// Wait for the email to finish sending
await wait(8_000);
const client = dbManager.getClient(dbName);
// The email should eventually reach SENT status in the external DB
await waitForSyncedEmailOutboxByStatus(client, 'SENT');
const res = await client.query(`SELECT * FROM "email_outboxes" WHERE "status" = 'SENT'`);
expect(res.rows.length).toBeGreaterThanOrEqual(1);
const row = res.rows[0];
expect(row.simple_status).toBe('OK');
expect(row.finished_sending_at).not.toBeNull();
expect(row.sent_at).not.toBeNull();
expect(row.send_retries).toBe(0);
}, TEST_TIMEOUT);
/**
* What it does:
* - Reads the external DB sync fusebox settings.

View File

@ -314,13 +314,38 @@ export async function waitForSyncedContactChannelDeletion(client: Client, value:
});
}
export async function waitForSyncedEmailOutbox(client: Client, emailId: string, expectedStatus?: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "email_outboxes" WHERE "id" = $1`,
[emailId],
{
shouldExist: true,
description: `email outbox "${emailId}" to appear in external DB`,
checkRow: expectedStatus ? (row) => row.status === expectedStatus : undefined,
},
);
}
export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "email_outboxes" WHERE "status" = $1`,
[status],
{
shouldExist: true,
description: `email outbox with status "${status}" to appear in external DB`,
},
);
}
/**
* Helper to create a project and update its config with external DB settings.
* Tracks the project for cleanup later.
*/
export async function createProjectWithExternalDb(
externalDatabases: any,
projectOptions?: { display_name?: string, description?: string },
projectOptions?: { display_name?: string, description?: string, config?: Record<string, unknown> },
options?: { projectTracker?: ProjectContext[] }
) {
const project = await Project.createAndSwitch(projectOptions);

View File

@ -832,4 +832,292 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
`.trim(),
},
},
"email_outboxes": {
sourceTables: { "EmailOutbox": "EmailOutbox" },
targetTable: "email_outboxes",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "email_outboxes" (
"id" uuid PRIMARY KEY NOT NULL,
"status" text NOT NULL,
"simple_status" text NOT NULL,
"created_with" text NOT NULL,
"email_draft_id" text,
"email_programmatic_call_template_id" text,
"is_high_priority" boolean NOT NULL DEFAULT false,
"rendered_is_transactional" boolean,
"rendered_subject" text,
"rendered_notification_category_id" text,
"scheduled_at" timestamp without time zone NOT NULL,
"created_at" timestamp without time zone NOT NULL,
"started_sending_at" timestamp without time zone,
"finished_sending_at" timestamp without time zone,
"sent_at" timestamp without time zone,
"delivered_at" timestamp without time zone,
"opened_at" timestamp without time zone,
"clicked_at" timestamp without time zone,
"unsubscribed_at" timestamp without time zone,
"marked_as_spam_at" timestamp without time zone,
"bounced_at" timestamp without time zone,
"can_have_delivery_info" boolean,
"skipped_reason" text,
"send_retries" integer NOT NULL DEFAULT 0,
"is_paused" boolean NOT NULL DEFAULT false
);
REVOKE ALL ON "email_outboxes" FROM PUBLIC;
GRANT SELECT ON "email_outboxes" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.email_outboxes (
project_id String,
branch_id String,
id UUID,
status LowCardinality(String),
simple_status LowCardinality(String),
created_with LowCardinality(String),
email_draft_id Nullable(String),
email_programmatic_call_template_id Nullable(String),
is_high_priority UInt8,
rendered_is_transactional Nullable(UInt8),
rendered_subject Nullable(String),
rendered_notification_category_id Nullable(String),
scheduled_at DateTime64(3, 'UTC'),
created_at DateTime64(3, 'UTC'),
started_sending_at Nullable(DateTime64(3, 'UTC')),
finished_sending_at Nullable(DateTime64(3, 'UTC')),
sent_at Nullable(DateTime64(3, 'UTC')),
delivered_at Nullable(DateTime64(3, 'UTC')),
opened_at Nullable(DateTime64(3, 'UTC')),
clicked_at Nullable(DateTime64(3, 'UTC')),
unsubscribed_at Nullable(DateTime64(3, 'UTC')),
marked_as_spam_at Nullable(DateTime64(3, 'UTC')),
bounced_at Nullable(DateTime64(3, 'UTC')),
can_have_delivery_info Nullable(UInt8),
skipped_reason LowCardinality(Nullable(String)),
send_retries Int32,
is_paused UInt8,
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"EmailOutbox"."id" AS "id",
"EmailOutbox"."status"::text AS "status",
"EmailOutbox"."simpleStatus"::text AS "simple_status",
"EmailOutbox"."createdWith"::text AS "created_with",
"EmailOutbox"."emailDraftId" AS "email_draft_id",
"EmailOutbox"."emailProgrammaticCallTemplateId" AS "email_programmatic_call_template_id",
"EmailOutbox"."isHighPriority" AS "is_high_priority",
"EmailOutbox"."renderedIsTransactional" AS "rendered_is_transactional",
"EmailOutbox"."renderedSubject" AS "rendered_subject",
"EmailOutbox"."renderedNotificationCategoryId" AS "rendered_notification_category_id",
"EmailOutbox"."scheduledAt" AS "scheduled_at",
"EmailOutbox"."createdAt" AS "created_at",
"EmailOutbox"."startedSendingAt" AS "started_sending_at",
"EmailOutbox"."finishedSendingAt" AS "finished_sending_at",
"EmailOutbox"."sentAt" AS "sent_at",
"EmailOutbox"."deliveredAt" AS "delivered_at",
"EmailOutbox"."openedAt" AS "opened_at",
"EmailOutbox"."clickedAt" AS "clicked_at",
"EmailOutbox"."unsubscribedAt" AS "unsubscribed_at",
"EmailOutbox"."markedAsSpamAt" AS "marked_as_spam_at",
"EmailOutbox"."bouncedAt" AS "bounced_at",
"EmailOutbox"."canHaveDeliveryInfo" AS "can_have_delivery_info",
"EmailOutbox"."skippedReason"::text AS "skipped_reason",
"EmailOutbox"."sendRetries" AS "send_retries",
"EmailOutbox"."isPaused" AS "is_paused",
"EmailOutbox"."sequenceId" AS "sync_sequence_id",
"EmailOutbox"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "EmailOutbox"
JOIN "Tenancy" ON "Tenancy"."id" = "EmailOutbox"."tenancyId"
WHERE "EmailOutbox"."tenancyId" = $1::uuid
AND "EmailOutbox"."sequenceId" IS NOT NULL
AND "EmailOutbox"."sequenceId" > $2::bigint
ORDER BY "EmailOutbox"."sequenceId" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT
"EmailOutbox"."id" AS "id",
"EmailOutbox"."status"::text AS "status",
"EmailOutbox"."simpleStatus"::text AS "simple_status",
"EmailOutbox"."createdWith"::text AS "created_with",
"EmailOutbox"."emailDraftId" AS "email_draft_id",
"EmailOutbox"."emailProgrammaticCallTemplateId" AS "email_programmatic_call_template_id",
"EmailOutbox"."isHighPriority" AS "is_high_priority",
"EmailOutbox"."renderedIsTransactional" AS "rendered_is_transactional",
"EmailOutbox"."renderedSubject" AS "rendered_subject",
"EmailOutbox"."renderedNotificationCategoryId" AS "rendered_notification_category_id",
"EmailOutbox"."scheduledAt" AS "scheduled_at",
"EmailOutbox"."createdAt" AS "created_at",
"EmailOutbox"."startedSendingAt" AS "started_sending_at",
"EmailOutbox"."finishedSendingAt" AS "finished_sending_at",
"EmailOutbox"."sentAt" AS "sent_at",
"EmailOutbox"."deliveredAt" AS "delivered_at",
"EmailOutbox"."openedAt" AS "opened_at",
"EmailOutbox"."clickedAt" AS "clicked_at",
"EmailOutbox"."unsubscribedAt" AS "unsubscribed_at",
"EmailOutbox"."markedAsSpamAt" AS "marked_as_spam_at",
"EmailOutbox"."bouncedAt" AS "bounced_at",
"EmailOutbox"."canHaveDeliveryInfo" AS "can_have_delivery_info",
"EmailOutbox"."skippedReason"::text AS "skipped_reason",
"EmailOutbox"."sendRetries" AS "send_retries",
"EmailOutbox"."isPaused" AS "is_paused",
"EmailOutbox"."sequenceId" AS "sequence_id",
"EmailOutbox"."tenancyId",
false AS "is_deleted"
FROM "EmailOutbox"
WHERE "EmailOutbox"."tenancyId" = $1::uuid
AND "EmailOutbox"."sequenceId" IS NOT NULL
AND "EmailOutbox"."sequenceId" > $2::bigint
ORDER BY "EmailOutbox"."sequenceId" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "id",
$2::text AS "status",
$3::text AS "simple_status",
$4::text AS "created_with",
$5::text AS "email_draft_id",
$6::text AS "email_programmatic_call_template_id",
$7::boolean AS "is_high_priority",
$8::boolean AS "rendered_is_transactional",
$9::text AS "rendered_subject",
$10::text AS "rendered_notification_category_id",
$11::timestamp without time zone AS "scheduled_at",
$12::timestamp without time zone AS "created_at",
$13::timestamp without time zone AS "started_sending_at",
$14::timestamp without time zone AS "finished_sending_at",
$15::timestamp without time zone AS "sent_at",
$16::timestamp without time zone AS "delivered_at",
$17::timestamp without time zone AS "opened_at",
$18::timestamp without time zone AS "clicked_at",
$19::timestamp without time zone AS "unsubscribed_at",
$20::timestamp without time zone AS "marked_as_spam_at",
$21::timestamp without time zone AS "bounced_at",
$22::boolean AS "can_have_delivery_info",
$23::text AS "skipped_reason",
$24::integer AS "send_retries",
$25::boolean AS "is_paused",
$26::bigint AS "sequence_id",
$27::boolean AS "is_deleted",
$28::text AS "mapping_name"
),
deleted AS (
DELETE FROM "email_outboxes" eo
USING params p
WHERE p."is_deleted" = true AND eo."id" = p."id"
RETURNING 1
),
upserted AS (
INSERT INTO "email_outboxes" (
"id",
"status",
"simple_status",
"created_with",
"email_draft_id",
"email_programmatic_call_template_id",
"is_high_priority",
"rendered_is_transactional",
"rendered_subject",
"rendered_notification_category_id",
"scheduled_at",
"created_at",
"started_sending_at",
"finished_sending_at",
"sent_at",
"delivered_at",
"opened_at",
"clicked_at",
"unsubscribed_at",
"marked_as_spam_at",
"bounced_at",
"can_have_delivery_info",
"skipped_reason",
"send_retries",
"is_paused"
)
SELECT
p."id",
p."status",
p."simple_status",
p."created_with",
p."email_draft_id",
p."email_programmatic_call_template_id",
p."is_high_priority",
p."rendered_is_transactional",
p."rendered_subject",
p."rendered_notification_category_id",
p."scheduled_at",
p."created_at",
p."started_sending_at",
p."finished_sending_at",
p."sent_at",
p."delivered_at",
p."opened_at",
p."clicked_at",
p."unsubscribed_at",
p."marked_as_spam_at",
p."bounced_at",
p."can_have_delivery_info",
p."skipped_reason",
p."send_retries",
p."is_paused"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("id") DO UPDATE SET
"status" = EXCLUDED."status",
"simple_status" = EXCLUDED."simple_status",
"created_with" = EXCLUDED."created_with",
"email_draft_id" = EXCLUDED."email_draft_id",
"email_programmatic_call_template_id" = EXCLUDED."email_programmatic_call_template_id",
"is_high_priority" = EXCLUDED."is_high_priority",
"rendered_is_transactional" = EXCLUDED."rendered_is_transactional",
"rendered_subject" = EXCLUDED."rendered_subject",
"rendered_notification_category_id" = EXCLUDED."rendered_notification_category_id",
"scheduled_at" = EXCLUDED."scheduled_at",
"created_at" = EXCLUDED."created_at",
"started_sending_at" = EXCLUDED."started_sending_at",
"finished_sending_at" = EXCLUDED."finished_sending_at",
"sent_at" = EXCLUDED."sent_at",
"delivered_at" = EXCLUDED."delivered_at",
"opened_at" = EXCLUDED."opened_at",
"clicked_at" = EXCLUDED."clicked_at",
"unsubscribed_at" = EXCLUDED."unsubscribed_at",
"marked_as_spam_at" = EXCLUDED."marked_as_spam_at",
"bounced_at" = EXCLUDED."bounced_at",
"can_have_delivery_info" = EXCLUDED."can_have_delivery_info",
"skipped_reason" = EXCLUDED."skipped_reason",
"send_retries" = EXCLUDED."send_retries",
"is_paused" = EXCLUDED."is_paused"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
} as const;