diff --git a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql index 5bd53c09f..7b24254d5 100644 --- a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql +++ b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql @@ -103,145 +103,3 @@ CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("sh -- SPLIT_STATEMENT_SENTINEL CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId"); - --- SPLIT_STATEMENT_SENTINEL --- SINGLE_STATEMENT_SENTINEL --- Creates function that sets shouldUpdateSequenceId to TRUE whenever a row is updated. --- This marks the row for re-syncing to external databases after any change. -CREATE FUNCTION reset_sequence_id_on_update() -RETURNS TRIGGER AS $$ -BEGIN - NEW."shouldUpdateSequenceId" := TRUE; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; - --- SPLIT_STATEMENT_SENTINEL --- Creates triggers that automatically mark rows for re-syncing when they are updated. --- Only triggers when shouldUpdateSequenceId is currently FALSE to avoid unnecessary updates. -CREATE TRIGGER mark_should_update_sequence_id_project_user -BEFORE UPDATE ON "ProjectUser" -FOR EACH ROW -WHEN (OLD."shouldUpdateSequenceId" = FALSE) -EXECUTE FUNCTION reset_sequence_id_on_update(); - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER mark_should_update_sequence_id_contact_channel -BEFORE UPDATE ON "ContactChannel" -FOR EACH ROW -WHEN (OLD."shouldUpdateSequenceId" = FALSE) -EXECUTE FUNCTION reset_sequence_id_on_update(); - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER mark_should_update_sequence_id_deleted_row -BEFORE UPDATE ON "DeletedRow" -FOR EACH ROW -WHEN (OLD."shouldUpdateSequenceId" = FALSE) -EXECUTE FUNCTION reset_sequence_id_on_update(); - --- SPLIT_STATEMENT_SENTINEL --- SINGLE_STATEMENT_SENTINEL --- Marks the related ProjectUser for re-sync when a ContactChannel changes. -CREATE FUNCTION mark_project_user_on_contact_channel_change() -RETURNS TRIGGER AS $function$ -BEGIN - UPDATE "ProjectUser" - SET "shouldUpdateSequenceId" = TRUE - WHERE "tenancyId" = NEW."tenancyId" - AND "projectUserId" = NEW."projectUserId"; - RETURN NEW; -END; -$function$ LANGUAGE plpgsql; - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER mark_project_user_on_contact_channel_insert -AFTER INSERT ON "ContactChannel" -FOR EACH ROW -EXECUTE FUNCTION mark_project_user_on_contact_channel_change(); - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER mark_project_user_on_contact_channel_update -AFTER UPDATE ON "ContactChannel" -FOR EACH ROW -WHEN (OLD."tenancyId" = NEW."tenancyId" AND OLD."projectUserId" = NEW."projectUserId") -EXECUTE FUNCTION mark_project_user_on_contact_channel_change(); - --- SPLIT_STATEMENT_SENTINEL --- SINGLE_STATEMENT_SENTINEL --- Marks the related ProjectUser for re-sync when a ContactChannel is deleted. -CREATE FUNCTION mark_project_user_on_contact_channel_delete() -RETURNS TRIGGER AS $function$ -BEGIN - UPDATE "ProjectUser" - SET "shouldUpdateSequenceId" = TRUE - WHERE "tenancyId" = OLD."tenancyId" - AND "projectUserId" = OLD."projectUserId"; - RETURN OLD; -END; -$function$ LANGUAGE plpgsql; - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER mark_project_user_on_contact_channel_delete -AFTER DELETE ON "ContactChannel" -FOR EACH ROW -EXECUTE FUNCTION mark_project_user_on_contact_channel_delete(); - --- SPLIT_STATEMENT_SENTINEL --- SINGLE_STATEMENT_SENTINEL --- Creates function that logs deleted rows to the DeletedRow table with their full data. --- Extracts the primary key and row data so external databases can process the deletion. -CREATE FUNCTION log_deleted_row() -RETURNS TRIGGER AS $function$ -DECLARE - row_data jsonb; - pk jsonb := '{}'::jsonb; - col record; -BEGIN - row_data := to_jsonb(OLD); - - FOR col IN - SELECT a.attname - FROM pg_index i - JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) - WHERE i.indrelid = TG_RELID - AND i.indisprimary - LOOP - pk := pk || jsonb_build_object(col.attname, row_data -> col.attname); - END LOOP; - - INSERT INTO "DeletedRow" ( - "id", - "tenancyId", - "tableName", - "primaryKey", - "data", - "deletedAt", - "shouldUpdateSequenceId" - ) - VALUES ( - gen_random_uuid(), - OLD."tenancyId", - TG_TABLE_NAME, - pk, - row_data, - NOW(), - TRUE - ); - - RETURN OLD; -END; -$function$ LANGUAGE plpgsql; - --- SPLIT_STATEMENT_SENTINEL --- Creates triggers that automatically log deleted rows to DeletedRow table before deletion. --- Runs before the row is deleted so all data is still available to be logged. -CREATE TRIGGER log_deleted_row_project_user -BEFORE DELETE ON "ProjectUser" -FOR EACH ROW -EXECUTE FUNCTION log_deleted_row(); - --- SPLIT_STATEMENT_SENTINEL -CREATE TRIGGER log_deleted_row_contact_channel -BEFORE DELETE ON "ContactChannel" -FOR EACH ROW -EXECUTE FUNCTION log_deleted_row(); diff --git a/apps/backend/src/app/api/latest/contact-channels/crud.tsx b/apps/backend/src/app/api/latest/contact-channels/crud.tsx index abc0edaa1..d5f3081f9 100644 --- a/apps/backend/src/app/api/latest/contact-channels/crud.tsx +++ b/apps/backend/src/app/api/latest/contact-channels/crud.tsx @@ -1,5 +1,6 @@ import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryById } from "@/lib/contact-channel"; import { normalizeEmail } from "@/lib/emails"; +import { markProjectUserForExternalDbSync, recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { ensureContactChannelDoesNotExists, ensureContactChannelExists } from "@/lib/request-checks"; import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client"; import { createCrudHandlers } from "@/route-handlers/crud-handler"; @@ -121,6 +122,11 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl }); } + await markProjectUserForExternalDbSync(tx, { + tenancyId: auth.tenancy.id, + projectUserId: data.user_id, + }); + return await tx.contactChannel.findUnique({ where: { tenancyId_projectUserId_id: { @@ -188,7 +194,7 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl }); } - return await tx.contactChannel.update({ + const updated = await tx.contactChannel.update({ where: { tenancyId_projectUserId_id: { tenancyId: auth.tenancy.id, @@ -196,13 +202,20 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl id: params.contact_channel_id || throwErr("Missing contact channel id"), }, }, - data: { + data: withExternalDbSyncUpdate({ value: value, isVerified: data.is_verified ?? (value ? false : undefined), // if value is updated and is_verified is not provided, set to false usedForAuth: data.used_for_auth !== undefined ? (data.used_for_auth ? 'TRUE' : null) : undefined, isPrimary: data.is_primary !== undefined ? (data.is_primary ? 'TRUE' : null) : undefined, - }, + }), }); + + await markProjectUserForExternalDbSync(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + + return updated; }); return contactChannelToCrud(updatedContactChannel); @@ -224,6 +237,13 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl contactChannelId: params.contact_channel_id || throwErr("Missing contact channel id"), }); + await recordExternalDbSyncDeletion(tx, { + tableName: "ContactChannel", + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + contactChannelId: params.contact_channel_id || throwErr("Missing contact channel id"), + }); + await tx.contactChannel.delete({ where: { tenancyId_projectUserId_id: { @@ -233,6 +253,11 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl }, }, }); + + await markProjectUserForExternalDbSync(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); }); }, onList: async ({ query, auth }) => { diff --git a/apps/backend/src/app/api/latest/contact-channels/verify/verification-code-handler.tsx b/apps/backend/src/app/api/latest/contact-channels/verify/verification-code-handler.tsx index 53d981fcf..fa2dfef55 100644 --- a/apps/backend/src/app/api/latest/contact-channels/verify/verification-code-handler.tsx +++ b/apps/backend/src/app/api/latest/contact-channels/verify/verification-code-handler.tsx @@ -1,4 +1,5 @@ import { sendEmailFromDefaultTemplate } from "@/lib/emails"; +import { markProjectUserForExternalDbSync, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { getSoleTenancyFromProjectBranch } from "@/lib/tenancies"; import { getPrismaClientForTenancy } from "@/prisma-client"; import { createVerificationCodeHandler } from "@/route-handlers/verification-code-handler"; @@ -68,9 +69,14 @@ export const contactChannelVerificationCodeHandler = createVerificationCodeHandl await prisma.contactChannel.update({ where: uniqueKeys, - data: { + data: withExternalDbSyncUpdate({ isVerified: true, - } + }), + }); + + await markProjectUserForExternalDbSync(prisma, { + tenancyId: tenancy.id, + projectUserId: data.user_id, }); return { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index 822980596..b027e5ce0 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -208,7 +208,6 @@ export const GET = createSmartRouteHandler({ } while (performance.now() - startTime < maxDurationMs) { - console.log("poller-iteration", performance.now() - startTime); const pendingRequests = await claimPendingRequests(); if (stopWhenIdle && pendingRequests.length === 0) { diff --git a/apps/backend/src/app/api/latest/users/crud.tsx b/apps/backend/src/app/api/latest/users/crud.tsx index 49f4641b0..d76b80e62 100644 --- a/apps/backend/src/app/api/latest/users/crud.tsx +++ b/apps/backend/src/app/api/latest/users/crud.tsx @@ -2,6 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client"; import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config"; import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel"; import { normalizeEmail } from "@/lib/emails"; +import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { grantDefaultProjectPermissions } from "@/lib/permissions"; import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks"; import { Tenancy } from "@/lib/tenancies"; @@ -934,9 +935,9 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC isPrimary: "TRUE", }, }, - data: { + data: withExternalDbSyncUpdate({ isVerified: data.primary_email_verified, - }, + }), }); } @@ -952,9 +953,9 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC isPrimary: "TRUE", }, }, - data: { + data: withExternalDbSyncUpdate({ usedForAuth: primaryEmailAuthEnabled ? BooleanTrue.TRUE : null, - }, + }), }); } @@ -1130,7 +1131,7 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC projectUserId: params.user_id, }, }, - data: { + data: withExternalDbSyncUpdate({ displayName: data.display_name === undefined ? undefined : (data.display_name || null), clientMetadata: data.client_metadata === null ? Prisma.JsonNull : data.client_metadata, clientReadOnlyMetadata: data.client_read_only_metadata === null ? Prisma.JsonNull : data.client_read_only_metadata, @@ -1142,7 +1143,7 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC restrictedByAdmin: data.restricted_by_admin ?? undefined, restrictedByAdminReason: restrictedByAdminReason, restrictedByAdminPrivateDetails: restrictedByAdminPrivateDetails, - }, + }), include: userFullInclude, }); @@ -1189,6 +1190,17 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC }, }); + await recordExternalDbSyncDeletion(tx, { + tableName: "ProjectUser", + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + + await recordExternalDbSyncContactChannelDeletionsForUser(tx, { + tenancyId: auth.tenancy.id, + projectUserId: params.user_id, + }); + await tx.projectUser.delete({ where: { tenancyId_projectUserId: { diff --git a/apps/backend/src/lib/contact-channel.tsx b/apps/backend/src/lib/contact-channel.tsx index 7b90457a4..1a8836a1e 100644 --- a/apps/backend/src/lib/contact-channel.tsx +++ b/apps/backend/src/lib/contact-channel.tsx @@ -1,4 +1,5 @@ import { BooleanTrue, ContactChannelType } from "@/generated/prisma/client"; +import { markProjectUserForExternalDbSync, withExternalDbSyncUpdate } from "@/lib/external-db-sync"; import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; import { normalizeEmail } from "./emails"; import { PrismaTransaction } from "./types"; @@ -34,9 +35,13 @@ export async function demoteAllContactChannelsToNonPrimary( type: options.type, isPrimary: BooleanTrue.TRUE, }, - data: { + data: withExternalDbSyncUpdate({ isPrimary: null, - }, + }), + }); + await markProjectUserForExternalDbSync(tx, { + tenancyId: options.tenancyId, + projectUserId: options.projectUserId, }); } @@ -100,10 +105,14 @@ export async function setContactChannelAsPrimaryById( id: options.contactChannelId, }, }, - data: { + data: withExternalDbSyncUpdate({ isPrimary: BooleanTrue.TRUE, ...options.additionalUpdates, - }, + }), + }); + await markProjectUserForExternalDbSync(tx, { + tenancyId: options.tenancyId, + projectUserId: options.projectUserId, }); } @@ -141,10 +150,14 @@ export async function setContactChannelAsPrimaryByValue( value: options.value, }, }, - data: { + data: withExternalDbSyncUpdate({ isPrimary: BooleanTrue.TRUE, ...options.additionalUpdates, - }, + }), + }); + await markProjectUserForExternalDbSync(tx, { + tenancyId: options.tenancyId, + projectUserId: options.projectUserId, }); } diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 2fcab363e..57f89127a 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -1,5 +1,7 @@ import { Tenancy } from "@/lib/tenancies"; +import type { PrismaTransaction } from "@/lib/types"; import { getPrismaClientForTenancy, PrismaClientWithReplica } from "@/prisma-client"; +import { Prisma } from "@/generated/prisma/client"; import { DEFAULT_DB_SYNC_MAPPINGS } from "@stackframe/stack-shared/dist/config/db-sync-mappings"; import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema"; import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors"; @@ -24,6 +26,172 @@ function assertUuid(value: unknown, label: string): asserts value is string { } } +type ExternalDbSyncClient = PrismaTransaction | PrismaClientWithReplica; + +type ExternalDbSyncTarget = + | { + tableName: "ProjectUser", + tenancyId: string, + projectUserId: string, + } + | { + tableName: "ContactChannel", + tenancyId: string, + projectUserId: string, + contactChannelId: string, + }; + +export function withExternalDbSyncUpdate(data: T): T & { shouldUpdateSequenceId: true } { + return { + ...data, + shouldUpdateSequenceId: true, + }; +} + +export async function markProjectUserForExternalDbSync( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + projectUserId: string, + } +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.projectUserId, "projectUserId"); + await tx.projectUser.update({ + where: { + tenancyId_projectUserId: { + tenancyId: options.tenancyId, + projectUserId: options.projectUserId, + }, + }, + data: { + shouldUpdateSequenceId: true, + }, + }); +} + +export async function recordExternalDbSyncDeletion( + tx: ExternalDbSyncClient, + target: ExternalDbSyncTarget, +): Promise { + assertUuid(target.tenancyId, "tenancyId"); + assertUuid(target.projectUserId, "projectUserId"); + + if (target.tableName === "ProjectUser") { + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ProjectUser', + jsonb_build_object('tenancyId', "tenancyId", 'projectUserId', "projectUserId"), + to_jsonb("ProjectUser".*), + NOW(), + TRUE + FROM "ProjectUser" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "projectUserId" = ${target.projectUserId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for ProjectUser, got ${insertedCount}.` + ); + } + return; + } + + assertUuid(target.contactChannelId, "contactChannelId"); + const insertedCount = await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ContactChannel', + jsonb_build_object( + 'tenancyId', + "tenancyId", + 'projectUserId', + "projectUserId", + 'id', + "id" + ), + to_jsonb("ContactChannel".*), + NOW(), + TRUE + FROM "ContactChannel" + WHERE "tenancyId" = ${target.tenancyId}::uuid + AND "projectUserId" = ${target.projectUserId}::uuid + AND "id" = ${target.contactChannelId}::uuid + FOR UPDATE + `); + + if (insertedCount !== 1) { + throw new StackAssertionError( + `Expected to insert 1 DeletedRow entry for ContactChannel, got ${insertedCount}.` + ); + } +} + +export async function recordExternalDbSyncContactChannelDeletionsForUser( + tx: ExternalDbSyncClient, + options: { + tenancyId: string, + projectUserId: string, + }, +): Promise { + assertUuid(options.tenancyId, "tenancyId"); + assertUuid(options.projectUserId, "projectUserId"); + + await tx.$executeRaw(Prisma.sql` + INSERT INTO "DeletedRow" ( + "id", + "tenancyId", + "tableName", + "primaryKey", + "data", + "deletedAt", + "shouldUpdateSequenceId" + ) + SELECT + gen_random_uuid(), + "tenancyId", + 'ContactChannel', + jsonb_build_object( + 'tenancyId', + "tenancyId", + 'projectUserId', + "projectUserId", + 'id', + "id" + ), + to_jsonb("ContactChannel".*), + NOW(), + TRUE + FROM "ContactChannel" + WHERE "tenancyId" = ${options.tenancyId}::uuid + AND "projectUserId" = ${options.projectUserId}::uuid + FOR UPDATE + `); +} + type PgErrorLike = { code?: string, constraint?: string, diff --git a/apps/backend/src/lib/tokens.tsx b/apps/backend/src/lib/tokens.tsx index deda644dd..0f5f96eea 100644 --- a/apps/backend/src/lib/tokens.tsx +++ b/apps/backend/src/lib/tokens.tsx @@ -1,5 +1,6 @@ import { usersCrudHandlers } from '@/app/api/latest/users/crud'; import { getPrismaClientForTenancy, globalPrismaClient } from '@/prisma-client'; +import { withExternalDbSyncUpdate } from '@/lib/external-db-sync'; import { KnownErrors } from '@stackframe/stack-shared'; import { yupBoolean, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields"; import { AccessTokenPayload } from '@stackframe/stack-shared/dist/sessions'; @@ -244,9 +245,9 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: Refres projectUserId: options.refreshTokenObj.projectUserId, }, }, - data: { + data: withExternalDbSyncUpdate({ lastActiveAt: now, - }, + }), }), globalPrismaClient.projectUserRefreshToken.update({ where: {