diff --git a/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql new file mode 100644 index 000000000..88e0751fd --- /dev/null +++ b/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql @@ -0,0 +1,12 @@ +-- AlterTable +ALTER TABLE "SessionReplay" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "SessionReplay_sequenceId_key" ON "SessionReplay"("sequenceId"); + +-- CreateIndex +CREATE INDEX "SessionReplay_tenancyId_sequenceId_idx" ON "SessionReplay"("tenancyId", "sequenceId"); + +-- CreateIndex +CREATE INDEX "SessionReplay_shouldUpdateSequenceId_idx" ON "SessionReplay"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index dad91817f..b79570597 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -356,6 +356,9 @@ model SessionReplay { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + projectUser ProjectUser @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade) tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade) @@ -367,6 +370,8 @@ model SessionReplay { @@index([tenancyId, lastEventAt]) // index by updatedAt instead of lastEventAt because event timing can be spoofed @@index([tenancyId, refreshTokenId, updatedAt]) + @@index([tenancyId, sequenceId], name: "SessionReplay_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "SessionReplay_shouldUpdateSequenceId_idx") } model SessionReplayChunk { diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index f56870c00..f5cd50d1d 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -24,6 +24,8 @@ export async function runClickhouseMigrations() { await client.exec({ query: TEAM_MEMBERS_VIEW_SQL }); await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL }); await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL }); + await client.exec({ query: SESSION_REPLAYS_TABLE_BASE_SQL }); + await client.exec({ query: SESSION_REPLAYS_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -39,6 +41,7 @@ export async function runClickhouseMigrations() { "GRANT SELECT ON default.teams TO limited_user;", "GRANT SELECT ON default.team_members TO limited_user;", "GRANT SELECT ON default.email_outboxes TO limited_user;", + "GRANT SELECT ON default.session_replays TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -58,6 +61,9 @@ export async function runClickhouseMigrations() { await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS session_replays_project_isolation ON default.session_replays FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); for (const query of queries) { await client.exec({ query }); } @@ -411,6 +417,36 @@ FINAL WHERE sync_is_deleted = 0; `; +const SESSION_REPLAYS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.session_replays ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + refresh_token_id String, + started_at DateTime64(3, 'UTC'), + last_event_at DateTime64(3, 'UTC'), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(started_at) +ORDER BY (project_id, branch_id, id); +`; + +const SESSION_REPLAYS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.session_replays +SQL SECURITY DEFINER +AS +SELECT project_id, branch_id, id, user_id, refresh_token_id, + started_at, last_event_at, created_at +FROM analytics_internal.session_replays +FINAL +WHERE sync_is_deleted = 0; +`; + const EXTERNAL_ANALYTICS_DB_SQL = ` CREATE DATABASE IF NOT EXISTS analytics_internal; `; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index a7d30bdc1..e9d60ce88 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -194,6 +194,34 @@ async function backfillSequenceIds(batchSize: number): Promise { didUpdate = true; } + const sessionReplayTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "id" + FROM "SessionReplay" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "SessionReplay" sr + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE sr."tenancyId" = r."tenancyId" + AND sr."id" = r."id" + RETURNING sr."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.session-replay-tenants", sessionReplayTenants.length); + + if (sessionReplayTenants.length > 0) { + await enqueueExternalDbSyncBatch(sessionReplayTenants.map(t => t.tenancyId)); + didUpdate = true; + } + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "id", "tenancyId" @@ -223,7 +251,7 @@ async function backfillSequenceIds(batchSize: number): Promise { span.setAttribute("stack.external-db-sync.did-update", didUpdate); if (didUpdate) { - console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`); + console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, DR=${deletedRowTenants.length}`); } return didUpdate; diff --git a/apps/backend/src/app/api/latest/session-replays/batch/route.tsx b/apps/backend/src/app/api/latest/session-replays/batch/route.tsx index 57e1a162e..25dccb4c7 100644 --- a/apps/backend/src/app/api/latest/session-replays/batch/route.tsx +++ b/apps/backend/src/app/api/latest/session-replays/batch/route.tsx @@ -120,10 +120,12 @@ export const POST = createSmartRouteHandler({ refreshTokenId, startedAt: new Date(firstMs), lastEventAt: new Date(newLastEventAtMs), + shouldUpdateSequenceId: true, }, update: { startedAt: new Date(newStartedAtMs), lastEventAt: new Date(newLastEventAtMs), + shouldUpdateSequenceId: true, }, }); diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 519930537..ffd24b687 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -569,6 +569,9 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record { expect(row.send_retries).toBe(0); }, TEST_TIMEOUT); + /** + * What it does: + * - Creates a project with analytics, signs in a user, uploads a session replay batch, + * and verifies the session replay row is synced to ClickHouse. + */ + test('SessionReplay sync (ClickHouse)', async ({ expect }) => { + await Project.createAndSwitch({ + config: { + magic_link_enabled: true, + }, + }); + await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } }); + await Auth.Otp.signIn(); + + const now = Date.now(); + const browserSessionId = randomUUID(); + const batchId = randomUUID(); + + const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { + method: "POST", + accessType: "client", + body: { + browser_session_id: browserSessionId, + session_replay_segment_id: randomUUID(), + batch_id: batchId, + started_at_ms: now, + sent_at_ms: now + 500, + events: [ + { timestamp: now + 100, type: 2 }, + { timestamp: now + 200, type: 3 }, + ], + }, + }); + expect(uploadRes.status).toBe(200); + expect(uploadRes.body.deduped).toBe(false); + + await InternalApiKey.createAndSetProjectKeys(); + + // Poll ClickHouse until the session_replays row appears + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT id, user_id, refresh_token_id, started_at, last_event_at FROM session_replays LIMIT 10", + }); + expect(response.status).toBe(200); + if (response.body.result.length >= 1) { + break; + } + await wait(intervalMs); + } + + expect(response!.body.result.length).toBeGreaterThanOrEqual(1); + const row = response!.body.result[0]; + expect(row.user_id).toBeDefined(); + expect(row.refresh_token_id).toBeDefined(); + expect(row.started_at).toBeDefined(); + expect(row.last_event_at).toBeDefined(); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a project with an external Postgres DB, signs in a user, + * uploads a session replay batch, and verifies the row is synced to external Postgres. + */ + test('SessionReplay sync (Postgres)', async () => { + const dbName = 'session_replay_pg_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }, { + display_name: 'Session Replay Sync Test', + config: { + magic_link_enabled: true, + }, + }); + await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } }); + await Auth.Otp.signIn(); + + const now = Date.now(); + const browserSessionId = randomUUID(); + const batchId = randomUUID(); + + const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { + method: "POST", + accessType: "client", + body: { + browser_session_id: browserSessionId, + session_replay_segment_id: randomUUID(), + batch_id: batchId, + started_at_ms: now, + sent_at_ms: now + 500, + events: [ + { timestamp: now + 100, type: 2 }, + { timestamp: now + 200, type: 3 }, + ], + }, + }); + expect(uploadRes.status).toBe(200); + const replayId = uploadRes.body.session_replay_id; + + const client = dbManager.getClient(dbName); + + // Wait for the session replay row to appear in external DB + await waitForSyncedSessionReplay(client, replayId); + + // Verify the synced row has expected columns + const res = await client.query(`SELECT * FROM "session_replays" WHERE "id" = $1`, [replayId]); + expect(res.rows.length).toBe(1); + const row = res.rows[0]; + expect(row.user_id).toBeDefined(); + expect(row.refresh_token_id).toBeDefined(); + expect(row.started_at).toBeDefined(); + expect(row.last_event_at).toBeDefined(); + }, TEST_TIMEOUT); + /** * What it does: * - Reads the external DB sync fusebox settings. diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index 73a2f29b9..f594691f1 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -327,6 +327,18 @@ export async function waitForSyncedEmailOutbox(client: Client, emailId: string, ); } +export async function waitForSyncedSessionReplay(client: Client, replayId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "session_replays" WHERE "id" = $1`, + [replayId], + { + shouldExist: true, + description: `session replay "${replayId}" to appear in external DB`, + }, + ); +} + export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) { await waitForExternalDbRow( client, diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index d77641e1a..b9b206f7e 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -1120,4 +1120,140 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, + "session_replays": { + sourceTables: { "SessionReplay": "SessionReplay" }, + targetTable: "session_replays", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "session_replays" ( + "id" uuid PRIMARY KEY NOT NULL, + "user_id" uuid NOT NULL, + "refresh_token_id" text NOT NULL, + "started_at" timestamp without time zone NOT NULL, + "last_event_at" timestamp without time zone NOT NULL, + "created_at" timestamp without time zone NOT NULL + ); + REVOKE ALL ON "session_replays" FROM PUBLIC; + GRANT SELECT ON "session_replays" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.session_replays ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + refresh_token_id String, + started_at DateTime64(3, 'UTC'), + last_event_at DateTime64(3, 'UTC'), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(started_at) + ORDER BY (project_id, branch_id, id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "SessionReplay"."id" AS "id", + "SessionReplay"."projectUserId" AS "user_id", + "SessionReplay"."refreshTokenId" AS "refresh_token_id", + "SessionReplay"."startedAt" AS "started_at", + "SessionReplay"."lastEventAt" AS "last_event_at", + "SessionReplay"."createdAt" AS "created_at", + "SessionReplay"."sequenceId" AS "sync_sequence_id", + "SessionReplay"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "SessionReplay" + JOIN "Tenancy" ON "Tenancy"."id" = "SessionReplay"."tenancyId" + WHERE "SessionReplay"."tenancyId" = $1::uuid + AND "SessionReplay"."sequenceId" IS NOT NULL + AND "SessionReplay"."sequenceId" > $2::bigint + ORDER BY "SessionReplay"."sequenceId" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT + "SessionReplay"."id" AS "id", + "SessionReplay"."projectUserId" AS "user_id", + "SessionReplay"."refreshTokenId" AS "refresh_token_id", + "SessionReplay"."startedAt" AS "started_at", + "SessionReplay"."lastEventAt" AS "last_event_at", + "SessionReplay"."createdAt" AS "created_at", + "SessionReplay"."sequenceId" AS "sequence_id", + "SessionReplay"."tenancyId", + false AS "is_deleted" + FROM "SessionReplay" + WHERE "SessionReplay"."tenancyId" = $1::uuid + AND "SessionReplay"."sequenceId" IS NOT NULL + AND "SessionReplay"."sequenceId" > $2::bigint + ORDER BY "SessionReplay"."sequenceId" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::uuid AS "user_id", + $3::text AS "refresh_token_id", + $4::timestamp without time zone AS "started_at", + $5::timestamp without time zone AS "last_event_at", + $6::timestamp without time zone AS "created_at", + $7::bigint AS "sequence_id", + $8::boolean AS "is_deleted", + $9::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "session_replays" sr + USING params p + WHERE p."is_deleted" = true AND sr."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "session_replays" ( + "id", + "user_id", + "refresh_token_id", + "started_at", + "last_event_at", + "created_at" + ) + SELECT + p."id", + p."user_id", + p."refresh_token_id", + p."started_at", + p."last_event_at", + p."created_at" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "user_id" = EXCLUDED."user_id", + "refresh_token_id" = EXCLUDED."refresh_token_id", + "started_at" = EXCLUDED."started_at", + "last_event_at" = EXCLUDED."last_event_at", + "created_at" = EXCLUDED."created_at" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, } as const;