add tracing

This commit is contained in:
Bilal Godil 2026-02-03 18:13:19 -08:00
parent d34a2c7fa4
commit c91998ef73
6 changed files with 461 additions and 317 deletions

View File

@ -94,7 +94,7 @@ ALTER TABLE "ContactChannel" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NUL
ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE;
-- SPLIT_STATEMENT_SENTINEL
-- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates
-- Creates indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates
-- and support ORDER BY tenancyId for less fragmented updates.
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId");

View File

@ -14,6 +14,7 @@ import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dis
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { traceSpan } from "@/utils/telemetry";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
@ -90,148 +91,205 @@ export const GET = createSmartRouteHandler({
throw new StatusError(401, "Unauthorized");
}
const startTime = performance.now();
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
const pollIntervalMs = 50;
const staleClaimIntervalMinutes = 5;
const pollerClaimLimit = getPollerClaimLimit();
return await traceSpan("external-db-sync.poller", async (span) => {
const startTime = performance.now();
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
const pollIntervalMs = 50;
const staleClaimIntervalMinutes = 5;
const pollerClaimLimit = getPollerClaimLimit();
let totalRequestsProcessed = 0;
async function claimPendingRequests(): Promise<OutgoingRequest[]> {
return await globalPrismaClient.$queryRaw<OutgoingRequest[]>`
UPDATE "OutgoingRequest"
SET "startedFulfillingAt" = NOW()
WHERE "id" IN (
SELECT id
FROM "OutgoingRequest"
WHERE "startedFulfillingAt" IS NULL
ORDER BY "createdAt"
LIMIT ${pollerClaimLimit}
FOR UPDATE SKIP LOCKED
)
RETURNING *;
`;
}
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
span.setAttribute("stack.external-db-sync.stop-when-idle", stopWhenIdle);
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit);
span.setAttribute("stack.external-db-sync.direct-sync", directSyncEnabled());
span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes);
async function deleteOutgoingRequest(id: string): Promise<void> {
await retryTransaction(globalPrismaClient, async (tx) => {
await tx.outgoingRequest.delete({ where: { id } });
});
}
let totalRequestsProcessed = 0;
let iterationCount = 0;
async function deleteOutgoingRequests(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await retryTransaction(globalPrismaClient, async (tx) => {
await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } });
});
}
async function processRequest(request: OutgoingRequest): Promise<void> {
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
const options = request.qstashOptions as any;
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
async function claimPendingRequests(): Promise<OutgoingRequest[]> {
return await traceSpan("external-db-sync.poller.claimPendingRequests", async (claimSpan) => {
const requests = await globalPrismaClient.$queryRaw<OutgoingRequest[]>`
UPDATE "OutgoingRequest"
SET "startedFulfillingAt" = NOW()
WHERE "id" IN (
SELECT id
FROM "OutgoingRequest"
WHERE "startedFulfillingAt" IS NULL
ORDER BY "createdAt"
LIMIT ${pollerClaimLimit}
FOR UPDATE SKIP LOCKED
)
RETURNING *;
`;
claimSpan.setAttribute("stack.external-db-sync.claimed-count", requests.length);
return requests;
});
}
let fullUrl = new URL(options.url, baseUrl).toString();
async function deleteOutgoingRequest(id: string): Promise<void> {
await retryTransaction(globalPrismaClient, async (tx) => {
await tx.outgoingRequest.delete({ where: { id } });
});
}
// In dev/test, QStash runs in Docker so "localhost" won't work.
// Replace with "host.docker.internal" to reach the host machine.
// if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
// const url = new URL(fullUrl);
// if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
// url.hostname = "host.docker.internal";
// fullUrl = url.toString();
// }
// }
async function deleteOutgoingRequests(ids: string[]): Promise<void> {
if (ids.length === 0) return;
await retryTransaction(globalPrismaClient, async (tx) => {
await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } });
});
}
async function processRequest(request: OutgoingRequest): Promise<void> {
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
const options = request.qstashOptions as any;
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
await upstash.publishJSON({
url: fullUrl,
body: options.body,
flowControl: options.flowControl,
});
await deleteOutgoingRequest(request.id);
}
let fullUrl = new URL(options.url, baseUrl).toString();
type UpstashRequest = PublishBatchRequest<unknown>;
function buildUpstashRequest(request: OutgoingRequest): UpstashRequest {
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
const options = request.qstashOptions as any;
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
let fullUrl = new URL(options.url, baseUrl).toString();
// In dev/test, QStash runs in Docker so "localhost" won't work.
// Replace with "host.docker.internal" to reach the host machine.
// if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
// const url = new URL(fullUrl);
// if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
// url.hostname = "host.docker.internal";
// fullUrl = url.toString();
// }
// }
const flowControl = options.flowControl as UpstashRequest["flowControl"];
return {
url: fullUrl,
body: options.body,
...(flowControl ? { flowControl } : {}),
};
}
async function processRequests(requests: OutgoingRequest[]): Promise<number> {
let processed = 0;
if (directSyncEnabled()) {
for (const request of requests) {
try {
await processRequest(request);
processed++;
} catch (error) {
captureError("poller-iteration-error", error);
// In dev/test, QStash runs in Docker so "localhost" won't work.
// Replace with "host.docker.internal" to reach the host machine.
if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
const url = new URL(fullUrl);
if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
url.hostname = "host.docker.internal";
fullUrl = url.toString();
}
}
return processed;
await upstash.publishJSON({
url: fullUrl,
body: options.body,
flowControl: options.flowControl,
});
await deleteOutgoingRequest(request.id);
}
if (requests.length === 0) return 0;
type UpstashRequest = PublishBatchRequest<unknown>;
try {
const batchPayload = requests.map(buildUpstashRequest);
console.log("publishing to QStash batch", { count: batchPayload.length });
await upstash.batchJSON(batchPayload);
await deleteOutgoingRequests(requests.map((request) => request.id));
return requests.length;
} catch (error) {
captureError("poller-iteration-error", error);
return 0;
}
}
function buildUpstashRequest(request: OutgoingRequest): UpstashRequest {
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
const options = request.qstashOptions as any;
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
while (performance.now() - startTime < maxDurationMs) {
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.pollerEnabled) {
break;
let fullUrl = new URL(options.url, baseUrl).toString();
// In dev/test, QStash runs in Docker so "localhost" won't work.
// Replace with "host.docker.internal" to reach the host machine.
if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
const url = new URL(fullUrl);
if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
url.hostname = "host.docker.internal";
fullUrl = url.toString();
}
}
const flowControl = options.flowControl as UpstashRequest["flowControl"];
return {
url: fullUrl,
body: options.body,
...(flowControl ? { flowControl } : {}),
};
}
const pendingRequests = await claimPendingRequests();
async function processRequests(requests: OutgoingRequest[]): Promise<number> {
return await traceSpan({
description: "external-db-sync.poller.processRequests",
attributes: {
"stack.external-db-sync.pending-count": requests.length,
"stack.external-db-sync.direct-sync": directSyncEnabled(),
},
}, async (processSpan) => {
let processed = 0;
if (stopWhenIdle && pendingRequests.length === 0) {
break;
if (directSyncEnabled()) {
for (const request of requests) {
try {
await processRequest(request);
processed++;
} catch (error) {
processSpan.setAttribute("stack.external-db-sync.iteration-error", true);
captureError("poller-iteration-error", error);
}
}
processSpan.setAttribute("stack.external-db-sync.processed-count", processed);
return processed;
}
if (requests.length === 0) {
processSpan.setAttribute("stack.external-db-sync.processed-count", 0);
return 0;
}
try {
const batchPayload = requests.map(buildUpstashRequest);
console.log("publishing to QStash batch", { count: batchPayload.length });
await upstash.batchJSON(batchPayload);
await deleteOutgoingRequests(requests.map((request) => request.id));
processSpan.setAttribute("stack.external-db-sync.processed-count", requests.length);
return requests.length;
} catch (error) {
processSpan.setAttribute("stack.external-db-sync.iteration-error", true);
captureError("poller-iteration-error", error);
processSpan.setAttribute("stack.external-db-sync.processed-count", 0);
return 0;
}
});
}
totalRequestsProcessed += await processRequests(pendingRequests);
type PollerIterationResult = {
stopReason: "disabled" | "idle" | null,
processed: number,
};
await wait(pollIntervalMs);
}
while (performance.now() - startTime < maxDurationMs) {
const iterationResult = await traceSpan<PollerIterationResult>({
description: "external-db-sync.poller.iteration",
attributes: {
"stack.external-db-sync.iteration": iterationCount + 1,
},
}, async (iterationSpan) => {
const fusebox = await getExternalDbSyncFusebox();
iterationSpan.setAttribute("stack.external-db-sync.poller-enabled", fusebox.pollerEnabled);
if (!fusebox.pollerEnabled) {
return { stopReason: "disabled", processed: 0 };
}
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
requests_processed: totalRequestsProcessed,
},
};
const pendingRequests = await claimPendingRequests();
iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length);
if (stopWhenIdle && pendingRequests.length === 0) {
return { stopReason: "idle", processed: 0 };
}
const processed = await processRequests(pendingRequests);
iterationSpan.setAttribute("stack.external-db-sync.processed-count", processed);
return { stopReason: null, processed };
});
if (iterationResult.stopReason) {
break;
}
iterationCount++;
totalRequestsProcessed += iterationResult.processed;
await wait(pollIntervalMs);
}
span.setAttribute("stack.external-db-sync.requests-processed", totalRequestsProcessed);
span.setAttribute("stack.external-db-sync.iterations", iterationCount);
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
requests_processed: totalRequestsProcessed,
},
};
});
},
});

