diff --git a/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql b/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql new file mode 100644 index 000000000..003b0df2f --- /dev/null +++ b/apps/backend/prisma/migrations/20260318000000_add_sequence_id_to_refresh_tokens_and_oauth_accounts/migration.sql @@ -0,0 +1,25 @@ +-- AlterTable +ALTER TABLE "ProjectUserRefreshToken" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "ProjectUserRefreshToken_sequenceId_key" ON "ProjectUserRefreshToken"("sequenceId"); + +-- CreateIndex +CREATE INDEX "ProjectUserRefreshToken_shouldUpdateSequenceId_idx" ON "ProjectUserRefreshToken"("shouldUpdateSequenceId", "tenancyId"); + +-- CreateIndex +CREATE INDEX "ProjectUserRefreshToken_tenancyId_sequenceId_idx" ON "ProjectUserRefreshToken"("tenancyId", "sequenceId"); + +-- AlterTable +ALTER TABLE "ProjectUserOAuthAccount" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "ProjectUserOAuthAccount_sequenceId_key" ON "ProjectUserOAuthAccount"("sequenceId"); + +-- CreateIndex +CREATE INDEX "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx" ON "ProjectUserOAuthAccount"("shouldUpdateSequenceId", "tenancyId"); + +-- CreateIndex +CREATE INDEX "ProjectUserOAuthAccount_tenancyId_sequenceId_idx" ON "ProjectUserOAuthAccount"("tenancyId", "sequenceId"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index e87ebf21d..d655132ad 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -348,9 +348,14 @@ model ProjectUserOAuthAccount { allowConnectedAccounts Boolean @default(true) allowSignIn Boolean @default(true) + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + @@id([tenancyId, id]) @@unique([tenancyId, configOAuthProviderId, projectUserId, providerAccountId]) @@index([tenancyId, projectUserId]) + @@index([tenancyId, sequenceId], name: "ProjectUserOAuthAccount_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx") } model SessionReplay { @@ -622,7 +627,12 @@ model ProjectUserRefreshToken { expiresAt DateTime? isImpersonation Boolean @default(false) + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + @@id([tenancyId, id]) + @@index([tenancyId, sequenceId], name: "ProjectUserRefreshToken_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "ProjectUserRefreshToken_shouldUpdateSequenceId_idx") } model ProjectUserAuthorizationCode { diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index 0f821b090..e16bd86d6 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -34,6 +34,10 @@ export async function runClickhouseMigrations() { await client.exec({ query: PROJECT_PERMISSIONS_VIEW_SQL }); await client.exec({ query: NOTIFICATION_PREFERENCES_TABLE_BASE_SQL }); await client.exec({ query: NOTIFICATION_PREFERENCES_VIEW_SQL }); + await client.exec({ query: REFRESH_TOKENS_TABLE_BASE_SQL }); + await client.exec({ query: REFRESH_TOKENS_VIEW_SQL }); + await client.exec({ query: CONNECTED_ACCOUNTS_TABLE_BASE_SQL }); + await client.exec({ query: CONNECTED_ACCOUNTS_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -54,6 +58,8 @@ export async function runClickhouseMigrations() { "GRANT SELECT ON default.session_replays TO limited_user;", "GRANT SELECT ON default.project_permissions TO limited_user;", "GRANT SELECT ON default.notification_preferences TO limited_user;", + "GRANT SELECT ON default.refresh_tokens TO limited_user;", + "GRANT SELECT ON default.connected_accounts TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -88,6 +94,12 @@ export async function runClickhouseMigrations() { await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS notification_preferences_project_isolation ON default.notification_preferences FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS refresh_tokens_project_isolation ON default.refresh_tokens FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS connected_accounts_project_isolation ON default.connected_accounts FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); for (const query of queries) { await client.exec({ query }); } @@ -621,6 +633,80 @@ FINAL WHERE sync_is_deleted = 0; `; +const REFRESH_TOKENS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.refresh_tokens ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + created_at DateTime64(3, 'UTC'), + last_used_at DateTime64(3, 'UTC'), + is_impersonation UInt8, + expires_at Nullable(DateTime64(3, 'UTC')), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const REFRESH_TOKENS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.refresh_tokens +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + user_id, + created_at, + last_used_at, + is_impersonation, + expires_at +FROM analytics_internal.refresh_tokens +FINAL +WHERE sync_is_deleted = 0; +`; + +const CONNECTED_ACCOUNTS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + provider String, + provider_account_id String, + email Nullable(String), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const CONNECTED_ACCOUNTS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.connected_accounts +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + user_id, + provider, + provider_account_id, + email, + created_at +FROM analytics_internal.connected_accounts +FINAL +WHERE sync_is_deleted = 0; +`; + const EXTERNAL_ANALYTICS_DB_SQL = ` CREATE DATABASE IF NOT EXISTS analytics_internal; `; diff --git a/apps/backend/scripts/run-cron-jobs.ts b/apps/backend/scripts/run-cron-jobs.ts index 3353d9381..abd221495 100644 --- a/apps/backend/scripts/run-cron-jobs.ts +++ b/apps/backend/scripts/run-cron-jobs.ts @@ -30,7 +30,7 @@ async function main() { if (runResult.status === "error") { captureError("run-cron-jobs", runResult.error); } - await wait(1000) + await wait(1000); } }); } diff --git a/apps/backend/src/app/api/latest/auth/password/update/route.tsx b/apps/backend/src/app/api/latest/auth/password/update/route.tsx index db6d43f24..75756e478 100644 --- a/apps/backend/src/app/api/latest/auth/password/update/route.tsx +++ b/apps/backend/src/app/api/latest/auth/password/update/route.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncRefreshTokenDeletionsForUser } from "@/lib/external-db-sync"; import { getPrismaClientForTenancy, globalPrismaClient, retryTransaction } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import { KnownErrors } from "@stackframe/stack-shared"; @@ -78,6 +79,12 @@ export const POST = createSmartRouteHandler({ }); // reset all other refresh tokens + await recordExternalDbSyncRefreshTokenDeletionsForUser(globalPrismaClient, { + tenancyId: tenancy.id, + projectUserId: user.id, + excludeRefreshToken: refreshToken?.[0], + }); + await globalPrismaClient.projectUserRefreshToken.deleteMany({ where: { tenancyId: tenancy.id, diff --git a/apps/backend/src/app/api/latest/auth/sessions/crud.tsx b/apps/backend/src/app/api/latest/auth/sessions/crud.tsx index 5f7d9d126..759a2f81e 100644 --- a/apps/backend/src/app/api/latest/auth/sessions/crud.tsx +++ b/apps/backend/src/app/api/latest/auth/sessions/crud.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync"; import { globalPrismaClient } from "@/prisma-client"; import { createCrudHandlers } from "@/route-handlers/crud-handler"; import { SmartRequestAuth } from "@/route-handlers/smart-request"; @@ -71,6 +72,12 @@ export const sessionsCrudHandlers = createLazyProxy(() => createCrudHandlers(ses throw new KnownErrors.CannotDeleteCurrentSession(); } + await recordExternalDbSyncDeletion(globalPrismaClient, { + tableName: "ProjectUserRefreshToken", + tenancyId: auth.tenancy.id, + refreshTokenId: params.id, + }); + await globalPrismaClient.projectUserRefreshToken.deleteMany({ where: { tenancyId: auth.tenancy.id, diff --git a/apps/backend/src/app/api/latest/auth/sessions/current/route.tsx b/apps/backend/src/app/api/latest/auth/sessions/current/route.tsx index 6df9ab3a5..9ab3716ee 100644 --- a/apps/backend/src/app/api/latest/auth/sessions/current/route.tsx +++ b/apps/backend/src/app/api/latest/auth/sessions/current/route.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync"; import { getPrismaClientForTenancy, globalPrismaClient } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import { Prisma } from "@/generated/prisma/client"; @@ -32,6 +33,13 @@ export const DELETE = createSmartRouteHandler({ try { const prisma = await getPrismaClientForTenancy(tenancy); + + await recordExternalDbSyncDeletion(globalPrismaClient, { + tableName: "ProjectUserRefreshToken", + tenancyId: tenancy.id, + refreshTokenId, + }); + const result = await globalPrismaClient.projectUserRefreshToken.deleteMany({ where: { tenancyId: tenancy.id, diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 9c506db17..470e89a55 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -374,6 +374,62 @@ async function backfillSequenceIds(batchSize: number): Promise { didUpdate = true; } + const refreshTokenTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "id" + FROM "ProjectUserRefreshToken" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "ProjectUserRefreshToken" rt + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE rt."tenancyId" = r."tenancyId" + AND rt."id" = r."id" + RETURNING rt."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.refresh-token-tenants", refreshTokenTenants.length); + + if (refreshTokenTenants.length > 0) { + await enqueueExternalDbSyncBatch(refreshTokenTenants.map(t => t.tenancyId)); + didUpdate = true; + } + + const oauthAccountTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "id" + FROM "ProjectUserOAuthAccount" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "ProjectUserOAuthAccount" oa + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE oa."tenancyId" = r."tenancyId" + AND oa."id" = r."id" + RETURNING oa."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.oauth-account-tenants", oauthAccountTenants.length); + + if (oauthAccountTenants.length > 0) { + await enqueueExternalDbSyncBatch(oauthAccountTenants.map(t => t.tenancyId)); + didUpdate = true; + } + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "id", "tenancyId" @@ -403,7 +459,7 @@ async function backfillSequenceIds(batchSize: number): Promise { span.setAttribute("stack.external-db-sync.did-update", didUpdate); if (didUpdate) { - console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, DR=${deletedRowTenants.length}`); + console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, RT=${refreshTokenTenants.length}, CA=${oauthAccountTenants.length}, DR=${deletedRowTenants.length}`); } return didUpdate; diff --git a/apps/backend/src/app/api/latest/oauth-providers/crud.tsx b/apps/backend/src/app/api/latest/oauth-providers/crud.tsx index 08eb0d681..e636e6b2c 100644 --- a/apps/backend/src/app/api/latest/oauth-providers/crud.tsx +++ b/apps/backend/src/app/api/latest/oauth-providers/crud.tsx @@ -1,3 +1,4 @@ +import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync"; import { ensureUserExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client"; @@ -356,6 +357,12 @@ export const oauthProviderCrudHandlers = createLazyProxy(() => createCrudHandler }); } + await recordExternalDbSyncDeletion(tx, { + tableName: "ProjectUserOAuthAccount", + tenancyId: auth.tenancy.id, + oauthAccountId: params.provider_id, + }); + await tx.projectUserOAuthAccount.delete({ where: { tenancyId_id: { diff --git a/apps/backend/src/app/api/latest/users/crud.tsx b/apps/backend/src/app/api/latest/users/crud.tsx index 7f0b1249f..36b9d275c 100644 --- a/apps/backend/src/app/api/latest/users/crud.tsx +++ b/apps/backend/src/app/api/latest/users/crud.tsx @@ -2,7 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client"; import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config"; import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel"; import { normalizeEmail } from "@/lib/emails"; -import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncNotificationPreferenceDeletionsForUser, recordExternalDbSyncProjectPermissionDeletionsForUser, recordExternalDbSyncTeamMemberDeletionsForUser, recordExternalDbSyncTeamPermissionDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; +import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncNotificationPreferenceDeletionsForUser, recordExternalDbSyncOAuthAccountDeletionsForUser, recordExternalDbSyncProjectPermissionDeletionsForUser, recordExternalDbSyncRefreshTokenDeletionsForUser, recordExternalDbSyncTeamMemberDeletionsForUser, recordExternalDbSyncTeamPermissionDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { grantDefaultProjectPermissions } from "@/lib/permissions"; import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; @@ -1156,6 +1156,11 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC // if user password changed, reset all refresh tokens if (passwordHash !== undefined) { + await recordExternalDbSyncRefreshTokenDeletionsForUser(globalPrismaClient, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + await globalPrismaClient.projectUserRefreshToken.deleteMany({ where: { tenancyId: auth.tenancy.id, @@ -1222,6 +1227,16 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC projectUserId: params.user_id, }); + await recordExternalDbSyncRefreshTokenDeletionsForUser(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + + await recordExternalDbSyncOAuthAccountDeletionsForUser(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + await tx.projectUser.delete({ where: { tenancyId_projectUserId: { diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 2c4f6e7f3..885952530 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -74,6 +74,16 @@ type ExternalDbSyncTarget = verificationCodeProjectId: string, verificationCodeBranchId: string, verificationCodeId: string, + } + | { + tableName: "ProjectUserRefreshToken", + tenancyId: string, + refreshTokenId: string, + } + | { + tableName: "ProjectUserOAuthAccount", + tenancyId: string, + oauthAccountId: string, }; type ExternalDbType = NonNullable["type"]>; @@ -373,6 +383,74 @@ export async function recordExternalDbSyncDeletion( return; } + if (target.tableName === "ProjectUserRefreshToken") { + assertUuid(target.refreshTokenId, "refreshTokenId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ProjectUserRefreshToken', + jsonb_build_object('tenancyId', "tenancyId", 'id', "id"), + to_jsonb("ProjectUserRefreshToken".*), + NOW(), + TRUE + FROM "ProjectUserRefreshToken" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "id" = ${target.refreshTokenId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for ProjectUserRefreshToken, got ${insertedCount}.` + ); + } + return; + } + + if (target.tableName === "ProjectUserOAuthAccount") { + assertUuid(target.oauthAccountId, "oauthAccountId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ProjectUserOAuthAccount', + jsonb_build_object('tenancyId', "tenancyId", 'id', "id"), + to_jsonb("ProjectUserOAuthAccount".*), + NOW(), + TRUE + FROM "ProjectUserOAuthAccount" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "id" = ${target.oauthAccountId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for ProjectUserOAuthAccount, got ${insertedCount}.` + ); + } + return; + } + { const _verificationCodeTarget: { tableName: "VerificationCode_TEAM_INVITATION" } = target; assertNonEmptyString(target.verificationCodeProjectId, "verificationCodeProjectId"); @@ -765,6 +843,82 @@ export async function recordExternalDbSyncNotificationPreferenceDeletionsForUser `); } +export async function recordExternalDbSyncRefreshTokenDeletionsForUser( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + projectUserId: string, + excludeRefreshToken?: string, + }, +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.projectUserId, "projectUserId"); + + const excludeCondition = options.excludeRefreshToken + ? Prisma.sql`AND "refreshToken" != ${options.excludeRefreshToken}` + : Prisma.sql``; + + await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ProjectUserRefreshToken', + jsonb_build_object('tenancyId', "tenancyId", 'id', "id"), + to_jsonb("ProjectUserRefreshToken".*), + NOW(), + TRUE + FROM "ProjectUserRefreshToken" + WHERE "tenancyId" = ${options.tenancyId}::uuid + AND "projectUserId" = ${options.projectUserId}::uuid + ${excludeCondition} + FOR UPDATE + `); +} + +export async function recordExternalDbSyncOAuthAccountDeletionsForUser( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + projectUserId: string, + }, +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.projectUserId, "projectUserId"); + + await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ProjectUserOAuthAccount', + jsonb_build_object('tenancyId', "tenancyId", 'id', "id"), + to_jsonb("ProjectUserOAuthAccount".*), + NOW(), + TRUE + FROM "ProjectUserOAuthAccount" + WHERE "tenancyId" = ${options.tenancyId}::uuid + AND "projectUserId" = ${options.projectUserId}::uuid + FOR UPDATE + `); +} + type PgErrorLike = { code?: string, constraint?: string, @@ -993,6 +1147,20 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record { { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON SQLite FROM limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON URL FROM limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW DATABASES ON default.* TO limited_user" }, + { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.connected_accounts TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.contact_channels TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.email_outboxes TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.events TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.notification_preferences TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.project_permissions TO limited_user" }, + { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.refresh_tokens TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.session_replays TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_invitations TO limited_user" }, { "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_member_profiles TO limited_user" }, @@ -570,6 +572,10 @@ it("can see only some tables", async ({ expect }) => { "status": 200, "body": { "result": [ + { + "database": "default", + "name": "connected_accounts", + }, { "database": "default", "name": "contact_channels", @@ -590,6 +596,10 @@ it("can see only some tables", async ({ expect }) => { "database": "default", "name": "project_permissions", }, + { + "database": "default", + "name": "refresh_tokens", + }, { "database": "default", "name": "session_replays", @@ -631,11 +641,13 @@ it("SHOW TABLES should have the correct tables", async ({ expect }) => { "status": 200, "body": { "result": [ + { "name": "connected_accounts" }, { "name": "contact_channels" }, { "name": "email_outboxes" }, { "name": "events" }, { "name": "notification_preferences" }, { "name": "project_permissions" }, + { "name": "refresh_tokens" }, { "name": "session_replays" }, { "name": "team_invitations" }, { "name": "team_member_profiles" }, @@ -1122,11 +1134,13 @@ it("shows grants", async ({ expect }) => { "status": 200, "body": { "result": [ + { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.connected_accounts TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.contact_channels TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.email_outboxes TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.events TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.notification_preferences TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.project_permissions TO limited_user" }, + { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.refresh_tokens TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.session_replays TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_invitations TO limited_user" }, { "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_member_profiles TO limited_user" }, diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 7519648a8..752b68563 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -13,10 +13,14 @@ import { verifyNotInExternalDb, waitForSyncedContactChannel, waitForSyncedContactChannelDeletion, + waitForSyncedConnectedAccount, + waitForSyncedConnectedAccountDeletion, waitForSyncedData, waitForSyncedDeletion, waitForSyncedEmailOutbox, waitForSyncedEmailOutboxByStatus, + waitForSyncedRefreshToken, + waitForSyncedRefreshTokenDeletion, waitForSyncedSessionReplay, waitForSyncedTeam, waitForSyncedTeamDeletion, @@ -1689,4 +1693,204 @@ describe.sequential('External DB Sync - Basic Tests', () => { }); }, TEST_TIMEOUT); + /** + * What it does: + * - Signs up a user (which creates a refresh token), waits for it to sync to the external DB. + * + * Why it matters: + * - Validates that refresh tokens are synced to external databases. + */ + test('Refresh token sync to external DB', async ({ expect }) => { + const dbName = 'refresh_token_sync'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: "postgres", + connectionString, + }, + }, { config: { magic_link_enabled: true } }); + + const signUpRes = await Auth.Otp.signIn(); + + // List sessions to get the session (refresh token) ID + const listRes = await niceBackendFetch("/api/v1/auth/sessions", { + accessType: "client", + method: "GET", + query: { user_id: signUpRes.userId }, + }); + expect(listRes.status).toBe(200); + expect(listRes.body.items.length).toBeGreaterThanOrEqual(1); + const sessionId = listRes.body.items[0].id; + + const client = dbManager.getClient(dbName); + await waitForSyncedRefreshToken(client, sessionId); + + const res = await client.query(`SELECT * FROM "refresh_tokens" WHERE "id" = $1`, [sessionId]); + expect(res.rows.length).toBe(1); + expect(res.rows[0].user_id).toBe(signUpRes.userId); + expect(res.rows[0].is_impersonation).toBe(false); + expect(res.rows[0].created_at).toBeInstanceOf(Date); + expect(res.rows[0].last_used_at).toBeInstanceOf(Date); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Signs up a user, revokes the session, and waits for the deletion to sync. + * + * Why it matters: + * - Validates that refresh token deletions are synced to external databases. + */ + test('Refresh token deletion sync to external DB', async ({ expect }) => { + const dbName = 'refresh_token_delete_sync'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: "postgres", + connectionString, + }, + }, { config: { magic_link_enabled: true } }); + + const signUpRes = await Auth.Otp.signIn(); + + // Create a second session so we can revoke one + const newSession = await niceBackendFetch("/api/v1/auth/sessions", { + accessType: "server", + method: "POST", + body: { user_id: signUpRes.userId }, + }); + expect(newSession.status).toBe(200); + + // List sessions to find the second session ID + const listRes = await niceBackendFetch("/api/v1/auth/sessions", { + accessType: "client", + method: "GET", + query: { user_id: signUpRes.userId }, + }); + expect(listRes.status).toBe(200); + const nonCurrentSession = listRes.body.items.find((s: any) => !s.is_current_session); + expect(nonCurrentSession).toBeDefined(); + + const client = dbManager.getClient(dbName); + await waitForSyncedRefreshToken(client, nonCurrentSession.id); + + // Revoke the non-current session + const deleteRes = await niceBackendFetch(`/api/v1/auth/sessions/${nonCurrentSession.id}`, { + accessType: "client", + method: "DELETE", + query: { user_id: signUpRes.userId }, + }); + expect(deleteRes.status).toBe(200); + + await waitForSyncedRefreshTokenDeletion(client, nonCurrentSession.id); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Signs up a user, verifies refresh token appears in ClickHouse. + * + * Why it matters: + * - Validates ClickHouse refresh_tokens table sync. + */ + test('Refresh token sync to ClickHouse', async ({ expect }) => { + await Project.createAndSwitch({ config: { magic_link_enabled: true } }); + await InternalApiKey.createAndSetProjectKeys(); + + const signUpRes = await Auth.Otp.signIn(); + + const listRes = await niceBackendFetch("/api/v1/auth/sessions", { + accessType: "client", + method: "GET", + query: { user_id: signUpRes.userId }, + }); + expect(listRes.status).toBe(200); + const sessionId = listRes.body.items[0].id; + + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT id, user_id, is_impersonation FROM refresh_tokens WHERE id = {session_id:UUID}", + params: { session_id: sessionId }, + }); + expect(response.status).toBe(200); + if (response.body.result.length === 1) { + expect(response.body.result[0]).toMatchObject({ + id: sessionId, + user_id: signUpRes.userId, + is_impersonation: 0, + }); + return; + } + await wait(intervalMs); + } + throw new StackAssertionError(`Timed out waiting for ClickHouse refresh token to sync.`, { response }); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Signs up a user, verifies connected account appears in ClickHouse. + * + * Why it matters: + * - Validates ClickHouse connected_accounts table sync. + */ + test('Connected account sync to ClickHouse', async ({ expect }) => { + // Use default project (has spotify configured) with analytics keys + await Auth.OAuth.signIn(); + await InternalApiKey.createAndSetProjectKeys(); + + // Get the user ID + const userRes = await niceBackendFetch("/api/v1/users/me", { + accessType: "client", + method: "GET", + }); + expect(userRes.status).toBe(200); + const userId = userRes.body.id; + + // Create an additional connected account via the oauth-providers API so we have a known ID + const createRes = await niceBackendFetch("/api/v1/oauth-providers", { + accessType: "server", + method: "POST", + body: { + user_id: userId, + provider_config_id: "spotify", + account_id: "ch-test-account-12345", + email: "chuser@example.com", + allow_sign_in: false, + allow_connected_accounts: true, + }, + }); + expect(createRes.status).toBe(201); + const accountId = createRes.body.id; + + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT id, user_id, provider, provider_account_id, email FROM connected_accounts WHERE id = {account_id:UUID}", + params: { account_id: accountId }, + }); + expect(response.status).toBe(200); + if (response.body.result.length === 1) { + expect(response.body.result[0]).toMatchObject({ + id: accountId, + user_id: userId, + provider: "spotify", + provider_account_id: "ch-test-account-12345", + email: "chuser@example.com", + }); + return; + } + await wait(intervalMs); + } + throw new StackAssertionError(`Timed out waiting for ClickHouse connected account to sync.`, { response }); + }, TEST_TIMEOUT); + }); diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index b51baf7c6..4c91ae93a 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -396,6 +396,54 @@ export async function waitForSyncedNotificationPreferenceDeletion(client: Client }); } +export async function waitForSyncedRefreshToken(client: Client, refreshTokenId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "refresh_tokens" WHERE "id" = $1`, + [refreshTokenId], + { + shouldExist: true, + description: `refresh token "${refreshTokenId}" to appear in external DB`, + }, + ); +} + +export async function waitForSyncedRefreshTokenDeletion(client: Client, refreshTokenId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "refresh_tokens" WHERE "id" = $1`, + [refreshTokenId], + { + shouldExist: false, + description: `refresh token "${refreshTokenId}" to be removed from external DB`, + }, + ); +} + +export async function waitForSyncedConnectedAccount(client: Client, accountId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "connected_accounts" WHERE "id" = $1`, + [accountId], + { + shouldExist: true, + description: `connected account "${accountId}" to appear in external DB`, + }, + ); +} + +export async function waitForSyncedConnectedAccountDeletion(client: Client, accountId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "connected_accounts" WHERE "id" = $1`, + [accountId], + { + shouldExist: false, + description: `connected account "${accountId}" to be removed from external DB`, + }, + ); +} + export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) { await waitForExternalDbRow( client, diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index 76566d245..0f2ca5fc3 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -2059,4 +2059,382 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, + "refresh_tokens": { + sourceTables: { "ProjectUserRefreshToken": "ProjectUserRefreshToken" }, + targetTable: "refresh_tokens", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "refresh_tokens" ( + "id" uuid PRIMARY KEY NOT NULL, + "user_id" uuid NOT NULL, + "created_at" timestamp without time zone NOT NULL, + "last_used_at" timestamp without time zone NOT NULL, + "is_impersonation" boolean NOT NULL DEFAULT false, + "expires_at" timestamp without time zone + ); + REVOKE ALL ON "refresh_tokens" FROM PUBLIC; + GRANT SELECT ON "refresh_tokens" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.refresh_tokens ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + created_at DateTime64(3, 'UTC'), + last_used_at DateTime64(3, 'UTC'), + is_impersonation UInt8, + expires_at Nullable(DateTime64(3, 'UTC')), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + + CREATE TABLE IF NOT EXISTS analytics_internal._stack_sync_metadata ( + tenancy_id UUID, + mapping_name String, + last_synced_sequence_id Int64, + updated_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(updated_at) + ORDER BY (tenancy_id, mapping_name); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT * + FROM ( + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "ProjectUserRefreshToken"."id" AS "id", + "ProjectUserRefreshToken"."projectUserId" AS "user_id", + "ProjectUserRefreshToken"."createdAt" AS "created_at", + "ProjectUserRefreshToken"."lastActiveAt" AS "last_used_at", + "ProjectUserRefreshToken"."isImpersonation" AS "is_impersonation", + "ProjectUserRefreshToken"."expiresAt" AS "expires_at", + "ProjectUserRefreshToken"."sequenceId" AS "sync_sequence_id", + "ProjectUserRefreshToken"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "ProjectUserRefreshToken" + JOIN "Tenancy" ON "Tenancy"."id" = "ProjectUserRefreshToken"."tenancyId" + WHERE "ProjectUserRefreshToken"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."deletedAt"::timestamp without time zone AS "last_used_at", + false AS "is_impersonation", + NULL::timestamp without time zone AS "expires_at", + "DeletedRow"."sequenceId" AS "sync_sequence_id", + "DeletedRow"."tenancyId" AS "tenancyId", + true AS "sync_is_deleted" + FROM "DeletedRow" + JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ProjectUserRefreshToken' + ) AS "_src" + WHERE "sync_sequence_id" IS NOT NULL + AND "sync_sequence_id" > $2::bigint + ORDER BY "sync_sequence_id" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT * + FROM ( + SELECT + "ProjectUserRefreshToken"."id" AS "id", + "ProjectUserRefreshToken"."projectUserId" AS "user_id", + "ProjectUserRefreshToken"."createdAt" AS "created_at", + "ProjectUserRefreshToken"."lastActiveAt" AS "last_used_at", + "ProjectUserRefreshToken"."isImpersonation" AS "is_impersonation", + "ProjectUserRefreshToken"."expiresAt" AS "expires_at", + "ProjectUserRefreshToken"."sequenceId" AS "sequence_id", + "ProjectUserRefreshToken"."tenancyId", + false AS "is_deleted" + FROM "ProjectUserRefreshToken" + WHERE "ProjectUserRefreshToken"."tenancyId" = $1::uuid + + UNION ALL + + SELECT + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."deletedAt"::timestamp without time zone AS "last_used_at", + false AS "is_impersonation", + NULL::timestamp without time zone AS "expires_at", + "DeletedRow"."sequenceId" AS "sequence_id", + "DeletedRow"."tenancyId", + true AS "is_deleted" + FROM "DeletedRow" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ProjectUserRefreshToken' + ) AS "_src" + WHERE "sequence_id" IS NOT NULL + AND "sequence_id" > $2::bigint + ORDER BY "sequence_id" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::uuid AS "user_id", + $3::timestamp without time zone AS "created_at", + $4::timestamp without time zone AS "last_used_at", + $5::boolean AS "is_impersonation", + $6::timestamp without time zone AS "expires_at", + $7::bigint AS "sequence_id", + $8::boolean AS "is_deleted", + $9::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "refresh_tokens" rt + USING params p + WHERE p."is_deleted" = true AND rt."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "refresh_tokens" ( + "id", + "user_id", + "created_at", + "last_used_at", + "is_impersonation", + "expires_at" + ) + SELECT + p."id", + p."user_id", + p."created_at", + p."last_used_at", + p."is_impersonation", + p."expires_at" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "user_id" = EXCLUDED."user_id", + "created_at" = EXCLUDED."created_at", + "last_used_at" = EXCLUDED."last_used_at", + "is_impersonation" = EXCLUDED."is_impersonation", + "expires_at" = EXCLUDED."expires_at" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, + "connected_accounts": { + sourceTables: { "ProjectUserOAuthAccount": "ProjectUserOAuthAccount" }, + targetTable: "connected_accounts", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "connected_accounts" ( + "id" uuid PRIMARY KEY NOT NULL, + "user_id" uuid NOT NULL, + "provider" text NOT NULL, + "provider_account_id" text NOT NULL, + "email" text, + "created_at" timestamp without time zone NOT NULL + ); + REVOKE ALL ON "connected_accounts" FROM PUBLIC; + GRANT SELECT ON "connected_accounts" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts ( + project_id String, + branch_id String, + id UUID, + user_id UUID, + provider String, + provider_account_id String, + email Nullable(String), + created_at DateTime64(3, 'UTC'), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + + CREATE TABLE IF NOT EXISTS analytics_internal._stack_sync_metadata ( + tenancy_id UUID, + mapping_name String, + last_synced_sequence_id Int64, + updated_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(updated_at) + ORDER BY (tenancy_id, mapping_name); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT * + FROM ( + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "ProjectUserOAuthAccount"."id" AS "id", + "ProjectUserOAuthAccount"."projectUserId" AS "user_id", + "ProjectUserOAuthAccount"."configOAuthProviderId" AS "provider", + "ProjectUserOAuthAccount"."providerAccountId" AS "provider_account_id", + "ProjectUserOAuthAccount"."email" AS "email", + "ProjectUserOAuthAccount"."createdAt" AS "created_at", + "ProjectUserOAuthAccount"."sequenceId" AS "sync_sequence_id", + "ProjectUserOAuthAccount"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "ProjectUserOAuthAccount" + JOIN "Tenancy" ON "Tenancy"."id" = "ProjectUserOAuthAccount"."tenancyId" + WHERE "ProjectUserOAuthAccount"."tenancyId" = $1::uuid + AND "ProjectUserOAuthAccount"."projectUserId" IS NOT NULL + + UNION ALL + + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "provider", + NULL::text AS "provider_account_id", + NULL::text AS "email", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sync_sequence_id", + "DeletedRow"."tenancyId" AS "tenancyId", + true AS "sync_is_deleted" + FROM "DeletedRow" + JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ProjectUserOAuthAccount' + ) AS "_src" + WHERE "sync_sequence_id" IS NOT NULL + AND "sync_sequence_id" > $2::bigint + ORDER BY "sync_sequence_id" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT * + FROM ( + SELECT + "ProjectUserOAuthAccount"."id" AS "id", + "ProjectUserOAuthAccount"."projectUserId" AS "user_id", + "ProjectUserOAuthAccount"."configOAuthProviderId" AS "provider", + "ProjectUserOAuthAccount"."providerAccountId" AS "provider_account_id", + "ProjectUserOAuthAccount"."email" AS "email", + "ProjectUserOAuthAccount"."createdAt" AS "created_at", + "ProjectUserOAuthAccount"."sequenceId" AS "sequence_id", + "ProjectUserOAuthAccount"."tenancyId", + false AS "is_deleted" + FROM "ProjectUserOAuthAccount" + WHERE "ProjectUserOAuthAccount"."tenancyId" = $1::uuid + AND "ProjectUserOAuthAccount"."projectUserId" IS NOT NULL + + UNION ALL + + SELECT + ("DeletedRow"."primaryKey"->>'id')::uuid AS "id", + ("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id", + NULL::text AS "provider", + NULL::text AS "provider_account_id", + NULL::text AS "email", + "DeletedRow"."deletedAt"::timestamp without time zone AS "created_at", + "DeletedRow"."sequenceId" AS "sequence_id", + "DeletedRow"."tenancyId", + true AS "is_deleted" + FROM "DeletedRow" + WHERE + "DeletedRow"."tenancyId" = $1::uuid + AND "DeletedRow"."tableName" = 'ProjectUserOAuthAccount' + ) AS "_src" + WHERE "sequence_id" IS NOT NULL + AND "sequence_id" > $2::bigint + ORDER BY "sequence_id" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::uuid AS "user_id", + $3::text AS "provider", + $4::text AS "provider_account_id", + $5::text AS "email", + $6::timestamp without time zone AS "created_at", + $7::bigint AS "sequence_id", + $8::boolean AS "is_deleted", + $9::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "connected_accounts" ca + USING params p + WHERE p."is_deleted" = true AND ca."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "connected_accounts" ( + "id", + "user_id", + "provider", + "provider_account_id", + "email", + "created_at" + ) + SELECT + p."id", + p."user_id", + p."provider", + p."provider_account_id", + p."email", + p."created_at" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "user_id" = EXCLUDED."user_id", + "provider" = EXCLUDED."provider", + "provider_account_id" = EXCLUDED."provider_account_id", + "email" = EXCLUDED."email", + "created_at" = EXCLUDED."created_at" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, } as const;