From 9f9c9a46dcea6657e77fb137915437e85cd8c2ca Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Mon, 16 Mar 2026 12:36:07 -0700 Subject: [PATCH] clickhouse sync email outbox table --- .../migration.sql | 12 + apps/backend/prisma/schema.prisma | 5 + apps/backend/scripts/clickhouse-migrations.ts | 81 +++++ .../src/app/api/latest/emails/outbox/crud.tsx | 5 + .../external-db-sync/sequencer/route.ts | 30 +- .../internal/external-db-sync/status/route.ts | 27 +- apps/backend/src/lib/email-queue-step.tsx | 12 +- apps/backend/src/lib/external-db-sync.ts | 18 +- .../api/v1/external-db-sync-basics.test.ts | 228 +++++++++++++- .../api/v1/external-db-sync-utils.ts | 27 +- .../src/config/db-sync-mappings.ts | 288 ++++++++++++++++++ 11 files changed, 725 insertions(+), 8 deletions(-) create mode 100644 apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql diff --git a/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql new file mode 100644 index 000000000..74b5681c0 --- /dev/null +++ b/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql @@ -0,0 +1,12 @@ +-- AlterTable +ALTER TABLE "EmailOutbox" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "EmailOutbox_sequenceId_key" ON "EmailOutbox"("sequenceId"); + +-- CreateIndex +CREATE INDEX "EmailOutbox_tenancyId_sequenceId_idx" ON "EmailOutbox"("tenancyId", "sequenceId"); + +-- CreateIndex +CREATE INDEX "EmailOutbox_shouldUpdateSequenceId_idx" ON "EmailOutbox"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index d6077f303..dad91817f 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -1004,6 +1004,9 @@ model EmailOutbox { unsubscribedAt DateTime? markedAsSpamAt DateTime? + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade) @@id([tenancyId, id]) @@ -1011,6 +1014,8 @@ model EmailOutbox { @@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx") @@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx") @@index([isQueued], map: "EmailOutbox_isQueued_idx") + @@index([tenancyId, sequenceId], name: "EmailOutbox_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "EmailOutbox_shouldUpdateSequenceId_idx") } model EmailOutboxProcessingMetadata { diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index f0b533c03..f56870c00 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -22,6 +22,8 @@ export async function runClickhouseMigrations() { await client.exec({ query: TEAMS_VIEW_SQL }); await client.exec({ query: TEAM_MEMBERS_TABLE_BASE_SQL }); await client.exec({ query: TEAM_MEMBERS_VIEW_SQL }); + await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL }); + await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -36,6 +38,7 @@ export async function runClickhouseMigrations() { "GRANT SELECT ON default.contact_channels TO limited_user;", "GRANT SELECT ON default.teams TO limited_user;", "GRANT SELECT ON default.team_members TO limited_user;", + "GRANT SELECT ON default.email_outboxes TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -52,6 +55,9 @@ export async function runClickhouseMigrations() { await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS team_members_project_isolation ON default.team_members FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); for (const query of queries) { await client.exec({ query }); } @@ -330,6 +336,81 @@ FINAL WHERE sync_is_deleted = 0; `; +const EMAIL_OUTBOXES_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.email_outboxes ( + project_id String, + branch_id String, + id UUID, + status LowCardinality(String), + simple_status LowCardinality(String), + created_with LowCardinality(String), + email_draft_id Nullable(String), + email_programmatic_call_template_id Nullable(String), + is_high_priority UInt8, + rendered_is_transactional Nullable(UInt8), + rendered_subject Nullable(String), + rendered_notification_category_id Nullable(String), + scheduled_at DateTime64(3, 'UTC'), + created_at DateTime64(3, 'UTC'), + started_sending_at Nullable(DateTime64(3, 'UTC')), + finished_sending_at Nullable(DateTime64(3, 'UTC')), + sent_at Nullable(DateTime64(3, 'UTC')), + delivered_at Nullable(DateTime64(3, 'UTC')), + opened_at Nullable(DateTime64(3, 'UTC')), + clicked_at Nullable(DateTime64(3, 'UTC')), + unsubscribed_at Nullable(DateTime64(3, 'UTC')), + marked_as_spam_at Nullable(DateTime64(3, 'UTC')), + bounced_at Nullable(DateTime64(3, 'UTC')), + can_have_delivery_info Nullable(UInt8), + skipped_reason LowCardinality(Nullable(String)), + send_retries Int32, + is_paused UInt8, + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const EMAIL_OUTBOXES_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.email_outboxes +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + status, + simple_status, + created_with, + email_draft_id, + email_programmatic_call_template_id, + is_high_priority, + rendered_is_transactional, + rendered_subject, + rendered_notification_category_id, + scheduled_at, + created_at, + started_sending_at, + finished_sending_at, + sent_at, + delivered_at, + opened_at, + clicked_at, + unsubscribed_at, + marked_as_spam_at, + bounced_at, + can_have_delivery_info, + skipped_reason, + send_retries, + is_paused +FROM analytics_internal.email_outboxes +FINAL +WHERE sync_is_deleted = 0; +`; + const EXTERNAL_ANALYTICS_DB_SQL = ` CREATE DATABASE IF NOT EXISTS analytics_internal; `; diff --git a/apps/backend/src/app/api/latest/emails/outbox/crud.tsx b/apps/backend/src/app/api/latest/emails/outbox/crud.tsx index 2d36b499d..73ec22702 100644 --- a/apps/backend/src/app/api/latest/emails/outbox/crud.tsx +++ b/apps/backend/src/app/api/latest/emails/outbox/crud.tsx @@ -447,6 +447,9 @@ export const emailOutboxCrudHandlers = createLazyProxy(() => createCrudHandlers( set("updatedAt", Prisma.sql`NOW()`); } + // Mark for external DB sync + set("shouldUpdateSequenceId", Prisma.sql`TRUE`); + const updateQuery: RawQuery = { supportedPrismaClients: ["global"], readOnlyQuery: false, @@ -543,6 +546,8 @@ function parseEmailOutboxFromJson(j: Record): EmailOutbox { clickedAt: dateOrNull("clickedAt"), unsubscribedAt: dateOrNull("unsubscribedAt"), markedAsSpamAt: dateOrNull("markedAsSpamAt"), + sequenceId: j.sequenceId != null ? BigInt(j.sequenceId as string | number) : null, + shouldUpdateSequenceId: j.shouldUpdateSequenceId as boolean, }; } diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 78bbb99ff..a7d30bdc1 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -166,6 +166,34 @@ async function backfillSequenceIds(batchSize: number): Promise { didUpdate = true; } + const emailOutboxTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "id" + FROM "EmailOutbox" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "EmailOutbox" eo + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE eo."tenancyId" = r."tenancyId" + AND eo."id" = r."id" + RETURNING eo."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.email-outbox-tenants", emailOutboxTenants.length); + + if (emailOutboxTenants.length > 0) { + await enqueueExternalDbSyncBatch(emailOutboxTenants.map(t => t.tenancyId)); + didUpdate = true; + } + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "id", "tenancyId" @@ -195,7 +223,7 @@ async function backfillSequenceIds(batchSize: number): Promise { span.setAttribute("stack.external-db-sync.did-update", didUpdate); if (didUpdate) { - console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, DR=${deletedRowTenants.length}`); + console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`); } return didUpdate; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts index 701c8818a..f8de41b24 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts @@ -87,6 +87,7 @@ const globalSchema = yupObject({ sequencer: yupObject({ project_users: sequenceStatsSchema.defined(), contact_channels: sequenceStatsSchema.defined(), + email_outboxes: sequenceStatsSchema.defined(), deleted_rows: sequenceStatsSchema.shape({ by_table: yupArray(deletedRowByTableSchema).defined(), }).defined(), @@ -119,6 +120,7 @@ const responseSchema = yupObject({ sequencer: yupObject({ project_users: sequenceStatsSchema.defined(), contact_channels: sequenceStatsSchema.defined(), + email_outboxes: sequenceStatsSchema.defined(), deleted_rows: sequenceStatsSchema.shape({ by_table: yupArray(deletedRowByTableSchema).defined(), }).defined(), @@ -233,6 +235,7 @@ function maxBigIntString(values: Array): string | nul function buildMappingInternalStats( projectUsersStats: SequenceStats, + emailOutboxStats: SequenceStats, deletedRowsByTable: DeletedRowSummary[], ) { const deletedProjectUserStats = deletedRowsByTable.find((row) => row.table_name === "ProjectUser") ?? null; @@ -264,6 +267,13 @@ function buildMappingInternalStats( internal_pending_count: usersMappingPending, }); + mappingInternalStats.set("email_outboxes", { + mapping_id: "email_outboxes", + internal_min_sequence_id: emailOutboxStats.min_sequence_id, + internal_max_sequence_id: emailOutboxStats.max_sequence_id, + internal_pending_count: emailOutboxStats.pending, + }); + const mappings = Array.from(mappingInternalStats.values()); const mappingStatuses = mappings.map((mapping) => ({ mapping_id: mapping.mapping_id, @@ -300,6 +310,17 @@ async function fetchInternalStats(tenancyId: string | null) { ${tenancyWhere} `).at(0) ?? throwErr("Contact channel stats query returned no rows."); + const emailOutboxStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "EmailOutbox" + ${tenancyWhere} + `).at(0) ?? throwErr("Email outbox stats query returned no rows."); + const deletedRowStatsRow = (await globalPrismaClient.$queryRaw` SELECT COUNT(*)::bigint AS "total", @@ -346,6 +367,7 @@ async function fetchInternalStats(tenancyId: string | null) { const projectUsersStats = formatSequenceStats(projectUserStatsRow); const contactChannelStats = formatSequenceStats(contactChannelStatsRow); + const emailOutboxStats = formatSequenceStats(emailOutboxStatsRow); const deletedRowStats = formatSequenceStats(deletedRowStatsRow); const deletedRowsByTable = deletedRowsByTableRows.map((row) => ({ @@ -353,11 +375,12 @@ async function fetchInternalStats(tenancyId: string | null) { ...formatSequenceStats(row), })); - const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, deletedRowsByTable); + const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, emailOutboxStats, deletedRowsByTable); return { projectUsersStats, contactChannelStats, + emailOutboxStats, deletedRowStats, deletedRowsByTable, outgoingStatsRow, @@ -1003,6 +1026,7 @@ export const GET = createSmartRouteHandler({ sequencer: { project_users: globalStats.projectUsersStats, contact_channels: globalStats.contactChannelStats, + email_outboxes: globalStats.emailOutboxStats, deleted_rows: { ...globalStats.deletedRowStats, by_table: globalStats.deletedRowsByTable, @@ -1021,6 +1045,7 @@ export const GET = createSmartRouteHandler({ sequencer: { project_users: currentStats.projectUsersStats, contact_channels: currentStats.contactChannelStats, + email_outboxes: currentStats.emailOutboxStats, deleted_rows: { ...currentStats.deletedRowStats, by_table: currentStats.deletedRowsByTable, diff --git a/apps/backend/src/lib/email-queue-step.tsx b/apps/backend/src/lib/email-queue-step.tsx index b98ebb812..17456c772 100644 --- a/apps/backend/src/lib/email-queue-step.tsx +++ b/apps/backend/src/lib/email-queue-step.tsx @@ -195,6 +195,7 @@ async function retryEmailsStuckInRendering(): Promise { data: { renderedByWorkerId: null, startedRenderingAt: null, + shouldUpdateSequenceId: true, }, }); if (res.length > 0) { @@ -398,6 +399,7 @@ async function renderTenancyEmails(workerId: string, tenancyId: string, group: E renderErrorInternalMessage: error, renderErrorInternalDetails: { error }, finishedRenderingAt: new Date(), + shouldUpdateSequenceId: true, }, }); }; @@ -418,6 +420,7 @@ async function renderTenancyEmails(workerId: string, tenancyId: string, group: E renderErrorInternalMessage: null, renderErrorInternalDetails: Prisma.DbNull, finishedRenderingAt: new Date(), + shouldUpdateSequenceId: true, }, }); }; @@ -508,7 +511,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> { // Query 1: Fresh emails (scheduledAt has passed, no retry pending) const freshEmails = await globalPrismaClient.$queryRaw<{ id: string }[]>` UPDATE "EmailOutbox" - SET "isQueued" = TRUE + SET "isQueued" = TRUE, "shouldUpdateSequenceId" = TRUE WHERE "isQueued" = FALSE AND "isPaused" = FALSE AND "skippedReason" IS NULL @@ -523,7 +526,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> { // Clear nextSendRetryAt when queuing so the email is in a clean "queued" state. const retryEmails = await globalPrismaClient.$queryRaw<{ id: string }[]>` UPDATE "EmailOutbox" - SET "isQueued" = TRUE, "nextSendRetryAt" = NULL + SET "isQueued" = TRUE, "nextSendRetryAt" = NULL, "shouldUpdateSequenceId" = TRUE WHERE "isQueued" = FALSE AND "isPaused" = FALSE AND "skippedReason" IS NULL @@ -749,6 +752,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO sendRetries: newAttemptCount, nextSendRetryAt: new Date(Date.now() + backoffMs), sendAttemptErrors: updatedErrors as Prisma.InputJsonArray, + shouldUpdateSequenceId: true, }, }); } else { @@ -789,6 +793,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO failureReason, allAttemptErrors: updatedErrors as Json[], }, + shouldUpdateSequenceId: true, }, }); } @@ -809,6 +814,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO sendServerErrorExternalDetails: Prisma.DbNull, sendServerErrorInternalMessage: null, sendServerErrorInternalDetails: Prisma.DbNull, + shouldUpdateSequenceId: true, }, }); } @@ -829,6 +835,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO sendServerErrorExternalDetails: {}, sendServerErrorInternalMessage: errorToNiceString(error), sendServerErrorInternalDetails: {}, + shouldUpdateSequenceId: true, }, }); } @@ -914,6 +921,7 @@ async function markSkipped(row: EmailOutbox, reason: EmailOutboxSkippedReason, d data: { skippedReason: reason, skippedDetails: details as Prisma.InputJsonValue, + shouldUpdateSequenceId: true, }, }); } diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index bd9b0b1ca..519930537 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -499,6 +499,13 @@ function normalizeClickhouseBoolean(value: unknown, label: string): number { throw new StackAssertionError(`${label} must be a boolean or 0/1. Received: ${JSON.stringify(value)}`); } +function normalizeClickhouseNullableBoolean(value: unknown, label: string): number | null { + if (value === null || value === undefined) { + return null; + } + return normalizeClickhouseBoolean(value, label); +} + function parseSequenceId(value: unknown, mappingId: string): number | null { if (value == null) { return null; @@ -530,7 +537,7 @@ async function ensureClickhouseSchema( // Map of target table name -> column normalizers for ClickHouse // 'json' columns get JSON.stringify, 'boolean' columns get normalizeClickhouseBoolean -const CLICKHOUSE_COLUMN_NORMALIZERS: Record> = { +const CLICKHOUSE_COLUMN_NORMALIZERS: Record> = { users: { client_metadata: 'json', client_read_only_metadata: 'json', @@ -555,6 +562,13 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record { let dbManager: TestDbManager; const createProjectWithExternalDb = ( externalDatabases: any, - projectOptions?: { display_name?: string, description?: string } + projectOptions?: { display_name?: string, description?: string, config?: Record } ) => { return createProjectWithExternalDbRaw( externalDatabases, @@ -837,6 +840,227 @@ describe.sequential('External DB Sync - Basic Tests', () => { await waitForSyncedTeamMemberDeletion(client, teamId, user.userId); }, TEST_TIMEOUT); + /** + * What it does: + * - Creates a project with email config, sends an email, and verifies + * the email outbox row is synced to the external Postgres DB. + */ + test('EmailOutbox sync (Postgres)', async () => { + const dbName = 'email_outbox_pg_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }, { + display_name: 'Email Outbox Sync Test', + config: { + email_config: { + type: "standard", + host: "localhost", + port: Number(withPortPrefix("29")), + username: "test", + password: "test", + sender_name: "Test Project", + sender_email: "test@example.com", + }, + }, + }); + + // Create a user + const createUserResponse = await niceBackendFetch("/api/v1/users", { + method: "POST", + accessType: "server", + body: { + primary_email: backendContext.value.mailbox.emailAddress, + primary_email_verified: true, + }, + }); + expect(createUserResponse.status).toBe(201); + const userId = createUserResponse.body.id; + + // Send an email + const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", { + method: "POST", + accessType: "server", + body: { + user_ids: [userId], + html: "

Sync test email

", + subject: "DB Sync Test Email", + notification_category_name: "Transactional", + }, + }); + expect(sendResponse.status).toBe(200); + + // Wait for the email to be processed (rendered + sent) + await wait(8_000); + + // Get the email ID from the outbox API + const listResponse = await niceBackendFetch("/api/v1/emails/outbox", { + method: "GET", + accessType: "server", + }); + expect(listResponse.status).toBe(200); + expect(listResponse.body.items.length).toBeGreaterThanOrEqual(1); + const emailId = listResponse.body.items[0].id; + + const client = dbManager.getClient(dbName); + + // Wait for the email outbox row to appear in external DB + await waitForSyncedEmailOutbox(client, emailId); + + // Verify the synced row has expected columns + const res = await client.query(`SELECT * FROM "email_outboxes" WHERE "id" = $1`, [emailId]); + expect(res.rows.length).toBe(1); + const row = res.rows[0]; + expect(row.created_with).toBe('PROGRAMMATIC_CALL'); + expect(row.is_high_priority).toBe(false); + expect(row.is_paused).toBe(false); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a project, sends an email, and verifies the email outbox row + * is synced to ClickHouse. + */ + test('EmailOutbox sync (ClickHouse)', async ({ expect }) => { + await Project.createAndSwitch({ + config: { + magic_link_enabled: true, + email_config: { + type: "standard", + host: "localhost", + port: Number(withPortPrefix("29")), + username: "test", + password: "test", + sender_name: "Test Project", + sender_email: "test@example.com", + }, + }, + }); + + // Create a user + const createUserResponse = await niceBackendFetch("/api/v1/users", { + method: "POST", + accessType: "server", + body: { + primary_email: backendContext.value.mailbox.emailAddress, + primary_email_verified: true, + }, + }); + expect(createUserResponse.status).toBe(201); + const userId = createUserResponse.body.id; + + // Send an email + const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", { + method: "POST", + accessType: "server", + body: { + user_ids: [userId], + html: "

ClickHouse sync test email

", + subject: "CH Sync Test Email", + notification_category_name: "Transactional", + }, + }); + expect(sendResponse.status).toBe(200); + + // Wait for the email to be processed + await wait(8_000); + + await InternalApiKey.createAndSetProjectKeys(); + + // Poll ClickHouse until the email_outboxes row appears + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT id, status, simple_status, created_with, is_high_priority FROM email_outboxes LIMIT 10", + }); + expect(response.status).toBe(200); + if (response.body.result.length >= 1) { + break; + } + await wait(intervalMs); + } + + expect(response!.body.result.length).toBeGreaterThanOrEqual(1); + const row = response!.body.result[0]; + expect(row.created_with).toBe('PROGRAMMATIC_CALL'); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Sends an email, waits for it to reach a terminal state, then verifies + * the status update is reflected in the external Postgres DB. + */ + test('EmailOutbox status updates are synced (Postgres)', async () => { + const dbName = 'email_outbox_status_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }, { + config: { + email_config: { + type: "standard", + host: "localhost", + port: Number(withPortPrefix("29")), + username: "test", + password: "test", + sender_name: "Test Project", + sender_email: "test@example.com", + }, + }, + }); + + const createUserResponse = await niceBackendFetch("/api/v1/users", { + method: "POST", + accessType: "server", + body: { + primary_email: backendContext.value.mailbox.emailAddress, + primary_email_verified: true, + }, + }); + expect(createUserResponse.status).toBe(201); + const userId = createUserResponse.body.id; + + const sendResponse = await niceBackendFetch("/api/v1/emails/send-email", { + method: "POST", + accessType: "server", + body: { + user_ids: [userId], + html: "

Status sync test

", + subject: "Status Sync Test", + notification_category_name: "Transactional", + }, + }); + expect(sendResponse.status).toBe(200); + + // Wait for the email to finish sending + await wait(8_000); + + const client = dbManager.getClient(dbName); + + // The email should eventually reach SENT status in the external DB + await waitForSyncedEmailOutboxByStatus(client, 'SENT'); + + const res = await client.query(`SELECT * FROM "email_outboxes" WHERE "status" = 'SENT'`); + expect(res.rows.length).toBeGreaterThanOrEqual(1); + const row = res.rows[0]; + expect(row.simple_status).toBe('OK'); + expect(row.finished_sending_at).not.toBeNull(); + expect(row.sent_at).not.toBeNull(); + expect(row.send_retries).toBe(0); + }, TEST_TIMEOUT); + /** * What it does: * - Reads the external DB sync fusebox settings. diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index ae830bbe6..73a2f29b9 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -314,13 +314,38 @@ export async function waitForSyncedContactChannelDeletion(client: Client, value: }); } +export async function waitForSyncedEmailOutbox(client: Client, emailId: string, expectedStatus?: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "email_outboxes" WHERE "id" = $1`, + [emailId], + { + shouldExist: true, + description: `email outbox "${emailId}" to appear in external DB`, + checkRow: expectedStatus ? (row) => row.status === expectedStatus : undefined, + }, + ); +} + +export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "email_outboxes" WHERE "status" = $1`, + [status], + { + shouldExist: true, + description: `email outbox with status "${status}" to appear in external DB`, + }, + ); +} + /** * Helper to create a project and update its config with external DB settings. * Tracks the project for cleanup later. */ export async function createProjectWithExternalDb( externalDatabases: any, - projectOptions?: { display_name?: string, description?: string }, + projectOptions?: { display_name?: string, description?: string, config?: Record }, options?: { projectTracker?: ProjectContext[] } ) { const project = await Project.createAndSwitch(projectOptions); diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index 92d639997..d77641e1a 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -832,4 +832,292 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, + "email_outboxes": { + sourceTables: { "EmailOutbox": "EmailOutbox" }, + targetTable: "email_outboxes", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "email_outboxes" ( + "id" uuid PRIMARY KEY NOT NULL, + "status" text NOT NULL, + "simple_status" text NOT NULL, + "created_with" text NOT NULL, + "email_draft_id" text, + "email_programmatic_call_template_id" text, + "is_high_priority" boolean NOT NULL DEFAULT false, + "rendered_is_transactional" boolean, + "rendered_subject" text, + "rendered_notification_category_id" text, + "scheduled_at" timestamp without time zone NOT NULL, + "created_at" timestamp without time zone NOT NULL, + "started_sending_at" timestamp without time zone, + "finished_sending_at" timestamp without time zone, + "sent_at" timestamp without time zone, + "delivered_at" timestamp without time zone, + "opened_at" timestamp without time zone, + "clicked_at" timestamp without time zone, + "unsubscribed_at" timestamp without time zone, + "marked_as_spam_at" timestamp without time zone, + "bounced_at" timestamp without time zone, + "can_have_delivery_info" boolean, + "skipped_reason" text, + "send_retries" integer NOT NULL DEFAULT 0, + "is_paused" boolean NOT NULL DEFAULT false + ); + REVOKE ALL ON "email_outboxes" FROM PUBLIC; + GRANT SELECT ON "email_outboxes" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.email_outboxes ( + project_id String, + branch_id String, + id UUID, + status LowCardinality(String), + simple_status LowCardinality(String), + created_with LowCardinality(String), + email_draft_id Nullable(String), + email_programmatic_call_template_id Nullable(String), + is_high_priority UInt8, + rendered_is_transactional Nullable(UInt8), + rendered_subject Nullable(String), + rendered_notification_category_id Nullable(String), + scheduled_at DateTime64(3, 'UTC'), + created_at DateTime64(3, 'UTC'), + started_sending_at Nullable(DateTime64(3, 'UTC')), + finished_sending_at Nullable(DateTime64(3, 'UTC')), + sent_at Nullable(DateTime64(3, 'UTC')), + delivered_at Nullable(DateTime64(3, 'UTC')), + opened_at Nullable(DateTime64(3, 'UTC')), + clicked_at Nullable(DateTime64(3, 'UTC')), + unsubscribed_at Nullable(DateTime64(3, 'UTC')), + marked_as_spam_at Nullable(DateTime64(3, 'UTC')), + bounced_at Nullable(DateTime64(3, 'UTC')), + can_have_delivery_info Nullable(UInt8), + skipped_reason LowCardinality(Nullable(String)), + send_retries Int32, + is_paused UInt8, + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "EmailOutbox"."id" AS "id", + "EmailOutbox"."status"::text AS "status", + "EmailOutbox"."simpleStatus"::text AS "simple_status", + "EmailOutbox"."createdWith"::text AS "created_with", + "EmailOutbox"."emailDraftId" AS "email_draft_id", + "EmailOutbox"."emailProgrammaticCallTemplateId" AS "email_programmatic_call_template_id", + "EmailOutbox"."isHighPriority" AS "is_high_priority", + "EmailOutbox"."renderedIsTransactional" AS "rendered_is_transactional", + "EmailOutbox"."renderedSubject" AS "rendered_subject", + "EmailOutbox"."renderedNotificationCategoryId" AS "rendered_notification_category_id", + "EmailOutbox"."scheduledAt" AS "scheduled_at", + "EmailOutbox"."createdAt" AS "created_at", + "EmailOutbox"."startedSendingAt" AS "started_sending_at", + "EmailOutbox"."finishedSendingAt" AS "finished_sending_at", + "EmailOutbox"."sentAt" AS "sent_at", + "EmailOutbox"."deliveredAt" AS "delivered_at", + "EmailOutbox"."openedAt" AS "opened_at", + "EmailOutbox"."clickedAt" AS "clicked_at", + "EmailOutbox"."unsubscribedAt" AS "unsubscribed_at", + "EmailOutbox"."markedAsSpamAt" AS "marked_as_spam_at", + "EmailOutbox"."bouncedAt" AS "bounced_at", + "EmailOutbox"."canHaveDeliveryInfo" AS "can_have_delivery_info", + "EmailOutbox"."skippedReason"::text AS "skipped_reason", + "EmailOutbox"."sendRetries" AS "send_retries", + "EmailOutbox"."isPaused" AS "is_paused", + "EmailOutbox"."sequenceId" AS "sync_sequence_id", + "EmailOutbox"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "EmailOutbox" + JOIN "Tenancy" ON "Tenancy"."id" = "EmailOutbox"."tenancyId" + WHERE "EmailOutbox"."tenancyId" = $1::uuid + AND "EmailOutbox"."sequenceId" IS NOT NULL + AND "EmailOutbox"."sequenceId" > $2::bigint + ORDER BY "EmailOutbox"."sequenceId" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT + "EmailOutbox"."id" AS "id", + "EmailOutbox"."status"::text AS "status", + "EmailOutbox"."simpleStatus"::text AS "simple_status", + "EmailOutbox"."createdWith"::text AS "created_with", + "EmailOutbox"."emailDraftId" AS "email_draft_id", + "EmailOutbox"."emailProgrammaticCallTemplateId" AS "email_programmatic_call_template_id", + "EmailOutbox"."isHighPriority" AS "is_high_priority", + "EmailOutbox"."renderedIsTransactional" AS "rendered_is_transactional", + "EmailOutbox"."renderedSubject" AS "rendered_subject", + "EmailOutbox"."renderedNotificationCategoryId" AS "rendered_notification_category_id", + "EmailOutbox"."scheduledAt" AS "scheduled_at", + "EmailOutbox"."createdAt" AS "created_at", + "EmailOutbox"."startedSendingAt" AS "started_sending_at", + "EmailOutbox"."finishedSendingAt" AS "finished_sending_at", + "EmailOutbox"."sentAt" AS "sent_at", + "EmailOutbox"."deliveredAt" AS "delivered_at", + "EmailOutbox"."openedAt" AS "opened_at", + "EmailOutbox"."clickedAt" AS "clicked_at", + "EmailOutbox"."unsubscribedAt" AS "unsubscribed_at", + "EmailOutbox"."markedAsSpamAt" AS "marked_as_spam_at", + "EmailOutbox"."bouncedAt" AS "bounced_at", + "EmailOutbox"."canHaveDeliveryInfo" AS "can_have_delivery_info", + "EmailOutbox"."skippedReason"::text AS "skipped_reason", + "EmailOutbox"."sendRetries" AS "send_retries", + "EmailOutbox"."isPaused" AS "is_paused", + "EmailOutbox"."sequenceId" AS "sequence_id", + "EmailOutbox"."tenancyId", + false AS "is_deleted" + FROM "EmailOutbox" + WHERE "EmailOutbox"."tenancyId" = $1::uuid + AND "EmailOutbox"."sequenceId" IS NOT NULL + AND "EmailOutbox"."sequenceId" > $2::bigint + ORDER BY "EmailOutbox"."sequenceId" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::text AS "status", + $3::text AS "simple_status", + $4::text AS "created_with", + $5::text AS "email_draft_id", + $6::text AS "email_programmatic_call_template_id", + $7::boolean AS "is_high_priority", + $8::boolean AS "rendered_is_transactional", + $9::text AS "rendered_subject", + $10::text AS "rendered_notification_category_id", + $11::timestamp without time zone AS "scheduled_at", + $12::timestamp without time zone AS "created_at", + $13::timestamp without time zone AS "started_sending_at", + $14::timestamp without time zone AS "finished_sending_at", + $15::timestamp without time zone AS "sent_at", + $16::timestamp without time zone AS "delivered_at", + $17::timestamp without time zone AS "opened_at", + $18::timestamp without time zone AS "clicked_at", + $19::timestamp without time zone AS "unsubscribed_at", + $20::timestamp without time zone AS "marked_as_spam_at", + $21::timestamp without time zone AS "bounced_at", + $22::boolean AS "can_have_delivery_info", + $23::text AS "skipped_reason", + $24::integer AS "send_retries", + $25::boolean AS "is_paused", + $26::bigint AS "sequence_id", + $27::boolean AS "is_deleted", + $28::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "email_outboxes" eo + USING params p + WHERE p."is_deleted" = true AND eo."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "email_outboxes" ( + "id", + "status", + "simple_status", + "created_with", + "email_draft_id", + "email_programmatic_call_template_id", + "is_high_priority", + "rendered_is_transactional", + "rendered_subject", + "rendered_notification_category_id", + "scheduled_at", + "created_at", + "started_sending_at", + "finished_sending_at", + "sent_at", + "delivered_at", + "opened_at", + "clicked_at", + "unsubscribed_at", + "marked_as_spam_at", + "bounced_at", + "can_have_delivery_info", + "skipped_reason", + "send_retries", + "is_paused" + ) + SELECT + p."id", + p."status", + p."simple_status", + p."created_with", + p."email_draft_id", + p."email_programmatic_call_template_id", + p."is_high_priority", + p."rendered_is_transactional", + p."rendered_subject", + p."rendered_notification_category_id", + p."scheduled_at", + p."created_at", + p."started_sending_at", + p."finished_sending_at", + p."sent_at", + p."delivered_at", + p."opened_at", + p."clicked_at", + p."unsubscribed_at", + p."marked_as_spam_at", + p."bounced_at", + p."can_have_delivery_info", + p."skipped_reason", + p."send_retries", + p."is_paused" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "status" = EXCLUDED."status", + "simple_status" = EXCLUDED."simple_status", + "created_with" = EXCLUDED."created_with", + "email_draft_id" = EXCLUDED."email_draft_id", + "email_programmatic_call_template_id" = EXCLUDED."email_programmatic_call_template_id", + "is_high_priority" = EXCLUDED."is_high_priority", + "rendered_is_transactional" = EXCLUDED."rendered_is_transactional", + "rendered_subject" = EXCLUDED."rendered_subject", + "rendered_notification_category_id" = EXCLUDED."rendered_notification_category_id", + "scheduled_at" = EXCLUDED."scheduled_at", + "created_at" = EXCLUDED."created_at", + "started_sending_at" = EXCLUDED."started_sending_at", + "finished_sending_at" = EXCLUDED."finished_sending_at", + "sent_at" = EXCLUDED."sent_at", + "delivered_at" = EXCLUDED."delivered_at", + "opened_at" = EXCLUDED."opened_at", + "clicked_at" = EXCLUDED."clicked_at", + "unsubscribed_at" = EXCLUDED."unsubscribed_at", + "marked_as_spam_at" = EXCLUDED."marked_as_spam_at", + "bounced_at" = EXCLUDED."bounced_at", + "can_have_delivery_info" = EXCLUDED."can_have_delivery_info", + "skipped_reason" = EXCLUDED."skipped_reason", + "send_retries" = EXCLUDED."send_retries", + "is_paused" = EXCLUDED."is_paused" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, } as const;