replace trigger

This commit is contained in:
Bilal Godil 2026-02-03 17:20:10 -08:00
parent 3b9c22e175
commit c0a3f7af6d
8 changed files with 244 additions and 162 deletions

View File

@ -103,145 +103,3 @@ CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("sh
-- SPLIT_STATEMENT_SENTINEL
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId");
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- Creates function that sets shouldUpdateSequenceId to TRUE whenever a row is updated.
-- This marks the row for re-syncing to external databases after any change.
CREATE FUNCTION reset_sequence_id_on_update()
RETURNS TRIGGER AS $$
BEGIN
NEW."shouldUpdateSequenceId" := TRUE;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
-- Creates triggers that automatically mark rows for re-syncing when they are updated.
-- Only triggers when shouldUpdateSequenceId is currently FALSE to avoid unnecessary updates.
CREATE TRIGGER mark_should_update_sequence_id_project_user
BEFORE UPDATE ON "ProjectUser"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_should_update_sequence_id_contact_channel
BEFORE UPDATE ON "ContactChannel"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_should_update_sequence_id_deleted_row
BEFORE UPDATE ON "DeletedRow"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- Marks the related ProjectUser for re-sync when a ContactChannel changes.
CREATE FUNCTION mark_project_user_on_contact_channel_change()
RETURNS TRIGGER AS $function$
BEGIN
UPDATE "ProjectUser"
SET "shouldUpdateSequenceId" = TRUE
WHERE "tenancyId" = NEW."tenancyId"
AND "projectUserId" = NEW."projectUserId";
RETURN NEW;
END;
$function$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_project_user_on_contact_channel_insert
AFTER INSERT ON "ContactChannel"
FOR EACH ROW
EXECUTE FUNCTION mark_project_user_on_contact_channel_change();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_project_user_on_contact_channel_update
AFTER UPDATE ON "ContactChannel"
FOR EACH ROW
WHEN (OLD."tenancyId" = NEW."tenancyId" AND OLD."projectUserId" = NEW."projectUserId")
EXECUTE FUNCTION mark_project_user_on_contact_channel_change();
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- Marks the related ProjectUser for re-sync when a ContactChannel is deleted.
CREATE FUNCTION mark_project_user_on_contact_channel_delete()
RETURNS TRIGGER AS $function$
BEGIN
UPDATE "ProjectUser"
SET "shouldUpdateSequenceId" = TRUE
WHERE "tenancyId" = OLD."tenancyId"
AND "projectUserId" = OLD."projectUserId";
RETURN OLD;
END;
$function$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_project_user_on_contact_channel_delete
AFTER DELETE ON "ContactChannel"
FOR EACH ROW
EXECUTE FUNCTION mark_project_user_on_contact_channel_delete();
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- Creates function that logs deleted rows to the DeletedRow table with their full data.
-- Extracts the primary key and row data so external databases can process the deletion.
CREATE FUNCTION log_deleted_row()
RETURNS TRIGGER AS $function$
DECLARE
row_data jsonb;
pk jsonb := '{}'::jsonb;
col record;
BEGIN
row_data := to_jsonb(OLD);
FOR col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID
AND i.indisprimary
LOOP
pk := pk || jsonb_build_object(col.attname, row_data -> col.attname);
END LOOP;
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
VALUES (
gen_random_uuid(),
OLD."tenancyId",
TG_TABLE_NAME,
pk,
row_data,
NOW(),
TRUE
);
RETURN OLD;
END;
$function$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
-- Creates triggers that automatically log deleted rows to DeletedRow table before deletion.
-- Runs before the row is deleted so all data is still available to be logged.
CREATE TRIGGER log_deleted_row_project_user
BEFORE DELETE ON "ProjectUser"
FOR EACH ROW
EXECUTE FUNCTION log_deleted_row();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER log_deleted_row_contact_channel
BEFORE DELETE ON "ContactChannel"
FOR EACH ROW
EXECUTE FUNCTION log_deleted_row();

View File

@ -1,5 +1,6 @@
import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryById } from "@/lib/contact-channel";
import { normalizeEmail } from "@/lib/emails";
import { markProjectUserForExternalDbSync, recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { ensureContactChannelDoesNotExists, ensureContactChannelExists } from "@/lib/request-checks";
import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client";
import { createCrudHandlers } from "@/route-handlers/crud-handler";
@ -121,6 +122,11 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl
});
}
await markProjectUserForExternalDbSync(tx, {
tenancyId: auth.tenancy.id,
projectUserId: data.user_id,
});
return await tx.contactChannel.findUnique({
where: {
tenancyId_projectUserId_id: {
@ -188,7 +194,7 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl
});
}
return await tx.contactChannel.update({
const updated = await tx.contactChannel.update({
where: {
tenancyId_projectUserId_id: {
tenancyId: auth.tenancy.id,
@ -196,13 +202,20 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl
id: params.contact_channel_id || throwErr("Missing contact channel id"),
},
},
data: {
data: withExternalDbSyncUpdate({
value: value,
isVerified: data.is_verified ?? (value ? false : undefined), // if value is updated and is_verified is not provided, set to false
usedForAuth: data.used_for_auth !== undefined ? (data.used_for_auth ? 'TRUE' : null) : undefined,
isPrimary: data.is_primary !== undefined ? (data.is_primary ? 'TRUE' : null) : undefined,
},
}),
});
await markProjectUserForExternalDbSync(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
return updated;
});
return contactChannelToCrud(updatedContactChannel);
@ -224,6 +237,13 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl
contactChannelId: params.contact_channel_id || throwErr("Missing contact channel id"),
});
await recordExternalDbSyncDeletion(tx, {
tableName: "ContactChannel",
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
contactChannelId: params.contact_channel_id || throwErr("Missing contact channel id"),
});
await tx.contactChannel.delete({
where: {
tenancyId_projectUserId_id: {
@ -233,6 +253,11 @@ export const contactChannelsCrudHandlers = createLazyProxy(() => createCrudHandl
},
},
});
await markProjectUserForExternalDbSync(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
});
},
onList: async ({ query, auth }) => {

View File

@ -1,4 +1,5 @@
import { sendEmailFromDefaultTemplate } from "@/lib/emails";
import { markProjectUserForExternalDbSync, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { getSoleTenancyFromProjectBranch } from "@/lib/tenancies";
import { getPrismaClientForTenancy } from "@/prisma-client";
import { createVerificationCodeHandler } from "@/route-handlers/verification-code-handler";
@ -68,9 +69,14 @@ export const contactChannelVerificationCodeHandler = createVerificationCodeHandl
await prisma.contactChannel.update({
where: uniqueKeys,
data: {
data: withExternalDbSyncUpdate({
isVerified: true,
}
}),
});
await markProjectUserForExternalDbSync(prisma, {
tenancyId: tenancy.id,
projectUserId: data.user_id,
});
return {

View File

@ -208,7 +208,6 @@ export const GET = createSmartRouteHandler({
}
while (performance.now() - startTime < maxDurationMs) {
console.log("poller-iteration", performance.now() - startTime);
const pendingRequests = await claimPendingRequests();
if (stopWhenIdle && pendingRequests.length === 0) {

View File

@ -2,6 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client";
import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config";
import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel";
import { normalizeEmail } from "@/lib/emails";
import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { grantDefaultProjectPermissions } from "@/lib/permissions";
import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
@ -934,9 +935,9 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
isPrimary: "TRUE",
},
},
data: {
data: withExternalDbSyncUpdate({
isVerified: data.primary_email_verified,
},
}),
});
}
@ -952,9 +953,9 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
isPrimary: "TRUE",
},
},
data: {
data: withExternalDbSyncUpdate({
usedForAuth: primaryEmailAuthEnabled ? BooleanTrue.TRUE : null,
},
}),
});
}
@ -1130,7 +1131,7 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
projectUserId: params.user_id,
},
},
data: {
data: withExternalDbSyncUpdate({
displayName: data.display_name === undefined ? undefined : (data.display_name || null),
clientMetadata: data.client_metadata === null ? Prisma.JsonNull : data.client_metadata,
clientReadOnlyMetadata: data.client_read_only_metadata === null ? Prisma.JsonNull : data.client_read_only_metadata,
@ -1142,7 +1143,7 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
restrictedByAdmin: data.restricted_by_admin ?? undefined,
restrictedByAdminReason: restrictedByAdminReason,
restrictedByAdminPrivateDetails: restrictedByAdminPrivateDetails,
},
}),
include: userFullInclude,
});
@ -1189,6 +1190,17 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
},
});
await recordExternalDbSyncDeletion(tx, {
tableName: "ProjectUser",
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await recordExternalDbSyncContactChannelDeletionsForUser(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await tx.projectUser.delete({
where: {
tenancyId_projectUserId: {

View File

@ -1,4 +1,5 @@
import { BooleanTrue, ContactChannelType } from "@/generated/prisma/client";
import { markProjectUserForExternalDbSync, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { normalizeEmail } from "./emails";
import { PrismaTransaction } from "./types";
@ -34,9 +35,13 @@ export async function demoteAllContactChannelsToNonPrimary(
type: options.type,
isPrimary: BooleanTrue.TRUE,
},
data: {
data: withExternalDbSyncUpdate({
isPrimary: null,
},
}),
});
await markProjectUserForExternalDbSync(tx, {
tenancyId: options.tenancyId,
projectUserId: options.projectUserId,
});
}
@ -100,10 +105,14 @@ export async function setContactChannelAsPrimaryById(
id: options.contactChannelId,
},
},
data: {
data: withExternalDbSyncUpdate({
isPrimary: BooleanTrue.TRUE,
...options.additionalUpdates,
},
}),
});
await markProjectUserForExternalDbSync(tx, {
tenancyId: options.tenancyId,
projectUserId: options.projectUserId,
});
}
@ -141,10 +150,14 @@ export async function setContactChannelAsPrimaryByValue(
value: options.value,
},
},
data: {
data: withExternalDbSyncUpdate({
isPrimary: BooleanTrue.TRUE,
...options.additionalUpdates,
},
}),
});
await markProjectUserForExternalDbSync(tx, {
tenancyId: options.tenancyId,
projectUserId: options.projectUserId,
});
}