View File

@ -12,6 +12,7 @@ import { captureError, StackAssertionError, StatusError } from "@stackframe/stac
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { traceSpan } from "@/utils/telemetry";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE";
@ -49,87 +50,103 @@ function getSequencerBatchSize(): number {
// Assigns sequence IDs to rows that need them and queues sync requests for affected tenants.
// Processes up to batchSize rows at a time from each table.
async function backfillSequenceIds(batchSize: number): Promise<boolean> {
let didUpdate = false;
const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId"
FROM "ProjectUser"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ProjectUser" pu
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE pu."tenancyId" = r."tenancyId"
AND pu."projectUserId" = r."projectUserId"
RETURNING pu."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
return await traceSpan({
description: "external-db-sync.sequencer.backfill",
attributes: {
"stack.external-db-sync.batch-size": batchSize,
},
}, async (span) => {
let didUpdate = false;
// Enqueue sync for all affected tenants in a single batch query
if (projectUserTenants.length > 0) {
await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId));
didUpdate = true;
}
const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId"
FROM "ProjectUser"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ProjectUser" pu
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE pu."tenancyId" = r."tenancyId"
AND pu."projectUserId" = r."projectUserId"
RETURNING pu."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId", "id"
FROM "ContactChannel"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ContactChannel" cc
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE cc."tenancyId" = r."tenancyId"
AND cc."projectUserId" = r."projectUserId"
AND cc."id" = r."id"
RETURNING cc."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.project-user-tenants", projectUserTenants.length);
if (contactChannelTenants.length > 0) {
await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId));
didUpdate = true;
}
// Enqueue sync for all affected tenants in a single batch query
if (projectUserTenants.length > 0) {
await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId));
didUpdate = true;
}
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
FROM "DeletedRow"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "DeletedRow" dr
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE dr."id" = r."id"
RETURNING dr."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId", "id"
FROM "ContactChannel"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ContactChannel" cc
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE cc."tenancyId" = r."tenancyId"
AND cc."projectUserId" = r."projectUserId"
AND cc."id" = r."id"
RETURNING cc."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
if (deletedRowTenants.length > 0) {
await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId));
didUpdate = true;
}
span.setAttribute("stack.external-db-sync.contact-channel-tenants", contactChannelTenants.length);
return didUpdate;
if (contactChannelTenants.length > 0) {
await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId));
didUpdate = true;
}
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id", "tenancyId"
FROM "DeletedRow"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "DeletedRow" dr
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE dr."id" = r."id"
RETURNING dr."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.deleted-row-tenants", deletedRowTenants.length);
if (deletedRowTenants.length > 0) {
await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId));
didUpdate = true;
}
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
return didUpdate;
});
}
// TODO: If we ever need to support non-hosted source-of-truth tenancies again,
@ -169,43 +186,72 @@ export const GET = createSmartRouteHandler({
throw new StatusError(401, "Unauthorized");
}
const startTime = performance.now();
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
const pollIntervalMs = 50;
const batchSize = getSequencerBatchSize();
return await traceSpan("external-db-sync.sequencer", async (span) => {
const startTime = performance.now();
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
const pollIntervalMs = 50;
const batchSize = getSequencerBatchSize();
let iterations = 0;
span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs);
span.setAttribute("stack.external-db-sync.stop-when-idle", stopWhenIdle);
span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs);
span.setAttribute("stack.external-db-sync.batch-size", batchSize);
while (performance.now() - startTime < maxDurationMs) {
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.sequencerEnabled) {
break;
}
let iterations = 0;
try {
const didUpdate = await backfillSequenceIds(batchSize);
if (stopWhenIdle && !didUpdate) {
type SequencerIterationResult = {
stopReason: "disabled" | "idle" | null,
};
while (performance.now() - startTime < maxDurationMs) {
const iterationResult = await traceSpan<SequencerIterationResult>({
description: "external-db-sync.sequencer.iteration",
attributes: {
"stack.external-db-sync.iteration": iterations + 1,
},
}, async (iterationSpan) => {
const fusebox = await getExternalDbSyncFusebox();
iterationSpan.setAttribute("stack.external-db-sync.sequencer-enabled", fusebox.sequencerEnabled);
if (!fusebox.sequencerEnabled) {
return { stopReason: "disabled" };
}
try {
const didUpdate = await backfillSequenceIds(batchSize);
iterationSpan.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (stopWhenIdle && !didUpdate) {
return { stopReason: "idle" };
}
} catch (error) {
iterationSpan.setAttribute("stack.external-db-sync.iteration-error", true);
captureError(
`sequencer-iteration-error`,
error,
);
}
return { stopReason: null };
});
if (iterationResult.stopReason) {
break;
}
} catch (error) {
captureError(
`sequencer-iteration-error`,
error,
);
iterations++;
await wait(pollIntervalMs);
}
iterations++;
await wait(pollIntervalMs);
}
span.setAttribute("stack.external-db-sync.iterations", iterations);
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
iterations,
},
};
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
iterations,
},
};
});
},
});

