From d34a2c7fa416fc7991df89076cb4c2ab3e06fb3b Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Tue, 3 Feb 2026 17:53:36 -0800 Subject: [PATCH] fusebox --- .../migration.sql | 25 +++ apps/backend/prisma/schema.prisma | 13 ++ apps/backend/scripts/db-migrations.ts | 4 +- .../external-db-sync/fusebox/route.ts | 103 ++++++++++++ .../internal/external-db-sync/poller/route.ts | 20 ++- .../external-db-sync/sequencer/route.ts | 6 + .../external-db-sync/sync-engine/route.tsx | 9 + .../src/lib/external-db-sync-metadata.ts | 34 ++++ apps/backend/src/lib/upstash.tsx | 6 +- .../external-db-sync/page-client.tsx | 155 ++++++++++++++++++ .../api/v1/external-db-sync-basics.test.ts | 67 ++++---- 11 files changed, 391 insertions(+), 51 deletions(-) create mode 100644 apps/backend/prisma/migrations/20260204014127_external_db_metadata/migration.sql create mode 100644 apps/backend/src/app/api/latest/internal/external-db-sync/fusebox/route.ts create mode 100644 apps/backend/src/lib/external-db-sync-metadata.ts diff --git a/apps/backend/prisma/migrations/20260204014127_external_db_metadata/migration.sql b/apps/backend/prisma/migrations/20260204014127_external_db_metadata/migration.sql new file mode 100644 index 000000000..d31ceb3a4 --- /dev/null +++ b/apps/backend/prisma/migrations/20260204014127_external_db_metadata/migration.sql @@ -0,0 +1,25 @@ +-- DropIndex +DROP INDEX "ContactChannel_shouldUpdateSequenceId_idx"; + +-- DropIndex +DROP INDEX "DeletedRow_shouldUpdateSequenceId_idx"; + +-- DropIndex +DROP INDEX "ProjectUser_shouldUpdateSequenceId_idx"; + +-- CreateTable +CREATE TABLE "ExternalDbSyncMetadata" ( + "id" TEXT NOT NULL DEFAULT gen_random_uuid(), + "singleton" "BooleanTrue" NOT NULL DEFAULT 'TRUE', + "sequencerEnabled" BOOLEAN NOT NULL DEFAULT true, + "pollerEnabled" BOOLEAN NOT NULL DEFAULT true, + "syncEngineEnabled" BOOLEAN NOT NULL DEFAULT true, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "ExternalDbSyncMetadata_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "ExternalDbSyncMetadata_singleton_key" ON "ExternalDbSyncMetadata"("singleton"); + diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index c36cf8101..1ad18873c 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -94,6 +94,19 @@ model EnvironmentConfigOverride { @@id([projectId, branchId]) } +model ExternalDbSyncMetadata { + id String @id @default(dbgenerated("gen_random_uuid()")) + + singleton BooleanTrue @unique @default(TRUE) + + sequencerEnabled Boolean @default(true) + pollerEnabled Boolean @default(true) + syncEngineEnabled Boolean @default(true) + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt +} + model Team { tenancyId String @db.Uuid teamId String @default(uuid()) @db.Uuid diff --git a/apps/backend/scripts/db-migrations.ts b/apps/backend/scripts/db-migrations.ts index 14733a687..4f997ec7c 100644 --- a/apps/backend/scripts/db-migrations.ts +++ b/apps/backend/scripts/db-migrations.ts @@ -91,9 +91,9 @@ const generateMigrationFile = async () => { 'prisma', 'migrate', 'diff', - '--from-url', + '--from-config-datasource', diffUrl, - '--to-schema-datamodel', + '--to-schema', 'prisma/schema.prisma', '--script', ], diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/fusebox/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/fusebox/route.ts new file mode 100644 index 000000000..99c9d9f08 --- /dev/null +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/fusebox/route.ts @@ -0,0 +1,103 @@ +import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; +import { + adaptSchema, + adminAuthTypeSchema, + yupBoolean, + yupNumber, + yupObject, + yupString, +} from "@stackframe/stack-shared/dist/schema-fields"; +import { KnownErrors } from "@stackframe/stack-shared"; +import { getExternalDbSyncFusebox, updateExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; + +const fuseboxResponseSchema = yupObject({ + statusCode: yupNumber().oneOf([200]).defined(), + bodyType: yupString().oneOf(["json"]).defined(), + body: yupObject({ + ok: yupBoolean().defined(), + sequencer_enabled: yupBoolean().defined(), + poller_enabled: yupBoolean().defined(), + sync_engine_enabled: yupBoolean().defined(), + }).defined(), +}); + +const fuseboxRequestSchema = yupObject({ + auth: yupObject({ + type: adminAuthTypeSchema, + tenancy: adaptSchema, + }).defined(), + body: yupObject({ + sequencer_enabled: yupBoolean().defined(), + poller_enabled: yupBoolean().defined(), + sync_engine_enabled: yupBoolean().defined(), + }).defined(), + method: yupString().oneOf(["POST"]).defined(), +}); + +const fuseboxGetRequestSchema = yupObject({ + auth: yupObject({ + type: adminAuthTypeSchema, + tenancy: adaptSchema, + }).defined(), + method: yupString().oneOf(["GET"]).defined(), +}); + +function ensureInternalProject(projectId: string) { + if (projectId !== "internal") { + throw new KnownErrors.ExpectedInternalProject(); + } +} + +export const GET = createSmartRouteHandler({ + metadata: { + summary: "Get external DB sync fusebox settings", + description: "Returns enablement flags for the external DB sync pipeline.", + tags: ["External DB Sync"], + hidden: true, + }, + request: fuseboxGetRequestSchema, + response: fuseboxResponseSchema, + handler: async ({ auth }) => { + ensureInternalProject(auth.tenancy.project.id); + const fusebox = await getExternalDbSyncFusebox(); + return { + statusCode: 200, + bodyType: "json" as const, + body: { + ok: true, + sequencer_enabled: fusebox.sequencerEnabled, + poller_enabled: fusebox.pollerEnabled, + sync_engine_enabled: fusebox.syncEngineEnabled, + }, + }; + }, +}); + +export const POST = createSmartRouteHandler({ + metadata: { + summary: "Update external DB sync fusebox settings", + description: "Updates enablement flags for the external DB sync pipeline.", + tags: ["External DB Sync"], + hidden: true, + }, + request: fuseboxRequestSchema, + response: fuseboxResponseSchema, + handler: async ({ auth, body }) => { + ensureInternalProject(auth.tenancy.project.id); + const fusebox = await updateExternalDbSyncFusebox({ + sequencerEnabled: body.sequencer_enabled, + pollerEnabled: body.poller_enabled, + syncEngineEnabled: body.sync_engine_enabled, + }); + return { + statusCode: 200, + bodyType: "json" as const, + body: { + ok: true, + sequencer_enabled: fusebox.sequencerEnabled, + poller_enabled: fusebox.pollerEnabled, + sync_engine_enabled: fusebox.syncEngineEnabled, + }, + }; + }, +}); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index b027e5ce0..1c3353077 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -13,6 +13,7 @@ import { import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env"; import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; +import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT"; @@ -161,13 +162,13 @@ export const GET = createSmartRouteHandler({ // In dev/test, QStash runs in Docker so "localhost" won't work. // Replace with "host.docker.internal" to reach the host machine. - if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { - const url = new URL(fullUrl); - if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { - url.hostname = "host.docker.internal"; - fullUrl = url.toString(); - } - } + // if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { + // const url = new URL(fullUrl); + // if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { + // url.hostname = "host.docker.internal"; + // fullUrl = url.toString(); + // } + // } const flowControl = options.flowControl as UpstashRequest["flowControl"]; @@ -208,6 +209,11 @@ export const GET = createSmartRouteHandler({ } while (performance.now() - startTime < maxDurationMs) { + const fusebox = await getExternalDbSyncFusebox(); + if (!fusebox.pollerEnabled) { + break; + } + const pendingRequests = await claimPendingRequests(); if (stopWhenIdle && pendingRequests.length === 0) { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index 4b71243d5..b2672d908 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -11,6 +11,7 @@ import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; +import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE"; @@ -177,6 +178,11 @@ export const GET = createSmartRouteHandler({ let iterations = 0; while (performance.now() - startTime < maxDurationMs) { + const fusebox = await getExternalDbSyncFusebox(); + if (!fusebox.sequencerEnabled) { + break; + } + try { const didUpdate = await backfillSequenceIds(batchSize); if (stopWhenIdle && !didUpdate) { diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx index c78ec2662..b25c6ed33 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx @@ -3,6 +3,7 @@ import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue"; import { getTenancy } from "@/lib/tenancies"; import { ensureUpstashSignature } from "@/lib/upstash"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; +import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields"; import { StatusError } from "@stackframe/stack-shared/dist/utils/errors"; @@ -29,6 +30,14 @@ export const POST = createSmartRouteHandler({ handler: async ({ body }, fullReq) => { await ensureUpstashSignature(fullReq); + const fusebox = await getExternalDbSyncFusebox(); + if (!fusebox.syncEngineEnabled) { + return { + statusCode: 200, + bodyType: "success", + }; + } + const { tenancyId } = body; const tenancy = await getTenancy(tenancyId); diff --git a/apps/backend/src/lib/external-db-sync-metadata.ts b/apps/backend/src/lib/external-db-sync-metadata.ts new file mode 100644 index 000000000..1bed7226f --- /dev/null +++ b/apps/backend/src/lib/external-db-sync-metadata.ts @@ -0,0 +1,34 @@ +import { BooleanTrue } from "@/generated/prisma/client"; +import { globalPrismaClient } from "@/prisma-client"; + +export type ExternalDbSyncFusebox = { + sequencerEnabled: boolean, + pollerEnabled: boolean, + syncEngineEnabled: boolean, +}; + +const fuseboxSelect = { + sequencerEnabled: true, + pollerEnabled: true, + syncEngineEnabled: true, +}; + +export async function getExternalDbSyncFusebox(): Promise { + return await globalPrismaClient.externalDbSyncMetadata.upsert({ + where: { singleton: BooleanTrue.TRUE }, + create: { singleton: BooleanTrue.TRUE }, + update: {}, + select: fuseboxSelect, + }); +} + +export async function updateExternalDbSyncFusebox( + updates: ExternalDbSyncFusebox, +): Promise { + return await globalPrismaClient.externalDbSyncMetadata.upsert({ + where: { singleton: BooleanTrue.TRUE }, + create: { singleton: BooleanTrue.TRUE, ...updates }, + update: updates, + select: fuseboxSelect, + }); +} diff --git a/apps/backend/src/lib/upstash.tsx b/apps/backend/src/lib/upstash.tsx index 6b4f48fec..2688fcc23 100644 --- a/apps/backend/src/lib/upstash.tsx +++ b/apps/backend/src/lib/upstash.tsx @@ -25,9 +25,9 @@ export async function ensureUpstashSignature(fullReq: SmartRequest): Promise Promise, }; @@ -208,9 +222,12 @@ function DataDate(props: { value: number | null | undefined, loading: boolean }) export default function PageClient() { const adminApp = useAdminApp() as AdminAppWithInternals; const [status, setStatus] = useState(null); + const [fusebox, setFusebox] = useState(null); + const [savedFusebox, setSavedFusebox] = useState(null); const [error, setError] = useState(null); const [loading, setLoading] = useState(false); const [autoRefresh, setAutoRefresh] = useState(true); + const [savingFusebox, setSavingFusebox] = useState(false); const inFlightRef = useRef(false); const summarySamplesRef = useRef { + const result = await Result.fromPromise((async () => { + const response = await adminApp[stackAppInternalsSymbol].sendRequest( + urlString`/internal/external-db-sync/fusebox`, + { method: "GET" }, + "admin", + ); + const body = await response.json(); + if (!response.ok) { + const message = typeof body?.error === "string" ? body.error : "Failed to load external DB sync fusebox."; + throw new Error(message); + } + return body as ExternalDbSyncFuseboxResponse; + })()); + + if (result.status === "error") { + const message = result.error instanceof Error ? result.error.message : String(result.error); + setError(message); + return; + } + + const nextFusebox = { + sequencerEnabled: result.data.sequencer_enabled, + pollerEnabled: result.data.poller_enabled, + syncEngineEnabled: result.data.sync_engine_enabled, + }; + setFusebox(nextFusebox); + setSavedFusebox(nextFusebox); + setError(null); + }, [adminApp]); + + const saveFusebox = useCallback(async () => { + if (!fusebox) return; + setSavingFusebox(true); + const result = await Result.fromPromise((async () => { + const response = await adminApp[stackAppInternalsSymbol].sendRequest( + urlString`/internal/external-db-sync/fusebox`, + { + method: "POST", + body: JSON.stringify({ + sequencer_enabled: fusebox.sequencerEnabled, + poller_enabled: fusebox.pollerEnabled, + sync_engine_enabled: fusebox.syncEngineEnabled, + }), + headers: { "content-type": "application/json" }, + }, + "admin", + ); + const body = await response.json(); + if (!response.ok) { + const message = typeof body?.error === "string" ? body.error : "Failed to update external DB sync fusebox."; + throw new Error(message); + } + return body as ExternalDbSyncFuseboxResponse; + })()); + setSavingFusebox(false); + + if (result.status === "error") { + const message = result.error instanceof Error ? result.error.message : String(result.error); + setError(message); + return; + } + + const nextFusebox = { + sequencerEnabled: result.data.sequencer_enabled, + pollerEnabled: result.data.poller_enabled, + syncEngineEnabled: result.data.sync_engine_enabled, + }; + setFusebox(nextFusebox); + setSavedFusebox(nextFusebox); + setError(null); + }, [adminApp, fusebox]); + const refreshWithAlert = useCallback(() => { runAsynchronouslyWithAlert(loadStatus); }, [loadStatus]); @@ -260,6 +350,10 @@ export default function PageClient() { runAsynchronously(loadStatus); }, [loadStatus]); + useEffect(() => { + runAsynchronously(loadFusebox); + }, [loadFusebox]); + useEffect(() => { if (!autoRefresh) return undefined; const interval = setInterval(() => { @@ -342,6 +436,12 @@ export default function PageClient() { const globalStatus = status?.global ?? null; const deletedRowsByTable = status?.sequencer.deleted_rows.by_table ?? []; const mappingRows = status?.sync_engine.mappings ?? []; + const fuseboxDirty = useMemo(() => { + if (!fusebox || !savedFusebox) return false; + return fusebox.sequencerEnabled !== savedFusebox.sequencerEnabled + || fusebox.pollerEnabled !== savedFusebox.pollerEnabled + || fusebox.syncEngineEnabled !== savedFusebox.syncEngineEnabled; + }, [fusebox, savedFusebox]); if (adminApp.projectId !== "internal") { return notFound(); @@ -377,6 +477,7 @@ export default function PageClient() { Last updated: {status ? formatMillis(status.generated_at_millis) : "—"} +
@@ -587,6 +688,60 @@ export default function PageClient() { + + + Fusebox + + + {!fusebox ? ( +
+ + + + +
+ ) : ( + <> +
+
+ Sequencer + Assigns sequence IDs and queues sync work. +
+ setFusebox((current) => current ? { ...current, sequencerEnabled: checked } : current)} + /> +
+
+
+ Poller + Dispatches queued sync jobs to QStash. +
+ setFusebox((current) => current ? { ...current, pollerEnabled: checked } : current)} + /> +
+
+
+ Sync engine + Processes mapping batches for tenants. +
+ setFusebox((current) => current ? { ...current, syncEngineEnabled: checked } : current)} + /> +
+
+ +
+ + )} +
+
+ ); } diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 4e5f7345c..4779a6ca2 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -449,55 +449,44 @@ describe.sequential('External DB Sync - Basic Tests', () => { expect(seq2).toBeGreaterThan(seq1); }, TEST_TIMEOUT); + /** * What it does: - * - Creates a project with external DB sync enabled. - * - Calls the internal status endpoint and validates the response shape. + * - Reads the external DB sync fusebox settings. + * - Writes the same values back to confirm the update endpoint. * * Why it matters: - * - Confirms the dashboard status API exposes sequencer, poller, and sync-engine metrics. + * - Ensures internal fusebox controls are reachable and validated. */ - test('Status endpoint exposes sync pipeline metrics', async () => { - const dbName = 'status_endpoint_test'; - const connectionString = await dbManager.createDatabase(dbName); - - await createProjectWithExternalDb({ - main: { - type: 'postgres', - connectionString, - } - }, { - display_name: '📈 External DB Sync Status', - description: 'Validating sync status endpoint shape', - }); - - const response = await niceBackendFetch('/api/latest/internal/external-db-sync/status', { + test('Fusebox endpoint returns and accepts enablement flags', async () => { + const getResponse = await niceBackendFetch('/api/latest/internal/external-db-sync/fusebox', { accessType: 'admin', }); - expect(response.status).toBe(200); - expect(response.body).toMatchObject({ + expect(getResponse.status).toBe(200); + expect(getResponse.body).toMatchObject({ ok: true, - tenancy: { - id: expect.any(String), - project_id: expect.any(String), - branch_id: expect.any(String), - }, - sequencer: { - project_users: expect.any(Object), - contact_channels: expect.any(Object), - deleted_rows: expect.any(Object), - }, - poller: { - total: expect.any(String), - pending: expect.any(String), - in_flight: expect.any(String), - stale: expect.any(String), - }, - sync_engine: { - mappings: expect.any(Array), - external_databases: expect.any(Array), + sequencer_enabled: expect.any(Boolean), + poller_enabled: expect.any(Boolean), + sync_engine_enabled: expect.any(Boolean), + }); + + const postResponse = await niceBackendFetch('/api/latest/internal/external-db-sync/fusebox', { + accessType: 'admin', + method: 'POST', + body: { + sequencer_enabled: getResponse.body.sequencer_enabled, + poller_enabled: getResponse.body.poller_enabled, + sync_engine_enabled: getResponse.body.sync_engine_enabled, }, }); + + expect(postResponse.status).toBe(200); + expect(postResponse.body).toMatchObject({ + ok: true, + sequencer_enabled: getResponse.body.sequencer_enabled, + poller_enabled: getResponse.body.poller_enabled, + sync_engine_enabled: getResponse.body.sync_engine_enabled, + }); }, TEST_TIMEOUT); });