This commit is contained in:
Bilal Godil 2026-02-03 17:53:36 -08:00
parent c0a3f7af6d
commit d34a2c7fa4
11 changed files with 391 additions and 51 deletions

View File

@ -0,0 +1,25 @@
-- DropIndex
DROP INDEX "ContactChannel_shouldUpdateSequenceId_idx";
-- DropIndex
DROP INDEX "DeletedRow_shouldUpdateSequenceId_idx";
-- DropIndex
DROP INDEX "ProjectUser_shouldUpdateSequenceId_idx";
-- CreateTable
CREATE TABLE "ExternalDbSyncMetadata" (
"id" TEXT NOT NULL DEFAULT gen_random_uuid(),
"singleton" "BooleanTrue" NOT NULL DEFAULT 'TRUE',
"sequencerEnabled" BOOLEAN NOT NULL DEFAULT true,
"pollerEnabled" BOOLEAN NOT NULL DEFAULT true,
"syncEngineEnabled" BOOLEAN NOT NULL DEFAULT true,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "ExternalDbSyncMetadata_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "ExternalDbSyncMetadata_singleton_key" ON "ExternalDbSyncMetadata"("singleton");

View File

@ -94,6 +94,19 @@ model EnvironmentConfigOverride {
@@id([projectId, branchId])
}
model ExternalDbSyncMetadata {
id String @id @default(dbgenerated("gen_random_uuid()"))
singleton BooleanTrue @unique @default(TRUE)
sequencerEnabled Boolean @default(true)
pollerEnabled Boolean @default(true)
syncEngineEnabled Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
model Team {
tenancyId String @db.Uuid
teamId String @default(uuid()) @db.Uuid

View File

@ -91,9 +91,9 @@ const generateMigrationFile = async () => {
'prisma',
'migrate',
'diff',
'--from-url',
'--from-config-datasource',
diffUrl,
'--to-schema-datamodel',
'--to-schema',
'prisma/schema.prisma',
'--script',
],

View File

@ -0,0 +1,103 @@
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import {
adaptSchema,
adminAuthTypeSchema,
yupBoolean,
yupNumber,
yupObject,
yupString,
} from "@stackframe/stack-shared/dist/schema-fields";
import { KnownErrors } from "@stackframe/stack-shared";
import { getExternalDbSyncFusebox, updateExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
const fuseboxResponseSchema = yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
ok: yupBoolean().defined(),
sequencer_enabled: yupBoolean().defined(),
poller_enabled: yupBoolean().defined(),
sync_engine_enabled: yupBoolean().defined(),
}).defined(),
});
const fuseboxRequestSchema = yupObject({
auth: yupObject({
type: adminAuthTypeSchema,
tenancy: adaptSchema,
}).defined(),
body: yupObject({
sequencer_enabled: yupBoolean().defined(),
poller_enabled: yupBoolean().defined(),
sync_engine_enabled: yupBoolean().defined(),
}).defined(),
method: yupString().oneOf(["POST"]).defined(),
});
const fuseboxGetRequestSchema = yupObject({
auth: yupObject({
type: adminAuthTypeSchema,
tenancy: adaptSchema,
}).defined(),
method: yupString().oneOf(["GET"]).defined(),
});
function ensureInternalProject(projectId: string) {
if (projectId !== "internal") {
throw new KnownErrors.ExpectedInternalProject();
}
}
export const GET = createSmartRouteHandler({
metadata: {
summary: "Get external DB sync fusebox settings",
description: "Returns enablement flags for the external DB sync pipeline.",
tags: ["External DB Sync"],
hidden: true,
},
request: fuseboxGetRequestSchema,
response: fuseboxResponseSchema,
handler: async ({ auth }) => {
ensureInternalProject(auth.tenancy.project.id);
const fusebox = await getExternalDbSyncFusebox();
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
sequencer_enabled: fusebox.sequencerEnabled,
poller_enabled: fusebox.pollerEnabled,
sync_engine_enabled: fusebox.syncEngineEnabled,
},
};
},
});
export const POST = createSmartRouteHandler({
metadata: {
summary: "Update external DB sync fusebox settings",
description: "Updates enablement flags for the external DB sync pipeline.",
tags: ["External DB Sync"],
hidden: true,
},
request: fuseboxRequestSchema,
response: fuseboxResponseSchema,
handler: async ({ auth, body }) => {
ensureInternalProject(auth.tenancy.project.id);
const fusebox = await updateExternalDbSyncFusebox({
sequencerEnabled: body.sequencer_enabled,
pollerEnabled: body.poller_enabled,
syncEngineEnabled: body.sync_engine_enabled,
});
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
sequencer_enabled: fusebox.sequencerEnabled,
poller_enabled: fusebox.pollerEnabled,
sync_engine_enabled: fusebox.syncEngineEnabled,
},
};
},
});

