From 8910138be9caf4b8038234333a4cdf273c05d596 Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Tue, 3 Feb 2026 09:19:58 -0800 Subject: [PATCH] fixes --- .github/workflows/e2e-api-tests.yaml | 12 +- .../e2e-custom-base-port-api-tests.yaml | 12 +- .../e2e-source-of-truth-api-tests.yaml | 12 +- ...rt-dev-and-test-with-custom-base-port.yaml | 4 +- .github/workflows/restart-dev-and-test.yaml | 4 +- .../setup-tests-with-custom-base-port.yaml | 4 +- .github/workflows/setup-tests.yaml | 4 +- .../migration.sql | 17 +- apps/backend/prisma/schema.prisma | 2 + .../external-db-sync/sequencer/route.ts | 155 ++---------------- .../external-db-sync/sync-engine/route.tsx | 4 +- .../backend/src/lib/external-db-sync-queue.ts | 18 +- 12 files changed, 73 insertions(+), 175 deletions(-) diff --git a/.github/workflows/e2e-api-tests.yaml b/.github/workflows/e2e-api-tests.yaml index 825d47df5..6f0698dbd 100644 --- a/.github/workflows/e2e-api-tests.yaml +++ b/.github/workflows/e2e-api-tests.yaml @@ -176,26 +176,26 @@ jobs: done - name: Run tests (excluding external DB sync) - run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts" + run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts" + run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts" + run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Verify data integrity run: pnpm run verify-data-integrity --no-bail diff --git a/.github/workflows/e2e-custom-base-port-api-tests.yaml b/.github/workflows/e2e-custom-base-port-api-tests.yaml index 62eb55da0..1928bc6f0 100644 --- a/.github/workflows/e2e-custom-base-port-api-tests.yaml +++ b/.github/workflows/e2e-custom-base-port-api-tests.yaml @@ -169,26 +169,26 @@ jobs: done - name: Run tests (excluding external DB sync) - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Verify data integrity run: pnpm run verify-data-integrity --no-bail diff --git a/.github/workflows/e2e-source-of-truth-api-tests.yaml b/.github/workflows/e2e-source-of-truth-api-tests.yaml index a50c9fdc2..8fe5ce674 100644 --- a/.github/workflows/e2e-source-of-truth-api-tests.yaml +++ b/.github/workflows/e2e-source-of-truth-api-tests.yaml @@ -176,26 +176,26 @@ jobs: done - name: Run tests (excluding external DB sync) - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 1) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Run tests again (excluding external DB sync, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --exclude "**/external-db-sync*.test.ts" + run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests again (single worker, attempt 2) if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' - run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Verify data integrity run: pnpm run verify-data-integrity --no-bail diff --git a/.github/workflows/restart-dev-and-test-with-custom-base-port.yaml b/.github/workflows/restart-dev-and-test-with-custom-base-port.yaml index f6433004d..5d3ce47ca 100644 --- a/.github/workflows/restart-dev-and-test-with-custom-base-port.yaml +++ b/.github/workflows/restart-dev-and-test-with-custom-base-port.yaml @@ -41,10 +41,10 @@ jobs: run: pnpm run restart-dev-environment - name: Run tests (excluding external DB sync) - run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts" + run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Print dev server logs run: cat dev-server.log.untracked.txt diff --git a/.github/workflows/restart-dev-and-test.yaml b/.github/workflows/restart-dev-and-test.yaml index bf66e9d01..a3339b161 100644 --- a/.github/workflows/restart-dev-and-test.yaml +++ b/.github/workflows/restart-dev-and-test.yaml @@ -40,10 +40,10 @@ jobs: run: pnpm run restart-dev-environment - name: Run tests (excluding external DB sync) - run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts" + run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync - name: Print dev server logs run: cat dev-server.log.untracked.txt diff --git a/.github/workflows/setup-tests-with-custom-base-port.yaml b/.github/workflows/setup-tests-with-custom-base-port.yaml index ceb325d73..376a1bd79 100644 --- a/.github/workflows/setup-tests-with-custom-base-port.yaml +++ b/.github/workflows/setup-tests-with-custom-base-port.yaml @@ -50,7 +50,7 @@ jobs: wait-for: 120s log-output-if: true - name: Run tests (excluding external DB sync) - run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts" + run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync diff --git a/.github/workflows/setup-tests.yaml b/.github/workflows/setup-tests.yaml index 84b4bacc6..8a0629820 100644 --- a/.github/workflows/setup-tests.yaml +++ b/.github/workflows/setup-tests.yaml @@ -48,7 +48,7 @@ jobs: wait-for: 120s log-output-if: true - name: Run tests (excluding external DB sync) - run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts" + run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync" - name: Run external DB sync tests (single worker) - run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts + run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync diff --git a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql index 902e910d4..1e19c2105 100644 --- a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql +++ b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql @@ -23,6 +23,14 @@ CREATE UNIQUE INDEX "ContactChannel_sequenceId_key" ON "ContactChannel"("sequen -- SPLIT_STATEMENT_SENTINEL CREATE UNIQUE INDEX "ProjectUser_sequenceId_key" ON "ProjectUser"("sequenceId"); +-- SPLIT_STATEMENT_SENTINEL +-- Creates composite indexes on (tenancyId, sequenceId) for efficient sync-engine queries. +-- These allow fast lookups of rows by tenant ordered by sequence number. +CREATE INDEX "ProjectUser_tenancyId_sequenceId_idx" ON "ProjectUser"("tenancyId", "sequenceId"); + +-- SPLIT_STATEMENT_SENTINEL +CREATE INDEX "ContactChannel_tenancyId_sequenceId_idx" ON "ContactChannel"("tenancyId", "sequenceId"); + -- SPLIT_STATEMENT_SENTINEL -- Creates OutgoingRequest table to queue sync requests to external databases. -- Each request stores the QStash options for making HTTP requests and tracks when fulfillment started. @@ -81,14 +89,15 @@ ALTER TABLE "ContactChannel" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NUL ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE; -- SPLIT_STATEMENT_SENTINEL --- Creates partial indexes on shouldUpdateSequenceId to quickly find rows that need updates. -CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE; +-- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates +-- and support ORDER BY tenancyId for less fragmented updates. +CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; -- SPLIT_STATEMENT_SENTINEL -CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE; +CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; -- SPLIT_STATEMENT_SENTINEL -CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE; +CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; -- SPLIT_STATEMENT_SENTINEL -- SINGLE_STATEMENT_SENTINEL diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 7c4e5cb4e..375aa15d1 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -225,6 +225,7 @@ model ProjectUser { @@index([tenancyId, displayName(sort: Desc)], name: "ProjectUser_displayName_desc") @@index([tenancyId, createdAt(sort: Asc)], name: "ProjectUser_createdAt_asc") @@index([tenancyId, createdAt(sort: Desc)], name: "ProjectUser_createdAt_desc") + @@index([tenancyId, sequenceId], name: "ProjectUser_tenancyId_sequenceId_idx") // Partial index for external db sync backfill lives in migration SQL. } @@ -290,6 +291,7 @@ model ContactChannel { @@unique([tenancyId, projectUserId, type, value]) // only one contact channel per project with the same value and type can be used for auth @@unique([tenancyId, type, value, usedForAuth]) + @@index([tenancyId, sequenceId], name: "ContactChannel_tenancyId_sequenceId_idx") // Partial index for external db sync backfill lives in migration SQL (WHERE shouldUpdateSequenceId = TRUE). } diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 74009516f..bad8d3fb6 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -1,4 +1,4 @@ -import { getPrismaClientForTenancy, globalPrismaClient, type PrismaClientTransaction } from "@/prisma-client"; +import { globalPrismaClient } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import { yupBoolean, @@ -10,10 +10,8 @@ import { import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; -import { getTenancy, type Tenancy } from "@/lib/tenancies"; -import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue"; +import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; -const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; function parseMaxDurationMs(value: string | undefined): number { @@ -32,11 +30,6 @@ function parseStopWhenIdle(value: string | undefined): boolean { throw new StatusError(400, "stopWhenIdle must be 'true' or 'false'"); } -function assertUuid(value: unknown, label: string): asserts value is string { - if (typeof value !== "string" || value.trim().length === 0 || !UUID_REGEX.test(value)) { - throw new StatusError(500, `${label} must be a valid UUID. Received: ${JSON.stringify(value)}`); - } -} // Assigns sequence IDs to rows that need them and queues sync requests for affected tenants. // Processes up to 1000 rows at a time from each table. @@ -48,6 +41,7 @@ async function backfillSequenceIds(): Promise { FROM "ProjectUser" WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL + ORDER BY "tenancyId" LIMIT 1000 FOR UPDATE SKIP LOCKED ), @@ -63,12 +57,9 @@ async function backfillSequenceIds(): Promise { SELECT DISTINCT "tenancyId" FROM updated_rows `; - // Enqueue sync for each affected tenant - for (const { tenancyId } of projectUserTenants) { - assertUuid(tenancyId, "projectUserTenants.tenancyId"); - await enqueueExternalDbSync(tenancyId); - } + // Enqueue sync for all affected tenants in a single batch query if (projectUserTenants.length > 0) { + await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId)); didUpdate = true; } @@ -78,6 +69,7 @@ async function backfillSequenceIds(): Promise { FROM "ContactChannel" WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL + ORDER BY "tenancyId" LIMIT 1000 FOR UPDATE SKIP LOCKED ), @@ -94,11 +86,8 @@ async function backfillSequenceIds(): Promise { SELECT DISTINCT "tenancyId" FROM updated_rows `; - for (const { tenancyId } of contactChannelTenants) { - assertUuid(tenancyId, "contactChannelTenants.tenancyId"); - await enqueueExternalDbSync(tenancyId); - } if (contactChannelTenants.length > 0) { + await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId)); didUpdate = true; } @@ -108,6 +97,7 @@ async function backfillSequenceIds(): Promise { FROM "DeletedRow" WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL + ORDER BY "tenancyId" LIMIT 1000 FOR UPDATE SKIP LOCKED ), @@ -122,125 +112,17 @@ async function backfillSequenceIds(): Promise { SELECT DISTINCT "tenancyId" FROM updated_rows `; - for (const { tenancyId } of deletedRowTenants) { - assertUuid(tenancyId, "deletedRowTenants.tenancyId"); - await enqueueExternalDbSync(tenancyId); - } if (deletedRowTenants.length > 0) { + await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId)); didUpdate = true; } return didUpdate; } -async function backfillSequenceIdsForTenancy(prisma: PrismaClientTransaction, tenancyId: string): Promise { - assertUuid(tenancyId, "tenancyId"); - let didUpdate = false; - - const projectUserRows = await prisma.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "tenancyId", "projectUserId" - FROM "ProjectUser" - WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL) - AND "tenancyId" = ${tenancyId}::uuid - LIMIT 1000 - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "ProjectUser" pu - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE pu."tenancyId" = r."tenancyId" - AND pu."projectUserId" = r."projectUserId" - RETURNING pu."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; - if (projectUserRows.length > 0) { - didUpdate = true; - } - - const contactChannelRows = await prisma.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "tenancyId", "projectUserId", "id" - FROM "ContactChannel" - WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL) - AND "tenancyId" = ${tenancyId}::uuid - LIMIT 1000 - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "ContactChannel" cc - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE cc."tenancyId" = r."tenancyId" - AND cc."projectUserId" = r."projectUserId" - AND cc."id" = r."id" - RETURNING cc."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; - if (contactChannelRows.length > 0) { - didUpdate = true; - } - - const deletedRowRows = await prisma.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "id", "tenancyId" - FROM "DeletedRow" - WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL) - AND "tenancyId" = ${tenancyId}::uuid - LIMIT 1000 - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "DeletedRow" dr - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE dr."id" = r."id" - RETURNING dr."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; - if (deletedRowRows.length > 0) { - didUpdate = true; - } - - return didUpdate; -} - -async function getNonHostedTenancies(): Promise { - const tenancyIds = await globalPrismaClient.tenancy.findMany({ - select: { id: true }, - }); - - const tenancies: Tenancy[] = []; - for (const { id } of tenancyIds) { - const tenancy = await getTenancy(id); - if (!tenancy) continue; - if (tenancy.config.sourceOfTruth.type !== "hosted") { - tenancies.push(tenancy); - } - } - - return tenancies; -} - -async function backfillSequenceIdsForNonHostedTenancies(tenancies: Tenancy[]): Promise { - let didUpdate = false; - for (const tenancy of tenancies) { - const prisma = await getPrismaClientForTenancy(tenancy); - const tenancyDidUpdate = await backfillSequenceIdsForTenancy(prisma, tenancy.id); - if (tenancyDidUpdate) { - await enqueueExternalDbSync(tenancy.id); - didUpdate = true; - } - } - return didUpdate; -} +// TODO: If we ever need to support non-hosted source-of-truth tenancies again, +// we'll need to implement a scalable way to iterate over them (pagination, etc.) +// instead of loading all tenancies into memory at once. export const GET = createSmartRouteHandler({ metadata: { @@ -275,10 +157,6 @@ export const GET = createSmartRouteHandler({ throw new StatusError(401, "Unauthorized"); } - let nonHostedTenancies = await getNonHostedTenancies(); - let lastTenancyRefreshMs = performance.now(); - const tenancyRefreshIntervalMs = 5_000; - const startTime = performance.now(); const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); @@ -288,13 +166,8 @@ export const GET = createSmartRouteHandler({ while (performance.now() - startTime < maxDurationMs) { try { - if (performance.now() - lastTenancyRefreshMs >= tenancyRefreshIntervalMs) { - nonHostedTenancies = await getNonHostedTenancies(); - lastTenancyRefreshMs = performance.now(); - } - const didUpdateHosted = await backfillSequenceIds(); - const didUpdateNonHosted = await backfillSequenceIdsForNonHostedTenancies(nonHostedTenancies); - if (stopWhenIdle && !didUpdateHosted && !didUpdateNonHosted) { + const didUpdate = await backfillSequenceIds(); + if (stopWhenIdle && !didUpdate) { break; } } catch (error) { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx index 8b39b8207..9ac3cf659 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx @@ -32,8 +32,8 @@ export const POST = createSmartRouteHandler({ const tenancy = await getTenancy(tenancyId); if (!tenancy) { -console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found.`); -throw new StatusError(404, `Tenancy ${tenancyId} not found.`); + console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`); + throw new StatusError(400, `Tenancy ${tenancyId} not found.`); } await syncExternalDatabases(tenancy); diff --git a/apps/backend/src/lib/external-db-sync-queue.ts b/apps/backend/src/lib/external-db-sync-queue.ts index 256d01615..db5de626b 100644 --- a/apps/backend/src/lib/external-db-sync-queue.ts +++ b/apps/backend/src/lib/external-db-sync-queue.ts @@ -12,6 +12,19 @@ function assertUuid(value: unknown, label: string): asserts value is string { // Queues a sync request for a specific tenant if one isn't already pending. export async function enqueueExternalDbSync(tenancyId: string): Promise { assertUuid(tenancyId, "tenancyId"); + await enqueueExternalDbSyncBatch([tenancyId]); +} + +// Queues sync requests for multiple tenants in a single query. +// Only inserts for tenants that don't already have a pending request. +export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise { + if (tenancyIds.length === 0) return; + + for (const id of tenancyIds) { + assertUuid(id, "tenancyId"); + } + + // Use unnest to pass array of UUIDs and insert all in one query await globalPrismaClient.$executeRaw` INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt") SELECT @@ -19,14 +32,15 @@ export async function enqueueExternalDbSync(tenancyId: string): Promise { NOW(), json_build_object( 'url', '/api/latest/internal/external-db-sync/sync-engine', - 'body', json_build_object('tenancyId', ${tenancyId}::uuid) + 'body', json_build_object('tenancyId', t.tenancy_id) ), NULL + FROM unnest(${tenancyIds}::uuid[]) AS t(tenancy_id) WHERE NOT EXISTS ( SELECT 1 FROM "OutgoingRequest" WHERE "startedFulfillingAt" IS NULL - AND ("qstashOptions"->'body'->>'tenancyId')::uuid = ${tenancyId}::uuid + AND ("qstashOptions"->'body'->>'tenancyId')::uuid = t.tenancy_id ) `; }