diff --git a/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql index 358cf0514..fc5562113 100644 --- a/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql +++ b/apps/backend/prisma/migrations/20260316000000_add_team_team_member_sequence_columns/migration.sql @@ -2,24 +2,6 @@ ALTER TABLE "Team" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; --- CreateIndex -CREATE UNIQUE INDEX "Team_sequenceId_key" ON "Team"("sequenceId"); - --- CreateIndex -CREATE INDEX "Team_tenancyId_sequenceId_idx" ON "Team"("tenancyId", "sequenceId"); - --- CreateIndex -CREATE INDEX "Team_shouldUpdateSequenceId_idx" ON "Team"("shouldUpdateSequenceId", "tenancyId"); - -- AlterTable ALTER TABLE "TeamMember" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "TeamMember_sequenceId_key" ON "TeamMember"("sequenceId"); - --- CreateIndex -CREATE INDEX "TeamMember_tenancyId_sequenceId_idx" ON "TeamMember"("tenancyId", "sequenceId"); - --- CreateIndex -CREATE INDEX "TeamMember_shouldUpdateSequenceId_idx" ON "TeamMember"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql index 74b5681c0..c296a6658 100644 --- a/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql +++ b/apps/backend/prisma/migrations/20260316000001_add_email_outbox_sequence_columns/migration.sql @@ -1,12 +1,3 @@ -- AlterTable ALTER TABLE "EmailOutbox" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "EmailOutbox_sequenceId_key" ON "EmailOutbox"("sequenceId"); - --- CreateIndex -CREATE INDEX "EmailOutbox_tenancyId_sequenceId_idx" ON "EmailOutbox"("tenancyId", "sequenceId"); - --- CreateIndex -CREATE INDEX "EmailOutbox_shouldUpdateSequenceId_idx" ON "EmailOutbox"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql index 88e0751fd..50a85170e 100644 --- a/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql +++ b/apps/backend/prisma/migrations/20260316000002_add_session_replay_sequence_columns/migration.sql @@ -1,12 +1,3 @@ -- AlterTable ALTER TABLE "SessionReplay" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "SessionReplay_sequenceId_key" ON "SessionReplay"("sequenceId"); - --- CreateIndex -CREATE INDEX "SessionReplay_tenancyId_sequenceId_idx" ON "SessionReplay"("tenancyId", "sequenceId"); - --- CreateIndex -CREATE INDEX "SessionReplay_shouldUpdateSequenceId_idx" ON "SessionReplay"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/migrations/20260317000000_add_team_permission_invitation_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260317000000_add_team_permission_invitation_sequence_columns/migration.sql index b13d10e6c..d3158401a 100644 --- a/apps/backend/prisma/migrations/20260317000000_add_team_permission_invitation_sequence_columns/migration.sql +++ b/apps/backend/prisma/migrations/20260317000000_add_team_permission_invitation_sequence_columns/migration.sql @@ -2,21 +2,6 @@ ALTER TABLE "TeamMemberDirectPermission" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; --- CreateIndex -CREATE UNIQUE INDEX "TeamMemberDirectPermission_sequenceId_key" ON "TeamMemberDirectPermission"("sequenceId"); - --- CreateIndex -CREATE INDEX "TeamMemberDirectPermission_shouldUpdateSequenceId_idx" ON "TeamMemberDirectPermission"("shouldUpdateSequenceId", "tenancyId"); - --- CreateIndex -CREATE INDEX "TeamMemberDirectPermission_tenancyId_sequenceId_idx" ON "TeamMemberDirectPermission"("tenancyId", "sequenceId"); - -- AlterTable ALTER TABLE "VerificationCode" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "VerificationCode_sequenceId_key" ON "VerificationCode"("sequenceId"); - --- CreateIndex -CREATE INDEX "VerificationCode_shouldUpdateSequenceId_type_idx" ON "VerificationCode"("shouldUpdateSequenceId", "type"); diff --git a/apps/backend/prisma/migrations/20260317000001_add_project_permission_notification_preference_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260317000001_add_project_permission_notification_preference_sequence_columns/migration.sql index 5c40d540c..d39d03dd2 100644 --- a/apps/backend/prisma/migrations/20260317000001_add_project_permission_notification_preference_sequence_columns/migration.sql +++ b/apps/backend/prisma/migrations/20260317000001_add_project_permission_notification_preference_sequence_columns/migration.sql @@ -2,24 +2,6 @@ ALTER TABLE "ProjectUserDirectPermission" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; --- CreateIndex -CREATE UNIQUE INDEX "ProjectUserDirectPermission_sequenceId_key" ON "ProjectUserDirectPermission"("sequenceId"); - --- CreateIndex -CREATE INDEX "ProjectUserDirectPermission_shouldUpdateSequenceId_idx" ON "ProjectUserDirectPermission"("shouldUpdateSequenceId", "tenancyId"); - --- CreateIndex -CREATE INDEX "ProjectUserDirectPermission_tenancyId_sequenceId_idx" ON "ProjectUserDirectPermission"("tenancyId", "sequenceId"); - -- AlterTable ALTER TABLE "UserNotificationPreference" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "UserNotificationPreference_sequenceId_key" ON "UserNotificationPreference"("sequenceId"); - --- CreateIndex -CREATE INDEX "UserNotificationPreference_shouldUpdateSequenceId_idx" ON "UserNotificationPreference"("shouldUpdateSequenceId", "tenancyId"); - --- CreateIndex -CREATE INDEX "UserNotificationPreference_tenancyId_sequenceId_idx" ON "UserNotificationPreference"("tenancyId", "sequenceId"); diff --git a/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql b/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql index 003b0df2f..61906b1ba 100644 --- a/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql +++ b/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql @@ -2,24 +2,6 @@ ALTER TABLE "ProjectUserRefreshToken" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; --- CreateIndex -CREATE UNIQUE INDEX "ProjectUserRefreshToken_sequenceId_key" ON "ProjectUserRefreshToken"("sequenceId"); - --- CreateIndex -CREATE INDEX "ProjectUserRefreshToken_shouldUpdateSequenceId_idx" ON "ProjectUserRefreshToken"("shouldUpdateSequenceId", "tenancyId"); - --- CreateIndex -CREATE INDEX "ProjectUserRefreshToken_tenancyId_sequenceId_idx" ON "ProjectUserRefreshToken"("tenancyId", "sequenceId"); - -- AlterTable ALTER TABLE "ProjectUserOAuthAccount" ADD COLUMN "sequenceId" BIGINT, ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; - --- CreateIndex -CREATE UNIQUE INDEX "ProjectUserOAuthAccount_sequenceId_key" ON "ProjectUserOAuthAccount"("sequenceId"); - --- CreateIndex -CREATE INDEX "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx" ON "ProjectUserOAuthAccount"("shouldUpdateSequenceId", "tenancyId"); - --- CreateIndex -CREATE INDEX "ProjectUserOAuthAccount_tenancyId_sequenceId_idx" ON "ProjectUserOAuthAccount"("tenancyId", "sequenceId"); diff --git a/apps/backend/prisma/migrations/20260318000001_add_sequence_indexes_concurrently/migration.sql b/apps/backend/prisma/migrations/20260318000001_add_sequence_indexes_concurrently/migration.sql new file mode 100644 index 000000000..c96f0f564 --- /dev/null +++ b/apps/backend/prisma/migrations/20260318000001_add_sequence_indexes_concurrently/migration.sql @@ -0,0 +1,154 @@ +-- Team indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "Team_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."Team"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Team_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."Team"("tenancyId", "sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Team_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."Team"("shouldUpdateSequenceId", "tenancyId"); + +-- TeamMember indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "TeamMember_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."TeamMember"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TeamMember_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."TeamMember"("tenancyId", "sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TeamMember_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."TeamMember"("shouldUpdateSequenceId", "tenancyId"); + +-- EmailOutbox indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("tenancyId", "sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("shouldUpdateSequenceId", "tenancyId"); + +-- SessionReplay indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("tenancyId", "sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("shouldUpdateSequenceId", "tenancyId"); + +-- TeamMemberDirectPermission indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "TeamMemberDirectPermission_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."TeamMemberDirectPermission"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TeamMemberDirectPermission_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."TeamMemberDirectPermission"("shouldUpdateSequenceId", "tenancyId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TeamMemberDirectPermission_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."TeamMemberDirectPermission"("tenancyId", "sequenceId"); + +-- VerificationCode indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "VerificationCode_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."VerificationCode"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "VerificationCode_shouldUpdateSequenceId_type_idx" ON /* SCHEMA_NAME_SENTINEL */."VerificationCode"("shouldUpdateSequenceId", "type"); + +-- ProjectUserDirectPermission indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserDirectPermission_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserDirectPermission"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserDirectPermission_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserDirectPermission"("shouldUpdateSequenceId", "tenancyId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserDirectPermission_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserDirectPermission"("tenancyId", "sequenceId"); + +-- UserNotificationPreference indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "UserNotificationPreference_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."UserNotificationPreference"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "UserNotificationPreference_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."UserNotificationPreference"("shouldUpdateSequenceId", "tenancyId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "UserNotificationPreference_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."UserNotificationPreference"("tenancyId", "sequenceId"); + +-- ProjectUserRefreshToken indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserRefreshToken_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserRefreshToken"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserRefreshToken_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserRefreshToken"("shouldUpdateSequenceId", "tenancyId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserRefreshToken_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserRefreshToken"("tenancyId", "sequenceId"); + +-- ProjectUserOAuthAccount indexes +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserOAuthAccount_sequenceId_key" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserOAuthAccount"("sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserOAuthAccount"("shouldUpdateSequenceId", "tenancyId"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ProjectUserOAuthAccount_tenancyId_sequenceId_idx" ON /* SCHEMA_NAME_SENTINEL */."ProjectUserOAuthAccount"("tenancyId", "sequenceId"); diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index e16bd86d6..8db2a4077 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -2,108 +2,86 @@ import { getClickhouseAdminClient } from "@/lib/clickhouse"; import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; export async function runClickhouseMigrations() { + const start = performance.now(); console.log("[Clickhouse] Running Clickhouse migrations..."); const client = getClickhouseAdminClient(); const clickhouseExternalPassword = getEnvVariable("STACK_CLICKHOUSE_EXTERNAL_PASSWORD"); - await client.exec({ - query: "CREATE USER IF NOT EXISTS limited_user IDENTIFIED WITH sha256_password BY {clickhouseExternalPassword:String}", - query_params: { clickhouseExternalPassword }, - }); - // todo: create migration files - await client.exec({ query: EXTERNAL_ANALYTICS_DB_SQL }); - await client.exec({ query: SYNC_METADATA_TABLE_SQL }); - await client.exec({ query: EVENTS_TABLE_BASE_SQL }); - await client.exec({ query: EVENTS_VIEW_SQL }); - await client.exec({ query: USERS_TABLE_BASE_SQL }); - await client.exec({ query: USERS_VIEW_SQL }); - await client.exec({ query: CONTACT_CHANNELS_TABLE_BASE_SQL }); - await client.exec({ query: CONTACT_CHANNELS_VIEW_SQL }); - await client.exec({ query: TEAMS_TABLE_BASE_SQL }); - await client.exec({ query: TEAMS_VIEW_SQL }); - await client.exec({ query: TEAM_MEMBER_PROFILES_TABLE_BASE_SQL }); - await client.exec({ query: TEAM_MEMBER_PROFILES_VIEW_SQL }); - await client.exec({ query: TEAM_PERMISSIONS_TABLE_BASE_SQL }); - await client.exec({ query: TEAM_PERMISSIONS_VIEW_SQL }); - await client.exec({ query: TEAM_INVITATIONS_TABLE_BASE_SQL }); - await client.exec({ query: TEAM_INVITATIONS_VIEW_SQL }); - await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL }); - await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL }); - await client.exec({ query: SESSION_REPLAYS_TABLE_BASE_SQL }); - await client.exec({ query: SESSION_REPLAYS_VIEW_SQL }); - await client.exec({ query: PROJECT_PERMISSIONS_TABLE_BASE_SQL }); - await client.exec({ query: PROJECT_PERMISSIONS_VIEW_SQL }); - await client.exec({ query: NOTIFICATION_PREFERENCES_TABLE_BASE_SQL }); - await client.exec({ query: NOTIFICATION_PREFERENCES_VIEW_SQL }); - await client.exec({ query: REFRESH_TOKENS_TABLE_BASE_SQL }); - await client.exec({ query: REFRESH_TOKENS_VIEW_SQL }); - await client.exec({ query: CONNECTED_ACCOUNTS_TABLE_BASE_SQL }); - await client.exec({ query: CONNECTED_ACCOUNTS_VIEW_SQL }); - await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); - await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); - await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); - await client.exec({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL }); - // Recreate the events view so SELECT * picks up columns added by EVENTS_ADD_REPLAY_COLUMNS_SQL - await client.exec({ query: EVENTS_VIEW_SQL }); - const queries = [ - "REVOKE ALL PRIVILEGES ON *.* FROM limited_user;", - "REVOKE ALL FROM limited_user;", - "GRANT SELECT ON default.events TO limited_user;", - "GRANT SELECT ON default.users TO limited_user;", - "GRANT SELECT ON default.contact_channels TO limited_user;", - "GRANT SELECT ON default.teams TO limited_user;", - "GRANT SELECT ON default.team_member_profiles TO limited_user;", - "GRANT SELECT ON default.team_permissions TO limited_user;", - "GRANT SELECT ON default.team_invitations TO limited_user;", - "GRANT SELECT ON default.email_outboxes TO limited_user;", - "GRANT SELECT ON default.session_replays TO limited_user;", - "GRANT SELECT ON default.project_permissions TO limited_user;", - "GRANT SELECT ON default.notification_preferences TO limited_user;", - "GRANT SELECT ON default.refresh_tokens TO limited_user;", - "GRANT SELECT ON default.connected_accounts TO limited_user;", + + // Setup — database, user, sync metadata + await client.command({ query: EXTERNAL_ANALYTICS_DB_SQL }); + await Promise.all([ + client.command({ + query: "CREATE USER IF NOT EXISTS limited_user IDENTIFIED WITH sha256_password BY {clickhouseExternalPassword:String}", + query_params: { clickhouseExternalPassword }, + }), + client.command({ query: SYNC_METADATA_TABLE_SQL }), + ]); + + // Create all tables in parallel + await Promise.all([ + client.command({ query: EVENTS_TABLE_BASE_SQL }), + client.command({ query: USERS_TABLE_BASE_SQL }), + client.command({ query: CONTACT_CHANNELS_TABLE_BASE_SQL }), + client.command({ query: TEAMS_TABLE_BASE_SQL }), + client.command({ query: TEAM_MEMBER_PROFILES_TABLE_BASE_SQL }), + client.command({ query: TEAM_PERMISSIONS_TABLE_BASE_SQL }), + client.command({ query: TEAM_INVITATIONS_TABLE_BASE_SQL }), + client.command({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL }), + + client.command({ query: PROJECT_PERMISSIONS_TABLE_BASE_SQL }), + client.command({ query: NOTIFICATION_PREFERENCES_TABLE_BASE_SQL }), + client.command({ query: REFRESH_TOKENS_TABLE_BASE_SQL }), + client.command({ query: CONNECTED_ACCOUNTS_TABLE_BASE_SQL }), + ]); + + // Alter events table (must come before views that reference new columns) + await client.command({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); + + // Create all views in parallel + await Promise.all([ + client.command({ query: EVENTS_VIEW_SQL }), + client.command({ query: USERS_VIEW_SQL }), + client.command({ query: CONTACT_CHANNELS_VIEW_SQL }), + client.command({ query: TEAMS_VIEW_SQL }), + client.command({ query: TEAM_MEMBER_PROFILES_VIEW_SQL }), + client.command({ query: TEAM_PERMISSIONS_VIEW_SQL }), + client.command({ query: TEAM_INVITATIONS_VIEW_SQL }), + client.command({ query: EMAIL_OUTBOXES_VIEW_SQL }), + + client.command({ query: PROJECT_PERMISSIONS_VIEW_SQL }), + client.command({ query: NOTIFICATION_PREFERENCES_VIEW_SQL }), + client.command({ query: REFRESH_TOKENS_VIEW_SQL }), + client.command({ query: CONNECTED_ACCOUNTS_VIEW_SQL }), + ]); + + // Data migrations (mutations) + await Promise.all([ + client.command({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }), + client.command({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }), + client.command({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL }), + ]); + + // Row policies in parallel + const tables = [ + "events", "users", "contact_channels", "teams", "team_member_profiles", + "team_permissions", "team_invitations", "email_outboxes", + "project_permissions", "notification_preferences", "refresh_tokens", "connected_accounts", ]; - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS users_project_isolation ON default.users FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS contact_channels_project_isolation ON default.contact_channels FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS teams_project_isolation ON default.teams FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS team_member_profiles_project_isolation ON default.team_member_profiles FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS team_permissions_project_isolation ON default.team_permissions FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS team_invitations_project_isolation ON default.team_invitations FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS session_replays_project_isolation ON default.session_replays FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS project_permissions_project_isolation ON default.project_permissions FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS notification_preferences_project_isolation ON default.notification_preferences FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS refresh_tokens_project_isolation ON default.refresh_tokens FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - await client.exec({ - query: "CREATE ROW POLICY IF NOT EXISTS connected_accounts_project_isolation ON default.connected_accounts FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", - }); - for (const query of queries) { - await client.exec({ query }); - } - console.log("[Clickhouse] Clickhouse migrations complete"); + await Promise.all(tables.map(table => + client.command({ + query: `CREATE ROW POLICY IF NOT EXISTS ${table}_project_isolation ON default.${table} FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user`, + }) + )); + + // Grants + await client.command({ query: "REVOKE ALL PRIVILEGES ON *.* FROM limited_user;" }); + await client.command({ query: "REVOKE ALL FROM limited_user;" }); + await Promise.all(tables.map(table => + client.command({ query: `GRANT SELECT ON default.${table} TO limited_user;` }) + )); + + const elapsed = ((performance.now() - start) / 1000).toFixed(1); + console.log(`[Clickhouse] Clickhouse migrations complete (${elapsed}s)`); await client.close(); } @@ -351,7 +329,6 @@ CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles ( user_id UUID, display_name Nullable(String), profile_image_url Nullable(String), - user JSON, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -373,7 +350,6 @@ SELECT user_id, display_name, profile_image_url, - user, created_at FROM analytics_internal.team_member_profiles FINAL @@ -386,7 +362,7 @@ CREATE TABLE IF NOT EXISTS analytics_internal.team_permissions ( branch_id String, team_id UUID, user_id UUID, - permission_id String, + id String, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -394,7 +370,7 @@ CREATE TABLE IF NOT EXISTS analytics_internal.team_permissions ( ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) -ORDER BY (project_id, branch_id, team_id, user_id, permission_id); +ORDER BY (project_id, branch_id, team_id, user_id, id); `; const TEAM_PERMISSIONS_VIEW_SQL = ` @@ -406,7 +382,7 @@ SELECT branch_id, team_id, user_id, - permission_id, + id, created_at FROM analytics_internal.team_permissions FINAL @@ -462,18 +438,17 @@ CREATE TABLE IF NOT EXISTS analytics_internal.email_outboxes ( email_programmatic_call_template_id Nullable(String), theme_id Nullable(String), is_high_priority UInt8, - rendered_is_transactional Nullable(UInt8), - rendered_subject Nullable(String), - rendered_notification_category_id Nullable(String), + is_transactional Nullable(UInt8), + subject Nullable(String), + notification_category_id Nullable(String), started_rendering_at Nullable(DateTime64(3, 'UTC')), - finished_rendering_at Nullable(DateTime64(3, 'UTC')), + rendered_at Nullable(DateTime64(3, 'UTC')), render_error Nullable(String), scheduled_at DateTime64(3, 'UTC'), created_at DateTime64(3, 'UTC'), + updated_at DateTime64(3, 'UTC'), started_sending_at Nullable(DateTime64(3, 'UTC')), - finished_sending_at Nullable(DateTime64(3, 'UTC')), server_error Nullable(String), - sent_at Nullable(DateTime64(3, 'UTC')), delivered_at Nullable(DateTime64(3, 'UTC')), opened_at Nullable(DateTime64(3, 'UTC')), clicked_at Nullable(DateTime64(3, 'UTC')), @@ -510,18 +485,17 @@ SELECT email_programmatic_call_template_id, theme_id, is_high_priority, - rendered_is_transactional, - rendered_subject, - rendered_notification_category_id, + is_transactional, + subject, + notification_category_id, started_rendering_at, - finished_rendering_at, + rendered_at, render_error, scheduled_at, created_at, + updated_at, started_sending_at, - finished_sending_at, server_error, - sent_at, delivered_at, opened_at, clicked_at, @@ -539,43 +513,13 @@ FINAL WHERE sync_is_deleted = 0; `; -const SESSION_REPLAYS_TABLE_BASE_SQL = ` -CREATE TABLE IF NOT EXISTS analytics_internal.session_replays ( - project_id String, - branch_id String, - id UUID, - user_id UUID, - refresh_token_id String, - started_at DateTime64(3, 'UTC'), - last_event_at DateTime64(3, 'UTC'), - created_at DateTime64(3, 'UTC'), - chunk_count UInt64, - sync_sequence_id Int64, - sync_is_deleted UInt8, - sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) -) -ENGINE ReplacingMergeTree(sync_sequence_id) -PARTITION BY toYYYYMM(started_at) -ORDER BY (project_id, branch_id, id); -`; - -const SESSION_REPLAYS_VIEW_SQL = ` -CREATE OR REPLACE VIEW default.session_replays -SQL SECURITY DEFINER -AS -SELECT project_id, branch_id, id, user_id, refresh_token_id, - started_at, last_event_at, created_at, chunk_count -FROM analytics_internal.session_replays -FINAL -WHERE sync_is_deleted = 0; -`; const PROJECT_PERMISSIONS_TABLE_BASE_SQL = ` CREATE TABLE IF NOT EXISTS analytics_internal.project_permissions ( project_id String, branch_id String, user_id UUID, - permission_id String, + id String, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -583,7 +527,7 @@ CREATE TABLE IF NOT EXISTS analytics_internal.project_permissions ( ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) -ORDER BY (project_id, branch_id, user_id, permission_id); +ORDER BY (project_id, branch_id, user_id, id); `; const PROJECT_PERMISSIONS_VIEW_SQL = ` @@ -594,7 +538,7 @@ SELECT project_id, branch_id, user_id, - permission_id, + id, created_at FROM analytics_internal.project_permissions FINAL @@ -605,7 +549,6 @@ const NOTIFICATION_PREFERENCES_TABLE_BASE_SQL = ` CREATE TABLE IF NOT EXISTS analytics_internal.notification_preferences ( project_id String, branch_id String, - id UUID, user_id UUID, notification_category_id String, enabled UInt8, @@ -614,7 +557,7 @@ CREATE TABLE IF NOT EXISTS analytics_internal.notification_preferences ( sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) ) ENGINE ReplacingMergeTree(sync_sequence_id) -ORDER BY (project_id, branch_id, id); +ORDER BY (project_id, branch_id, user_id, notification_category_id); `; const NOTIFICATION_PREFERENCES_VIEW_SQL = ` @@ -624,7 +567,6 @@ AS SELECT project_id, branch_id, - id, user_id, notification_category_id, enabled @@ -674,11 +616,9 @@ const CONNECTED_ACCOUNTS_TABLE_BASE_SQL = ` CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts ( project_id String, branch_id String, - id UUID, user_id UUID, provider String, provider_account_id String, - email Nullable(String), created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -686,7 +626,7 @@ CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts ( ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) -ORDER BY (project_id, branch_id, id); +ORDER BY (project_id, branch_id, user_id, provider, provider_account_id); `; const CONNECTED_ACCOUNTS_VIEW_SQL = ` @@ -696,11 +636,9 @@ AS SELECT project_id, branch_id, - id, user_id, provider, provider_account_id, - email, created_at FROM analytics_internal.connected_accounts FINAL diff --git a/apps/backend/src/app/api/latest/emails/unsubscribe-link/route.tsx b/apps/backend/src/app/api/latest/emails/unsubscribe-link/route.tsx index 9e0438477..19e5696a1 100644 --- a/apps/backend/src/app/api/latest/emails/unsubscribe-link/route.tsx +++ b/apps/backend/src/app/api/latest/emails/unsubscribe-link/route.tsx @@ -1,3 +1,4 @@ +import { withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { getSoleTenancyFromProjectBranch } from "@/lib/tenancies"; import { getPrismaClientForTenancy, globalPrismaClient } from "@/prisma-client"; import { VerificationCodeType } from "@/generated/prisma/client"; @@ -51,15 +52,15 @@ export async function GET(request: NextRequest) { notificationCategoryId: notification_category_id, }, }, - update: { + update: withExternalDbSyncUpdate({ enabled: false, - }, - create: { + }), + create: withExternalDbSyncUpdate({ tenancyId: tenancy.id, projectUserId: user_id, notificationCategoryId: notification_category_id, enabled: false, - }, + }), }); return new Response('

Successfully unsubscribed from notification group

', { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 470e89a55..bae534572 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -79,23 +79,6 @@ async function backfillSequenceIds(batchSize: number): Promise { if (projectUserTenants.length > 0) { await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId)); didUpdate = true; - - // Cascade: when a user changes, mark their TeamMember rows for re-sync - // so the embedded user JSON in team_member_profiles stays fresh - await globalPrismaClient.$executeRaw` - UPDATE "TeamMember" - SET "shouldUpdateSequenceId" = TRUE - FROM ( - SELECT DISTINCT "tenancyId", "projectUserId" - FROM "ProjectUser" - WHERE "tenancyId" IN (${Prisma.join(projectUserTenants.map(t => t.tenancyId))}) - AND "shouldUpdateSequenceId" = FALSE - AND "sequenceId" IS NOT NULL - ) AS changed_users - WHERE "TeamMember"."tenancyId" = changed_users."tenancyId" - AND "TeamMember"."projectUserId" = changed_users."projectUserId" - AND "TeamMember"."shouldUpdateSequenceId" = FALSE - `; } const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts index f8de41b24..48890173a 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts @@ -87,7 +87,15 @@ const globalSchema = yupObject({ sequencer: yupObject({ project_users: sequenceStatsSchema.defined(), contact_channels: sequenceStatsSchema.defined(), + teams: sequenceStatsSchema.defined(), + team_members: sequenceStatsSchema.defined(), + team_permissions: sequenceStatsSchema.defined(), + team_invitations: sequenceStatsSchema.defined(), email_outboxes: sequenceStatsSchema.defined(), + project_permissions: sequenceStatsSchema.defined(), + notification_preferences: sequenceStatsSchema.defined(), + refresh_tokens: sequenceStatsSchema.defined(), + connected_accounts: sequenceStatsSchema.defined(), deleted_rows: sequenceStatsSchema.shape({ by_table: yupArray(deletedRowByTableSchema).defined(), }).defined(), @@ -120,7 +128,15 @@ const responseSchema = yupObject({ sequencer: yupObject({ project_users: sequenceStatsSchema.defined(), contact_channels: sequenceStatsSchema.defined(), + teams: sequenceStatsSchema.defined(), + team_members: sequenceStatsSchema.defined(), + team_permissions: sequenceStatsSchema.defined(), + team_invitations: sequenceStatsSchema.defined(), email_outboxes: sequenceStatsSchema.defined(), + project_permissions: sequenceStatsSchema.defined(), + notification_preferences: sequenceStatsSchema.defined(), + refresh_tokens: sequenceStatsSchema.defined(), + connected_accounts: sequenceStatsSchema.defined(), deleted_rows: sequenceStatsSchema.shape({ by_table: yupArray(deletedRowByTableSchema).defined(), }).defined(), @@ -310,6 +326,52 @@ async function fetchInternalStats(tenancyId: string | null) { ${tenancyWhere} `).at(0) ?? throwErr("Contact channel stats query returned no rows."); + const teamStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "Team" + ${tenancyWhere} + `).at(0) ?? throwErr("Team stats query returned no rows."); + + const teamMemberStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "TeamMember" + ${tenancyWhere} + `).at(0) ?? throwErr("Team member stats query returned no rows."); + + const teamPermissionStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "TeamMemberDirectPermission" + ${tenancyWhere} + `).at(0) ?? throwErr("Team permission stats query returned no rows."); + + const teamInvitationStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "VerificationCode" + ${tenancyId + ? Prisma.sql`JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId" AND "Tenancy"."branchId" = "VerificationCode"."branchId" WHERE "type" = 'TEAM_INVITATION' AND "Tenancy"."id" = ${tenancyId}::uuid` + : Prisma.sql`WHERE "type" = 'TEAM_INVITATION'`} + `).at(0) ?? throwErr("Team invitation stats query returned no rows."); + const emailOutboxStatsRow = (await globalPrismaClient.$queryRaw` SELECT COUNT(*)::bigint AS "total", @@ -321,6 +383,50 @@ async function fetchInternalStats(tenancyId: string | null) { ${tenancyWhere} `).at(0) ?? throwErr("Email outbox stats query returned no rows."); + const projectPermissionStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "ProjectUserDirectPermission" + ${tenancyWhere} + `).at(0) ?? throwErr("Project permission stats query returned no rows."); + + const notificationPreferenceStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "UserNotificationPreference" + ${tenancyWhere} + `).at(0) ?? throwErr("Notification preference stats query returned no rows."); + + const refreshTokenStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "ProjectUserRefreshToken" + ${tenancyWhere} + `).at(0) ?? throwErr("Refresh token stats query returned no rows."); + + const connectedAccountStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "ProjectUserOAuthAccount" + ${tenancyWhere} + `).at(0) ?? throwErr("Connected account stats query returned no rows."); + const deletedRowStatsRow = (await globalPrismaClient.$queryRaw` SELECT COUNT(*)::bigint AS "total", @@ -367,7 +473,15 @@ async function fetchInternalStats(tenancyId: string | null) { const projectUsersStats = formatSequenceStats(projectUserStatsRow); const contactChannelStats = formatSequenceStats(contactChannelStatsRow); + const teamStats = formatSequenceStats(teamStatsRow); + const teamMemberStats = formatSequenceStats(teamMemberStatsRow); + const teamPermissionStats = formatSequenceStats(teamPermissionStatsRow); + const teamInvitationStats = formatSequenceStats(teamInvitationStatsRow); const emailOutboxStats = formatSequenceStats(emailOutboxStatsRow); + const projectPermissionStats = formatSequenceStats(projectPermissionStatsRow); + const notificationPreferenceStats = formatSequenceStats(notificationPreferenceStatsRow); + const refreshTokenStats = formatSequenceStats(refreshTokenStatsRow); + const connectedAccountStats = formatSequenceStats(connectedAccountStatsRow); const deletedRowStats = formatSequenceStats(deletedRowStatsRow); const deletedRowsByTable = deletedRowsByTableRows.map((row) => ({ @@ -380,7 +494,15 @@ async function fetchInternalStats(tenancyId: string | null) { return { projectUsersStats, contactChannelStats, + teamStats, + teamMemberStats, + teamPermissionStats, + teamInvitationStats, emailOutboxStats, + projectPermissionStats, + notificationPreferenceStats, + refreshTokenStats, + connectedAccountStats, deletedRowStats, deletedRowsByTable, outgoingStatsRow, @@ -1026,7 +1148,15 @@ export const GET = createSmartRouteHandler({ sequencer: { project_users: globalStats.projectUsersStats, contact_channels: globalStats.contactChannelStats, + teams: globalStats.teamStats, + team_members: globalStats.teamMemberStats, + team_permissions: globalStats.teamPermissionStats, + team_invitations: globalStats.teamInvitationStats, email_outboxes: globalStats.emailOutboxStats, + project_permissions: globalStats.projectPermissionStats, + notification_preferences: globalStats.notificationPreferenceStats, + refresh_tokens: globalStats.refreshTokenStats, + connected_accounts: globalStats.connectedAccountStats, deleted_rows: { ...globalStats.deletedRowStats, by_table: globalStats.deletedRowsByTable, @@ -1045,7 +1175,15 @@ export const GET = createSmartRouteHandler({ sequencer: { project_users: currentStats.projectUsersStats, contact_channels: currentStats.contactChannelStats, + teams: currentStats.teamStats, + team_members: currentStats.teamMemberStats, + team_permissions: currentStats.teamPermissionStats, + team_invitations: currentStats.teamInvitationStats, email_outboxes: currentStats.emailOutboxStats, + project_permissions: currentStats.projectPermissionStats, + notification_preferences: currentStats.notificationPreferenceStats, + refresh_tokens: currentStats.refreshTokenStats, + connected_accounts: currentStats.connectedAccountStats, deleted_rows: { ...currentStats.deletedRowStats, by_table: currentStats.deletedRowsByTable, diff --git a/apps/backend/src/app/api/latest/oauth-providers/crud.tsx b/apps/backend/src/app/api/latest/oauth-providers/crud.tsx index e636e6b2c..4a0163937 100644 --- a/apps/backend/src/app/api/latest/oauth-providers/crud.tsx +++ b/apps/backend/src/app/api/latest/oauth-providers/crud.tsx @@ -1,4 +1,4 @@ -import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync"; +import { recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { ensureUserExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client"; @@ -310,10 +310,10 @@ export const oauthProviderCrudHandlers = createLazyProxy(() => createCrudHandler id: params.provider_id, }, }, - data: { + data: withExternalDbSyncUpdate({ email: data.email, providerAccountId: data.account_id, - }, + }), }); const providerType = resolveProviderType(auth.tenancy, existingOAuthAccount.configOAuthProviderId) diff --git a/apps/backend/src/app/api/latest/team-member-profiles/crud.tsx b/apps/backend/src/app/api/latest/team-member-profiles/crud.tsx index 1e3909880..552a49484 100644 --- a/apps/backend/src/app/api/latest/team-member-profiles/crud.tsx +++ b/apps/backend/src/app/api/latest/team-member-profiles/crud.tsx @@ -1,4 +1,5 @@ import { Prisma } from "@/generated/prisma/client"; +import { withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { ensureTeamExists, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks"; import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client"; import { createCrudHandlers } from "@/route-handlers/crud-handler"; @@ -146,10 +147,10 @@ export const teamMemberProfilesCrudHandlers = createLazyProxy(() => createCrudHa teamId: params.team_id, }, }, - data: { + data: withExternalDbSyncUpdate({ displayName: data.display_name, profileImageUrl: await uploadAndGetUrl(data.profile_image_url, "team-member-profile-images") - }, + }), include: fullInclude, }); diff --git a/apps/backend/src/lib/email-queue-step.tsx b/apps/backend/src/lib/email-queue-step.tsx index 28002cb2a..453c1f0bd 100644 --- a/apps/backend/src/lib/email-queue-step.tsx +++ b/apps/backend/src/lib/email-queue-step.tsx @@ -242,7 +242,8 @@ async function claimEmailsForRendering(workerId: string): Promise UPDATE "EmailOutbox" AS e SET "renderedByWorkerId" = ${workerId}::uuid, - "startedRenderingAt" = NOW() + "startedRenderingAt" = NOW(), + "shouldUpdateSequenceId" = TRUE FROM selected WHERE e."tenancyId" = selected."tenancyId" AND e."id" = selected."id" RETURNING e.*; @@ -531,7 +532,8 @@ async function claimEmailsForSending(tx: PrismaClientTransaction, tenancyId: str FOR UPDATE SKIP LOCKED ) UPDATE "EmailOutbox" AS e - SET "startedSendingAt" = NOW() + SET "startedSendingAt" = NOW(), + "shouldUpdateSequenceId" = TRUE FROM selected WHERE e."tenancyId" = selected."tenancyId" AND e."id" = selected."id" RETURNING e.*; diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index d276e79dc..b3a3431d8 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -298,6 +298,7 @@ export async function recordExternalDbSyncDeletion( TRUE FROM "TeamMemberDirectPermission" WHERE "id" = ${target.permissionDbId}::uuid + AND "tenancyId" = ${target.tenancyId}::uuid FOR UPDATE `); @@ -335,6 +336,7 @@ export async function recordExternalDbSyncDeletion( TRUE FROM "ProjectUserDirectPermission" WHERE "id" = ${target.permissionDbId}::uuid + AND "tenancyId" = ${target.tenancyId}::uuid FOR UPDATE `); @@ -477,7 +479,8 @@ export async function recordExternalDbSyncDeletion( FROM "VerificationCode" JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId" AND "Tenancy"."branchId" = "VerificationCode"."branchId" - WHERE "VerificationCode"."projectId" = ${target.verificationCodeProjectId} + WHERE "Tenancy"."id" = ${target.tenancyId}::uuid + AND "VerificationCode"."projectId" = ${target.verificationCodeProjectId} AND "VerificationCode"."branchId" = ${target.verificationCodeBranchId} AND "VerificationCode"."id" = ${target.verificationCodeId}::uuid AND "VerificationCode"."type" = 'TEAM_INVITATION' diff --git a/apps/backend/src/lib/permissions.tsx b/apps/backend/src/lib/permissions.tsx index 2bec32937..7429ead3a 100644 --- a/apps/backend/src/lib/permissions.tsx +++ b/apps/backend/src/lib/permissions.tsx @@ -332,9 +332,9 @@ export async function updatePermissionDefinition( tenancyId: options.tenancy.id, permissionId: options.oldId, }, - data: { + data: withExternalDbSyncUpdate({ permissionId: newId, - }, + }), }); await sourceOfTruthTx.projectUserDirectPermission.updateMany({ @@ -342,9 +342,9 @@ export async function updatePermissionDefinition( tenancyId: options.tenancy.id, permissionId: options.oldId, }, - data: { + data: withExternalDbSyncUpdate({ permissionId: newId, - }, + }), }); return { diff --git a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx index c1e7d5795..02c2bd30a 100644 --- a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx +++ b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx @@ -80,6 +80,15 @@ type ExternalDbSyncStatus = { sequencer: { project_users: SequenceStats, contact_channels: SequenceStats, + teams: SequenceStats, + team_members: SequenceStats, + team_permissions: SequenceStats, + team_invitations: SequenceStats, + email_outboxes: SequenceStats, + project_permissions: SequenceStats, + notification_preferences: SequenceStats, + refresh_tokens: SequenceStats, + connected_accounts: SequenceStats, deleted_rows: DeletedRowStats, }, poller: PollerStats, @@ -95,6 +104,15 @@ type ExternalDbSyncStatus = { sequencer: { project_users: SequenceStats, contact_channels: SequenceStats, + teams: SequenceStats, + team_members: SequenceStats, + team_permissions: SequenceStats, + team_invitations: SequenceStats, + email_outboxes: SequenceStats, + project_permissions: SequenceStats, + notification_preferences: SequenceStats, + refresh_tokens: SequenceStats, + connected_accounts: SequenceStats, deleted_rows: DeletedRowStats, }, poller: PollerStats, @@ -400,6 +418,15 @@ export default function PageClient() { const sequencerPending = sumBigIntStrings([ summarySource.sequencer.project_users.pending, summarySource.sequencer.contact_channels.pending, + summarySource.sequencer.teams.pending, + summarySource.sequencer.team_members.pending, + summarySource.sequencer.team_permissions.pending, + summarySource.sequencer.team_invitations.pending, + summarySource.sequencer.email_outboxes.pending, + summarySource.sequencer.project_permissions.pending, + summarySource.sequencer.notification_preferences.pending, + summarySource.sequencer.refresh_tokens.pending, + summarySource.sequencer.connected_accounts.pending, summarySource.sequencer.deleted_rows.pending, ]); const mappingPending = sumBigIntStrings( @@ -518,7 +545,7 @@ export default function PageClient() { -
ProjectUser + ContactChannel + DeletedRow rows waiting for sequence IDs.
+
All synced table rows waiting for sequence IDs.
Throughput {loadingState ? "—" : formatThroughput(throughputStats?.sequencer ?? null)} @@ -578,30 +605,29 @@ export default function PageClient() { - - ProjectUser - - - - - - - - ContactChannel - - - - - - - - DeletedRow - - - - - - + {([ + ["ProjectUser", status?.sequencer.project_users], + ["ContactChannel", status?.sequencer.contact_channels], + ["Team", status?.sequencer.teams], + ["TeamMember", status?.sequencer.team_members], + ["TeamPermission", status?.sequencer.team_permissions], + ["TeamInvitation", status?.sequencer.team_invitations], + ["EmailOutbox", status?.sequencer.email_outboxes], + ["ProjectPermission", status?.sequencer.project_permissions], + ["NotificationPref", status?.sequencer.notification_preferences], + ["RefreshToken", status?.sequencer.refresh_tokens], + ["ConnectedAccount", status?.sequencer.connected_accounts], + ["DeletedRow", status?.sequencer.deleted_rows], + ] as const).map(([name, stats]) => ( + + {name} + + + + + + + ))} diff --git a/apps/e2e/tests/backend/endpoints/api/v1/analytics-query.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/analytics-query.test.ts index bfd0b1610..afcdc9324 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/analytics-query.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/analytics-query.test.ts @@ -529,7 +529,6 @@ it("has limited grants", async ({ expect }) => { { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.notification_preferences TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.project_permissions TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.refresh_tokens TO limited_user" }, - { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.session_replays TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_invitations TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_member_profiles TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_permissions TO limited_user" }, @@ -600,10 +599,6 @@ it("can see only some tables", async ({ expect }) => { "database": "default", "name": "refresh_tokens", }, - { - "database": "default", - "name": "session_replays", - }, { "database": "default", "name": "team_invitations", @@ -648,7 +643,6 @@ it("SHOW TABLES should have the correct tables", async ({ expect }) => { { "name": "notification_preferences" }, { "name": "project_permissions" }, { "name": "refresh_tokens" }, - { "name": "session_replays" }, { "name": "team_invitations" }, { "name": "team_member_profiles" }, { "name": "team_permissions" }, @@ -1141,7 +1135,6 @@ it("shows grants", async ({ expect }) => { { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.notification_preferences TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.project_permissions TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.refresh_tokens TO limited_user" }, - { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.session_replays TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_invitations TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_member_profiles TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_permissions TO limited_user" }, diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 752b68563..191c31068 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -21,7 +21,7 @@ import { waitForSyncedEmailOutboxByStatus, waitForSyncedRefreshToken, waitForSyncedRefreshTokenDeletion, - waitForSyncedSessionReplay, + waitForSyncedTeam, waitForSyncedTeamDeletion, waitForSyncedTeamInvitation, @@ -948,7 +948,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { let response; while (performance.now() - start < timeoutMs) { response = await runQueryForCurrentProject({ - query: "SELECT team_id, user_id, permission_id FROM team_permissions WHERE permission_id = {perm:String}", + query: "SELECT team_id, user_id, id FROM team_permissions WHERE id = {perm:String}", params: { perm: '$read_members' }, }); expect(response.status).toBe(200); @@ -959,7 +959,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { } expect(response!.body.result.length).toBe(1); - expect(response!.body.result[0].permission_id).toBe('$read_members'); + expect(response!.body.result[0].id).toBe('$read_members'); }, TEST_TIMEOUT); /** @@ -1038,7 +1038,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { let response; while (performance.now() - start < timeoutMs) { response = await runQueryForCurrentProject({ - query: "SELECT user_id, permission_id FROM project_permissions WHERE permission_id = {perm:String}", + query: "SELECT user_id, id FROM project_permissions WHERE id = {perm:String}", params: { perm: 'ch_test_perm' }, }); expect(response.status).toBe(200); @@ -1049,7 +1049,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { } expect(response!.body.result.length).toBe(1); - expect(response!.body.result[0].permission_id).toBe('ch_test_perm'); + expect(response!.body.result[0].id).toBe('ch_test_perm'); }, TEST_TIMEOUT); /** @@ -1419,7 +1419,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { expect(response!.body.result.length).toBeGreaterThanOrEqual(1); const row = response!.body.result[0]; - expect(row.created_with).toBe('PROGRAMMATIC_CALL'); + expect(row.created_with).toBe('programmatic-call'); }, TEST_TIMEOUT); /** @@ -1487,175 +1487,6 @@ describe.sequential('External DB Sync - Basic Tests', () => { expect(row.send_retries).toBe(0); }, TEST_TIMEOUT); - /** - * What it does: - * - Creates a project with analytics, signs in a user, uploads a session replay batch, - * and verifies the session replay row is synced to ClickHouse. - */ - test('SessionReplay sync (ClickHouse)', async ({ expect }) => { - await Project.createAndSwitch({ - config: { - magic_link_enabled: true, - }, - }); - await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } }); - await Auth.Otp.signIn(); - - const now = Date.now(); - const browserSessionId = randomUUID(); - const batchId = randomUUID(); - - const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { - method: "POST", - accessType: "client", - body: { - browser_session_id: browserSessionId, - session_replay_segment_id: randomUUID(), - batch_id: batchId, - started_at_ms: now, - sent_at_ms: now + 500, - events: [ - { timestamp: now + 100, type: 2 }, - { timestamp: now + 200, type: 3 }, - ], - }, - }); - expect(uploadRes.status).toBe(200); - expect(uploadRes.body.deduped).toBe(false); - const replayId = uploadRes.body.session_replay_id; - - const secondUploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { - method: "POST", - accessType: "client", - body: { - browser_session_id: browserSessionId, - session_replay_segment_id: randomUUID(), - batch_id: randomUUID(), - started_at_ms: now + 1_000, - sent_at_ms: now + 1_500, - events: [ - { timestamp: now + 1_100, type: 2 }, - ], - }, - }); - expect(secondUploadRes.status).toBe(200); - expect(secondUploadRes.body.session_replay_id).toBe(replayId); - - await InternalApiKey.createAndSetProjectKeys(); - - // Poll ClickHouse until the session_replays row appears - const timeoutMs = 180_000; - const intervalMs = 2_000; - const start = performance.now(); - - let response; - while (performance.now() - start < timeoutMs) { - response = await runQueryForCurrentProject({ - query: "SELECT id, user_id, refresh_token_id, started_at, last_event_at, chunk_count FROM session_replays LIMIT 10", - }); - expect(response.status).toBe(200); - const syncedRow = response.body.result.find((resultRow: Record) => resultRow.id === replayId); - if (syncedRow && Number(syncedRow.chunk_count) === 2) { - break; - } - await wait(intervalMs); - } - - const row = response!.body.result.find((resultRow: Record) => resultRow.id === replayId); - expect(row).toBeDefined(); - if (!row) { - throw new Error("Expected synced ClickHouse session replay row to be present."); - } - expect(row.id).toBe(replayId); - expect(row.user_id).toBeDefined(); - expect(row.refresh_token_id).toBeDefined(); - expect(row.started_at).toBeDefined(); - expect(row.last_event_at).toBeDefined(); - expect(Number(row.chunk_count)).toBe(2); - }, TEST_TIMEOUT); - - /** - * What it does: - * - Creates a project with an external Postgres DB, signs in a user, - * uploads a session replay batch, and verifies the row is synced to external Postgres. - */ - test('SessionReplay sync (Postgres)', async () => { - const dbName = 'session_replay_pg_test'; - const connectionString = await dbManager.createDatabase(dbName); - - await createProjectWithExternalDb({ - main: { - type: 'postgres', - connectionString, - } - }, { - display_name: 'Session Replay Sync Test', - config: { - magic_link_enabled: true, - }, - }); - await Project.updateConfig({ apps: { installed: { analytics: { enabled: true } } } }); - await Auth.Otp.signIn(); - - const now = Date.now(); - const browserSessionId = randomUUID(); - const batchId = randomUUID(); - - const uploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { - method: "POST", - accessType: "client", - body: { - browser_session_id: browserSessionId, - session_replay_segment_id: randomUUID(), - batch_id: batchId, - started_at_ms: now, - sent_at_ms: now + 500, - events: [ - { timestamp: now + 100, type: 2 }, - { timestamp: now + 200, type: 3 }, - ], - }, - }); - expect(uploadRes.status).toBe(200); - const replayId = uploadRes.body.session_replay_id; - - const client = dbManager.getClient(dbName); - - // Wait for the session replay row to appear in external DB - const secondUploadRes = await niceBackendFetch("/api/v1/session-replays/batch", { - method: "POST", - accessType: "client", - body: { - browser_session_id: browserSessionId, - session_replay_segment_id: randomUUID(), - batch_id: randomUUID(), - started_at_ms: now + 1_000, - sent_at_ms: now + 1_500, - events: [ - { timestamp: now + 1_100, type: 2 }, - ], - }, - }); - expect(secondUploadRes.status).toBe(200); - expect(secondUploadRes.body.session_replay_id).toBe(replayId); - - await waitForSyncedSessionReplay(client, replayId, 2); - - // Verify the synced row has expected columns - const res = await client.query(`SELECT * FROM "session_replays" WHERE "id" = $1`, [replayId]); - expect(res.rows.length).toBe(1); - const row = res.rows[0]; - expect(row.user_id).toBeDefined(); - expect(row.refresh_token_id).toBeDefined(); - expect(row.created_at).toBeInstanceOf(Date); - expect(row.started_at).toBeInstanceOf(Date); - expect(row.last_event_at).toBeInstanceOf(Date); - expect(row).toMatchObject({ - id: replayId, - chunk_count: "2", - }); - }, TEST_TIMEOUT); - /** * What it does: * - Reads the external DB sync fusebox settings. @@ -1874,17 +1705,15 @@ describe.sequential('External DB Sync - Basic Tests', () => { let response; while (performance.now() - start < timeoutMs) { response = await runQueryForCurrentProject({ - query: "SELECT id, user_id, provider, provider_account_id, email FROM connected_accounts WHERE id = {account_id:UUID}", - params: { account_id: accountId }, + query: "SELECT user_id, provider, provider_account_id FROM connected_accounts WHERE provider_account_id = {account_id:String} AND user_id = {user_id:UUID}", + params: { account_id: "ch-test-account-12345", user_id: userId }, }); expect(response.status).toBe(200); if (response.body.result.length === 1) { expect(response.body.result[0]).toMatchObject({ - id: accountId, user_id: userId, provider: "spotify", provider_account_id: "ch-test-account-12345", - email: "chuser@example.com", }); return; } diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index 88de1db9f..bbca0df6b 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -664,7 +664,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { }, }, "team_member_profiles": { - sourceTables: { "TeamMember": "TeamMember", "ProjectUser": "ProjectUser" }, + sourceTables: { "TeamMember": "TeamMember" }, targetTable: "team_member_profiles", targetTableSchemas: { postgres: ` @@ -673,7 +673,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "user_id" uuid NOT NULL, "display_name" text, "profile_image_url" text, - "user" jsonb NOT NULL DEFAULT '{}'::jsonb, "created_at" timestamp without time zone NOT NULL, PRIMARY KEY ("team_id", "user_id") ); @@ -694,7 +693,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { user_id UUID, display_name Nullable(String), profile_image_url Nullable(String), - user JSON, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -716,46 +714,12 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "TeamMember"."projectUserId" AS "user_id", "TeamMember"."displayName" AS "display_name", "TeamMember"."profileImageUrl" AS "profile_image_url", - jsonb_build_object( - 'id', "ProjectUser"."projectUserId", - 'display_name', "ProjectUser"."displayName", - 'primary_email', ( - SELECT "ContactChannel"."value" - FROM "ContactChannel" - WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" - AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" - AND "ContactChannel"."type" = 'EMAIL' - AND "ContactChannel"."isPrimary" = 'TRUE' - LIMIT 1 - ), - 'primary_email_verified', COALESCE( - ( - SELECT "ContactChannel"."isVerified" - FROM "ContactChannel" - WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" - AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" - AND "ContactChannel"."type" = 'EMAIL' - AND "ContactChannel"."isPrimary" = 'TRUE' - LIMIT 1 - ), - false - ), - 'profile_image_url', "ProjectUser"."profileImageUrl", - 'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000, - 'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb), - 'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb), - 'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb), - 'is_anonymous', "ProjectUser"."isAnonymous", - 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."lastActiveAt") * 1000 ELSE NULL END - ) AS "user", "TeamMember"."createdAt" AS "created_at", "TeamMember"."sequenceId" AS "sync_sequence_id", "TeamMember"."tenancyId" AS "tenancyId", false AS "sync_is_deleted" FROM "TeamMember" JOIN "Tenancy" ON "Tenancy"."id" = "TeamMember"."tenancyId" - JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId" - AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId" WHERE "TeamMember"."tenancyId" = $1::uuid UNION ALL @@ -767,7 +731,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", NULL::text AS "display_name", NULL::text AS "profile_image_url", - '{}'::jsonb AS "user", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sync_sequence_id", "DeletedRow"."tenancyId" AS "tenancyId", @@ -792,45 +755,11 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "TeamMember"."projectUserId" AS "user_id", "TeamMember"."displayName" AS "display_name", "TeamMember"."profileImageUrl" AS "profile_image_url", - jsonb_build_object( - 'id', "ProjectUser"."projectUserId", - 'display_name', "ProjectUser"."displayName", - 'primary_email', ( - SELECT "ContactChannel"."value" - FROM "ContactChannel" - WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" - AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" - AND "ContactChannel"."type" = 'EMAIL' - AND "ContactChannel"."isPrimary" = 'TRUE' - LIMIT 1 - ), - 'primary_email_verified', COALESCE( - ( - SELECT "ContactChannel"."isVerified" - FROM "ContactChannel" - WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId" - AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId" - AND "ContactChannel"."type" = 'EMAIL' - AND "ContactChannel"."isPrimary" = 'TRUE' - LIMIT 1 - ), - false - ), - 'profile_image_url', "ProjectUser"."profileImageUrl", - 'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000, - 'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb), - 'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb), - 'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb), - 'is_anonymous', "ProjectUser"."isAnonymous", - 'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000 ELSE NULL END - ) AS "user", "TeamMember"."createdAt" AS "created_at", "TeamMember"."sequenceId" AS "sequence_id", "TeamMember"."tenancyId", false AS "is_deleted" FROM "TeamMember" - JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId" - AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId" WHERE "TeamMember"."tenancyId" = $1::uuid UNION ALL @@ -840,7 +769,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", NULL::text AS "display_name", NULL::text AS "profile_image_url", - '{}'::jsonb AS "user", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sequence_id", "DeletedRow"."tenancyId", @@ -863,11 +791,10 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { $2::uuid AS "user_id", $3::text AS "display_name", $4::text AS "profile_image_url", - $5::jsonb AS "user", - $6::timestamp without time zone AS "created_at", - $7::bigint AS "sequence_id", - $8::boolean AS "is_deleted", - $9::text AS "mapping_name" + $5::timestamp without time zone AS "created_at", + $6::bigint AS "sequence_id", + $7::boolean AS "is_deleted", + $8::text AS "mapping_name" ), deleted AS ( DELETE FROM "team_member_profiles" tm @@ -881,7 +808,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "user_id", "display_name", "profile_image_url", - "user", "created_at" ) SELECT @@ -889,14 +815,12 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { p."user_id", p."display_name", p."profile_image_url", - p."user", p."created_at" FROM params p WHERE p."is_deleted" = false ON CONFLICT ("team_id", "user_id") DO UPDATE SET "display_name" = EXCLUDED."display_name", "profile_image_url" = EXCLUDED."profile_image_url", - "user" = EXCLUDED."user", "created_at" = EXCLUDED."created_at" RETURNING 1 ) @@ -935,7 +859,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { branch_id String, team_id UUID, user_id UUID, - permission_id String, + id String, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -943,7 +867,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) - ORDER BY (project_id, branch_id, team_id, user_id, permission_id); + ORDER BY (project_id, branch_id, team_id, user_id, id); `.trim(), }, internalDbFetchQueries: { @@ -955,7 +879,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "Tenancy"."branchId" AS "branch_id", "TeamMemberDirectPermission"."teamId" AS "team_id", "TeamMemberDirectPermission"."projectUserId" AS "user_id", - "TeamMemberDirectPermission"."permissionId" AS "permission_id", + "TeamMemberDirectPermission"."permissionId" AS "id", "TeamMemberDirectPermission"."createdAt" AS "created_at", "TeamMemberDirectPermission"."sequenceId" AS "sync_sequence_id", "TeamMemberDirectPermission"."tenancyId" AS "tenancyId", @@ -971,7 +895,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "Tenancy"."branchId" AS "branch_id", ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id", ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", - "DeletedRow"."primaryKey"->>'permissionId' AS "permission_id", + "DeletedRow"."primaryKey"->>'permissionId' AS "id", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sync_sequence_id", "DeletedRow"."tenancyId" AS "tenancyId", @@ -1007,7 +931,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT ("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id", ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", - "DeletedRow"."primaryKey"->>'permissionId' AS "permission_id", + "DeletedRow"."primaryKey"->>'permissionId' AS "id", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sequence_id", "DeletedRow"."tenancyId", @@ -1314,18 +1238,17 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { email_programmatic_call_template_id Nullable(String), theme_id Nullable(String), is_high_priority UInt8, - rendered_is_transactional Nullable(UInt8), - rendered_subject Nullable(String), - rendered_notification_category_id Nullable(String), + is_transactional Nullable(UInt8), + subject Nullable(String), + notification_category_id Nullable(String), started_rendering_at Nullable(DateTime64(3, 'UTC')), - finished_rendering_at Nullable(DateTime64(3, 'UTC')), + rendered_at Nullable(DateTime64(3, 'UTC')), render_error Nullable(String), scheduled_at DateTime64(3, 'UTC'), created_at DateTime64(3, 'UTC'), + updated_at DateTime64(3, 'UTC'), started_sending_at Nullable(DateTime64(3, 'UTC')), - finished_sending_at Nullable(DateTime64(3, 'UTC')), server_error Nullable(String), - sent_at Nullable(DateTime64(3, 'UTC')), delivered_at Nullable(DateTime64(3, 'UTC')), opened_at Nullable(DateTime64(3, 'UTC')), clicked_at Nullable(DateTime64(3, 'UTC')), @@ -1353,25 +1276,24 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", "EmailOutbox"."id" AS "id", - "EmailOutbox"."status"::text AS "status", - "EmailOutbox"."simpleStatus"::text AS "simple_status", - "EmailOutbox"."createdWith"::text AS "created_with", + LOWER(REPLACE("EmailOutbox"."status"::text, '_', '-')) AS "status", + LOWER(REPLACE("EmailOutbox"."simpleStatus"::text, '_', '-')) AS "simple_status", + CASE WHEN "EmailOutbox"."createdWith"::text = 'DRAFT' THEN 'draft' ELSE 'programmatic-call' END AS "created_with", "EmailOutbox"."emailDraftId" AS "email_draft_id", "EmailOutbox"."emailProgrammaticCallTemplateId" AS "email_programmatic_call_template_id", "EmailOutbox"."themeId" AS "theme_id", "EmailOutbox"."isHighPriority" AS "is_high_priority", - "EmailOutbox"."renderedIsTransactional" AS "rendered_is_transactional", - "EmailOutbox"."renderedSubject" AS "rendered_subject", - "EmailOutbox"."renderedNotificationCategoryId" AS "rendered_notification_category_id", + "EmailOutbox"."renderedIsTransactional" AS "is_transactional", + "EmailOutbox"."renderedSubject" AS "subject", + "EmailOutbox"."renderedNotificationCategoryId" AS "notification_category_id", "EmailOutbox"."startedRenderingAt" AS "started_rendering_at", - "EmailOutbox"."finishedRenderingAt" AS "finished_rendering_at", + "EmailOutbox"."finishedRenderingAt" AS "rendered_at", "EmailOutbox"."renderErrorExternalMessage" AS "render_error", "EmailOutbox"."scheduledAt" AS "scheduled_at", "EmailOutbox"."createdAt" AS "created_at", + "EmailOutbox"."updatedAt" AS "updated_at", "EmailOutbox"."startedSendingAt" AS "started_sending_at", - "EmailOutbox"."finishedSendingAt" AS "finished_sending_at", "EmailOutbox"."sendServerErrorExternalMessage" AS "server_error", - "EmailOutbox"."sentAt" AS "sent_at", "EmailOutbox"."deliveredAt" AS "delivered_at", "EmailOutbox"."openedAt" AS "opened_at", "EmailOutbox"."clickedAt" AS "clicked_at", @@ -1380,7 +1302,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "EmailOutbox"."bouncedAt" AS "bounced_at", "EmailOutbox"."deliveryDelayedAt" AS "delivery_delayed_at", "EmailOutbox"."canHaveDeliveryInfo" AS "can_have_delivery_info", - "EmailOutbox"."skippedReason"::text AS "skipped_reason", + LOWER(REPLACE("EmailOutbox"."skippedReason"::text, '_', '-')) AS "skipped_reason", "EmailOutbox"."skippedDetails" AS "skipped_details", "EmailOutbox"."sendRetries" AS "send_retries", "EmailOutbox"."isPaused" AS "is_paused", @@ -1598,160 +1520,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, - "session_replays": { - sourceTables: { "SessionReplay": "SessionReplay" }, - targetTable: "session_replays", - targetTableSchemas: { - postgres: ` - CREATE TABLE IF NOT EXISTS "session_replays" ( - "id" uuid PRIMARY KEY NOT NULL, - "user_id" uuid NOT NULL, - "refresh_token_id" text NOT NULL, - "started_at" timestamp without time zone NOT NULL, - "last_event_at" timestamp without time zone NOT NULL, - "created_at" timestamp without time zone NOT NULL, - "chunk_count" bigint NOT NULL DEFAULT 0 - ); - REVOKE ALL ON "session_replays" FROM PUBLIC; - GRANT SELECT ON "session_replays" TO PUBLIC; - - CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( - "mapping_name" text PRIMARY KEY NOT NULL, - "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, - "updated_at" timestamp without time zone NOT NULL DEFAULT now() - ); - `.trim(), - clickhouse: ` - CREATE TABLE IF NOT EXISTS analytics_internal.session_replays ( - project_id String, - branch_id String, - id UUID, - user_id UUID, - refresh_token_id String, - started_at DateTime64(3, 'UTC'), - last_event_at DateTime64(3, 'UTC'), - created_at DateTime64(3, 'UTC'), - chunk_count UInt64, - sync_sequence_id Int64, - sync_is_deleted UInt8, - sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) - ) - ENGINE ReplacingMergeTree(sync_sequence_id) - PARTITION BY toYYYYMM(started_at) - ORDER BY (project_id, branch_id, id); - `.trim(), - }, - internalDbFetchQueries: { - clickhouse: ` - SELECT - "Tenancy"."projectId" AS "project_id", - "Tenancy"."branchId" AS "branch_id", - "SessionReplay"."id" AS "id", - "SessionReplay"."projectUserId" AS "user_id", - "SessionReplay"."refreshTokenId" AS "refresh_token_id", - "SessionReplay"."startedAt" AS "started_at", - "SessionReplay"."lastEventAt" AS "last_event_at", - "SessionReplay"."createdAt" AS "created_at", - ( - SELECT COUNT(*) - FROM "SessionReplayChunk" - WHERE "SessionReplayChunk"."tenancyId" = "SessionReplay"."tenancyId" - AND "SessionReplayChunk"."sessionReplayId" = "SessionReplay"."id" - ) AS "chunk_count", - "SessionReplay"."sequenceId" AS "sync_sequence_id", - "SessionReplay"."tenancyId" AS "tenancyId", - false AS "sync_is_deleted" - FROM "SessionReplay" - JOIN "Tenancy" ON "Tenancy"."id" = "SessionReplay"."tenancyId" - WHERE "SessionReplay"."tenancyId" = $1::uuid - AND "SessionReplay"."sequenceId" IS NOT NULL - AND "SessionReplay"."sequenceId" > $2::bigint - ORDER BY "SessionReplay"."sequenceId" ASC - LIMIT 1000 - `.trim(), - }, - internalDbFetchQuery: ` - SELECT - "SessionReplay"."id" AS "id", - "SessionReplay"."projectUserId" AS "user_id", - "SessionReplay"."refreshTokenId" AS "refresh_token_id", - "SessionReplay"."startedAt" AS "started_at", - "SessionReplay"."lastEventAt" AS "last_event_at", - "SessionReplay"."createdAt" AS "created_at", - ( - SELECT COUNT(*) - FROM "SessionReplayChunk" - WHERE "SessionReplayChunk"."tenancyId" = "SessionReplay"."tenancyId" - AND "SessionReplayChunk"."sessionReplayId" = "SessionReplay"."id" - ) AS "chunk_count", - "SessionReplay"."sequenceId" AS "sequence_id", - "SessionReplay"."tenancyId", - false AS "is_deleted" - FROM "SessionReplay" - WHERE "SessionReplay"."tenancyId" = $1::uuid - AND "SessionReplay"."sequenceId" IS NOT NULL - AND "SessionReplay"."sequenceId" > $2::bigint - ORDER BY "SessionReplay"."sequenceId" ASC - LIMIT 1000 - `.trim(), - externalDbUpdateQueries: { - postgres: ` - WITH params AS ( - SELECT - $1::uuid AS "id", - $2::uuid AS "user_id", - $3::text AS "refresh_token_id", - $4::timestamp without time zone AS "started_at", - $5::timestamp without time zone AS "last_event_at", - $6::timestamp without time zone AS "created_at", - $7::bigint AS "chunk_count", - $8::bigint AS "sequence_id", - $9::boolean AS "is_deleted", - $10::text AS "mapping_name" - ), - deleted AS ( - DELETE FROM "session_replays" sr - USING params p - WHERE p."is_deleted" = true AND sr."id" = p."id" - RETURNING 1 - ), - upserted AS ( - INSERT INTO "session_replays" ( - "id", - "user_id", - "refresh_token_id", - "started_at", - "last_event_at", - "created_at", - "chunk_count" - ) - SELECT - p."id", - p."user_id", - p."refresh_token_id", - p."started_at", - p."last_event_at", - p."created_at", - p."chunk_count" - FROM params p - WHERE p."is_deleted" = false - ON CONFLICT ("id") DO UPDATE SET - "user_id" = EXCLUDED."user_id", - "refresh_token_id" = EXCLUDED."refresh_token_id", - "started_at" = EXCLUDED."started_at", - "last_event_at" = EXCLUDED."last_event_at", - "created_at" = EXCLUDED."created_at", - "chunk_count" = EXCLUDED."chunk_count" - RETURNING 1 - ) - INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") - SELECT p."mapping_name", p."sequence_id", now() FROM params p - ON CONFLICT ("mapping_name") DO UPDATE SET - "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), - "updated_at" = now(); - `.trim(), - }, - }, "project_permissions": { sourceTables: { "ProjectUserDirectPermission": "ProjectUserDirectPermission" }, targetTable: "project_permissions", @@ -1777,7 +1545,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { project_id String, branch_id String, user_id UUID, - permission_id String, + id String, created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -1785,7 +1553,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) - ORDER BY (project_id, branch_id, user_id, permission_id); + ORDER BY (project_id, branch_id, user_id, id); `.trim(), }, internalDbFetchQueries: { @@ -1796,7 +1564,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", "ProjectUserDirectPermission"."projectUserId" AS "user_id", - "ProjectUserDirectPermission"."permissionId" AS "permission_id", + "ProjectUserDirectPermission"."permissionId" AS "id", "ProjectUserDirectPermission"."createdAt" AS "created_at", "ProjectUserDirectPermission"."sequenceId" AS "sync_sequence_id", "ProjectUserDirectPermission"."tenancyId" AS "tenancyId", @@ -1811,7 +1579,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", - "DeletedRow"."primaryKey"->>'permissionId' AS "permission_id", + "DeletedRow"."primaryKey"->>'permissionId' AS "id", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sync_sequence_id", "DeletedRow"."tenancyId" AS "tenancyId", @@ -1845,7 +1613,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT ("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id", - "DeletedRow"."primaryKey"->>'permissionId' AS "permission_id", + "DeletedRow"."primaryKey"->>'permissionId' AS "id", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sequence_id", "DeletedRow"."tenancyId", @@ -1925,7 +1693,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { CREATE TABLE IF NOT EXISTS analytics_internal.notification_preferences ( project_id String, branch_id String, - id UUID, user_id UUID, notification_category_id String, enabled UInt8, @@ -1934,7 +1701,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) ) ENGINE ReplacingMergeTree(sync_sequence_id) - ORDER BY (project_id, branch_id, id); + ORDER BY (project_id, branch_id, user_id, notification_category_id); `.trim(), }, internalDbFetchQueries: { @@ -1944,7 +1711,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", - "UserNotificationPreference"."id" AS "id", "UserNotificationPreference"."projectUserId" AS "user_id", "UserNotificationPreference"."notificationCategoryId" AS "notification_category_id", "UserNotificationPreference"."enabled" AS "enabled", @@ -1960,7 +1726,6 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", - ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", ("DeletedRow"."data"->>'notificationCategoryId')::uuid AS "notification_category_id", ("DeletedRow"."data"->>'enabled')::boolean AS "enabled", @@ -2273,11 +2038,9 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts ( project_id String, branch_id String, - id UUID, user_id UUID, provider String, provider_account_id String, - email Nullable(String), created_at DateTime64(3, 'UTC'), sync_sequence_id Int64, sync_is_deleted UInt8, @@ -2285,7 +2048,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { ) ENGINE ReplacingMergeTree(sync_sequence_id) PARTITION BY toYYYYMM(created_at) - ORDER BY (project_id, branch_id, id); + ORDER BY (project_id, branch_id, user_id, provider, provider_account_id); CREATE TABLE IF NOT EXISTS analytics_internal._stack_sync_metadata ( tenancy_id UUID, @@ -2304,11 +2067,9 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", - "ProjectUserOAuthAccount"."id" AS "id", "ProjectUserOAuthAccount"."projectUserId" AS "user_id", "ProjectUserOAuthAccount"."configOAuthProviderId" AS "provider", "ProjectUserOAuthAccount"."providerAccountId" AS "provider_account_id", - "ProjectUserOAuthAccount"."email" AS "email", "ProjectUserOAuthAccount"."createdAt" AS "created_at", "ProjectUserOAuthAccount"."sequenceId" AS "sync_sequence_id", "ProjectUserOAuthAccount"."tenancyId" AS "tenancyId", @@ -2323,11 +2084,9 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { SELECT "Tenancy"."projectId" AS "project_id", "Tenancy"."branchId" AS "branch_id", - ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", - NULL::text AS "provider", - NULL::text AS "provider_account_id", - NULL::text AS "email", + "DeletedRow"."data"->>'configOAuthProviderId' AS "provider", + "DeletedRow"."data"->>'providerAccountId' AS "provider_account_id", "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", "DeletedRow"."sequenceId" AS "sync_sequence_id", "DeletedRow"."tenancyId" AS "tenancyId",