View File

@ -13,6 +13,7 @@ import {
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
@ -161,13 +162,13 @@ export const GET = createSmartRouteHandler({
// In dev/test, QStash runs in Docker so "localhost" won't work.
// Replace with "host.docker.internal" to reach the host machine.
if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
const url = new URL(fullUrl);
if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
url.hostname = "host.docker.internal";
fullUrl = url.toString();
}
}
// if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
// const url = new URL(fullUrl);
// if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
// url.hostname = "host.docker.internal";
// fullUrl = url.toString();
// }
// }
const flowControl = options.flowControl as UpstashRequest["flowControl"];
@ -208,6 +209,11 @@ export const GET = createSmartRouteHandler({
}
while (performance.now() - startTime < maxDurationMs) {
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.pollerEnabled) {
break;
}
const pendingRequests = await claimPendingRequests();
if (stopWhenIdle && pendingRequests.length === 0) {

View File

@ -11,6 +11,7 @@ import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE";
@ -177,6 +178,11 @@ export const GET = createSmartRouteHandler({
let iterations = 0;
while (performance.now() - startTime < maxDurationMs) {
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.sequencerEnabled) {
break;
}
try {
const didUpdate = await backfillSequenceIds(batchSize);
if (stopWhenIdle && !didUpdate) {

View File

@ -3,6 +3,7 @@ import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue";
import { getTenancy } from "@/lib/tenancies";
import { ensureUpstashSignature } from "@/lib/upstash";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
@ -29,6 +30,14 @@ export const POST = createSmartRouteHandler({
handler: async ({ body }, fullReq) => {
await ensureUpstashSignature(fullReq);
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.syncEngineEnabled) {
return {
statusCode: 200,
bodyType: "success",
};
}
const { tenancyId } = body;
const tenancy = await getTenancy(tenancyId);

View File

@ -0,0 +1,34 @@
import { BooleanTrue } from "@/generated/prisma/client";
import { globalPrismaClient } from "@/prisma-client";
export type ExternalDbSyncFusebox = {
sequencerEnabled: boolean,
pollerEnabled: boolean,
syncEngineEnabled: boolean,
};
const fuseboxSelect = {
sequencerEnabled: true,
pollerEnabled: true,
syncEngineEnabled: true,
};
export async function getExternalDbSyncFusebox(): Promise<ExternalDbSyncFusebox> {
return await globalPrismaClient.externalDbSyncMetadata.upsert({
where: { singleton: BooleanTrue.TRUE },
create: { singleton: BooleanTrue.TRUE },
update: {},
select: fuseboxSelect,
});
}
export async function updateExternalDbSyncFusebox(
updates: ExternalDbSyncFusebox,
): Promise<ExternalDbSyncFusebox> {
return await globalPrismaClient.externalDbSyncMetadata.upsert({
where: { singleton: BooleanTrue.TRUE },
create: { singleton: BooleanTrue.TRUE, ...updates },
update: updates,
select: fuseboxSelect,
});
}

View File

@ -25,9 +25,9 @@ export async function ensureUpstashSignature(fullReq: SmartRequest): Promise<voi
}
const url = new URL(fullReq.url);
if ((nodeEnv.includes("development") || nodeEnv.includes("test")) && url.hostname === "localhost") {
url.hostname = "host.docker.internal";
}
// if ((nodeEnv.includes("development") || nodeEnv.includes("test")) && url.hostname === "localhost") {
// url.hostname = "host.docker.internal";
// }
const isValid = await upstashReceiver.verify({
signature: upstashSignature,

View File

@ -22,6 +22,7 @@ import {
} from "@/components/ui";
import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { runAsynchronously, runAsynchronouslyWithAlert } from "@stackframe/stack-shared/dist/utils/promises";
import { urlString } from "@stackframe/stack-shared/dist/utils/urls";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { notFound } from "next/navigation";
@ -123,6 +124,19 @@ type ExternalDbSyncStatus = {
},
};
type ExternalDbSyncFusebox = {
sequencerEnabled: boolean,
pollerEnabled: boolean,
syncEngineEnabled: boolean,
};
type ExternalDbSyncFuseboxResponse = {
ok: true,
sequencer_enabled: boolean,
poller_enabled: boolean,
sync_engine_enabled: boolean,
};
type AdminAppInternals = {
sendRequest: (path: string, requestOptions: RequestInit, requestType?: "client" | "server" | "admin") => Promise<Response>,
};
@ -208,9 +222,12 @@ function DataDate(props: { value: number | null | undefined, loading: boolean })
export default function PageClient() {
const adminApp = useAdminApp() as AdminAppWithInternals;
const [status, setStatus] = useState<ExternalDbSyncStatus | null>(null);
const [fusebox, setFusebox] = useState<ExternalDbSyncFusebox | null>(null);
const [savedFusebox, setSavedFusebox] = useState<ExternalDbSyncFusebox | null>(null);
const [error, setError] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
const [autoRefresh, setAutoRefresh] = useState(true);
const [savingFusebox, setSavingFusebox] = useState(false);
const inFlightRef = useRef(false);
const summarySamplesRef = useRef<Array<{
timestampMillis: number,
@ -252,6 +269,79 @@ export default function PageClient() {
inFlightRef.current = false;
}, [adminApp]);
const loadFusebox = useCallback(async () => {
const result = await Result.fromPromise((async () => {
const response = await adminApp[stackAppInternalsSymbol].sendRequest(
urlString`/internal/external-db-sync/fusebox`,
{ method: "GET" },
"admin",
);
const body = await response.json();
if (!response.ok) {
const message = typeof body?.error === "string" ? body.error : "Failed to load external DB sync fusebox.";
throw new Error(message);
}
return body as ExternalDbSyncFuseboxResponse;
})());
if (result.status === "error") {
const message = result.error instanceof Error ? result.error.message : String(result.error);
setError(message);
return;
}
const nextFusebox = {
sequencerEnabled: result.data.sequencer_enabled,
pollerEnabled: result.data.poller_enabled,
syncEngineEnabled: result.data.sync_engine_enabled,
};
setFusebox(nextFusebox);
setSavedFusebox(nextFusebox);
setError(null);
}, [adminApp]);
const saveFusebox = useCallback(async () => {
if (!fusebox) return;
setSavingFusebox(true);
const result = await Result.fromPromise((async () => {
const response = await adminApp[stackAppInternalsSymbol].sendRequest(
urlString`/internal/external-db-sync/fusebox`,
{
method: "POST",
body: JSON.stringify({
sequencer_enabled: fusebox.sequencerEnabled,
poller_enabled: fusebox.pollerEnabled,
sync_engine_enabled: fusebox.syncEngineEnabled,
}),
headers: { "content-type": "application/json" },
},
"admin",
);
const body = await response.json();
if (!response.ok) {
const message = typeof body?.error === "string" ? body.error : "Failed to update external DB sync fusebox.";
throw new Error(message);
}
return body as ExternalDbSyncFuseboxResponse;
})());
setSavingFusebox(false);
if (result.status === "error") {
const message = result.error instanceof Error ? result.error.message : String(result.error);
setError(message);
return;
}
const nextFusebox = {
sequencerEnabled: result.data.sequencer_enabled,
pollerEnabled: result.data.poller_enabled,
syncEngineEnabled: result.data.sync_engine_enabled,
};
setFusebox(nextFusebox);
setSavedFusebox(nextFusebox);
setError(null);
}, [adminApp, fusebox]);
const refreshWithAlert = useCallback(() => {
runAsynchronouslyWithAlert(loadStatus);
}, [loadStatus]);
@ -260,6 +350,10 @@ export default function PageClient() {
runAsynchronously(loadStatus);
}, [loadStatus]);
useEffect(() => {
runAsynchronously(loadFusebox);
}, [loadFusebox]);
useEffect(() => {
if (!autoRefresh) return undefined;
const interval = setInterval(() => {
@ -342,6 +436,12 @@ export default function PageClient() {
const globalStatus = status?.global ?? null;
const deletedRowsByTable = status?.sequencer.deleted_rows.by_table ?? [];
const mappingRows = status?.sync_engine.mappings ?? [];
const fuseboxDirty = useMemo(() => {
if (!fusebox || !savedFusebox) return false;
return fusebox.sequencerEnabled !== savedFusebox.sequencerEnabled
|| fusebox.pollerEnabled !== savedFusebox.pollerEnabled
|| fusebox.syncEngineEnabled !== savedFusebox.syncEngineEnabled;
}, [fusebox, savedFusebox]);
if (adminApp.projectId !== "internal") {
return notFound();
@ -377,6 +477,7 @@ export default function PageClient() {
<span>Last updated: {status ? formatMillis(status.generated_at_millis) : "—"}</span>
</div>
<div className="grid gap-4 md:grid-cols-3">
<Card>
<CardHeader>
@ -587,6 +688,60 @@ export default function PageClient() {
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle>Fusebox</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
{!fusebox ? (
<div className="space-y-3">
<Skeleton className="h-6 w-40" />
<Skeleton className="h-6 w-40" />
<Skeleton className="h-6 w-48" />
<Skeleton className="h-9 w-24" />
</div>
) : (
<>
<div className="flex items-center justify-between gap-6">
<div>
<Typography type="p" className="text-sm font-medium">Sequencer</Typography>
<Typography type="p" className="text-xs text-muted-foreground">Assigns sequence IDs and queues sync work.</Typography>
</div>
<Switch
checked={fusebox.sequencerEnabled}
onCheckedChange={(checked) => setFusebox((current) => current ? { ...current, sequencerEnabled: checked } : current)}
/>
</div>
<div className="flex items-center justify-between gap-6">
<div>
<Typography type="p" className="text-sm font-medium">Poller</Typography>
<Typography type="p" className="text-xs text-muted-foreground">Dispatches queued sync jobs to QStash.</Typography>
</div>
<Switch
checked={fusebox.pollerEnabled}
onCheckedChange={(checked) => setFusebox((current) => current ? { ...current, pollerEnabled: checked } : current)}
/>
</div>
<div className="flex items-center justify-between gap-6">
<div>
<Typography type="p" className="text-sm font-medium">Sync engine</Typography>
<Typography type="p" className="text-xs text-muted-foreground">Processes mapping batches for tenants.</Typography>
</div>
<Switch
checked={fusebox.syncEngineEnabled}
onCheckedChange={(checked) => setFusebox((current) => current ? { ...current, syncEngineEnabled: checked } : current)}
/>
</div>
<div className="flex justify-end">
<Button onClick={saveFusebox} disabled={!fuseboxDirty || savingFusebox} loading={savingFusebox}>
Save
</Button>
</div>
</>
)}
</CardContent>
</Card>
</PageLayout>
);
}

View File

@ -449,55 +449,44 @@ describe.sequential('External DB Sync - Basic Tests', () => {
expect(seq2).toBeGreaterThan(seq1);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project with external DB sync enabled.
* - Calls the internal status endpoint and validates the response shape.
* - Reads the external DB sync fusebox settings.
* - Writes the same values back to confirm the update endpoint.
*
* Why it matters:
* - Confirms the dashboard status API exposes sequencer, poller, and sync-engine metrics.
* - Ensures internal fusebox controls are reachable and validated.
*/
test('Status endpoint exposes sync pipeline metrics', async () => {
const dbName = 'status_endpoint_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
display_name: '📈 External DB Sync Status',
description: 'Validating sync status endpoint shape',
});
const response = await niceBackendFetch('/api/latest/internal/external-db-sync/status', {
test('Fusebox endpoint returns and accepts enablement flags', async () => {
const getResponse = await niceBackendFetch('/api/latest/internal/external-db-sync/fusebox', {
accessType: 'admin',
});
expect(response.status).toBe(200);
expect(response.body).toMatchObject({
expect(getResponse.status).toBe(200);
expect(getResponse.body).toMatchObject({
ok: true,
tenancy: {
id: expect.any(String),
project_id: expect.any(String),
branch_id: expect.any(String),
},
sequencer: {
project_users: expect.any(Object),
contact_channels: expect.any(Object),
deleted_rows: expect.any(Object),
},
poller: {
total: expect.any(String),
pending: expect.any(String),
in_flight: expect.any(String),
stale: expect.any(String),
},
sync_engine: {
mappings: expect.any(Array),
external_databases: expect.any(Array),
sequencer_enabled: expect.any(Boolean),
poller_enabled: expect.any(Boolean),
sync_engine_enabled: expect.any(Boolean),
});
const postResponse = await niceBackendFetch('/api/latest/internal/external-db-sync/fusebox', {
accessType: 'admin',
method: 'POST',
body: {
sequencer_enabled: getResponse.body.sequencer_enabled,
poller_enabled: getResponse.body.poller_enabled,
sync_engine_enabled: getResponse.body.sync_engine_enabled,
},
});
expect(postResponse.status).toBe(200);
expect(postResponse.body).toMatchObject({
ok: true,
sequencer_enabled: getResponse.body.sequencer_enabled,
poller_enabled: getResponse.body.poller_enabled,
sync_engine_enabled: getResponse.body.sync_engine_enabled,
});
}, TEST_TIMEOUT);
});