clickhouse sync session and connected accounts

This commit is contained in:
Bilal Godil 2026-03-18 12:20:23 -07:00
parent 66cff4d125
commit bf008e21b2
15 changed files with 1036 additions and 3 deletions

View File

@ -0,0 +1,25 @@
-- AlterTable
ALTER TABLE "ProjectUserRefreshToken" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "ProjectUserRefreshToken_sequenceId_key" ON "ProjectUserRefreshToken"("sequenceId");
-- CreateIndex
CREATE INDEX "ProjectUserRefreshToken_shouldUpdateSequenceId_idx" ON "ProjectUserRefreshToken"("shouldUpdateSequenceId", "tenancyId");
-- CreateIndex
CREATE INDEX "ProjectUserRefreshToken_tenancyId_sequenceId_idx" ON "ProjectUserRefreshToken"("tenancyId", "sequenceId");
-- AlterTable
ALTER TABLE "ProjectUserOAuthAccount" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "ProjectUserOAuthAccount_sequenceId_key" ON "ProjectUserOAuthAccount"("sequenceId");
-- CreateIndex
CREATE INDEX "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx" ON "ProjectUserOAuthAccount"("shouldUpdateSequenceId", "tenancyId");
-- CreateIndex
CREATE INDEX "ProjectUserOAuthAccount_tenancyId_sequenceId_idx" ON "ProjectUserOAuthAccount"("tenancyId", "sequenceId");

View File

