clickhouse sync session replays

This commit is contained in:
Bilal Godil 2026-03-16 13:47:35 -07:00
parent 9f9c9a46dc
commit 008c6b083f
9 changed files with 361 additions and 2 deletions

View File

@ -0,0 +1,12 @@
-- AlterTable
ALTER TABLE "SessionReplay" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "SessionReplay_sequenceId_key" ON "SessionReplay"("sequenceId");
-- CreateIndex
CREATE INDEX "SessionReplay_tenancyId_sequenceId_idx" ON "SessionReplay"("tenancyId", "sequenceId");
-- CreateIndex
CREATE INDEX "SessionReplay_shouldUpdateSequenceId_idx" ON "SessionReplay"("shouldUpdateSequenceId", "tenancyId");

View File

@ -356,6 +356,9 @@ model SessionReplay {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
projectUser ProjectUser @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade)
tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade)
@ -367,6 +370,8 @@ model SessionReplay {
@@index([tenancyId, lastEventAt])
// index by updatedAt instead of lastEventAt because event timing can be spoofed
@@index([tenancyId, refreshTokenId, updatedAt])
@@index([tenancyId, sequenceId], name: "SessionReplay_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "SessionReplay_shouldUpdateSequenceId_idx")
}
model SessionReplayChunk {

View File

@ -24,6 +24,8 @@ export async function runClickhouseMigrations() {
await client.exec({ query: TEAM_MEMBERS_VIEW_SQL });
await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL });
await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL });
await client.exec({ query: SESSION_REPLAYS_TABLE_BASE_SQL });
await client.exec({ query: SESSION_REPLAYS_VIEW_SQL });
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
@ -39,6 +41,7 @@ export async function runClickhouseMigrations() {
"GRANT SELECT ON default.teams TO limited_user;",
"GRANT SELECT ON default.team_members TO limited_user;",
"GRANT SELECT ON default.email_outboxes TO limited_user;",
"GRANT SELECT ON default.session_replays TO limited_user;",
];
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
@ -58,6 +61,9 @@ export async function runClickhouseMigrations() {
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS session_replays_project_isolation ON default.session_replays FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
for (const query of queries) {
await client.exec({ query });
}
@ -411,6 +417,36 @@ FINAL
WHERE sync_is_deleted = 0;
`;
const SESSION_REPLAYS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.session_replays (
project_id String,
branch_id String,
id UUID,
user_id UUID,
refresh_token_id String,
started_at DateTime64(3, 'UTC'),
last_event_at DateTime64(3, 'UTC'),
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(started_at)
ORDER BY (project_id, branch_id, id);
`;
const SESSION_REPLAYS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.session_replays
SQL SECURITY DEFINER
AS
SELECT project_id, branch_id, id, user_id, refresh_token_id,
started_at, last_event_at, created_at
FROM analytics_internal.session_replays
FINAL
WHERE sync_is_deleted = 0;
`;
const EXTERNAL_ANALYTICS_DB_SQL = `
CREATE DATABASE IF NOT EXISTS analytics_internal;
`;

View File

@ -194,6 +194,34 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
didUpdate = true;
}
const sessionReplayTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "id"
FROM "SessionReplay"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "SessionReplay" sr
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE sr."tenancyId" = r."tenancyId"
AND sr."id" = r."id"
RETURNING sr."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.session-replay-tenants", sessionReplayTenants.length);
if (sessionReplayTenants.length > 0) {
await enqueueExternalDbSyncBatch(sessionReplayTenants.map(t => t.tenancyId));
didUpdate = true;
}
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
@ -223,7 +251,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (didUpdate) {
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`);
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, DR=${deletedRowTenants.length}`);
}
return didUpdate;

View File

@ -120,10 +120,12 @@ export const POST = createSmartRouteHandler({
refreshTokenId,
startedAt: new Date(firstMs),
lastEventAt: new Date(newLastEventAtMs),
shouldUpdateSequenceId: true,
},
update: {
startedAt: new Date(newStartedAtMs),
lastEventAt: new Date(newLastEventAtMs),
shouldUpdateSequenceId: true,
},
});

View File

@ -569,6 +569,9 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boo
is_paused: 'boolean',
sync_is_deleted: 'boolean',
},
session_replays: {
sync_is_deleted: 'boolean',
},
};
async function pushRowsToClickhouse(

View File

@ -3,7 +3,8 @@ import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { afterAll, beforeAll, describe, expect } from 'vitest';
import { test } from '../../../../helpers';
import { withPortPrefix } from '../../../../helpers/ports';
import { backendContext, InternalApiKey, Project, User, niceBackendFetch } from '../../../backend-helpers';
import { Auth, backendContext, InternalApiKey, Project, User, niceBackendFetch } from '../../../backend-helpers';
import { randomUUID } from 'node:crypto';
import {
TEST_TIMEOUT,
TestDbManager,
@ -16,6 +17,7 @@ import {
waitForSyncedDeletion,
waitForSyncedEmailOutbox,
waitForSyncedEmailOutboxByStatus,
waitForSyncedSessionReplay,
waitForSyncedTeam,
waitForSyncedTeamDeletion,
waitForSyncedTeamMember,
@ -1061,6 +1063,129 @@ describe.sequential('External DB Sync - Basic Tests', () => {
expect(row.send_retries).toBe(0);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project with analytics, signs in a user, uploads a session replay batch,
* and verifies the session replay row is synced to ClickHouse.
*/
test('SessionReplay sync (ClickHouse)', async ({ expect }) => {
await Project.createAndSwitch({
config: {
magic_link_enabled: true,
},
});
await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } });
await Auth.Otp.signIn();
const now = Date.now();
const browserSessionId = randomUUID();
const batchId = randomUUID();
const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", {
method: "POST",
accessType: "client",
body: {
browser_session_id: browserSessionId,
session_replay_segment_id: randomUUID(),
batch_id: batchId,
started_at_ms: now,
sent_at_ms: now + 500,
events: [
{ timestamp: now + 100, type: 2 },
{ timestamp: now + 200, type: 3 },
],
},
});
expect(uploadRes.status).toBe(200);
expect(uploadRes.body.deduped).toBe(false);
await InternalApiKey.createAndSetProjectKeys();
// Poll ClickHouse until the session_replays row appears
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT id, user_id, refresh_token_id, started_at, last_event_at FROM session_replays LIMIT 10",
});
expect(response.status).toBe(200);
if (response.body.result.length >= 1) {
break;
}
await wait(intervalMs);
}
expect(response!.body.result.length).toBeGreaterThanOrEqual(1);
const row = response!.body.result[0];
expect(row.user_id).toBeDefined();
expect(row.refresh_token_id).toBeDefined();
expect(row.started_at).toBeDefined();
expect(row.last_event_at).toBeDefined();
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project with an external Postgres DB, signs in a user,
* uploads a session replay batch, and verifies the row is synced to external Postgres.
*/
test('SessionReplay sync (Postgres)', async () => {
const dbName = 'session_replay_pg_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
display_name: 'Session Replay Sync Test',
config: {
magic_link_enabled: true,
},
});
await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } });
await Auth.Otp.signIn();
const now = Date.now();
const browserSessionId = randomUUID();
const batchId = randomUUID();
const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", {
method: "POST",
accessType: "client",
body: {
browser_session_id: browserSessionId,
session_replay_segment_id: randomUUID(),
batch_id: batchId,
started_at_ms: now,
sent_at_ms: now + 500,
events: [
{ timestamp: now + 100, type: 2 },
{ timestamp: now + 200, type: 3 },
],
},
});
expect(uploadRes.status).toBe(200);
const replayId = uploadRes.body.session_replay_id;
const client = dbManager.getClient(dbName);
// Wait for the session replay row to appear in external DB
await waitForSyncedSessionReplay(client, replayId);
// Verify the synced row has expected columns
const res = await client.query(`SELECT * FROM "session_replays" WHERE "id" = $1`, [replayId]);
expect(res.rows.length).toBe(1);
const row = res.rows[0];
expect(row.user_id).toBeDefined();
expect(row.refresh_token_id).toBeDefined();
expect(row.started_at).toBeDefined();
expect(row.last_event_at).toBeDefined();
}, TEST_TIMEOUT);
/**
* What it does:
* - Reads the external DB sync fusebox settings.

View File

@ -327,6 +327,18 @@ export async function waitForSyncedEmailOutbox(client: Client, emailId: string,
);
}
export async function waitForSyncedSessionReplay(client: Client, replayId: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "session_replays" WHERE "id" = $1`,
[replayId],
{
shouldExist: true,
description: `session replay "${replayId}" to appear in external DB`,
},
);
}
export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) {
await waitForExternalDbRow(
client,

View File

@ -1120,4 +1120,140 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
`.trim(),
},
},
"session_replays": {
sourceTables: { "SessionReplay": "SessionReplay" },
targetTable: "session_replays",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "session_replays" (
"id" uuid PRIMARY KEY NOT NULL,
"user_id" uuid NOT NULL,
"refresh_token_id" text NOT NULL,
"started_at" timestamp without time zone NOT NULL,
"last_event_at" timestamp without time zone NOT NULL,
"created_at" timestamp without time zone NOT NULL
);
REVOKE ALL ON "session_replays" FROM PUBLIC;
GRANT SELECT ON "session_replays" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.session_replays (
project_id String,
branch_id String,
id UUID,
user_id UUID,
refresh_token_id String,
started_at DateTime64(3, 'UTC'),
last_event_at DateTime64(3, 'UTC'),
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(started_at)
ORDER BY (project_id, branch_id, id);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"SessionReplay"."id" AS "id",
"SessionReplay"."projectUserId" AS "user_id",
"SessionReplay"."refreshTokenId" AS "refresh_token_id",
"SessionReplay"."startedAt" AS "started_at",
"SessionReplay"."lastEventAt" AS "last_event_at",
"SessionReplay"."createdAt" AS "created_at",
"SessionReplay"."sequenceId" AS "sync_sequence_id",
"SessionReplay"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "SessionReplay"
JOIN "Tenancy" ON "Tenancy"."id" = "SessionReplay"."tenancyId"
WHERE "SessionReplay"."tenancyId" = $1::uuid
AND "SessionReplay"."sequenceId" IS NOT NULL
AND "SessionReplay"."sequenceId" > $2::bigint
ORDER BY "SessionReplay"."sequenceId" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT
"SessionReplay"."id" AS "id",
"SessionReplay"."projectUserId" AS "user_id",
"SessionReplay"."refreshTokenId" AS "refresh_token_id",
"SessionReplay"."startedAt" AS "started_at",
"SessionReplay"."lastEventAt" AS "last_event_at",
"SessionReplay"."createdAt" AS "created_at",
"SessionReplay"."sequenceId" AS "sequence_id",
"SessionReplay"."tenancyId",
false AS "is_deleted"
FROM "SessionReplay"
WHERE "SessionReplay"."tenancyId" = $1::uuid
AND "SessionReplay"."sequenceId" IS NOT NULL
AND "SessionReplay"."sequenceId" > $2::bigint
ORDER BY "SessionReplay"."sequenceId" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "id",
$2::uuid AS "user_id",
$3::text AS "refresh_token_id",
$4::timestamp without time zone AS "started_at",
$5::timestamp without time zone AS "last_event_at",
$6::timestamp without time zone AS "created_at",
$7::bigint AS "sequence_id",
$8::boolean AS "is_deleted",
$9::text AS "mapping_name"
),
deleted AS (
DELETE FROM "session_replays" sr
USING params p
WHERE p."is_deleted" = true AND sr."id" = p."id"
RETURNING 1
),
upserted AS (
INSERT INTO "session_replays" (
"id",
"user_id",
"refresh_token_id",
"started_at",
"last_event_at",
"created_at"
)
SELECT
p."id",
p."user_id",
p."refresh_token_id",
p."started_at",
p."last_event_at",
p."created_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("id") DO UPDATE SET
"user_id" = EXCLUDED."user_id",
"refresh_token_id" = EXCLUDED."refresh_token_id",
"started_at" = EXCLUDED."started_at",
"last_event_at" = EXCLUDED."last_event_at",
"created_at" = EXCLUDED."created_at"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
} as const;