View File

@ -15,6 +15,7 @@ import { errorToNiceString, StackAssertionError, throwErr } from "@stackframe/st
import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { Client } from "pg";
import { KnownErrors } from "@stackframe/stack-shared";
import { traceSpan } from "@/utils/telemetry";
const STALE_CLAIM_INTERVAL_MINUTES = 5;
@ -718,79 +719,99 @@ export const GET = createSmartRouteHandler({
}),
response: responseSchema,
handler: async ({ auth, query }) => {
if (auth.tenancy.project.id !== "internal") {
throw new KnownErrors.ExpectedInternalProject();
}
const tenancyId = auth.tenancy.id;
return await traceSpan({
description: "external-db-sync.status",
attributes: {
"stack.external-db-sync.scope": query.scope,
"stack.external-db-sync.tenancy-id": auth.tenancy.id,
},
}, async (span) => {
if (auth.tenancy.project.id !== "internal") {
throw new KnownErrors.ExpectedInternalProject();
}
const tenancyId = auth.tenancy.id;
const shouldIncludeGlobal = query.scope === "all";
const currentStats = shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId);
const globalStats = shouldIncludeGlobal ? currentStats : null;
const globalTenanciesCount = shouldIncludeGlobal
? (await globalPrismaClient.$queryRaw<CountRow[]>`
SELECT COUNT(*)::bigint AS "total"
FROM "Tenancy"
`).at(0) ?? throwErr("Tenancy count query returned no rows.")
: null;
const globalDbSyncCount = shouldIncludeGlobal
? (await globalPrismaClient.$queryRaw<CountRow[]>`
SELECT COUNT(*)::bigint AS "total"
FROM "EnvironmentConfigOverride"
WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL
`).at(0) ?? throwErr("DB sync config count query returned no rows.")
: null;
const shouldIncludeGlobal = query.scope === "all";
span.setAttribute("stack.external-db-sync.include-global", shouldIncludeGlobal);
const externalDbStatuses = shouldIncludeGlobal
? []
: await Promise.all(
Object.entries(
auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"],
).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)),
);
const currentStats = await traceSpan({
description: "external-db-sync.status.fetchInternalStats",
attributes: {
"stack.external-db-sync.scope": shouldIncludeGlobal ? "all" : "tenancy",
},
}, async () => shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId));
const outgoingStats = currentStats.outgoingStatsRow;
const globalStats = shouldIncludeGlobal ? currentStats : null;
const globalTenanciesCount = shouldIncludeGlobal
? (await globalPrismaClient.$queryRaw<CountRow[]>`
SELECT COUNT(*)::bigint AS "total"
FROM "Tenancy"
`).at(0) ?? throwErr("Tenancy count query returned no rows.")
: null;
const globalDbSyncCount = shouldIncludeGlobal
? (await globalPrismaClient.$queryRaw<CountRow[]>`
SELECT COUNT(*)::bigint AS "total"
FROM "EnvironmentConfigOverride"
WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL
`).at(0) ?? throwErr("DB sync config count query returned no rows.")
: null;
return {
statusCode: 200 as const,
bodyType: "json" as const,
body: {
ok: true,
generated_at_millis: Date.now(),
global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? {
tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"),
tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"),
const externalDbStatuses = shouldIncludeGlobal
? []
: await traceSpan("external-db-sync.status.fetchExternalDatabaseStatuses", async (externalSpan) => {
const statuses = await Promise.all(
Object.entries(
auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"],
).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)),
);
externalSpan.setAttribute("stack.external-db-sync.external-db-count", statuses.length);
return statuses;
});
const outgoingStats = currentStats.outgoingStatsRow;
return {
statusCode: 200 as const,
bodyType: "json" as const,
body: {
ok: true,
generated_at_millis: Date.now(),
global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? {
tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"),
tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"),
sequencer: {
project_users: globalStats.projectUsersStats,
contact_channels: globalStats.contactChannelStats,
deleted_rows: {
...globalStats.deletedRowStats,
by_table: globalStats.deletedRowsByTable,
},
},
poller: formatPollerStats(globalStats.outgoingStatsRow),
sync_engine: {
mappings: globalStats.mappings,
},
} : null,
tenancy: {
id: tenancyId,
project_id: auth.tenancy.project.id,
branch_id: auth.tenancy.branchId,
},
sequencer: {
project_users: globalStats.projectUsersStats,
contact_channels: globalStats.contactChannelStats,
project_users: currentStats.projectUsersStats,
contact_channels: currentStats.contactChannelStats,
deleted_rows: {
...globalStats.deletedRowStats,
by_table: globalStats.deletedRowsByTable,
...currentStats.deletedRowStats,
by_table: currentStats.deletedRowsByTable,
},
},
poller: formatPollerStats(globalStats.outgoingStatsRow),
poller: formatPollerStats(outgoingStats),
sync_engine: {
mappings: globalStats.mappings,
},
} : null,
tenancy: {
id: tenancyId,
project_id: auth.tenancy.project.id,
branch_id: auth.tenancy.branchId,
},
sequencer: {
project_users: currentStats.projectUsersStats,
contact_channels: currentStats.contactChannelStats,
deleted_rows: {
...currentStats.deletedRowStats,
by_table: currentStats.deletedRowsByTable,
mappings: currentStats.mappings,
external_databases: externalDbStatuses,
},
},
poller: formatPollerStats(outgoingStats),
sync_engine: {
mappings: currentStats.mappings,
external_databases: externalDbStatuses,
},
},
};
};
});
},
});