@ -348,9 +348,14 @@ model ProjectUserOAuthAccount {
allowConnectedAccounts Boolean @default(true)
allowSignIn Boolean @default(true)
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
@@id([tenancyId, id])
@@unique([tenancyId, configOAuthProviderId, projectUserId, providerAccountId])
@@index([tenancyId, projectUserId])
@@index([tenancyId, sequenceId], name: "ProjectUserOAuthAccount_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "ProjectUserOAuthAccount_shouldUpdateSequenceId_idx")
}
model SessionReplay {
@ -622,7 +627,12 @@ model ProjectUserRefreshToken {
expiresAt DateTime?
isImpersonation Boolean @default(false)
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
@@id([tenancyId, id])
@@index([tenancyId, sequenceId], name: "ProjectUserRefreshToken_tenancyId_sequenceId_idx")
@@index([shouldUpdateSequenceId, tenancyId], name: "ProjectUserRefreshToken_shouldUpdateSequenceId_idx")
}
model ProjectUserAuthorizationCode {

View File

@ -34,6 +34,10 @@ export async function runClickhouseMigrations() {
await client.exec({ query: PROJECT_PERMISSIONS_VIEW_SQL });
await client.exec({ query: NOTIFICATION_PREFERENCES_TABLE_BASE_SQL });
await client.exec({ query: NOTIFICATION_PREFERENCES_VIEW_SQL });
await client.exec({ query: REFRESH_TOKENS_TABLE_BASE_SQL });
await client.exec({ query: REFRESH_TOKENS_VIEW_SQL });
await client.exec({ query: CONNECTED_ACCOUNTS_TABLE_BASE_SQL });
await client.exec({ query: CONNECTED_ACCOUNTS_VIEW_SQL });
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
@ -54,6 +58,8 @@ export async function runClickhouseMigrations() {
"GRANT SELECT ON default.session_replays TO limited_user;",
"GRANT SELECT ON default.project_permissions TO limited_user;",
"GRANT SELECT ON default.notification_preferences TO limited_user;",
"GRANT SELECT ON default.refresh_tokens TO limited_user;",
"GRANT SELECT ON default.connected_accounts TO limited_user;",
];
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
@ -88,6 +94,12 @@ export async function runClickhouseMigrations() {
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS notification_preferences_project_isolation ON default.notification_preferences FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS refresh_tokens_project_isolation ON default.refresh_tokens FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS connected_accounts_project_isolation ON default.connected_accounts FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
for (const query of queries) {
await client.exec({ query });
}
@ -621,6 +633,80 @@ FINAL
WHERE sync_is_deleted = 0;
`;
const REFRESH_TOKENS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.refresh_tokens (
project_id String,
branch_id String,
id UUID,
user_id UUID,
created_at DateTime64(3, 'UTC'),
last_used_at DateTime64(3, 'UTC'),
is_impersonation UInt8,
expires_at Nullable(DateTime64(3, 'UTC')),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;
const REFRESH_TOKENS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.refresh_tokens
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
user_id,
created_at,
last_used_at,
is_impersonation,
expires_at
FROM analytics_internal.refresh_tokens
FINAL
WHERE sync_is_deleted = 0;
`;
const CONNECTED_ACCOUNTS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts (
project_id String,
branch_id String,
id UUID,
user_id UUID,
provider String,
provider_account_id String,
email Nullable(String),
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;
const CONNECTED_ACCOUNTS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.connected_accounts
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
user_id,
provider,
provider_account_id,
email,
created_at
FROM analytics_internal.connected_accounts
FINAL
WHERE sync_is_deleted = 0;
`;
const EXTERNAL_ANALYTICS_DB_SQL = `
CREATE DATABASE IF NOT EXISTS analytics_internal;
`;

View File

@ -30,7 +30,7 @@ async function main() {
if (runResult.status === "error") {
captureError("run-cron-jobs", runResult.error);
}
await wait(1000)
await wait(1000);
}
});
}

View File

@ -1,3 +1,4 @@
import { recordExternalDbSyncRefreshTokenDeletionsForUser } from "@/lib/external-db-sync";
import { getPrismaClientForTenancy, globalPrismaClient, retryTransaction } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { KnownErrors } from "@stackframe/stack-shared";
@ -78,6 +79,12 @@ export const POST = createSmartRouteHandler({
});
// reset all other refresh tokens
await recordExternalDbSyncRefreshTokenDeletionsForUser(globalPrismaClient, {
tenancyId: tenancy.id,
projectUserId: user.id,
excludeRefreshToken: refreshToken?.[0],
});
await globalPrismaClient.projectUserRefreshToken.deleteMany({
where: {
tenancyId: tenancy.id,

View File

@ -1,3 +1,4 @@
import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync";
import { globalPrismaClient } from "@/prisma-client";
import { createCrudHandlers } from "@/route-handlers/crud-handler";
import { SmartRequestAuth } from "@/route-handlers/smart-request";
@ -71,6 +72,12 @@ export const sessionsCrudHandlers = createLazyProxy(() => createCrudHandlers(ses
throw new KnownErrors.CannotDeleteCurrentSession();
}
await recordExternalDbSyncDeletion(globalPrismaClient, {
tableName: "ProjectUserRefreshToken",
tenancyId: auth.tenancy.id,
refreshTokenId: params.id,
});
await globalPrismaClient.projectUserRefreshToken.deleteMany({
where: {
tenancyId: auth.tenancy.id,

View File

@ -1,3 +1,4 @@
import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync";
import { getPrismaClientForTenancy, globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { Prisma } from "@/generated/prisma/client";
@ -32,6 +33,13 @@ export const DELETE = createSmartRouteHandler({
try {
const prisma = await getPrismaClientForTenancy(tenancy);
await recordExternalDbSyncDeletion(globalPrismaClient, {
tableName: "ProjectUserRefreshToken",
tenancyId: tenancy.id,
refreshTokenId,
});
const result = await globalPrismaClient.projectUserRefreshToken.deleteMany({
where: {
tenancyId: tenancy.id,

View File

@ -374,6 +374,62 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
didUpdate = true;
}
const refreshTokenTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "id"
FROM "ProjectUserRefreshToken"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ProjectUserRefreshToken" rt
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE rt."tenancyId" = r."tenancyId"
AND rt."id" = r."id"
RETURNING rt."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.refresh-token-tenants", refreshTokenTenants.length);
if (refreshTokenTenants.length > 0) {
await enqueueExternalDbSyncBatch(refreshTokenTenants.map(t => t.tenancyId));
didUpdate = true;
}
const oauthAccountTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "id"
FROM "ProjectUserOAuthAccount"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ProjectUserOAuthAccount" oa
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE oa."tenancyId" = r."tenancyId"
AND oa."id" = r."id"
RETURNING oa."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.oauth-account-tenants", oauthAccountTenants.length);
if (oauthAccountTenants.length > 0) {
await enqueueExternalDbSyncBatch(oauthAccountTenants.map(t => t.tenancyId));
didUpdate = true;
}
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
@ -403,7 +459,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (didUpdate) {
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, DR=${deletedRowTenants.length}`);
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PP=${projectPermissionTenants.length}, NP=${notificationPreferenceTenants.length}, RT=${refreshTokenTenants.length}, CA=${oauthAccountTenants.length}, DR=${deletedRowTenants.length}`);
}
return didUpdate;

View File

@ -1,3 +1,4 @@
import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync";
import { ensureUserExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client";
@ -356,6 +357,12 @@ export const oauthProviderCrudHandlers = createLazyProxy(() => createCrudHandler
});
}
await recordExternalDbSyncDeletion(tx, {
tableName: "ProjectUserOAuthAccount",
tenancyId: auth.tenancy.id,
oauthAccountId: params.provider_id,
});
await tx.projectUserOAuthAccount.delete({
where: {
tenancyId_id: {

View File

@ -2,7 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client";
import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config";
import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel";
import { normalizeEmail } from "@/lib/emails";
import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncNotificationPreferenceDeletionsForUser, recordExternalDbSyncProjectPermissionDeletionsForUser, recordExternalDbSyncTeamMemberDeletionsForUser, recordExternalDbSyncTeamPermissionDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncNotificationPreferenceDeletionsForUser, recordExternalDbSyncOAuthAccountDeletionsForUser, recordExternalDbSyncProjectPermissionDeletionsForUser, recordExternalDbSyncRefreshTokenDeletionsForUser, recordExternalDbSyncTeamMemberDeletionsForUser, recordExternalDbSyncTeamPermissionDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { grantDefaultProjectPermissions } from "@/lib/permissions";
import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
@ -1156,6 +1156,11 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
// if user password changed, reset all refresh tokens
if (passwordHash !== undefined) {
await recordExternalDbSyncRefreshTokenDeletionsForUser(globalPrismaClient, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await globalPrismaClient.projectUserRefreshToken.deleteMany({
where: {
tenancyId: auth.tenancy.id,
@ -1222,6 +1227,16 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
projectUserId: params.user_id,
});
await recordExternalDbSyncRefreshTokenDeletionsForUser(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await recordExternalDbSyncOAuthAccountDeletionsForUser(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await tx.projectUser.delete({
where: {
tenancyId_projectUserId: {

View File

@ -74,6 +74,16 @@ type ExternalDbSyncTarget =
verificationCodeProjectId: string,
verificationCodeBranchId: string,
verificationCodeId: string,
}
| {
tableName: "ProjectUserRefreshToken",
tenancyId: string,
refreshTokenId: string,
}
| {
tableName: "ProjectUserOAuthAccount",
tenancyId: string,
oauthAccountId: string,
};
type ExternalDbType = NonNullable<NonNullable<CompleteConfig["dbSync"]["externalDatabases"][string]>["type"]>;
@ -373,6 +383,74 @@ export async function recordExternalDbSyncDeletion(
return;
}
if (target.tableName === "ProjectUserRefreshToken") {
assertUuid(target.refreshTokenId, "refreshTokenId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ProjectUserRefreshToken',
jsonb_build_object('tenancyId', "tenancyId", 'id', "id"),
to_jsonb("ProjectUserRefreshToken".*),
NOW(),
TRUE
FROM "ProjectUserRefreshToken"
WHERE "tenancyId" = ${target.tenancyId}::uuid
AND "id" = ${target.refreshTokenId}::uuid
FOR UPDATE
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for ProjectUserRefreshToken, got ${insertedCount}.`
);
}
return;
}
if (target.tableName === "ProjectUserOAuthAccount") {
assertUuid(target.oauthAccountId, "oauthAccountId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ProjectUserOAuthAccount',
jsonb_build_object('tenancyId', "tenancyId", 'id', "id"),
to_jsonb("ProjectUserOAuthAccount".*),
NOW(),
TRUE
FROM "ProjectUserOAuthAccount"
WHERE "tenancyId" = ${target.tenancyId}::uuid
AND "id" = ${target.oauthAccountId}::uuid
FOR UPDATE
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for ProjectUserOAuthAccount, got ${insertedCount}.`
);
}
return;
}
{
const _verificationCodeTarget: { tableName: "VerificationCode_TEAM_INVITATION" } = target;
assertNonEmptyString(target.verificationCodeProjectId, "verificationCodeProjectId");
@ -765,6 +843,82 @@ export async function recordExternalDbSyncNotificationPreferenceDeletionsForUser
`);
}
export async function recordExternalDbSyncRefreshTokenDeletionsForUser(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
excludeRefreshToken?: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
const excludeCondition = options.excludeRefreshToken
? Prisma.sql`AND "refreshToken" != ${options.excludeRefreshToken}`
: Prisma.sql``;
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ProjectUserRefreshToken',
jsonb_build_object('tenancyId', "tenancyId", 'id', "id"),
to_jsonb("ProjectUserRefreshToken".*),
NOW(),
TRUE
FROM "ProjectUserRefreshToken"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "projectUserId" = ${options.projectUserId}::uuid
${excludeCondition}
FOR UPDATE
`);
}
export async function recordExternalDbSyncOAuthAccountDeletionsForUser(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ProjectUserOAuthAccount',
jsonb_build_object('tenancyId', "tenancyId", 'id', "id"),
to_jsonb("ProjectUserOAuthAccount".*),
NOW(),
TRUE
FROM "ProjectUserOAuthAccount"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "projectUserId" = ${options.projectUserId}::uuid
FOR UPDATE
`);
}
type PgErrorLike = {
code?: string,
constraint?: string,
@ -993,6 +1147,20 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boo
chunk_count: 'bigint',
sync_is_deleted: 'boolean',
},
project_permissions: {
sync_is_deleted: 'boolean',
},
notification_preferences: {
enabled: 'boolean',
sync_is_deleted: 'boolean',
},
refresh_tokens: {
is_impersonation: 'boolean',
sync_is_deleted: 'boolean',
},
connected_accounts: {
sync_is_deleted: 'boolean',
},
};
async function pushRowsToClickhouse(

View File

@ -522,11 +522,13 @@ it("has limited grants", async ({ expect }) => {
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON SQLite FROM limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON URL FROM limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW DATABASES ON default.* TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.connected_accounts TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.contact_channels TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.email_outboxes TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.events TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.notification_preferences TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.project_permissions TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.refresh_tokens TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.session_replays TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_invitations TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_member_profiles TO limited_user" },
@ -570,6 +572,10 @@ it("can see only some tables", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{
"database": "default",
"name": "connected_accounts",
},
{
"database": "default",
"name": "contact_channels",
@ -590,6 +596,10 @@ it("can see only some tables", async ({ expect }) => {
"database": "default",
"name": "project_permissions",
},
{
"database": "default",
"name": "refresh_tokens",
},
{
"database": "default",
"name": "session_replays",
@ -631,11 +641,13 @@ it("SHOW TABLES should have the correct tables", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{ "name": "connected_accounts" },
{ "name": "contact_channels" },
{ "name": "email_outboxes" },
{ "name": "events" },
{ "name": "notification_preferences" },
{ "name": "project_permissions" },
{ "name": "refresh_tokens" },
{ "name": "session_replays" },
{ "name": "team_invitations" },
{ "name": "team_member_profiles" },
@ -1122,11 +1134,13 @@ it("shows grants", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.connected_accounts TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.contact_channels TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.email_outboxes TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.events TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.notification_preferences TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.project_permissions TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.refresh_tokens TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.session_replays TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_invitations TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_member_profiles TO limited_user" },

View File

@ -13,10 +13,14 @@ import {
verifyNotInExternalDb,
waitForSyncedContactChannel,
waitForSyncedContactChannelDeletion,
waitForSyncedConnectedAccount,
waitForSyncedConnectedAccountDeletion,
waitForSyncedData,
waitForSyncedDeletion,
waitForSyncedEmailOutbox,
waitForSyncedEmailOutboxByStatus,
waitForSyncedRefreshToken,
waitForSyncedRefreshTokenDeletion,
waitForSyncedSessionReplay,
waitForSyncedTeam,
waitForSyncedTeamDeletion,
@ -1689,4 +1693,204 @@ describe.sequential('External DB Sync - Basic Tests', () => {
});
}, TEST_TIMEOUT);
/**
* What it does:
* - Signs up a user (which creates a refresh token), waits for it to sync to the external DB.
*
* Why it matters:
* - Validates that refresh tokens are synced to external databases.
*/
test('Refresh token sync to external DB', async ({ expect }) => {
const dbName = 'refresh_token_sync';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: "postgres",
connectionString,
},
}, { config: { magic_link_enabled: true } });
const signUpRes = await Auth.Otp.signIn();
// List sessions to get the session (refresh token) ID
const listRes = await niceBackendFetch("/api/v1/auth/sessions", {
accessType: "client",
method: "GET",
query: { user_id: signUpRes.userId },
});
expect(listRes.status).toBe(200);
expect(listRes.body.items.length).toBeGreaterThanOrEqual(1);
const sessionId = listRes.body.items[0].id;
const client = dbManager.getClient(dbName);
await waitForSyncedRefreshToken(client, sessionId);
const res = await client.query(`SELECT * FROM "refresh_tokens" WHERE "id" = $1`, [sessionId]);
expect(res.rows.length).toBe(1);
expect(res.rows[0].user_id).toBe(signUpRes.userId);
expect(res.rows[0].is_impersonation).toBe(false);
expect(res.rows[0].created_at).toBeInstanceOf(Date);
expect(res.rows[0].last_used_at).toBeInstanceOf(Date);
}, TEST_TIMEOUT);
/**
* What it does:
* - Signs up a user, revokes the session, and waits for the deletion to sync.
*
* Why it matters:
* - Validates that refresh token deletions are synced to external databases.
*/
test('Refresh token deletion sync to external DB', async ({ expect }) => {
const dbName = 'refresh_token_delete_sync';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: "postgres",
connectionString,
},
}, { config: { magic_link_enabled: true } });
const signUpRes = await Auth.Otp.signIn();
// Create a second session so we can revoke one
const newSession = await niceBackendFetch("/api/v1/auth/sessions", {
accessType: "server",
method: "POST",
body: { user_id: signUpRes.userId },
});
expect(newSession.status).toBe(200);
// List sessions to find the second session ID
const listRes = await niceBackendFetch("/api/v1/auth/sessions", {
accessType: "client",
method: "GET",
query: { user_id: signUpRes.userId },
});
expect(listRes.status).toBe(200);
const nonCurrentSession = listRes.body.items.find((s: any) => !s.is_current_session);
expect(nonCurrentSession).toBeDefined();
const client = dbManager.getClient(dbName);
await waitForSyncedRefreshToken(client, nonCurrentSession.id);
// Revoke the non-current session
const deleteRes = await niceBackendFetch(`/api/v1/auth/sessions/${nonCurrentSession.id}`, {
accessType: "client",
method: "DELETE",
query: { user_id: signUpRes.userId },
});
expect(deleteRes.status).toBe(200);
await waitForSyncedRefreshTokenDeletion(client, nonCurrentSession.id);
}, TEST_TIMEOUT);
/**
* What it does:
* - Signs up a user, verifies refresh token appears in ClickHouse.
*
* Why it matters:
* - Validates ClickHouse refresh_tokens table sync.
*/
test('Refresh token sync to ClickHouse', async ({ expect }) => {
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
await InternalApiKey.createAndSetProjectKeys();
const signUpRes = await Auth.Otp.signIn();
const listRes = await niceBackendFetch("/api/v1/auth/sessions", {
accessType: "client",
method: "GET",
query: { user_id: signUpRes.userId },
});
expect(listRes.status).toBe(200);
const sessionId = listRes.body.items[0].id;
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT id, user_id, is_impersonation FROM refresh_tokens WHERE id = {session_id:UUID}",
params: { session_id: sessionId },
});
expect(response.status).toBe(200);
if (response.body.result.length === 1) {
expect(response.body.result[0]).toMatchObject({
id: sessionId,
user_id: signUpRes.userId,
is_impersonation: 0,
});
return;
}
await wait(intervalMs);
}
throw new StackAssertionError(`Timed out waiting for ClickHouse refresh token to sync.`, { response });
}, TEST_TIMEOUT);
/**
* What it does:
* - Signs up a user, verifies connected account appears in ClickHouse.
*
* Why it matters:
* - Validates ClickHouse connected_accounts table sync.
*/
test('Connected account sync to ClickHouse', async ({ expect }) => {
// Use default project (has spotify configured) with analytics keys
await Auth.OAuth.signIn();
await InternalApiKey.createAndSetProjectKeys();
// Get the user ID
const userRes = await niceBackendFetch("/api/v1/users/me", {
accessType: "client",
method: "GET",
});
expect(userRes.status).toBe(200);
const userId = userRes.body.id;
// Create an additional connected account via the oauth-providers API so we have a known ID
const createRes = await niceBackendFetch("/api/v1/oauth-providers", {
accessType: "server",
method: "POST",
body: {
user_id: userId,
provider_config_id: "spotify",
account_id: "ch-test-account-12345",
email: "chuser@example.com",
allow_sign_in: false,
allow_connected_accounts: true,
},
});
expect(createRes.status).toBe(201);
const accountId = createRes.body.id;
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT id, user_id, provider, provider_account_id, email FROM connected_accounts WHERE id = {account_id:UUID}",
params: { account_id: accountId },
});
expect(response.status).toBe(200);
if (response.body.result.length === 1) {
expect(response.body.result[0]).toMatchObject({
id: accountId,
user_id: userId,
provider: "spotify",
provider_account_id: "ch-test-account-12345",
email: "chuser@example.com",
});
return;
}
await wait(intervalMs);
}
throw new StackAssertionError(`Timed out waiting for ClickHouse connected account to sync.`, { response });
}, TEST_TIMEOUT);
});