View File

@ -1,5 +1,7 @@
import { Tenancy } from "@/lib/tenancies";
import type { PrismaTransaction } from "@/lib/types";
import { getPrismaClientForTenancy, PrismaClientWithReplica } from "@/prisma-client";
import { Prisma } from "@/generated/prisma/client";
import { DEFAULT_DB_SYNC_MAPPINGS } from "@stackframe/stack-shared/dist/config/db-sync-mappings";
import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
@ -24,6 +26,172 @@ function assertUuid(value: unknown, label: string): asserts value is string {
}
}
type ExternalDbSyncClient = PrismaTransaction | PrismaClientWithReplica;
type ExternalDbSyncTarget =
| {
tableName: "ProjectUser",
tenancyId: string,
projectUserId: string,
}
| {
tableName: "ContactChannel",
tenancyId: string,
projectUserId: string,
contactChannelId: string,
};
export function withExternalDbSyncUpdate<T extends object>(data: T): T & { shouldUpdateSequenceId: true } {
return {
...data,
shouldUpdateSequenceId: true,
};
}
export async function markProjectUserForExternalDbSync(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
}
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
await tx.projectUser.update({
where: {
tenancyId_projectUserId: {
tenancyId: options.tenancyId,
projectUserId: options.projectUserId,
},
},
data: {
shouldUpdateSequenceId: true,
},
});
}
export async function recordExternalDbSyncDeletion(
tx: ExternalDbSyncClient,
target: ExternalDbSyncTarget,
): Promise<void> {
assertUuid(target.tenancyId, "tenancyId");
assertUuid(target.projectUserId, "projectUserId");
if (target.tableName === "ProjectUser") {
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ProjectUser',
jsonb_build_object('tenancyId', "tenancyId", 'projectUserId', "projectUserId"),
to_jsonb("ProjectUser".*),
NOW(),
TRUE
FROM "ProjectUser"
WHERE "tenancyId" = ${target.tenancyId}::uuid
AND "projectUserId" = ${target.projectUserId}::uuid
FOR UPDATE
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for ProjectUser, got ${insertedCount}.`
);
}
return;
}
assertUuid(target.contactChannelId, "contactChannelId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ContactChannel',
jsonb_build_object(
'tenancyId',
"tenancyId",
'projectUserId',
"projectUserId",
'id',
"id"
),
to_jsonb("ContactChannel".*),
NOW(),
TRUE
FROM "ContactChannel"
WHERE "tenancyId" = ${target.tenancyId}::uuid
AND "projectUserId" = ${target.projectUserId}::uuid
AND "id" = ${target.contactChannelId}::uuid
FOR UPDATE
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for ContactChannel, got ${insertedCount}.`
);
}
}
export async function recordExternalDbSyncContactChannelDeletionsForUser(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'ContactChannel',
jsonb_build_object(
'tenancyId',
"tenancyId",
'projectUserId',
"projectUserId",
'id',
"id"
),
to_jsonb("ContactChannel".*),
NOW(),
TRUE
FROM "ContactChannel"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "projectUserId" = ${options.projectUserId}::uuid
FOR UPDATE
`);
}
type PgErrorLike = {
code?: string,
constraint?: string,

View File

@ -1,5 +1,6 @@
import { usersCrudHandlers } from '@/app/api/latest/users/crud';
import { getPrismaClientForTenancy, globalPrismaClient } from '@/prisma-client';
import { withExternalDbSyncUpdate } from '@/lib/external-db-sync';
import { KnownErrors } from '@stackframe/stack-shared';
import { yupBoolean, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { AccessTokenPayload } from '@stackframe/stack-shared/dist/sessions';
@ -244,9 +245,9 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: Refres
projectUserId: options.refreshTokenObj.projectUserId,
},
},
data: {
data: withExternalDbSyncUpdate({
lastActiveAt: now,
},
}),
}),
globalPrismaClient.projectUserRefreshToken.update({
where: {