mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-13 21:01:21 +08:00
fixes
This commit is contained in:
parent
686a1e6d2f
commit
8910138be9
12
.github/workflows/e2e-api-tests.yaml
vendored
12
.github/workflows/e2e-api-tests.yaml
vendored
@ -176,26 +176,26 @@ jobs:
|
||||
done
|
||||
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run ${{ matrix.freestyle-mode == 'prod' && '--min-workers=1 --max-workers=1' || '' }} --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Verify data integrity
|
||||
run: pnpm run verify-data-integrity --no-bail
|
||||
|
||||
@ -169,26 +169,26 @@ jobs:
|
||||
done
|
||||
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Verify data integrity
|
||||
run: pnpm run verify-data-integrity --no-bail
|
||||
|
||||
@ -176,26 +176,26 @@ jobs:
|
||||
done
|
||||
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 1)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Run tests again (excluding external DB sync, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm test run --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests again (single worker, attempt 2)
|
||||
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm test run --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Verify data integrity
|
||||
run: pnpm run verify-data-integrity --no-bail
|
||||
|
||||
@ -41,10 +41,10 @@ jobs:
|
||||
run: pnpm run restart-dev-environment
|
||||
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Print dev server logs
|
||||
run: cat dev-server.log.untracked.txt
|
||||
|
||||
4
.github/workflows/restart-dev-and-test.yaml
vendored
4
.github/workflows/restart-dev-and-test.yaml
vendored
@ -40,10 +40,10 @@ jobs:
|
||||
run: pnpm run restart-dev-environment
|
||||
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
- name: Print dev server logs
|
||||
run: cat dev-server.log.untracked.txt
|
||||
|
||||
@ -50,7 +50,7 @@ jobs:
|
||||
wait-for: 120s
|
||||
log-output-if: true
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
4
.github/workflows/setup-tests.yaml
vendored
4
.github/workflows/setup-tests.yaml
vendored
@ -48,7 +48,7 @@ jobs:
|
||||
wait-for: 120s
|
||||
log-output-if: true
|
||||
- name: Run tests (excluding external DB sync)
|
||||
run: pnpm run test run --reporter=verbose --exclude "**/external-db-sync*.test.ts"
|
||||
run: pnpm run test run --reporter=verbose --exclude "apps/e2e/tests/backend/endpoints/api/v1/external-db-sync"
|
||||
|
||||
- name: Run external DB sync tests (single worker)
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 **/external-db-sync*.test.ts
|
||||
run: pnpm run test run --reporter=verbose --min-workers=1 --max-workers=1 apps/e2e/tests/backend/endpoints/api/v1/external-db-sync
|
||||
|
||||
@ -23,6 +23,14 @@ CREATE UNIQUE INDEX "ContactChannel_sequenceId_key" ON "ContactChannel"("sequen
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE UNIQUE INDEX "ProjectUser_sequenceId_key" ON "ProjectUser"("sequenceId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- Creates composite indexes on (tenancyId, sequenceId) for efficient sync-engine queries.
|
||||
-- These allow fast lookups of rows by tenant ordered by sequence number.
|
||||
CREATE INDEX "ProjectUser_tenancyId_sequenceId_idx" ON "ProjectUser"("tenancyId", "sequenceId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "ContactChannel_tenancyId_sequenceId_idx" ON "ContactChannel"("tenancyId", "sequenceId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- Creates OutgoingRequest table to queue sync requests to external databases.
|
||||
-- Each request stores the QStash options for making HTTP requests and tracks when fulfillment started.
|
||||
@ -81,14 +89,15 @@ ALTER TABLE "ContactChannel" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NUL
|
||||
ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE;
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- Creates partial indexes on shouldUpdateSequenceId to quickly find rows that need updates.
|
||||
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
-- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates
|
||||
-- and support ORDER BY tenancyId for less fragmented updates.
|
||||
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- SINGLE_STATEMENT_SENTINEL
|
||||
|
||||
@ -225,6 +225,7 @@ model ProjectUser {
|
||||
@@index([tenancyId, displayName(sort: Desc)], name: "ProjectUser_displayName_desc")
|
||||
@@index([tenancyId, createdAt(sort: Asc)], name: "ProjectUser_createdAt_asc")
|
||||
@@index([tenancyId, createdAt(sort: Desc)], name: "ProjectUser_createdAt_desc")
|
||||
@@index([tenancyId, sequenceId], name: "ProjectUser_tenancyId_sequenceId_idx")
|
||||
// Partial index for external db sync backfill lives in migration SQL.
|
||||
}
|
||||
|
||||
@ -290,6 +291,7 @@ model ContactChannel {
|
||||
@@unique([tenancyId, projectUserId, type, value])
|
||||
// only one contact channel per project with the same value and type can be used for auth
|
||||
@@unique([tenancyId, type, value, usedForAuth])
|
||||
@@index([tenancyId, sequenceId], name: "ContactChannel_tenancyId_sequenceId_idx")
|
||||
// Partial index for external db sync backfill lives in migration SQL (WHERE shouldUpdateSequenceId = TRUE).
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { getPrismaClientForTenancy, globalPrismaClient, type PrismaClientTransaction } from "@/prisma-client";
|
||||
import { globalPrismaClient } from "@/prisma-client";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
import {
|
||||
yupBoolean,
|
||||
@ -10,10 +10,8 @@ import {
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
|
||||
import { getTenancy, type Tenancy } from "@/lib/tenancies";
|
||||
import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue";
|
||||
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
|
||||
|
||||
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
||||
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
|
||||
|
||||
function parseMaxDurationMs(value: string | undefined): number {
|
||||
@ -32,11 +30,6 @@ function parseStopWhenIdle(value: string | undefined): boolean {
|
||||
throw new StatusError(400, "stopWhenIdle must be 'true' or 'false'");
|
||||
}
|
||||
|
||||
function assertUuid(value: unknown, label: string): asserts value is string {
|
||||
if (typeof value !== "string" || value.trim().length === 0 || !UUID_REGEX.test(value)) {
|
||||
throw new StatusError(500, `${label} must be a valid UUID. Received: ${JSON.stringify(value)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Assigns sequence IDs to rows that need them and queues sync requests for affected tenants.
|
||||
// Processes up to 1000 rows at a time from each table.
|
||||
@ -48,6 +41,7 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
FROM "ProjectUser"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
@ -63,12 +57,9 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
|
||||
// Enqueue sync for each affected tenant
|
||||
for (const { tenancyId } of projectUserTenants) {
|
||||
assertUuid(tenancyId, "projectUserTenants.tenancyId");
|
||||
await enqueueExternalDbSync(tenancyId);
|
||||
}
|
||||
// Enqueue sync for all affected tenants in a single batch query
|
||||
if (projectUserTenants.length > 0) {
|
||||
await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId));
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
@ -78,6 +69,7 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
FROM "ContactChannel"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
@ -94,11 +86,8 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
|
||||
for (const { tenancyId } of contactChannelTenants) {
|
||||
assertUuid(tenancyId, "contactChannelTenants.tenancyId");
|
||||
await enqueueExternalDbSync(tenancyId);
|
||||
}
|
||||
if (contactChannelTenants.length > 0) {
|
||||
await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId));
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
@ -108,6 +97,7 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
FROM "DeletedRow"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
@ -122,125 +112,17 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
|
||||
for (const { tenancyId } of deletedRowTenants) {
|
||||
assertUuid(tenancyId, "deletedRowTenants.tenancyId");
|
||||
await enqueueExternalDbSync(tenancyId);
|
||||
}
|
||||
if (deletedRowTenants.length > 0) {
|
||||
await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId));
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
return didUpdate;
|
||||
}
|
||||
|
||||
async function backfillSequenceIdsForTenancy(prisma: PrismaClientTransaction, tenancyId: string): Promise<boolean> {
|
||||
assertUuid(tenancyId, "tenancyId");
|
||||
let didUpdate = false;
|
||||
|
||||
const projectUserRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "tenancyId", "projectUserId"
|
||||
FROM "ProjectUser"
|
||||
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
|
||||
AND "tenancyId" = ${tenancyId}::uuid
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
UPDATE "ProjectUser" pu
|
||||
SET "sequenceId" = nextval('global_seq_id'),
|
||||
"shouldUpdateSequenceId" = FALSE
|
||||
FROM rows_to_update r
|
||||
WHERE pu."tenancyId" = r."tenancyId"
|
||||
AND pu."projectUserId" = r."projectUserId"
|
||||
RETURNING pu."tenancyId"
|
||||
)
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
if (projectUserRows.length > 0) {
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
const contactChannelRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "tenancyId", "projectUserId", "id"
|
||||
FROM "ContactChannel"
|
||||
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
|
||||
AND "tenancyId" = ${tenancyId}::uuid
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
UPDATE "ContactChannel" cc
|
||||
SET "sequenceId" = nextval('global_seq_id'),
|
||||
"shouldUpdateSequenceId" = FALSE
|
||||
FROM rows_to_update r
|
||||
WHERE cc."tenancyId" = r."tenancyId"
|
||||
AND cc."projectUserId" = r."projectUserId"
|
||||
AND cc."id" = r."id"
|
||||
RETURNING cc."tenancyId"
|
||||
)
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
if (contactChannelRows.length > 0) {
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
const deletedRowRows = await prisma.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "id", "tenancyId"
|
||||
FROM "DeletedRow"
|
||||
WHERE ("shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)
|
||||
AND "tenancyId" = ${tenancyId}::uuid
|
||||
LIMIT 1000
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
UPDATE "DeletedRow" dr
|
||||
SET "sequenceId" = nextval('global_seq_id'),
|
||||
"shouldUpdateSequenceId" = FALSE
|
||||
FROM rows_to_update r
|
||||
WHERE dr."id" = r."id"
|
||||
RETURNING dr."tenancyId"
|
||||
)
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
if (deletedRowRows.length > 0) {
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
return didUpdate;
|
||||
}
|
||||
|
||||
async function getNonHostedTenancies(): Promise<Tenancy[]> {
|
||||
const tenancyIds = await globalPrismaClient.tenancy.findMany({
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
const tenancies: Tenancy[] = [];
|
||||
for (const { id } of tenancyIds) {
|
||||
const tenancy = await getTenancy(id);
|
||||
if (!tenancy) continue;
|
||||
if (tenancy.config.sourceOfTruth.type !== "hosted") {
|
||||
tenancies.push(tenancy);
|
||||
}
|
||||
}
|
||||
|
||||
return tenancies;
|
||||
}
|
||||
|
||||
async function backfillSequenceIdsForNonHostedTenancies(tenancies: Tenancy[]): Promise<boolean> {
|
||||
let didUpdate = false;
|
||||
for (const tenancy of tenancies) {
|
||||
const prisma = await getPrismaClientForTenancy(tenancy);
|
||||
const tenancyDidUpdate = await backfillSequenceIdsForTenancy(prisma, tenancy.id);
|
||||
if (tenancyDidUpdate) {
|
||||
await enqueueExternalDbSync(tenancy.id);
|
||||
didUpdate = true;
|
||||
}
|
||||
}
|
||||
return didUpdate;
|
||||
}
|
||||
// TODO: If we ever need to support non-hosted source-of-truth tenancies again,
|
||||
// we'll need to implement a scalable way to iterate over them (pagination, etc.)
|
||||
// instead of loading all tenancies into memory at once.
|
||||
|
||||
export const GET = createSmartRouteHandler({
|
||||
metadata: {
|
||||
@ -275,10 +157,6 @@ export const GET = createSmartRouteHandler({
|
||||
throw new StatusError(401, "Unauthorized");
|
||||
}
|
||||
|
||||
let nonHostedTenancies = await getNonHostedTenancies();
|
||||
let lastTenancyRefreshMs = performance.now();
|
||||
const tenancyRefreshIntervalMs = 5_000;
|
||||
|
||||
const startTime = performance.now();
|
||||
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
|
||||
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
|
||||
@ -288,13 +166,8 @@ export const GET = createSmartRouteHandler({
|
||||
|
||||
while (performance.now() - startTime < maxDurationMs) {
|
||||
try {
|
||||
if (performance.now() - lastTenancyRefreshMs >= tenancyRefreshIntervalMs) {
|
||||
nonHostedTenancies = await getNonHostedTenancies();
|
||||
lastTenancyRefreshMs = performance.now();
|
||||
}
|
||||
const didUpdateHosted = await backfillSequenceIds();
|
||||
const didUpdateNonHosted = await backfillSequenceIdsForNonHostedTenancies(nonHostedTenancies);
|
||||
if (stopWhenIdle && !didUpdateHosted && !didUpdateNonHosted) {
|
||||
const didUpdate = await backfillSequenceIds();
|
||||
if (stopWhenIdle && !didUpdate) {
|
||||
break;
|
||||
}
|
||||
} catch (error) {
|
||||
|
||||
@ -32,8 +32,8 @@ export const POST = createSmartRouteHandler({
|
||||
|
||||
const tenancy = await getTenancy(tenancyId);
|
||||
if (!tenancy) {
|
||||
console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found.`);
|
||||
throw new StatusError(404, `Tenancy ${tenancyId} not found.`);
|
||||
console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`);
|
||||
throw new StatusError(400, `Tenancy ${tenancyId} not found.`);
|
||||
}
|
||||
|
||||
await syncExternalDatabases(tenancy);
|
||||
|
||||
@ -12,6 +12,19 @@ function assertUuid(value: unknown, label: string): asserts value is string {
|
||||
// Queues a sync request for a specific tenant if one isn't already pending.
|
||||
export async function enqueueExternalDbSync(tenancyId: string): Promise<void> {
|
||||
assertUuid(tenancyId, "tenancyId");
|
||||
await enqueueExternalDbSyncBatch([tenancyId]);
|
||||
}
|
||||
|
||||
// Queues sync requests for multiple tenants in a single query.
|
||||
// Only inserts for tenants that don't already have a pending request.
|
||||
export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise<void> {
|
||||
if (tenancyIds.length === 0) return;
|
||||
|
||||
for (const id of tenancyIds) {
|
||||
assertUuid(id, "tenancyId");
|
||||
}
|
||||
|
||||
// Use unnest to pass array of UUIDs and insert all in one query
|
||||
await globalPrismaClient.$executeRaw`
|
||||
INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt")
|
||||
SELECT
|
||||
@ -19,14 +32,15 @@ export async function enqueueExternalDbSync(tenancyId: string): Promise<void> {
|
||||
NOW(),
|
||||
json_build_object(
|
||||
'url', '/api/latest/internal/external-db-sync/sync-engine',
|
||||
'body', json_build_object('tenancyId', ${tenancyId}::uuid)
|
||||
'body', json_build_object('tenancyId', t.tenancy_id)
|
||||
),
|
||||
NULL
|
||||
FROM unnest(${tenancyIds}::uuid[]) AS t(tenancy_id)
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM "OutgoingRequest"
|
||||
WHERE "startedFulfillingAt" IS NULL
|
||||
AND ("qstashOptions"->'body'->>'tenancyId')::uuid = ${tenancyId}::uuid
|
||||
AND ("qstashOptions"->'body'->>'tenancyId')::uuid = t.tenancy_id
|
||||
)
|
||||
`;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user