View File

@ -6,6 +6,7 @@ import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { traceSpan } from "@/utils/telemetry";
export const POST = createSmartRouteHandler({
metadata: {
@ -28,32 +29,50 @@ export const POST = createSmartRouteHandler({
bodyType: yupString().oneOf(["success"]).defined(),
}),
handler: async ({ body }, fullReq) => {
await ensureUpstashSignature(fullReq);
return await traceSpan({
description: "external-db-sync.sync-engine",
attributes: {
"stack.external-db-sync.tenancy-id": body.tenancyId,
},
}, async (span) => {
await ensureUpstashSignature(fullReq);
const fusebox = await getExternalDbSyncFusebox();
span.setAttribute("stack.external-db-sync.sync-engine-enabled", fusebox.syncEngineEnabled);
if (!fusebox.syncEngineEnabled) {
return {
statusCode: 200,
bodyType: "success",
};
}
const { tenancyId } = body;
const tenancy = await traceSpan("external-db-sync.sync-engine.loadTenancy", async (tenancySpan) => {
const foundTenancy = await getTenancy(tenancyId);
tenancySpan.setAttribute("stack.external-db-sync.tenancy-found", !!foundTenancy);
return foundTenancy;
});
if (!tenancy) {
console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`);
throw new StatusError(400, `Tenancy ${tenancyId} not found.`);
}
const needsResync = await traceSpan("external-db-sync.sync-engine.syncExternalDatabases", async (syncSpan) => {
const resync = await syncExternalDatabases(tenancy);
syncSpan.setAttribute("stack.external-db-sync.needs-resync", resync);
return resync;
});
if (needsResync) {
await traceSpan("external-db-sync.sync-engine.enqueueResync", async () => {
await enqueueExternalDbSync(tenancy.id);
});
}
const fusebox = await getExternalDbSyncFusebox();
if (!fusebox.syncEngineEnabled) {
return {
statusCode: 200,
bodyType: "success",
};
}
const { tenancyId } = body;
const tenancy = await getTenancy(tenancyId);
if (!tenancy) {
console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`);
throw new StatusError(400, `Tenancy ${tenancyId} not found.`);
}
const needsResync = await syncExternalDatabases(tenancy);
if (needsResync) {
await enqueueExternalDbSync(tenancy.id);
}
return {
statusCode: 200,
bodyType: "success",
};
});
},
});

View File

@ -25,9 +25,9 @@ export async function ensureUpstashSignature(fullReq: SmartRequest): Promise<voi
}
const url = new URL(fullReq.url);
// if ((nodeEnv.includes("development") || nodeEnv.includes("test")) && url.hostname === "localhost") {
// url.hostname = "host.docker.internal";
// }
if ((nodeEnv.includes("development") || nodeEnv.includes("test")) && url.hostname === "localhost") {
url.hostname = "host.docker.internal";
}
const isValid = await upstashReceiver.verify({
signature: upstashSignature,