From 939b1a96c6c56decb86e8b4c0843bb79b9ef2f40 Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Fri, 30 Jan 2026 11:13:52 -0800 Subject: [PATCH] resolve pr comments --- .../migration.sql | 48 ++++++++++++++++++- apps/backend/scripts/run-cron-jobs.ts | 12 +++-- apps/backend/src/lib/external-db-sync.ts | 23 +++++---- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql index 1bca0785e..902e910d4 100644 --- a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql +++ b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql @@ -125,6 +125,53 @@ FOR EACH ROW WHEN (OLD."shouldUpdateSequenceId" = FALSE) EXECUTE FUNCTION reset_sequence_id_on_update(); +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- Marks the related ProjectUser for re-sync when a ContactChannel changes. +CREATE FUNCTION mark_project_user_on_contact_channel_change() +RETURNS TRIGGER AS $function$ +BEGIN + UPDATE "ProjectUser" + SET "shouldUpdateSequenceId" = TRUE + WHERE "tenancyId" = NEW."tenancyId" + AND "projectUserId" = NEW."projectUserId"; + RETURN NEW; +END; +$function$ LANGUAGE plpgsql; + +-- SPLIT_STATEMENT_SENTINEL +CREATE TRIGGER mark_project_user_on_contact_channel_insert +AFTER INSERT ON "ContactChannel" +FOR EACH ROW +EXECUTE FUNCTION mark_project_user_on_contact_channel_change(); + +-- SPLIT_STATEMENT_SENTINEL +CREATE TRIGGER mark_project_user_on_contact_channel_update +AFTER UPDATE ON "ContactChannel" +FOR EACH ROW +WHEN (OLD."tenancyId" = NEW."tenancyId" AND OLD."projectUserId" = NEW."projectUserId") +EXECUTE FUNCTION mark_project_user_on_contact_channel_change(); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- Marks the related ProjectUser for re-sync when a ContactChannel is deleted. +CREATE FUNCTION mark_project_user_on_contact_channel_delete() +RETURNS TRIGGER AS $function$ +BEGIN + UPDATE "ProjectUser" + SET "shouldUpdateSequenceId" = TRUE + WHERE "tenancyId" = OLD."tenancyId" + AND "projectUserId" = OLD."projectUserId"; + RETURN OLD; +END; +$function$ LANGUAGE plpgsql; + +-- SPLIT_STATEMENT_SENTINEL +CREATE TRIGGER mark_project_user_on_contact_channel_delete +AFTER DELETE ON "ContactChannel" +FOR EACH ROW +EXECUTE FUNCTION mark_project_user_on_contact_channel_delete(); + -- SPLIT_STATEMENT_SENTINEL -- SINGLE_STATEMENT_SENTINEL -- Creates function that logs deleted rows to the DeletedRow table with their full data. @@ -185,4 +232,3 @@ BEFORE DELETE ON "ContactChannel" FOR EACH ROW EXECUTE FUNCTION log_deleted_row(); - diff --git a/apps/backend/scripts/run-cron-jobs.ts b/apps/backend/scripts/run-cron-jobs.ts index 4a5f57078..98b9680ce 100644 --- a/apps/backend/scripts/run-cron-jobs.ts +++ b/apps/backend/scripts/run-cron-jobs.ts @@ -1,6 +1,7 @@ import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; -import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; +import { captureError, StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; import { runAsynchronously, wait } from "@stackframe/stack-shared/dist/utils/promises"; +import { Result } from "@stackframe/stack-shared/dist/utils/results"; const endpoints = [ "/api/latest/internal/external-db-sync/sequencer", @@ -13,19 +14,22 @@ async function main() { const baseUrl = `http://localhost:${getEnvVariable('NEXT_PUBLIC_STACK_PORT_PREFIX', '81')}02`; - const run = (endpoint: string) => runAsynchronously(async () => { + const run = async (endpoint: string) => { console.log(`Running ${endpoint}...`); const res = await fetch(`${baseUrl}${endpoint}`, { headers: { 'Authorization': `Bearer ${cronSecret}` }, }); if (!res.ok) throw new StackAssertionError(`Failed to call ${endpoint}: ${res.status} ${res.statusText}\n${await res.text()}`, { res }); console.log(`${endpoint} completed.`); - }); + }; for (const endpoint of endpoints) { runAsynchronously(async () => { while (true) { - run(endpoint); + const runResult = await Result.fromPromise(run(endpoint)); + if (runResult.status === "error") { + captureError("run-cron-jobs", runResult.error); + } // Vercel only guarantees minute-granularity for cron jobs, so we randomize the interval await wait(Math.random() * 120_000); } diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 574524075..1d83722eb 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -4,6 +4,7 @@ import { DEFAULT_DB_SYNC_MAPPINGS } from "@stackframe/stack-shared/dist/config/d import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema"; import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors"; import { omit } from "@stackframe/stack-shared/dist/utils/objects"; +import { Result } from "@stackframe/stack-shared/dist/utils/results"; import { Client } from 'pg'; const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; @@ -215,9 +216,10 @@ async function syncDatabase( ) { assertNonEmptyString(dbId, "dbId"); assertUuid(tenancyId, "tenancyId"); - if (dbConfig.type !== 'postgres') { + const dbType = dbConfig.type; + if (dbType !== 'postgres') { throw new StackAssertionError( - `Unsupported database type '${dbConfig.type}' for external DB ${dbId}. Only 'postgres' is currently supported.` + `Unsupported database type '${String(dbType)}' for external DB ${dbId}. Only 'postgres' is currently supported.` ); } @@ -232,7 +234,7 @@ async function syncDatabase( connectionString: dbConfig.connectionString, }); - try { + const syncResult = await Result.fromPromise((async () => { await externalClient.connect(); // Always use DEFAULT_DB_SYNC_MAPPINGS - users cannot customize mappings @@ -245,17 +247,20 @@ async function syncDatabase( internalPrisma, dbId, tenancyId, - dbConfig.type, + dbType, ); } + })()); - } catch (error) { - await externalClient.end(); - captureError(`external-db-sync-${dbId}`, error); - return; + const closeResult = await Result.fromPromise(externalClient.end()); + if (closeResult.status === "error") { + captureError(`external-db-sync-${dbId}-close`, closeResult.error); } - await externalClient.end(); + if (syncResult.status === "error") { + captureError(`external-db-sync-${dbId}`, syncResult.error); + return; + } }