diff --git a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql index 1e19c2105..5bd53c09f 100644 --- a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql +++ b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql @@ -37,12 +37,17 @@ CREATE INDEX "ContactChannel_tenancyId_sequenceId_idx" ON "ContactChannel"("tena CREATE TABLE "OutgoingRequest" ( "id" UUID NOT NULL, "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "deduplicationKey" TEXT, "qstashOptions" JSONB NOT NULL, "startedFulfillingAt" TIMESTAMP(3), - CONSTRAINT "OutgoingRequest_pkey" PRIMARY KEY ("id") + CONSTRAINT "OutgoingRequest_pkey" PRIMARY KEY ("id"), + CONSTRAINT "OutgoingRequest_deduplicationKey_key" UNIQUE ("deduplicationKey") ); +-- SPLIT_STATEMENT_SENTINEL +CREATE INDEX "OutgoingRequest_startedFulfillingAt_deduplicationKey_idx" ON "OutgoingRequest"("startedFulfillingAt", "deduplicationKey"); + -- SPLIT_STATEMENT_SENTINEL -- Creates composite index on startedFulfillingAt and createdAt for efficient querying of pending requests in order. -- This allows fast lookups of pending requests (WHERE startedFulfillingAt IS NULL) ordered by createdAt. @@ -91,13 +96,13 @@ ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DE -- SPLIT_STATEMENT_SENTINEL -- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates -- and support ORDER BY tenancyId for less fragmented updates. -CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; +CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId"); -- SPLIT_STATEMENT_SENTINEL -CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; +CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId"); -- SPLIT_STATEMENT_SENTINEL -CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE; +CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId"); -- SPLIT_STATEMENT_SENTINEL -- SINGLE_STATEMENT_SENTINEL @@ -240,4 +245,3 @@ CREATE TRIGGER log_deleted_row_contact_channel BEFORE DELETE ON "ContactChannel" FOR EACH ROW EXECUTE FUNCTION log_deleted_row(); - diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 375aa15d1..76ee2cde2 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -1071,8 +1071,11 @@ model OutgoingRequest { qstashOptions Json startedFulfillingAt DateTime? + deduplicationKey String? + @@unique([deduplicationKey]) @@index([startedFulfillingAt, createdAt]) + @@index([startedFulfillingAt, deduplicationKey]) } model DeletedRow { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index ada34c105..822980596 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -1,4 +1,5 @@ import { upstash } from "@/lib/upstash"; +import type { PublishBatchRequest } from "@upstash/qstash"; import { globalPrismaClient, retryTransaction } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import type { OutgoingRequest } from "@/generated/prisma/client"; @@ -10,11 +11,13 @@ import { yupTuple, } from "@stackframe/stack-shared/dist/schema-fields"; import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env"; -import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT"; +const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT"; +const DEFAULT_POLL_CLAIM_LIMIT = 100; function parseMaxDurationMs(value: string | undefined): number { if (!value) return DEFAULT_MAX_DURATION_MS; @@ -36,6 +39,18 @@ function directSyncEnabled(): boolean { return getEnvVariable(DIRECT_SYNC_ENV, "") === "true"; } +function getPollerClaimLimit(): number { + const rawValue = getEnvVariable(POLLER_CLAIM_LIMIT_ENV, ""); + if (!rawValue) return DEFAULT_POLL_CLAIM_LIMIT; + const parsed = Number.parseInt(rawValue, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new StackAssertionError( + `${POLLER_CLAIM_LIMIT_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}` + ); + } + return parsed; +} + function getLocalApiBaseUrl(): string { const prefix = getEnvVariable("NEXT_PUBLIC_STACK_PORT_PREFIX", "81"); return `http://localhost:${prefix}02`; @@ -79,32 +94,37 @@ export const GET = createSmartRouteHandler({ const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); const pollIntervalMs = 50; const staleClaimIntervalMinutes = 5; + const pollerClaimLimit = getPollerClaimLimit(); let totalRequestsProcessed = 0; async function claimPendingRequests(): Promise { - return await retryTransaction(globalPrismaClient, async (tx) => { - const rows = await tx.$queryRaw` + return await globalPrismaClient.$queryRaw` UPDATE "OutgoingRequest" SET "startedFulfillingAt" = NOW() WHERE "id" IN ( SELECT id FROM "OutgoingRequest" WHERE "startedFulfillingAt" IS NULL - OR "startedFulfillingAt" < NOW() - (${staleClaimIntervalMinutes} * INTERVAL '1 minute') ORDER BY "createdAt" - LIMIT 100 + LIMIT ${pollerClaimLimit} FOR UPDATE SKIP LOCKED ) RETURNING *; `; - return rows; - }); } + async function deleteOutgoingRequest(id: string): Promise { await retryTransaction(globalPrismaClient, async (tx) => { await tx.outgoingRequest.delete({ where: { id } }); }); } + + async function deleteOutgoingRequests(ids: string[]): Promise { + if (ids.length === 0) return; + await retryTransaction(globalPrismaClient, async (tx) => { + await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } }); + }); + } async function processRequest(request: OutgoingRequest): Promise { // Prisma JsonValue doesn't carry a precise shape for this JSON blob. const options = request.qstashOptions as any; @@ -112,6 +132,33 @@ export const GET = createSmartRouteHandler({ let fullUrl = new URL(options.url, baseUrl).toString(); + // In dev/test, QStash runs in Docker so "localhost" won't work. + // Replace with "host.docker.internal" to reach the host machine. + // if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { + // const url = new URL(fullUrl); + // if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { + // url.hostname = "host.docker.internal"; + // fullUrl = url.toString(); + // } + // } + + await upstash.publishJSON({ + url: fullUrl, + body: options.body, + flowControl: options.flowControl, + }); + await deleteOutgoingRequest(request.id); + } + + type UpstashRequest = PublishBatchRequest; + + function buildUpstashRequest(request: OutgoingRequest): UpstashRequest { + // Prisma JsonValue doesn't carry a precise shape for this JSON blob. + const options = request.qstashOptions as any; + const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); + + let fullUrl = new URL(options.url, baseUrl).toString(); + // In dev/test, QStash runs in Docker so "localhost" won't work. // Replace with "host.docker.internal" to reach the host machine. if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { @@ -122,27 +169,13 @@ export const GET = createSmartRouteHandler({ } } - if (directSyncEnabled()) { - const directUrl = new URL(options.url, getLocalApiBaseUrl()).toString(); - const res = await fetch(directUrl, { - method: "POST", - headers: { - "content-type": "application/json", - "upstash-signature": "test-bypass", - }, - body: JSON.stringify(options.body), - }); - if (!res.ok) { - throw new StatusError(res.status, `Direct sync failed: ${res.status} ${res.statusText}`); - } - } else { - await upstash.publishJSON({ - url: fullUrl, - body: options.body, - }); - } + const flowControl = options.flowControl as UpstashRequest["flowControl"]; - await deleteOutgoingRequest(request.id); + return { + url: fullUrl, + body: options.body, + ...(flowControl ? { flowControl } : {}), + }; } async function processRequests(requests: OutgoingRequest[]): Promise { @@ -160,19 +193,22 @@ export const GET = createSmartRouteHandler({ return processed; } - const results = await Promise.allSettled(requests.map(processRequest)); - for (const result of results) { - if (result.status === "fulfilled") { - processed++; - continue; - } - captureError("poller-iteration-error", result.reason); - } + if (requests.length === 0) return 0; - return processed; + try { + const batchPayload = requests.map(buildUpstashRequest); + console.log("publishing to QStash batch", { count: batchPayload.length }); + await upstash.batchJSON(batchPayload); + await deleteOutgoingRequests(requests.map((request) => request.id)); + return requests.length; + } catch (error) { + captureError("poller-iteration-error", error); + return 0; + } } while (performance.now() - startTime < maxDurationMs) { + console.log("poller-iteration", performance.now() - startTime); const pendingRequests = await claimPendingRequests(); if (stopWhenIdle && pendingRequests.length === 0) { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index bad8d3fb6..4b71243d5 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -8,11 +8,13 @@ import { yupTuple, } from "@stackframe/stack-shared/dist/schema-fields"; import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; -import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; +const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE"; +const DEFAULT_BATCH_SIZE = 1000; function parseMaxDurationMs(value: string | undefined): number { if (!value) return DEFAULT_MAX_DURATION_MS; @@ -30,19 +32,30 @@ function parseStopWhenIdle(value: string | undefined): boolean { throw new StatusError(400, "stopWhenIdle must be 'true' or 'false'"); } +function getSequencerBatchSize(): number { + const rawValue = getEnvVariable(SEQUENCER_BATCH_SIZE_ENV, ""); + if (!rawValue) return DEFAULT_BATCH_SIZE; + const parsed = Number.parseInt(rawValue, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new StackAssertionError( + `${SEQUENCER_BATCH_SIZE_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}` + ); + } + return parsed; +} + // Assigns sequence IDs to rows that need them and queues sync requests for affected tenants. -// Processes up to 1000 rows at a time from each table. -async function backfillSequenceIds(): Promise { +// Processes up to batchSize rows at a time from each table. +async function backfillSequenceIds(batchSize: number): Promise { let didUpdate = false; const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "tenancyId", "projectUserId" FROM "ProjectUser" WHERE "shouldUpdateSequenceId" = TRUE - OR "sequenceId" IS NULL ORDER BY "tenancyId" - LIMIT 1000 + LIMIT ${batchSize} FOR UPDATE SKIP LOCKED ), updated_rows AS ( @@ -68,9 +81,8 @@ async function backfillSequenceIds(): Promise { SELECT "tenancyId", "projectUserId", "id" FROM "ContactChannel" WHERE "shouldUpdateSequenceId" = TRUE - OR "sequenceId" IS NULL ORDER BY "tenancyId" - LIMIT 1000 + LIMIT ${batchSize} FOR UPDATE SKIP LOCKED ), updated_rows AS ( @@ -96,9 +108,8 @@ async function backfillSequenceIds(): Promise { SELECT "id", "tenancyId" FROM "DeletedRow" WHERE "shouldUpdateSequenceId" = TRUE - OR "sequenceId" IS NULL ORDER BY "tenancyId" - LIMIT 1000 + LIMIT ${batchSize} FOR UPDATE SKIP LOCKED ), updated_rows AS ( @@ -161,12 +172,13 @@ export const GET = createSmartRouteHandler({ const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); const pollIntervalMs = 50; + const batchSize = getSequencerBatchSize(); let iterations = 0; while (performance.now() - startTime < maxDurationMs) { try { - const didUpdate = await backfillSequenceIds(); + const didUpdate = await backfillSequenceIds(batchSize); if (stopWhenIdle && !didUpdate) { break; } diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts new file mode 100644 index 000000000..1b7086706 --- /dev/null +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts @@ -0,0 +1,796 @@ +import { globalPrismaClient } from "@/prisma-client"; +import { Prisma } from "@/generated/prisma/client"; +import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; +import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema"; +import { + adaptSchema, + adminAuthTypeSchema, + yupArray, + yupBoolean, + yupNumber, + yupObject, + yupString, +} from "@stackframe/stack-shared/dist/schema-fields"; +import { errorToNiceString, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors"; +import { Result } from "@stackframe/stack-shared/dist/utils/results"; +import { Client } from "pg"; +import { KnownErrors } from "@stackframe/stack-shared"; + +const STALE_CLAIM_INTERVAL_MINUTES = 5; + +const sequenceStatsSchema = yupObject({ + total: yupString().defined(), + pending: yupString().defined(), + null_sequence_id: yupString().defined(), + min_sequence_id: yupString().nullable().defined(), + max_sequence_id: yupString().nullable().defined(), +}); + +const deletedRowByTableSchema = yupObject({ + table_name: yupString().defined(), + total: yupString().defined(), + pending: yupString().defined(), + null_sequence_id: yupString().defined(), + min_sequence_id: yupString().nullable().defined(), + max_sequence_id: yupString().nullable().defined(), +}); + +const externalDbMetadataSchema = yupObject({ + mapping_name: yupString().defined(), + last_synced_sequence_id: yupString().defined(), + updated_at_millis: yupNumber().nullable().defined(), +}); + +const externalDbMappingStatusSchema = yupObject({ + mapping_id: yupString().defined(), + internal_max_sequence_id: yupString().nullable().defined(), + last_synced_sequence_id: yupString().nullable().defined(), + updated_at_millis: yupNumber().nullable().defined(), + backlog: yupString().nullable().defined(), +}); + +const externalDbSchema = yupObject({ + id: yupString().defined(), + type: yupString().defined(), + connection: yupObject({ + redacted: yupString().nullable().defined(), + host: yupString().nullable().defined(), + port: yupNumber().nullable().defined(), + database: yupString().nullable().defined(), + user: yupString().nullable().defined(), + }).defined(), + status: yupString().oneOf(["ok", "error"]).defined(), + error: yupString().nullable().defined(), + metadata: yupArray(externalDbMetadataSchema).defined(), + users_table: yupObject({ + exists: yupBoolean().defined(), + total_rows: yupString().nullable().defined(), + min_signed_up_at_millis: yupNumber().nullable().defined(), + max_signed_up_at_millis: yupNumber().nullable().defined(), + }).defined(), + mapping_status: yupArray(externalDbMappingStatusSchema).defined(), +}); + +const mappingSchema = yupObject({ + mapping_id: yupString().defined(), + internal_min_sequence_id: yupString().nullable().defined(), + internal_max_sequence_id: yupString().nullable().defined(), + internal_pending_count: yupString().defined(), +}); + +const globalSchema = yupObject({ + tenancies_total: yupString().defined(), + tenancies_with_db_sync: yupString().defined(), + sequencer: yupObject({ + project_users: sequenceStatsSchema.defined(), + contact_channels: sequenceStatsSchema.defined(), + deleted_rows: sequenceStatsSchema.shape({ + by_table: yupArray(deletedRowByTableSchema).defined(), + }).defined(), + }).defined(), + poller: yupObject({ + total: yupString().defined(), + pending: yupString().defined(), + in_flight: yupString().defined(), + stale: yupString().defined(), + oldest_created_at_millis: yupNumber().nullable().defined(), + newest_created_at_millis: yupNumber().nullable().defined(), + }).defined(), + sync_engine: yupObject({ + mappings: yupArray(mappingSchema).defined(), + }).defined(), +}); + +const responseSchema = yupObject({ + statusCode: yupNumber().oneOf([200]).defined(), + bodyType: yupString().oneOf(["json"]).defined(), + body: yupObject({ + ok: yupBoolean().defined(), + generated_at_millis: yupNumber().defined(), + global: globalSchema.nullable().defined(), + tenancy: yupObject({ + id: yupString().defined(), + project_id: yupString().defined(), + branch_id: yupString().defined(), + }).defined(), + sequencer: yupObject({ + project_users: sequenceStatsSchema.defined(), + contact_channels: sequenceStatsSchema.defined(), + deleted_rows: sequenceStatsSchema.shape({ + by_table: yupArray(deletedRowByTableSchema).defined(), + }).defined(), + }).defined(), + poller: yupObject({ + total: yupString().defined(), + pending: yupString().defined(), + in_flight: yupString().defined(), + stale: yupString().defined(), + oldest_created_at_millis: yupNumber().nullable().defined(), + newest_created_at_millis: yupNumber().nullable().defined(), + }).defined(), + sync_engine: yupObject({ + mappings: yupArray(mappingSchema).defined(), + external_databases: yupArray(externalDbSchema).defined(), + }).defined(), + }).defined(), +}); + +type SequenceStatsRow = { + total: unknown, + pending: unknown, + null_sequence_id: unknown, + min_sequence_id: unknown, + max_sequence_id: unknown, +}; + +type DeletedRowStatsRow = SequenceStatsRow & { + table_name: string, +}; + +type OutgoingStatsRow = { + total: unknown, + pending: unknown, + in_flight: unknown, + stale: unknown, + oldest_created_at: unknown, + newest_created_at: unknown, +}; + +type ExternalDbMetadataRow = { + mapping_name: string, + last_synced_sequence_id: unknown, + updated_at: unknown, +}; + +type UsersTableStatsRow = { + total_rows: unknown, + min_signed_up_at: unknown, + max_signed_up_at: unknown, +}; + +type CountRow = { + total: unknown, +}; + +type SequenceStats = ReturnType; +type DeletedRowSummary = SequenceStats & { table_name: string }; + +function toBigIntString(value: unknown): string | null { + if (value === null || value === undefined) return null; + if (typeof value === "bigint") return value.toString(); + if (typeof value === "number" && Number.isFinite(value)) return Math.trunc(value).toString(); + if (typeof value === "string") return value; + return null; +} + +function toBigIntStringOrThrow(value: unknown, label: string): string { + return toBigIntString(value) ?? throwErr(`Expected ${label} to be a bigint-compatible value.`, { value }); +} + +function toMillis(value: unknown): number | null { + if (value === null || value === undefined) return null; + if (value instanceof Date) return value.getTime(); + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "string") { + const parsed = new Date(value); + return Number.isNaN(parsed.getTime()) ? null : parsed.getTime(); + } + return null; +} + +function addBigIntStrings(a: string | null | undefined, b: string | null | undefined): string { + const first = a ? BigInt(a) : 0n; + const second = b ? BigInt(b) : 0n; + return (first + second).toString(); +} + +function minBigIntString(values: Array): string | null { + let minValue: bigint | null = null; + for (const value of values) { + if (!value) continue; + const parsed = BigInt(value); + if (minValue === null || parsed < minValue) { + minValue = parsed; + } + } + return minValue === null ? null : minValue.toString(); +} + +function maxBigIntString(values: Array): string | null { + let maxValue: bigint | null = null; + for (const value of values) { + if (!value) continue; + const parsed = BigInt(value); + if (maxValue === null || parsed > maxValue) { + maxValue = parsed; + } + } + return maxValue === null ? null : maxValue.toString(); +} + +function buildMappingInternalStats( + projectUsersStats: SequenceStats, + deletedRowsByTable: DeletedRowSummary[], +) { + const deletedProjectUserStats = deletedRowsByTable.find((row) => row.table_name === "ProjectUser") ?? null; + + const mappingInternalStats = new Map(); + + const usersMappingMin = minBigIntString([ + projectUsersStats.min_sequence_id, + deletedProjectUserStats?.min_sequence_id, + ]); + const usersMappingMax = maxBigIntString([ + projectUsersStats.max_sequence_id, + deletedProjectUserStats?.max_sequence_id, + ]); + const usersMappingPending = addBigIntStrings( + projectUsersStats.pending, + deletedProjectUserStats?.pending, + ); + + mappingInternalStats.set("users", { + mapping_id: "users", + internal_min_sequence_id: usersMappingMin, + internal_max_sequence_id: usersMappingMax, + internal_pending_count: usersMappingPending, + }); + + const mappings = Array.from(mappingInternalStats.values()); + const mappingStatuses = mappings.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + })); + + return { mappings, mappingStatuses }; +} + +async function fetchInternalStats(tenancyId: string | null) { + const tenancyWhere = tenancyId + ? Prisma.sql`WHERE "tenancyId" = ${tenancyId}::uuid` + : Prisma.sql``; + + const projectUserStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "ProjectUser" + ${tenancyWhere} + `).at(0) ?? throwErr("Project user stats query returned no rows."); + + const contactChannelStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "ContactChannel" + ${tenancyWhere} + `).at(0) ?? throwErr("Contact channel stats query returned no rows."); + + const deletedRowStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "DeletedRow" + ${tenancyWhere} + `).at(0) ?? throwErr("Deleted row stats query returned no rows."); + + const deletedRowsByTableRows = await globalPrismaClient.$queryRaw` + SELECT + "tableName" AS "table_name", + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id", + MIN("sequenceId") AS "min_sequence_id", + MAX("sequenceId") AS "max_sequence_id" + FROM "DeletedRow" + ${tenancyWhere} + GROUP BY "tableName" + ORDER BY "tableName" ASC + `; + + const outgoingTenancyFilter = tenancyId + ? Prisma.sql`AND ("qstashOptions"->'body'->>'tenancyId') = ${tenancyId}` + : Prisma.sql``; + + const outgoingStatsRow = (await globalPrismaClient.$queryRaw` + SELECT + COUNT(*)::bigint AS "total", + COUNT(*) FILTER (WHERE "startedFulfillingAt" IS NULL)::bigint AS "pending", + COUNT(*) FILTER (WHERE "startedFulfillingAt" IS NOT NULL)::bigint AS "in_flight", + COUNT(*) FILTER ( + WHERE "startedFulfillingAt" < NOW() - (${STALE_CLAIM_INTERVAL_MINUTES} * INTERVAL '1 minute') + )::bigint AS "stale", + MIN("createdAt") AS "oldest_created_at", + MAX("createdAt") AS "newest_created_at" + FROM "OutgoingRequest" + WHERE ("qstashOptions"->>'url') = '/api/latest/internal/external-db-sync/sync-engine' + ${outgoingTenancyFilter} + `).at(0) ?? throwErr("Outgoing request stats query returned no rows."); + + const projectUsersStats = formatSequenceStats(projectUserStatsRow); + const contactChannelStats = formatSequenceStats(contactChannelStatsRow); + const deletedRowStats = formatSequenceStats(deletedRowStatsRow); + + const deletedRowsByTable = deletedRowsByTableRows.map((row) => ({ + table_name: row.table_name, + ...formatSequenceStats(row), + })); + + const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, deletedRowsByTable); + + return { + projectUsersStats, + contactChannelStats, + deletedRowStats, + deletedRowsByTable, + outgoingStatsRow, + mappings, + mappingStatuses, + }; +} + +function formatPollerStats(outgoingStats: OutgoingStatsRow) { + return { + total: toBigIntStringOrThrow(outgoingStats.total, "outgoing total"), + pending: toBigIntStringOrThrow(outgoingStats.pending, "outgoing pending"), + in_flight: toBigIntStringOrThrow(outgoingStats.in_flight, "outgoing in_flight"), + stale: toBigIntStringOrThrow(outgoingStats.stale, "outgoing stale"), + oldest_created_at_millis: toMillis(outgoingStats.oldest_created_at), + newest_created_at_millis: toMillis(outgoingStats.newest_created_at), + }; +} + +function formatSequenceStats(row: SequenceStatsRow) { + return { + total: toBigIntStringOrThrow(row.total, "sequence stats total"), + pending: toBigIntStringOrThrow(row.pending, "sequence stats pending"), + null_sequence_id: toBigIntStringOrThrow(row.null_sequence_id, "sequence stats null_sequence_id"), + min_sequence_id: toBigIntString(row.min_sequence_id), + max_sequence_id: toBigIntString(row.max_sequence_id), + }; +} + +function formatError(error: unknown): string { + return errorToNiceString(error); +} + +function parseConnectionString(connectionString: string | null | undefined) { + if (!connectionString) { + return { + redacted: null, + host: null, + port: null, + database: null, + user: null, + }; + } + + const parsed = Result.fromThrowing(() => new URL(connectionString)); + if (parsed.status === "error") { + return { + redacted: null, + host: null, + port: null, + database: null, + user: null, + }; + } + + const url = parsed.data; + const user = url.username ? decodeURIComponent(url.username) : null; + const host = url.hostname || null; + const port = url.port ? Number.parseInt(url.port, 10) : null; + const database = url.pathname ? url.pathname.replace(/^\//, "") : null; + const redacted = `${url.protocol}//${url.username ? encodeURIComponent(url.username) : ""}${url.username ? ":" : ""}${url.password ? "***" : ""}${url.username ? "@" : ""}${url.hostname}${url.port ? ":" + url.port : ""}${url.pathname}${url.search}`; + + return { + redacted, + host, + port: Number.isFinite(port ?? NaN) ? port : null, + database, + user, + }; +} + +async function fetchExternalDatabaseStatus( + dbId: string, + dbConfig: CompleteConfig["dbSync"]["externalDatabases"][string], + mappingStatuses: Array<{ + mapping_id: string, + internal_max_sequence_id: string | null, + }>, +) { + const connection = parseConnectionString(dbConfig.connectionString ?? null); + + if (dbConfig.type !== "postgres") { + return { + id: dbId, + type: String(dbConfig.type), + connection, + status: "error" as const, + error: `Unsupported database type: ${String(dbConfig.type)}`, + metadata: [], + users_table: { + exists: false, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + + if (!dbConfig.connectionString) { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: "Missing connection string", + metadata: [], + users_table: { + exists: false, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + + const client = new Client({ connectionString: dbConfig.connectionString }); + const connectResult = await Result.fromPromise(client.connect()); + if (connectResult.status === "error") { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: formatError(connectResult.error), + metadata: [], + users_table: { + exists: false, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + + let metadata: ExternalDbMetadataRow[] = []; + let metadataExists = false; + let usersExists = false; + let usersStats: UsersTableStatsRow | null = null; + + try { + const metadataExistsResult = await Result.fromPromise(client.query<{ exists: boolean }>(` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = '_stack_sync_metadata' + ) AS "exists"; + `)); + if (metadataExistsResult.status === "error") { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: formatError(metadataExistsResult.error), + metadata: [], + users_table: { + exists: false, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + metadataExists = metadataExistsResult.data.rows[0]?.exists === true; + + const usersExistsResult = await Result.fromPromise(client.query<{ exists: boolean }>(` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'users' + ) AS "exists"; + `)); + if (usersExistsResult.status === "error") { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: formatError(usersExistsResult.error), + metadata: [], + users_table: { + exists: false, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + usersExists = usersExistsResult.data.rows[0]?.exists === true; + + if (metadataExists) { + const metadataResult = await Result.fromPromise(client.query(` + SELECT "mapping_name", "last_synced_sequence_id", "updated_at" + FROM "_stack_sync_metadata" + ORDER BY "mapping_name" ASC; + `)); + if (metadataResult.status === "error") { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: formatError(metadataResult.error), + metadata: [], + users_table: { + exists: usersExists, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + metadata = metadataResult.data.rows; + } + + if (usersExists) { + const usersStatsResult = await Result.fromPromise(client.query(` + SELECT + COUNT(*)::bigint AS "total_rows", + MIN("signed_up_at") AS "min_signed_up_at", + MAX("signed_up_at") AS "max_signed_up_at" + FROM "users"; + `)); + if (usersStatsResult.status === "error") { + return { + id: dbId, + type: dbConfig.type, + connection, + status: "error" as const, + error: formatError(usersStatsResult.error), + metadata: metadata.map((row) => ({ + mapping_name: row.mapping_name, + last_synced_sequence_id: toBigIntString(row.last_synced_sequence_id) ?? "-1", + updated_at_millis: toMillis(row.updated_at), + })), + users_table: { + exists: usersExists, + total_rows: null, + min_signed_up_at_millis: null, + max_signed_up_at_millis: null, + }, + mapping_status: mappingStatuses.map((mapping) => ({ + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: null, + updated_at_millis: null, + backlog: null, + })), + }; + } + usersStats = usersStatsResult.data.rows[0] ?? null; + } + } finally { + await Result.fromPromise(client.end()); + } + + const metadataMap = new Map(); + const formattedMetadata = metadata.map((row) => { + const lastSynced = toBigIntString(row.last_synced_sequence_id) ?? "-1"; + const updatedAt = toMillis(row.updated_at); + metadataMap.set(row.mapping_name, { last_synced_sequence_id: lastSynced, updated_at_millis: updatedAt }); + return { + mapping_name: row.mapping_name, + last_synced_sequence_id: lastSynced, + updated_at_millis: updatedAt, + }; + }); + + const mappingStatus = mappingStatuses.map((mapping) => { + const external = metadataMap.get(mapping.mapping_id); + const lastSynced = external?.last_synced_sequence_id ?? null; + const updatedAt = external?.updated_at_millis ?? null; + let backlog: string | null = null; + if (mapping.internal_max_sequence_id && lastSynced) { + backlog = (BigInt(mapping.internal_max_sequence_id) - BigInt(lastSynced)).toString(); + } + return { + mapping_id: mapping.mapping_id, + internal_max_sequence_id: mapping.internal_max_sequence_id, + last_synced_sequence_id: lastSynced, + updated_at_millis: updatedAt, + backlog, + }; + }); + + return { + id: dbId, + type: dbConfig.type, + connection, + status: "ok" as const, + error: null, + metadata: formattedMetadata, + users_table: { + exists: usersExists, + total_rows: toBigIntString(usersStats?.total_rows ?? null), + min_signed_up_at_millis: toMillis(usersStats?.min_signed_up_at ?? null), + max_signed_up_at_millis: toMillis(usersStats?.max_signed_up_at ?? null), + }, + mapping_status: mappingStatus, + }; +} + +export const GET = createSmartRouteHandler({ + metadata: { + summary: "External DB sync status", + description: "Returns sequencing, queue, and external sync progress for the current tenancy. Optional global aggregate when scope=all.", + tags: ["External DB Sync"], + hidden: true, + }, + request: yupObject({ + auth: yupObject({ + type: adminAuthTypeSchema, + tenancy: adaptSchema, + }).defined(), + query: yupObject({ + scope: yupString().oneOf(["tenancy", "all"]).default("tenancy"), + }).defined(), + method: yupString().oneOf(["GET"]).defined(), + }), + response: responseSchema, + handler: async ({ auth, query }) => { + if (auth.tenancy.project.id !== "internal") { + throw new KnownErrors.ExpectedInternalProject(); + } + const tenancyId = auth.tenancy.id; + + const shouldIncludeGlobal = query.scope === "all"; + const currentStats = shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId); + const globalStats = shouldIncludeGlobal ? currentStats : null; + const globalTenanciesCount = shouldIncludeGlobal + ? (await globalPrismaClient.$queryRaw` + SELECT COUNT(*)::bigint AS "total" + FROM "Tenancy" + `).at(0) ?? throwErr("Tenancy count query returned no rows.") + : null; + const globalDbSyncCount = shouldIncludeGlobal + ? (await globalPrismaClient.$queryRaw` + SELECT COUNT(*)::bigint AS "total" + FROM "EnvironmentConfigOverride" + WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL + `).at(0) ?? throwErr("DB sync config count query returned no rows.") + : null; + + const externalDbStatuses = shouldIncludeGlobal + ? [] + : await Promise.all( + Object.entries( + auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"], + ).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)), + ); + + const outgoingStats = currentStats.outgoingStatsRow; + + return { + statusCode: 200 as const, + bodyType: "json" as const, + body: { + ok: true, + generated_at_millis: Date.now(), + global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? { + tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"), + tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"), + sequencer: { + project_users: globalStats.projectUsersStats, + contact_channels: globalStats.contactChannelStats, + deleted_rows: { + ...globalStats.deletedRowStats, + by_table: globalStats.deletedRowsByTable, + }, + }, + poller: formatPollerStats(globalStats.outgoingStatsRow), + sync_engine: { + mappings: globalStats.mappings, + }, + } : null, + tenancy: { + id: tenancyId, + project_id: auth.tenancy.project.id, + branch_id: auth.tenancy.branchId, + }, + sequencer: { + project_users: currentStats.projectUsersStats, + contact_channels: currentStats.contactChannelStats, + deleted_rows: { + ...currentStats.deletedRowStats, + by_table: currentStats.deletedRowsByTable, + }, + }, + poller: formatPollerStats(outgoingStats), + sync_engine: { + mappings: currentStats.mappings, + external_databases: externalDbStatuses, + }, + }, + }; + }, +}); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx index 9ac3cf659..c78ec2662 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx @@ -1,4 +1,5 @@ import { syncExternalDatabases } from "@/lib/external-db-sync"; +import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue"; import { getTenancy } from "@/lib/tenancies"; import { ensureUpstashSignature } from "@/lib/upstash"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; @@ -36,7 +37,10 @@ export const POST = createSmartRouteHandler({ throw new StatusError(400, `Tenancy ${tenancyId} not found.`); } - await syncExternalDatabases(tenancy); + const needsResync = await syncExternalDatabases(tenancy); + if (needsResync) { + await enqueueExternalDbSync(tenancy.id); + } return { statusCode: 200, diff --git a/apps/backend/src/lib/external-db-sync-queue.ts b/apps/backend/src/lib/external-db-sync-queue.ts index db5de626b..ab666893e 100644 --- a/apps/backend/src/lib/external-db-sync-queue.ts +++ b/apps/backend/src/lib/external-db-sync-queue.ts @@ -26,21 +26,18 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise< // Use unnest to pass array of UUIDs and insert all in one query await globalPrismaClient.$executeRaw` - INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt") + INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt", "deduplicationKey") SELECT gen_random_uuid(), NOW(), json_build_object( 'url', '/api/latest/internal/external-db-sync/sync-engine', - 'body', json_build_object('tenancyId', t.tenancy_id) + 'body', json_build_object('tenancyId', t.tenancy_id), + 'flowControl', json_build_object('key', 'sentinel-sync-key', 'parallelism', 20) ), - NULL + NULL, + 'sentinel-sync-key-' || t.tenancy_id FROM unnest(${tenancyIds}::uuid[]) AS t(tenancy_id) - WHERE NOT EXISTS ( - SELECT 1 - FROM "OutgoingRequest" - WHERE "startedFulfillingAt" IS NULL - AND ("qstashOptions"->'body'->>'tenancyId')::uuid = t.tenancy_id - ) + ON CONFLICT ("deduplicationKey") DO NOTHING `; } diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index 2df15a94b..2fcab363e 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -1,13 +1,15 @@ import { Tenancy } from "@/lib/tenancies"; -import { getPrismaClientForTenancy, PrismaClientTransaction } from "@/prisma-client"; +import { getPrismaClientForTenancy, PrismaClientWithReplica } from "@/prisma-client"; import { DEFAULT_DB_SYNC_MAPPINGS } from "@stackframe/stack-shared/dist/config/db-sync-mappings"; import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema"; import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors"; +import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; import { omit } from "@stackframe/stack-shared/dist/utils/objects"; import { Result } from "@stackframe/stack-shared/dist/utils/results"; import { Client } from 'pg'; const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; +const MAX_BATCHES_PER_MAPPING_ENV = "STACK_EXTERNAL_DB_SYNC_MAX_BATCHES_PER_MAPPING"; function assertNonEmptyString(value: unknown, label: string): asserts value is string { if (typeof value !== "string" || value.trim().length === 0) { @@ -42,6 +44,18 @@ function isConcurrentUpdateError(error: unknown): error is PgErrorLike { return typeof pgError.message === "string" && pgError.message.includes("tuple concurrently updated"); } +function getMaxBatchesPerMapping(): number | null { + const rawValue = getEnvVariable(MAX_BATCHES_PER_MAPPING_ENV, ""); + if (!rawValue) return null; + const parsed = Number.parseInt(rawValue, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new StackAssertionError( + `${MAX_BATCHES_PER_MAPPING_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}` + ); + } + return parsed; +} + async function ensureExternalSchema( externalClient: Client, tableSchemaSql: string, @@ -143,11 +157,11 @@ async function syncMapping( externalClient: Client, mappingId: string, mapping: typeof DEFAULT_DB_SYNC_MAPPINGS[keyof typeof DEFAULT_DB_SYNC_MAPPINGS], - internalPrisma: PrismaClientTransaction, + internalPrisma: PrismaClientWithReplica, dbId: string, tenancyId: string, dbType: 'postgres', -) { +): Promise { assertNonEmptyString(mappingId, "mappingId"); assertNonEmptyString(mapping.targetTable, "mapping.targetTable"); assertUuid(tenancyId, "tenancyId"); @@ -180,13 +194,16 @@ async function syncMapping( } const BATCH_LIMIT = 1000; + const maxBatchesPerMapping = getMaxBatchesPerMapping(); + let batchesProcessed = 0; + let throttled = false; while (true) { assertUuid(tenancyId, "tenancyId"); if (!Number.isFinite(lastSequenceId)) { throw new StackAssertionError(`lastSequenceId must be a finite number for mapping ${mappingId}.`); } - const rows = await internalPrisma.$queryRawUnsafe(fetchQuery, tenancyId, lastSequenceId); + const rows = await internalPrisma.$replica().$queryRawUnsafe(fetchQuery, tenancyId, lastSequenceId); if (rows.length === 0) { break; @@ -216,16 +233,24 @@ async function syncMapping( if (rows.length < BATCH_LIMIT) { break; } + + batchesProcessed++; + if (maxBatchesPerMapping !== null && batchesProcessed >= maxBatchesPerMapping) { + throttled = true; + break; + } } + + return throttled; } async function syncDatabase( dbId: string, dbConfig: CompleteConfig["dbSync"]["externalDatabases"][string], - internalPrisma: PrismaClientTransaction, + internalPrisma: PrismaClientWithReplica, tenancyId: string, -) { +): Promise { assertNonEmptyString(dbId, "dbId"); assertUuid(tenancyId, "tenancyId"); const dbType = dbConfig.type; @@ -246,13 +271,14 @@ async function syncDatabase( connectionString: dbConfig.connectionString, }); + let needsResync = false; const syncResult = await Result.fromPromise((async () => { await externalClient.connect(); // Always use DEFAULT_DB_SYNC_MAPPINGS - users cannot customize mappings // because internalDbFetchQuery runs against Stack Auth's internal DB for (const [mappingId, mapping] of Object.entries(DEFAULT_DB_SYNC_MAPPINGS)) { - await syncMapping( + const mappingThrottled = await syncMapping( externalClient, mappingId, mapping, @@ -261,6 +287,9 @@ async function syncDatabase( tenancyId, dbType, ); + if (mappingThrottled) { + needsResync = true; + } } })()); @@ -271,23 +300,31 @@ async function syncDatabase( if (syncResult.status === "error") { captureError(`external-db-sync-${dbId}`, syncResult.error); - return; + return false; } + + return needsResync; } -export async function syncExternalDatabases(tenancy: Tenancy) { +export async function syncExternalDatabases(tenancy: Tenancy): Promise { assertUuid(tenancy.id, "tenancy.id"); const externalDatabases = tenancy.config.dbSync.externalDatabases; const internalPrisma = await getPrismaClientForTenancy(tenancy); + let needsResync = false; for (const [dbId, dbConfig] of Object.entries(externalDatabases)) { try { - await syncDatabase(dbId, dbConfig, internalPrisma, tenancy.id); + const databaseThrottled = await syncDatabase(dbId, dbConfig, internalPrisma, tenancy.id); + if (databaseThrottled) { + needsResync = true; + } } catch (error) { // Log the error but continue syncing other databases // This ensures one bad database config doesn't block successful syncs to other databases captureError(`external-db-sync-${dbId}`, error); } } + + return needsResync; } diff --git a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx new file mode 100644 index 000000000..cd65e458b --- /dev/null +++ b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page-client.tsx @@ -0,0 +1,592 @@ +"use client"; + +import { PageLayout } from "../page-layout"; +import { useAdminApp } from "../use-admin-app"; +import { + Alert, + Button, + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, + Skeleton, + Switch, + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, + Typography, +} from "@/components/ui"; +import { Result } from "@stackframe/stack-shared/dist/utils/results"; +import { runAsynchronously, runAsynchronouslyWithAlert } from "@stackframe/stack-shared/dist/utils/promises"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { notFound } from "next/navigation"; + +const stackAppInternalsSymbol = Symbol.for("StackAuth--DO-NOT-USE-OR-YOU-WILL-BE-FIRED--StackAppInternals"); +const AUTO_REFRESH_INTERVAL_MS = 5000; + +type SequenceStats = { + total: string, + pending: string, + null_sequence_id: string, + min_sequence_id: string | null, + max_sequence_id: string | null, +}; + +type DeletedRowStats = SequenceStats & { + by_table: Array, +}; + +type PollerStats = { + total: string, + pending: string, + in_flight: string, + stale: string, + oldest_created_at_millis: number | null, + newest_created_at_millis: number | null, +}; + +type MappingStats = { + mapping_id: string, + internal_min_sequence_id: string | null, + internal_max_sequence_id: string | null, + internal_pending_count: string, +}; + +type ExternalDbMetadata = { + mapping_name: string, + last_synced_sequence_id: string, + updated_at_millis: number | null, +}; + +type ExternalDbMappingStatus = { + mapping_id: string, + internal_max_sequence_id: string | null, + last_synced_sequence_id: string | null, + updated_at_millis: number | null, + backlog: string | null, +}; + +type ExternalDbSyncStatus = { + ok: true, + generated_at_millis: number, + global: { + tenancies_total: string, + tenancies_with_db_sync: string, + sequencer: { + project_users: SequenceStats, + contact_channels: SequenceStats, + deleted_rows: DeletedRowStats, + }, + poller: PollerStats, + sync_engine: { + mappings: MappingStats[], + }, + } | null, + tenancy: { + id: string, + project_id: string, + branch_id: string, + }, + sequencer: { + project_users: SequenceStats, + contact_channels: SequenceStats, + deleted_rows: DeletedRowStats, + }, + poller: PollerStats, + sync_engine: { + mappings: MappingStats[], + external_databases: Array<{ + id: string, + type: string, + connection: { + redacted: string | null, + host: string | null, + port: number | null, + database: string | null, + user: string | null, + }, + status: "ok" | "error", + error: string | null, + metadata: ExternalDbMetadata[], + users_table: { + exists: boolean, + total_rows: string | null, + min_signed_up_at_millis: number | null, + max_signed_up_at_millis: number | null, + }, + mapping_status: ExternalDbMappingStatus[], + }>, + }, +}; + +type AdminAppInternals = { + sendRequest: (path: string, requestOptions: RequestInit, requestType?: "client" | "server" | "admin") => Promise, +}; + +type AdminAppWithInternals = ReturnType & { + [stackAppInternalsSymbol]: AdminAppInternals, +}; + +function formatBigInt(value: string | null) { + if (value === null) return "—"; + if (value.length > 15) return value; + const asNumber = Number(value); + return Number.isFinite(asNumber) ? new Intl.NumberFormat().format(asNumber) : value; +} + +function formatMillis(value: number | null) { + if (!value) return "—"; + return new Date(value).toLocaleString(); +} + +function sumBigIntStrings(values: Array) { + let total = BigInt(0); + for (const value of values) { + if (!value) continue; + if (!/^[-]?\d+$/.test(value)) continue; + total += BigInt(value); + } + return total.toString(); +} + +function parseBigIntString(value: string | null | undefined) { + if (value === null || value === undefined) return null; + if (!/^[-]?\d+$/.test(value)) return null; + return BigInt(value); +} + +const BIGINT_ZERO = BigInt(0); +const BIGINT_HUNDRED = BigInt(100); +const BIGINT_THOUSAND = BigInt(1000); + +function formatThroughput(value: bigint | number | null) { + if (value === null) return "—"; + if (typeof value === "number") { + if (!Number.isFinite(value)) return "—"; + if (value === 0) return "0/s"; + const sign = value > 0 ? "+" : ""; + const abs = Math.abs(value); + const display = abs >= 100 ? abs.toFixed(0) : abs >= 10 ? abs.toFixed(1) : abs.toFixed(2); + return `${sign}${display}/s`; + } + if (value === BIGINT_ZERO) return "0/s"; + const sign = value > BIGINT_ZERO ? "+" : ""; + const abs = value > BIGINT_ZERO ? value : -value; + const intPart = abs / BIGINT_HUNDRED; + const fracPart = abs % BIGINT_HUNDRED; + const intDisplay = new Intl.NumberFormat().format(intPart); + const fracDisplay = fracPart.toString().padStart(2, "0"); + const display = intPart === BIGINT_ZERO ? `0.${fracDisplay}` : `${intDisplay}.${fracDisplay}`; + return `${sign}${display}/s`; +} + +function calculateThroughputScaled(prev: bigint | null, current: bigint | null, deltaMillis: number) { + if (prev === null || current === null) return null; + if (deltaMillis <= 0) return null; + const deltaMillisBigInt = BigInt(deltaMillis); + return (prev - current) * BIGINT_THOUSAND * BIGINT_HUNDRED / deltaMillisBigInt; +} + +function DataValue(props: { value: string | null | undefined, loading: boolean }) { + if (props.loading) { + return ; + } + return {formatBigInt(props.value ?? null)}; +} + +function DataDate(props: { value: number | null | undefined, loading: boolean }) { + if (props.loading) { + return ; + } + return {formatMillis(props.value ?? null)}; +} + +export default function PageClient() { + const adminApp = useAdminApp() as AdminAppWithInternals; + const [status, setStatus] = useState(null); + const [error, setError] = useState(null); + const [loading, setLoading] = useState(false); + const [autoRefresh, setAutoRefresh] = useState(true); + const inFlightRef = useRef(false); + const summarySamplesRef = useRef>([]); + + const loadStatus = useCallback(async () => { + if (inFlightRef.current) return; + inFlightRef.current = true; + setLoading(true); + + const result = await Result.fromPromise((async () => { + const response = await adminApp[stackAppInternalsSymbol].sendRequest( + "/internal/external-db-sync/status?scope=all", + { method: "GET" }, + "admin", + ); + const body = await response.json(); + if (!response.ok) { + const message = typeof body?.error === "string" ? body.error : "Failed to load external DB sync status."; + throw new Error(message); + } + return body as ExternalDbSyncStatus; + })()); + + if (result.status === "error") { + const message = result.error instanceof Error ? result.error.message : String(result.error); + setError(message); + setLoading(false); + inFlightRef.current = false; + return; + } + + setStatus(result.data); + setError(null); + setLoading(false); + inFlightRef.current = false; + }, [adminApp]); + + const refreshWithAlert = useCallback(() => { + runAsynchronouslyWithAlert(loadStatus); + }, [loadStatus]); + + useEffect(() => { + runAsynchronously(loadStatus); + }, [loadStatus]); + + useEffect(() => { + if (!autoRefresh) return undefined; + const interval = setInterval(() => { + runAsynchronously(loadStatus); + }, AUTO_REFRESH_INTERVAL_MS); + return () => clearInterval(interval); + }, [autoRefresh, loadStatus]); + + const summaryStats = useMemo(() => { + if (!status) return null; + const summarySource = status.global ?? status; + const sequencerPending = sumBigIntStrings([ + summarySource.sequencer.project_users.pending, + summarySource.sequencer.contact_channels.pending, + summarySource.sequencer.deleted_rows.pending, + ]); + const mappingPending = sumBigIntStrings( + summarySource.sync_engine.mappings.map((mapping) => mapping.internal_pending_count), + ); + + return { + sequencerPending, + pollerPending: summarySource.poller.pending, + mappingPending, + isGlobal: Boolean(status.global), + }; + }, [status]); + + const throughputStats = useMemo(() => { + if (!status || !summaryStats) return null; + const currentSample = { + timestampMillis: status.generated_at_millis, + sequencerPending: summaryStats.sequencerPending, + pollerPending: summaryStats.pollerPending, + mappingPending: summaryStats.mappingPending, + }; + const samples = summarySamplesRef.current; + const samplesWithCurrent = samples.length === 0 || samples[samples.length - 1].timestampMillis !== currentSample.timestampMillis + ? [...samples, currentSample] + : samples; + const windowStart = status.generated_at_millis - 20000; + const windowedSamples = samplesWithCurrent.filter((sample) => sample.timestampMillis >= windowStart); + if (windowedSamples.length < 2) return null; + const oldest = windowedSamples[0]; + const deltaMillis = status.generated_at_millis - oldest.timestampMillis; + if (deltaMillis <= 0) return null; + + return { + sequencer: calculateThroughputScaled( + parseBigIntString(oldest.sequencerPending), + parseBigIntString(summaryStats.sequencerPending), + deltaMillis, + ), + poller: calculateThroughputScaled( + parseBigIntString(oldest.pollerPending), + parseBigIntString(summaryStats.pollerPending), + deltaMillis, + ), + mapping: calculateThroughputScaled( + parseBigIntString(oldest.mappingPending), + parseBigIntString(summaryStats.mappingPending), + deltaMillis, + ), + }; + }, [status, summaryStats]); + + useEffect(() => { + if (!status || !summaryStats) return; + const nextSamples = [...summarySamplesRef.current, { + timestampMillis: status.generated_at_millis, + sequencerPending: summaryStats.sequencerPending, + pollerPending: summaryStats.pollerPending, + mappingPending: summaryStats.mappingPending, + }]; + const windowStart = status.generated_at_millis - 20000; + summarySamplesRef.current = nextSamples.filter((sample) => sample.timestampMillis >= windowStart); + }, [status, summaryStats]); + + const loadingState = loading && !status; + const globalStatus = status?.global ?? null; + const deletedRowsByTable = status?.sequencer.deleted_rows.by_table ?? []; + const mappingRows = status?.sync_engine.mappings ?? []; + + if (adminApp.projectId !== "internal") { + return notFound(); + } + + return ( + +
+ + Auto refresh +
+ + + } + fillWidth + > + {error && {error}} + +
+ Scope: {globalStatus ? "All tenancies" : "Current tenancy"} + {globalStatus && ( + <> + Tenancies: {formatBigInt(globalStatus.tenancies_total)} + DB sync configs: {formatBigInt(globalStatus.tenancies_with_db_sync)} + + )} + Last updated: {status ? formatMillis(status.generated_at_millis) : "—"} +
+ +
+ + + Sequencer pending rows + + {loadingState ? : formatBigInt(summaryStats?.sequencerPending ?? null)} + + + +
ProjectUser + ContactChannel + DeletedRow rows waiting for sequence IDs.
+
+ Throughput + {loadingState ? "—" : formatThroughput(throughputStats?.sequencer ?? null)} +
+
+
+ + + + Outgoing sync requests + + {loadingState ? : formatBigInt(summaryStats?.pollerPending ?? null)} + + + +
Requests still queued for the poller to dispatch.
+
+ Throughput + {loadingState ? "—" : formatThroughput(throughputStats?.poller ?? null)} +
+
+
+ + + + Mapping pending rows + + {loadingState ? : formatBigInt(summaryStats?.mappingPending ?? null)} + + + +
Pending internal rows waiting for sync across mappings.
+
+ Throughput + {loadingState ? "—" : formatThroughput(throughputStats?.mapping ?? null)} +
+
+
+
+ +
+ + + Sequencer + Rows awaiting sequence ID backfill per table. + + + + + + Table + Total + Pending + Null Seq + Min Seq + Max Seq + + + + + ProjectUser + + + + + + + + ContactChannel + + + + + + + + DeletedRow + + + + + + + +
+ +
+ + Deleted rows by table + + + + + Table + Total + Pending + Null Seq + Min Seq + Max Seq + + + + {deletedRowsByTable.map((row) => ( + + {row.table_name} + + + + + + + ))} + {!loadingState && deletedRowsByTable.length === 0 && ( + + + No deleted rows recorded yet. + + + )} + +
+
+
+
+ + + + Poller + OutgoingRequest queue and processing overview. + + + + + + Total + Pending + In Flight + Stale + + + + + + + + + + +
+ +
+
+ Oldest request + +
+
+ Newest request + +
+
+
+
+
+ + + + Sync Engine + Internal mapping checkpoints before external sync. + + + + + + Mapping + Min Seq + Max Seq + Pending Rows + + + + {mappingRows.map((mapping) => ( + + {mapping.mapping_id} + + + + + ))} + {!loadingState && mappingRows.length === 0 && ( + + + No mappings configured. + + + )} + +
+
+
+ +
+ ); +} diff --git a/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page.tsx b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page.tsx new file mode 100644 index 000000000..60d3832f9 --- /dev/null +++ b/apps/dashboard/src/app/(main)/(protected)/projects/[projectId]/external-db-sync/page.tsx @@ -0,0 +1,9 @@ +import PageClient from "./page-client"; + +export const metadata = { + title: "External DB Sync", +}; + +export default function Page() { + return ; +} diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 8e751b091..4e5f7345c 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -448,4 +448,56 @@ describe.sequential('External DB Sync - Basic Tests', () => { const seq2 = Number(metadata2.rows[0].last_synced_sequence_id); expect(seq2).toBeGreaterThan(seq1); }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a project with external DB sync enabled. + * - Calls the internal status endpoint and validates the response shape. + * + * Why it matters: + * - Confirms the dashboard status API exposes sequencer, poller, and sync-engine metrics. + */ + test('Status endpoint exposes sync pipeline metrics', async () => { + const dbName = 'status_endpoint_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }, { + display_name: '📈 External DB Sync Status', + description: 'Validating sync status endpoint shape', + }); + + const response = await niceBackendFetch('/api/latest/internal/external-db-sync/status', { + accessType: 'admin', + }); + + expect(response.status).toBe(200); + expect(response.body).toMatchObject({ + ok: true, + tenancy: { + id: expect.any(String), + project_id: expect.any(String), + branch_id: expect.any(String), + }, + sequencer: { + project_users: expect.any(Object), + contact_channels: expect.any(Object), + deleted_rows: expect.any(Object), + }, + poller: { + total: expect.any(String), + pending: expect.any(String), + in_flight: expect.any(String), + stale: expect.any(String), + }, + sync_engine: { + mappings: expect.any(Array), + external_databases: expect.any(Array), + }, + }); + }, TEST_TIMEOUT); }); diff --git a/apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql b/apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql new file mode 100644 index 000000000..269d06179 --- /dev/null +++ b/apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql @@ -0,0 +1,262 @@ +--set -a; source apps/backend/.env.development; set +a; psql "$STACK_DATABASE_CONNECTION_STRING" -v ON_ERROR_STOP=1 -f apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql + +BEGIN; + +CREATE EXTENSION IF NOT EXISTS "pgcrypto"; + +-- NOTE: +-- - This script is intentionally heavy (1,000,000 projects + 3,000,000 users). +-- - Update BOTH settings blocks if you need a different external DB connection string. +-- - The external DB should be reachable from the backend (default uses docker postgres on port 8128). + +-- ===================================================================================== +-- 1) One million projects, one user each +-- ===================================================================================== +WITH settings AS ( + SELECT + 'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string, + 1000000::int AS project_count +), +config AS ( + SELECT jsonb_build_object( + 'dbSync', + jsonb_build_object( + 'externalDatabases', + jsonb_build_object( + 'main', + jsonb_build_object( + 'type', 'postgres', + 'connectionString', external_connection_string + ) + ) + ) + ) AS config_json + FROM settings +), +small_projects AS ( + SELECT + gen_random_uuid() AS project_id, + gen_random_uuid() AS tenancy_id, + gen_random_uuid() AS project_user_id, + gen_random_uuid() AS auth_method_id, + gen_random_uuid() AS contact_id, + gs AS idx, + lpad(gs::text, 7, '0') AS padded_idx, + now() AS ts + FROM settings + CROSS JOIN generate_series(1, settings.project_count) AS gs +), +insert_projects AS ( + INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt") + SELECT + project_id, + 'External DB Sync Project ' || padded_idx, + 'External DB sync load test project', + FALSE, + NULL, + ts, + ts + FROM small_projects + RETURNING "id" +), +insert_tenancies AS ( + INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt") + SELECT + tenancy_id, + project_id, + 'main', + NULL, + 'TRUE'::"BooleanTrue", + ts, + ts + FROM small_projects + RETURNING "id" +), +insert_env_config AS ( + INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt") + SELECT + project_id, + 'main', + (SELECT config_json FROM config), + ts, + ts + FROM small_projects + ON CONFLICT ("projectId", "branchId") DO UPDATE SET + "config" = EXCLUDED."config", + "updatedAt" = EXCLUDED."updatedAt" + RETURNING "projectId" +), +insert_users AS ( + INSERT INTO "ProjectUser" + ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt") + SELECT + tenancy_id, + project_user_id, + project_id, + 'main', + 'External Sync User ' || padded_idx, + project_id, + ts, + ts + FROM small_projects + RETURNING "tenancyId", "projectUserId" +), +insert_contacts AS ( + INSERT INTO "ContactChannel" + ("tenancyId", "projectUserId", "id", "type", "isPrimary", "usedForAuth", "isVerified", "value", "createdAt", "updatedAt") + SELECT + tenancy_id, + project_user_id, + contact_id, + 'EMAIL', + 'TRUE'::"BooleanTrue", + 'TRUE'::"BooleanTrue", + false, + 'external-sync-user-' || padded_idx || '@load.local', + ts, + ts + FROM small_projects + RETURNING "tenancyId", "projectUserId" +), +insert_auth_methods AS ( + INSERT INTO "AuthMethod" + ("tenancyId", "id", "projectUserId", "createdAt", "updatedAt") + SELECT + tenancy_id, + auth_method_id, + project_user_id, + ts, + ts + FROM small_projects + RETURNING "tenancyId", "id", "projectUserId" +) +INSERT INTO "PasswordAuthMethod" + ("tenancyId", "authMethodId", "projectUserId", "passwordHash", "createdAt", "updatedAt") +SELECT + tenancy_id, + auth_method_id, + project_user_id, + '$2a$13$TVyY/gpw9Db/w1fBeJkCgeNg2Rae2JfNqrPnSACtj.ufAO5cVF13.', + ts, + ts +FROM small_projects; + +COMMIT; + +BEGIN; + +-- ===================================================================================== +-- 2) Three projects, one million users each +-- ===================================================================================== +SET LOCAL synchronous_commit = off; + +CREATE TEMP TABLE tmp_large_projects AS +SELECT + gen_random_uuid() AS project_id, + gen_random_uuid() AS tenancy_id, + gs AS project_idx, + lpad(gs::text, 2, '0') AS padded_project_idx, + now() AS ts +FROM generate_series(1, 3) AS gs; + +INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt") +SELECT + project_id, + 'External DB Sync Mega Project ' || padded_project_idx, + 'External DB sync load test project (mega)', + FALSE, + NULL, + ts, + ts +FROM tmp_large_projects; + +INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt") +SELECT + tenancy_id, + project_id, + 'main', + NULL, + 'TRUE'::"BooleanTrue", + ts, + ts +FROM tmp_large_projects; + +WITH settings AS ( + SELECT + 'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string +), +config AS ( + SELECT jsonb_build_object( + 'dbSync', + jsonb_build_object( + 'externalDatabases', + jsonb_build_object( + 'main', + jsonb_build_object( + 'type', 'postgres', + 'connectionString', external_connection_string + ) + ) + ) + ) AS config_json + FROM settings +) +INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt") +SELECT + project_id, + 'main', + (SELECT config_json FROM config), + ts, + ts +FROM tmp_large_projects +ON CONFLICT ("projectId", "branchId") DO UPDATE SET + "config" = EXCLUDED."config", + "updatedAt" = EXCLUDED."updatedAt"; + +-- ALTER TABLE "ProjectUser" DISABLE TRIGGER project_user_insert_trigger; + +DO $$ +DECLARE + users_per_project int := 1000000; + batch_size int := 10000; + batch_start int := 1; + batch_end int; +BEGIN + WHILE batch_start <= users_per_project LOOP + batch_end := LEAST(batch_start + batch_size - 1, users_per_project); + + WITH mega_users AS ( + SELECT + lp.project_id, + lp.tenancy_id, + lp.project_idx, + lp.padded_project_idx, + gs AS user_idx, + lpad(gs::text, 7, '0') AS padded_user_idx, + gen_random_uuid() AS project_user_id, + lp.ts AS ts + FROM tmp_large_projects lp + CROSS JOIN generate_series(batch_start, batch_end) AS gs + ) + INSERT INTO "ProjectUser" + ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt") + SELECT + tenancy_id, + project_user_id, + project_id, + 'main', + 'Mega User ' || padded_project_idx || '-' || padded_user_idx, + project_id, + ts, + ts + FROM mega_users; + + RAISE NOTICE 'Inserted users %-% of % per project', batch_start, batch_end, users_per_project; + + batch_start := batch_end + 1; + END LOOP; +END $$; + +-- ALTER TABLE "ProjectUser" ENABLE TRIGGER project_user_insert_trigger; + +COMMIT; diff --git a/claude/CLAUDE-KNOWLEDGE.md b/claude/CLAUDE-KNOWLEDGE.md index f0a7b1c0b..4211a604d 100644 --- a/claude/CLAUDE-KNOWLEDGE.md +++ b/claude/CLAUDE-KNOWLEDGE.md @@ -17,3 +17,27 @@ A: Use `poolMatchGlobs` to route the external DB sync test globs to the `forks` Q: How can CI keep most tests parallel while isolating external DB sync tests? A: Split workflow test runs into two steps: run the full suite with `--exclude "**/external-db-sync*.test.ts"`, then run only external DB sync tests with `--min-workers=1 --max-workers=1`. + +Q: How do I call a custom internal admin endpoint from the dashboard without adding SDK methods? +A: Grab the Stack internals symbol (`Symbol.for("StackAuth--DO-NOT-USE-OR-YOU-WILL-BE-FIRED--StackAppInternals")`) and call `adminApp[stackAppInternalsSymbol].sendRequest(path, options, "admin")` to issue an authenticated admin request. + +Q: How do I read project and branch IDs from admin auth in backend route handlers? +A: Use `auth.tenancy.project.id` and `auth.tenancy.branchId` when `adminAuthTypeSchema` is in use; `auth.project` and `auth.branchId` do not exist there. + +Q: How do I avoid BigInt literal errors in dashboard typecheck? +A: Avoid `0n` in client code and use `BigInt(0)` instead, since BigInt literals require an ES2020 target. + +Q: How can I get external DB sync status across all tenancies? +A: Call `/api/latest/internal/external-db-sync/status?scope=all` with admin auth; the response includes a `global` aggregate alongside the current tenancy details. + +Q: How can I throttle external DB sync in dev without pausing it? +A: Use `STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT` to cap poller throughput, `STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE` to reduce backfill batch size, and `STACK_EXTERNAL_DB_SYNC_MAX_BATCHES_PER_MAPPING` to limit sync-engine work per request (it re-enqueues when throttled). + +Q: How can the external DB sync dashboard show global stats only? +A: When `/api/latest/internal/external-db-sync/status?scope=all` is used, the route can return global aggregates for the main stats and an empty `external_databases` array; the dashboard should avoid tenancy-specific fields and external DB cards in that mode. + +Q: How can I make the mega-user load in `mock-external-db-sync-projects.sql` show progress and avoid huge single-statement inserts? +A: Create a temp table for the three mega projects, then use a `DO $$` loop to insert users in batches (e.g., 10k per project) with a `RAISE NOTICE` after each batch; this keeps memory pressure lower and gives progress feedback. + +Q: Why is the mega-user load still slow even with batching? +A: `ProjectUser` inserts fire the `project_user_insert_trigger` (from `apps/backend/prisma/migrations/20250304200822_add_project_user_count/migration.sql`) which updates `Project.userCount` on every insert; with 1M users per project, that means 1M updates to the same project row, causing huge write amplification. ContactChannel inserts also trigger `mark_project_user_on_contact_channel_*` updates. For fast bulk loads, disable those triggers during the load and recompute `Project.userCount` afterward.