View File

@ -396,6 +396,54 @@ export async function waitForSyncedNotificationPreferenceDeletion(client: Client
});
}
export async function waitForSyncedRefreshToken(client: Client, refreshTokenId: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "refresh_tokens" WHERE "id" = $1`,
[refreshTokenId],
{
shouldExist: true,
description: `refresh token "${refreshTokenId}" to appear in external DB`,
},
);
}
export async function waitForSyncedRefreshTokenDeletion(client: Client, refreshTokenId: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "refresh_tokens" WHERE "id" = $1`,
[refreshTokenId],
{
shouldExist: false,
description: `refresh token "${refreshTokenId}" to be removed from external DB`,
},
);
}
export async function waitForSyncedConnectedAccount(client: Client, accountId: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "connected_accounts" WHERE "id" = $1`,
[accountId],
{
shouldExist: true,
description: `connected account "${accountId}" to appear in external DB`,
},
);
}
export async function waitForSyncedConnectedAccountDeletion(client: Client, accountId: string) {
await waitForExternalDbRow(
client,
`SELECT * FROM "connected_accounts" WHERE "id" = $1`,
[accountId],
{
shouldExist: false,
description: `connected account "${accountId}" to be removed from external DB`,
},
);
}
export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) {
await waitForExternalDbRow(
client,

View File

@ -2059,4 +2059,382 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
`.trim(),
},
},
"refresh_tokens": {
sourceTables: { "ProjectUserRefreshToken": "ProjectUserRefreshToken" },
targetTable: "refresh_tokens",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "refresh_tokens" (
"id" uuid PRIMARY KEY NOT NULL,
"user_id" uuid NOT NULL,
"created_at" timestamp without time zone NOT NULL,
"last_used_at" timestamp without time zone NOT NULL,
"is_impersonation" boolean NOT NULL DEFAULT false,
"expires_at" timestamp without time zone
);
REVOKE ALL ON "refresh_tokens" FROM PUBLIC;
GRANT SELECT ON "refresh_tokens" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.refresh_tokens (
project_id String,
branch_id String,
id UUID,
user_id UUID,
created_at DateTime64(3, 'UTC'),
last_used_at DateTime64(3, 'UTC'),
is_impersonation UInt8,
expires_at Nullable(DateTime64(3, 'UTC')),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
CREATE TABLE IF NOT EXISTS analytics_internal._stack_sync_metadata (
tenancy_id UUID,
mapping_name String,
last_synced_sequence_id Int64,
updated_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(updated_at)
ORDER BY (tenancy_id, mapping_name);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT *
FROM (
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"ProjectUserRefreshToken"."id" AS "id",
"ProjectUserRefreshToken"."projectUserId" AS "user_id",
"ProjectUserRefreshToken"."createdAt" AS "created_at",
"ProjectUserRefreshToken"."lastActiveAt" AS "last_used_at",
"ProjectUserRefreshToken"."isImpersonation" AS "is_impersonation",
"ProjectUserRefreshToken"."expiresAt" AS "expires_at",
"ProjectUserRefreshToken"."sequenceId" AS "sync_sequence_id",
"ProjectUserRefreshToken"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "ProjectUserRefreshToken"
JOIN "Tenancy" ON "Tenancy"."id" = "ProjectUserRefreshToken"."tenancyId"
WHERE "ProjectUserRefreshToken"."tenancyId" = $1::uuid
UNION ALL
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."deletedAt"::timestamp without time zone AS "last_used_at",
false AS "is_impersonation",
NULL::timestamp without time zone AS "expires_at",
"DeletedRow"."sequenceId" AS "sync_sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
true AS "sync_is_deleted"
FROM "DeletedRow"
JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'ProjectUserRefreshToken'
) AS "_src"
WHERE "sync_sequence_id" IS NOT NULL
AND "sync_sequence_id" > $2::bigint
ORDER BY "sync_sequence_id" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT *
FROM (
SELECT
"ProjectUserRefreshToken"."id" AS "id",
"ProjectUserRefreshToken"."projectUserId" AS "user_id",
"ProjectUserRefreshToken"."createdAt" AS "created_at",
"ProjectUserRefreshToken"."lastActiveAt" AS "last_used_at",
"ProjectUserRefreshToken"."isImpersonation" AS "is_impersonation",
"ProjectUserRefreshToken"."expiresAt" AS "expires_at",
"ProjectUserRefreshToken"."sequenceId" AS "sequence_id",
"ProjectUserRefreshToken"."tenancyId",
false AS "is_deleted"
FROM "ProjectUserRefreshToken"
WHERE "ProjectUserRefreshToken"."tenancyId" = $1::uuid
UNION ALL
SELECT
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."deletedAt"::timestamp without time zone AS "last_used_at",
false AS "is_impersonation",
NULL::timestamp without time zone AS "expires_at",
"DeletedRow"."sequenceId" AS "sequence_id",
"DeletedRow"."tenancyId",
true AS "is_deleted"
FROM "DeletedRow"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'ProjectUserRefreshToken'
) AS "_src"
WHERE "sequence_id" IS NOT NULL
AND "sequence_id" > $2::bigint
ORDER BY "sequence_id" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "id",
$2::uuid AS "user_id",
$3::timestamp without time zone AS "created_at",
$4::timestamp without time zone AS "last_used_at",
$5::boolean AS "is_impersonation",
$6::timestamp without time zone AS "expires_at",
$7::bigint AS "sequence_id",
$8::boolean AS "is_deleted",
$9::text AS "mapping_name"
),
deleted AS (
DELETE FROM "refresh_tokens" rt
USING params p
WHERE p."is_deleted" = true AND rt."id" = p."id"
RETURNING 1
),
upserted AS (
INSERT INTO "refresh_tokens" (
"id",
"user_id",
"created_at",
"last_used_at",
"is_impersonation",
"expires_at"
)
SELECT
p."id",
p."user_id",
p."created_at",
p."last_used_at",
p."is_impersonation",
p."expires_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("id") DO UPDATE SET
"user_id" = EXCLUDED."user_id",
"created_at" = EXCLUDED."created_at",
"last_used_at" = EXCLUDED."last_used_at",
"is_impersonation" = EXCLUDED."is_impersonation",
"expires_at" = EXCLUDED."expires_at"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
"connected_accounts": {
sourceTables: { "ProjectUserOAuthAccount": "ProjectUserOAuthAccount" },
targetTable: "connected_accounts",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "connected_accounts" (
"id" uuid PRIMARY KEY NOT NULL,
"user_id" uuid NOT NULL,
"provider" text NOT NULL,
"provider_account_id" text NOT NULL,
"email" text,
"created_at" timestamp without time zone NOT NULL
);
REVOKE ALL ON "connected_accounts" FROM PUBLIC;
GRANT SELECT ON "connected_accounts" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.connected_accounts (
project_id String,
branch_id String,
id UUID,
user_id UUID,
provider String,
provider_account_id String,
email Nullable(String),
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
CREATE TABLE IF NOT EXISTS analytics_internal._stack_sync_metadata (
tenancy_id UUID,
mapping_name String,
last_synced_sequence_id Int64,
updated_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(updated_at)
ORDER BY (tenancy_id, mapping_name);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT *
FROM (
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"ProjectUserOAuthAccount"."id" AS "id",
"ProjectUserOAuthAccount"."projectUserId" AS "user_id",
"ProjectUserOAuthAccount"."configOAuthProviderId" AS "provider",
"ProjectUserOAuthAccount"."providerAccountId" AS "provider_account_id",
"ProjectUserOAuthAccount"."email" AS "email",
"ProjectUserOAuthAccount"."createdAt" AS "created_at",
"ProjectUserOAuthAccount"."sequenceId" AS "sync_sequence_id",
"ProjectUserOAuthAccount"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "ProjectUserOAuthAccount"
JOIN "Tenancy" ON "Tenancy"."id" = "ProjectUserOAuthAccount"."tenancyId"
WHERE "ProjectUserOAuthAccount"."tenancyId" = $1::uuid
AND "ProjectUserOAuthAccount"."projectUserId" IS NOT NULL
UNION ALL
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id",
NULL::text AS "provider",
NULL::text AS "provider_account_id",
NULL::text AS "email",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sync_sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
true AS "sync_is_deleted"
FROM "DeletedRow"
JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'ProjectUserOAuthAccount'
) AS "_src"
WHERE "sync_sequence_id" IS NOT NULL
AND "sync_sequence_id" > $2::bigint
ORDER BY "sync_sequence_id" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT *
FROM (
SELECT
"ProjectUserOAuthAccount"."id" AS "id",
"ProjectUserOAuthAccount"."projectUserId" AS "user_id",
"ProjectUserOAuthAccount"."configOAuthProviderId" AS "provider",
"ProjectUserOAuthAccount"."providerAccountId" AS "provider_account_id",
"ProjectUserOAuthAccount"."email" AS "email",
"ProjectUserOAuthAccount"."createdAt" AS "created_at",
"ProjectUserOAuthAccount"."sequenceId" AS "sequence_id",
"ProjectUserOAuthAccount"."tenancyId",
false AS "is_deleted"
FROM "ProjectUserOAuthAccount"
WHERE "ProjectUserOAuthAccount"."tenancyId" = $1::uuid
AND "ProjectUserOAuthAccount"."projectUserId" IS NOT NULL
UNION ALL
SELECT
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
("DeletedRow"."data"->>'projectUserId')::uuid AS "user_id",
NULL::text AS "provider",
NULL::text AS "provider_account_id",
NULL::text AS "email",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sequence_id",
"DeletedRow"."tenancyId",
true AS "is_deleted"
FROM "DeletedRow"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'ProjectUserOAuthAccount'
) AS "_src"
WHERE "sequence_id" IS NOT NULL
AND "sequence_id" > $2::bigint
ORDER BY "sequence_id" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "id",
$2::uuid AS "user_id",
$3::text AS "provider",
$4::text AS "provider_account_id",
$5::text AS "email",
$6::timestamp without time zone AS "created_at",
$7::bigint AS "sequence_id",
$8::boolean AS "is_deleted",
$9::text AS "mapping_name"
),
deleted AS (
DELETE FROM "connected_accounts" ca
USING params p
WHERE p."is_deleted" = true AND ca."id" = p."id"
RETURNING 1
),
upserted AS (
INSERT INTO "connected_accounts" (
"id",
"user_id",
"provider",
"provider_account_id",
"email",
"created_at"
)
SELECT
p."id",
p."user_id",
p."provider",
p."provider_account_id",
p."email",
p."created_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("id") DO UPDATE SET
"user_id" = EXCLUDED."user_id",
"provider" = EXCLUDED."provider",
"provider_account_id" = EXCLUDED."provider_account_id",
"email" = EXCLUDED."email",
"created_at" = EXCLUDED."created_at"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
} as const;