From c91998ef73a28113e848968e59e156afafca62a9 Mon Sep 17 00:00:00 2001 From: Bilal Godil Date: Tue, 3 Feb 2026 18:13:19 -0800 Subject: [PATCH] add tracing --- .../migration.sql | 2 +- .../internal/external-db-sync/poller/route.ts | 304 +++++++++++------- .../external-db-sync/sequencer/route.ts | 256 +++++++++------ .../internal/external-db-sync/status/route.ts | 147 +++++---- .../external-db-sync/sync-engine/route.tsx | 63 ++-- apps/backend/src/lib/upstash.tsx | 6 +- 6 files changed, 461 insertions(+), 317 deletions(-) diff --git a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql index 7b24254d5..e90bcf245 100644 --- a/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql +++ b/apps/backend/prisma/migrations/20251125030551_external_db_sync/migration.sql @@ -94,7 +94,7 @@ ALTER TABLE "ContactChannel" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NUL ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE; -- SPLIT_STATEMENT_SENTINEL --- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates +-- Creates indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates -- and support ORDER BY tenancyId for less fragmented updates. CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index 1c3353077..9e06c176a 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -14,6 +14,7 @@ import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dis import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; +import { traceSpan } from "@/utils/telemetry"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT"; @@ -90,148 +91,205 @@ export const GET = createSmartRouteHandler({ throw new StatusError(401, "Unauthorized"); } - const startTime = performance.now(); - const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); - const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); - const pollIntervalMs = 50; - const staleClaimIntervalMinutes = 5; - const pollerClaimLimit = getPollerClaimLimit(); + return await traceSpan("external-db-sync.poller", async (span) => { + const startTime = performance.now(); + const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); + const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); + const pollIntervalMs = 50; + const staleClaimIntervalMinutes = 5; + const pollerClaimLimit = getPollerClaimLimit(); - let totalRequestsProcessed = 0; - async function claimPendingRequests(): Promise { - return await globalPrismaClient.$queryRaw` - UPDATE "OutgoingRequest" - SET "startedFulfillingAt" = NOW() - WHERE "id" IN ( - SELECT id - FROM "OutgoingRequest" - WHERE "startedFulfillingAt" IS NULL - ORDER BY "createdAt" - LIMIT ${pollerClaimLimit} - FOR UPDATE SKIP LOCKED - ) - RETURNING *; - `; - } + span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs); + span.setAttribute("stack.external-db-sync.stop-when-idle", stopWhenIdle); + span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs); + span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit); + span.setAttribute("stack.external-db-sync.direct-sync", directSyncEnabled()); + span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes); - async function deleteOutgoingRequest(id: string): Promise { - await retryTransaction(globalPrismaClient, async (tx) => { - await tx.outgoingRequest.delete({ where: { id } }); - }); - } + let totalRequestsProcessed = 0; + let iterationCount = 0; - async function deleteOutgoingRequests(ids: string[]): Promise { - if (ids.length === 0) return; - await retryTransaction(globalPrismaClient, async (tx) => { - await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } }); - }); - } - async function processRequest(request: OutgoingRequest): Promise { - // Prisma JsonValue doesn't carry a precise shape for this JSON blob. - const options = request.qstashOptions as any; - const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); + async function claimPendingRequests(): Promise { + return await traceSpan("external-db-sync.poller.claimPendingRequests", async (claimSpan) => { + const requests = await globalPrismaClient.$queryRaw` + UPDATE "OutgoingRequest" + SET "startedFulfillingAt" = NOW() + WHERE "id" IN ( + SELECT id + FROM "OutgoingRequest" + WHERE "startedFulfillingAt" IS NULL + ORDER BY "createdAt" + LIMIT ${pollerClaimLimit} + FOR UPDATE SKIP LOCKED + ) + RETURNING *; + `; + claimSpan.setAttribute("stack.external-db-sync.claimed-count", requests.length); + return requests; + }); + } - let fullUrl = new URL(options.url, baseUrl).toString(); + async function deleteOutgoingRequest(id: string): Promise { + await retryTransaction(globalPrismaClient, async (tx) => { + await tx.outgoingRequest.delete({ where: { id } }); + }); + } - // In dev/test, QStash runs in Docker so "localhost" won't work. - // Replace with "host.docker.internal" to reach the host machine. - // if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { - // const url = new URL(fullUrl); - // if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { - // url.hostname = "host.docker.internal"; - // fullUrl = url.toString(); - // } - // } + async function deleteOutgoingRequests(ids: string[]): Promise { + if (ids.length === 0) return; + await retryTransaction(globalPrismaClient, async (tx) => { + await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } }); + }); + } + async function processRequest(request: OutgoingRequest): Promise { + // Prisma JsonValue doesn't carry a precise shape for this JSON blob. + const options = request.qstashOptions as any; + const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); - await upstash.publishJSON({ - url: fullUrl, - body: options.body, - flowControl: options.flowControl, - }); - await deleteOutgoingRequest(request.id); - } + let fullUrl = new URL(options.url, baseUrl).toString(); - type UpstashRequest = PublishBatchRequest; - - function buildUpstashRequest(request: OutgoingRequest): UpstashRequest { - // Prisma JsonValue doesn't carry a precise shape for this JSON blob. - const options = request.qstashOptions as any; - const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); - - let fullUrl = new URL(options.url, baseUrl).toString(); - - // In dev/test, QStash runs in Docker so "localhost" won't work. - // Replace with "host.docker.internal" to reach the host machine. - // if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { - // const url = new URL(fullUrl); - // if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { - // url.hostname = "host.docker.internal"; - // fullUrl = url.toString(); - // } - // } - - const flowControl = options.flowControl as UpstashRequest["flowControl"]; - - return { - url: fullUrl, - body: options.body, - ...(flowControl ? { flowControl } : {}), - }; - } - - async function processRequests(requests: OutgoingRequest[]): Promise { - let processed = 0; - - if (directSyncEnabled()) { - for (const request of requests) { - try { - await processRequest(request); - processed++; - } catch (error) { - captureError("poller-iteration-error", error); + // In dev/test, QStash runs in Docker so "localhost" won't work. + // Replace with "host.docker.internal" to reach the host machine. + if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { + const url = new URL(fullUrl); + if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { + url.hostname = "host.docker.internal"; + fullUrl = url.toString(); } } - return processed; + + await upstash.publishJSON({ + url: fullUrl, + body: options.body, + flowControl: options.flowControl, + }); + await deleteOutgoingRequest(request.id); } - if (requests.length === 0) return 0; + type UpstashRequest = PublishBatchRequest; - try { - const batchPayload = requests.map(buildUpstashRequest); - console.log("publishing to QStash batch", { count: batchPayload.length }); - await upstash.batchJSON(batchPayload); - await deleteOutgoingRequests(requests.map((request) => request.id)); - return requests.length; - } catch (error) { - captureError("poller-iteration-error", error); - return 0; - } - } + function buildUpstashRequest(request: OutgoingRequest): UpstashRequest { + // Prisma JsonValue doesn't carry a precise shape for this JSON blob. + const options = request.qstashOptions as any; + const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); - while (performance.now() - startTime < maxDurationMs) { - const fusebox = await getExternalDbSyncFusebox(); - if (!fusebox.pollerEnabled) { - break; + let fullUrl = new URL(options.url, baseUrl).toString(); + + // In dev/test, QStash runs in Docker so "localhost" won't work. + // Replace with "host.docker.internal" to reach the host machine. + if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) { + const url = new URL(fullUrl); + if (url.hostname === "localhost" || url.hostname === "127.0.0.1") { + url.hostname = "host.docker.internal"; + fullUrl = url.toString(); + } + } + + const flowControl = options.flowControl as UpstashRequest["flowControl"]; + + return { + url: fullUrl, + body: options.body, + ...(flowControl ? { flowControl } : {}), + }; } - const pendingRequests = await claimPendingRequests(); + async function processRequests(requests: OutgoingRequest[]): Promise { + return await traceSpan({ + description: "external-db-sync.poller.processRequests", + attributes: { + "stack.external-db-sync.pending-count": requests.length, + "stack.external-db-sync.direct-sync": directSyncEnabled(), + }, + }, async (processSpan) => { + let processed = 0; - if (stopWhenIdle && pendingRequests.length === 0) { - break; + if (directSyncEnabled()) { + for (const request of requests) { + try { + await processRequest(request); + processed++; + } catch (error) { + processSpan.setAttribute("stack.external-db-sync.iteration-error", true); + captureError("poller-iteration-error", error); + } + } + processSpan.setAttribute("stack.external-db-sync.processed-count", processed); + return processed; + } + + if (requests.length === 0) { + processSpan.setAttribute("stack.external-db-sync.processed-count", 0); + return 0; + } + + try { + const batchPayload = requests.map(buildUpstashRequest); + console.log("publishing to QStash batch", { count: batchPayload.length }); + await upstash.batchJSON(batchPayload); + await deleteOutgoingRequests(requests.map((request) => request.id)); + processSpan.setAttribute("stack.external-db-sync.processed-count", requests.length); + return requests.length; + } catch (error) { + processSpan.setAttribute("stack.external-db-sync.iteration-error", true); + captureError("poller-iteration-error", error); + processSpan.setAttribute("stack.external-db-sync.processed-count", 0); + return 0; + } + }); } - totalRequestsProcessed += await processRequests(pendingRequests); + type PollerIterationResult = { + stopReason: "disabled" | "idle" | null, + processed: number, + }; - await wait(pollIntervalMs); - } + while (performance.now() - startTime < maxDurationMs) { + const iterationResult = await traceSpan({ + description: "external-db-sync.poller.iteration", + attributes: { + "stack.external-db-sync.iteration": iterationCount + 1, + }, + }, async (iterationSpan) => { + const fusebox = await getExternalDbSyncFusebox(); + iterationSpan.setAttribute("stack.external-db-sync.poller-enabled", fusebox.pollerEnabled); + if (!fusebox.pollerEnabled) { + return { stopReason: "disabled", processed: 0 }; + } - return { - statusCode: 200, - bodyType: "json" as const, - body: { - ok: true, - requests_processed: totalRequestsProcessed, - }, - }; + const pendingRequests = await claimPendingRequests(); + iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length); + + if (stopWhenIdle && pendingRequests.length === 0) { + return { stopReason: "idle", processed: 0 }; + } + + const processed = await processRequests(pendingRequests); + iterationSpan.setAttribute("stack.external-db-sync.processed-count", processed); + return { stopReason: null, processed }; + }); + + if (iterationResult.stopReason) { + break; + } + + iterationCount++; + totalRequestsProcessed += iterationResult.processed; + + await wait(pollIntervalMs); + } + + span.setAttribute("stack.external-db-sync.requests-processed", totalRequestsProcessed); + span.setAttribute("stack.external-db-sync.iterations", iterationCount); + + return { + statusCode: 200, + bodyType: "json" as const, + body: { + ok: true, + requests_processed: totalRequestsProcessed, + }, + }; + }); }, }); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index b2672d908..a59229e34 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -12,6 +12,7 @@ import { captureError, StackAssertionError, StatusError } from "@stackframe/stac import { wait } from "@stackframe/stack-shared/dist/utils/promises"; import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; +import { traceSpan } from "@/utils/telemetry"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE"; @@ -49,87 +50,103 @@ function getSequencerBatchSize(): number { // Assigns sequence IDs to rows that need them and queues sync requests for affected tenants. // Processes up to batchSize rows at a time from each table. async function backfillSequenceIds(batchSize: number): Promise { - let didUpdate = false; - const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "tenancyId", "projectUserId" - FROM "ProjectUser" - WHERE "shouldUpdateSequenceId" = TRUE - ORDER BY "tenancyId" - LIMIT ${batchSize} - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "ProjectUser" pu - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE pu."tenancyId" = r."tenancyId" - AND pu."projectUserId" = r."projectUserId" - RETURNING pu."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; + return await traceSpan({ + description: "external-db-sync.sequencer.backfill", + attributes: { + "stack.external-db-sync.batch-size": batchSize, + }, + }, async (span) => { + let didUpdate = false; - // Enqueue sync for all affected tenants in a single batch query - if (projectUserTenants.length > 0) { - await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId)); - didUpdate = true; - } + const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "projectUserId" + FROM "ProjectUser" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "ProjectUser" pu + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE pu."tenancyId" = r."tenancyId" + AND pu."projectUserId" = r."projectUserId" + RETURNING pu."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; - const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "tenancyId", "projectUserId", "id" - FROM "ContactChannel" - WHERE "shouldUpdateSequenceId" = TRUE - ORDER BY "tenancyId" - LIMIT ${batchSize} - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "ContactChannel" cc - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE cc."tenancyId" = r."tenancyId" - AND cc."projectUserId" = r."projectUserId" - AND cc."id" = r."id" - RETURNING cc."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; + span.setAttribute("stack.external-db-sync.project-user-tenants", projectUserTenants.length); - if (contactChannelTenants.length > 0) { - await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId)); - didUpdate = true; - } + // Enqueue sync for all affected tenants in a single batch query + if (projectUserTenants.length > 0) { + await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId)); + didUpdate = true; + } - const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` - WITH rows_to_update AS ( - SELECT "id", "tenancyId" - FROM "DeletedRow" - WHERE "shouldUpdateSequenceId" = TRUE - ORDER BY "tenancyId" - LIMIT ${batchSize} - FOR UPDATE SKIP LOCKED - ), - updated_rows AS ( - UPDATE "DeletedRow" dr - SET "sequenceId" = nextval('global_seq_id'), - "shouldUpdateSequenceId" = FALSE - FROM rows_to_update r - WHERE dr."id" = r."id" - RETURNING dr."tenancyId" - ) - SELECT DISTINCT "tenancyId" FROM updated_rows - `; + const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "projectUserId", "id" + FROM "ContactChannel" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "ContactChannel" cc + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE cc."tenancyId" = r."tenancyId" + AND cc."projectUserId" = r."projectUserId" + AND cc."id" = r."id" + RETURNING cc."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; - if (deletedRowTenants.length > 0) { - await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId)); - didUpdate = true; - } + span.setAttribute("stack.external-db-sync.contact-channel-tenants", contactChannelTenants.length); - return didUpdate; + if (contactChannelTenants.length > 0) { + await enqueueExternalDbSyncBatch(contactChannelTenants.map(t => t.tenancyId)); + didUpdate = true; + } + + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "id", "tenancyId" + FROM "DeletedRow" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "DeletedRow" dr + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE dr."id" = r."id" + RETURNING dr."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.deleted-row-tenants", deletedRowTenants.length); + + if (deletedRowTenants.length > 0) { + await enqueueExternalDbSyncBatch(deletedRowTenants.map(t => t.tenancyId)); + didUpdate = true; + } + + span.setAttribute("stack.external-db-sync.did-update", didUpdate); + + return didUpdate; + }); } // TODO: If we ever need to support non-hosted source-of-truth tenancies again, @@ -169,43 +186,72 @@ export const GET = createSmartRouteHandler({ throw new StatusError(401, "Unauthorized"); } - const startTime = performance.now(); - const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); - const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); - const pollIntervalMs = 50; - const batchSize = getSequencerBatchSize(); + return await traceSpan("external-db-sync.sequencer", async (span) => { + const startTime = performance.now(); + const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); + const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle); + const pollIntervalMs = 50; + const batchSize = getSequencerBatchSize(); - let iterations = 0; + span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs); + span.setAttribute("stack.external-db-sync.stop-when-idle", stopWhenIdle); + span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs); + span.setAttribute("stack.external-db-sync.batch-size", batchSize); - while (performance.now() - startTime < maxDurationMs) { - const fusebox = await getExternalDbSyncFusebox(); - if (!fusebox.sequencerEnabled) { - break; - } + let iterations = 0; - try { - const didUpdate = await backfillSequenceIds(batchSize); - if (stopWhenIdle && !didUpdate) { + type SequencerIterationResult = { + stopReason: "disabled" | "idle" | null, + }; + + while (performance.now() - startTime < maxDurationMs) { + const iterationResult = await traceSpan({ + description: "external-db-sync.sequencer.iteration", + attributes: { + "stack.external-db-sync.iteration": iterations + 1, + }, + }, async (iterationSpan) => { + const fusebox = await getExternalDbSyncFusebox(); + iterationSpan.setAttribute("stack.external-db-sync.sequencer-enabled", fusebox.sequencerEnabled); + if (!fusebox.sequencerEnabled) { + return { stopReason: "disabled" }; + } + + try { + const didUpdate = await backfillSequenceIds(batchSize); + iterationSpan.setAttribute("stack.external-db-sync.did-update", didUpdate); + if (stopWhenIdle && !didUpdate) { + return { stopReason: "idle" }; + } + } catch (error) { + iterationSpan.setAttribute("stack.external-db-sync.iteration-error", true); + captureError( + `sequencer-iteration-error`, + error, + ); + } + + return { stopReason: null }; + }); + + if (iterationResult.stopReason) { break; } - } catch (error) { - captureError( - `sequencer-iteration-error`, - error, - ); + + iterations++; + await wait(pollIntervalMs); } - iterations++; - await wait(pollIntervalMs); - } + span.setAttribute("stack.external-db-sync.iterations", iterations); - return { - statusCode: 200, - bodyType: "json" as const, - body: { - ok: true, - iterations, - }, - }; + return { + statusCode: 200, + bodyType: "json" as const, + body: { + ok: true, + iterations, + }, + }; + }); }, }); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts index 1b7086706..cf1c89156 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/status/route.ts @@ -15,6 +15,7 @@ import { errorToNiceString, StackAssertionError, throwErr } from "@stackframe/st import { Result } from "@stackframe/stack-shared/dist/utils/results"; import { Client } from "pg"; import { KnownErrors } from "@stackframe/stack-shared"; +import { traceSpan } from "@/utils/telemetry"; const STALE_CLAIM_INTERVAL_MINUTES = 5; @@ -718,79 +719,99 @@ export const GET = createSmartRouteHandler({ }), response: responseSchema, handler: async ({ auth, query }) => { - if (auth.tenancy.project.id !== "internal") { - throw new KnownErrors.ExpectedInternalProject(); - } - const tenancyId = auth.tenancy.id; + return await traceSpan({ + description: "external-db-sync.status", + attributes: { + "stack.external-db-sync.scope": query.scope, + "stack.external-db-sync.tenancy-id": auth.tenancy.id, + }, + }, async (span) => { + if (auth.tenancy.project.id !== "internal") { + throw new KnownErrors.ExpectedInternalProject(); + } + const tenancyId = auth.tenancy.id; - const shouldIncludeGlobal = query.scope === "all"; - const currentStats = shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId); - const globalStats = shouldIncludeGlobal ? currentStats : null; - const globalTenanciesCount = shouldIncludeGlobal - ? (await globalPrismaClient.$queryRaw` - SELECT COUNT(*)::bigint AS "total" - FROM "Tenancy" - `).at(0) ?? throwErr("Tenancy count query returned no rows.") - : null; - const globalDbSyncCount = shouldIncludeGlobal - ? (await globalPrismaClient.$queryRaw` - SELECT COUNT(*)::bigint AS "total" - FROM "EnvironmentConfigOverride" - WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL - `).at(0) ?? throwErr("DB sync config count query returned no rows.") - : null; + const shouldIncludeGlobal = query.scope === "all"; + span.setAttribute("stack.external-db-sync.include-global", shouldIncludeGlobal); - const externalDbStatuses = shouldIncludeGlobal - ? [] - : await Promise.all( - Object.entries( - auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"], - ).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)), - ); + const currentStats = await traceSpan({ + description: "external-db-sync.status.fetchInternalStats", + attributes: { + "stack.external-db-sync.scope": shouldIncludeGlobal ? "all" : "tenancy", + }, + }, async () => shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId)); - const outgoingStats = currentStats.outgoingStatsRow; + const globalStats = shouldIncludeGlobal ? currentStats : null; + const globalTenanciesCount = shouldIncludeGlobal + ? (await globalPrismaClient.$queryRaw` + SELECT COUNT(*)::bigint AS "total" + FROM "Tenancy" + `).at(0) ?? throwErr("Tenancy count query returned no rows.") + : null; + const globalDbSyncCount = shouldIncludeGlobal + ? (await globalPrismaClient.$queryRaw` + SELECT COUNT(*)::bigint AS "total" + FROM "EnvironmentConfigOverride" + WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL + `).at(0) ?? throwErr("DB sync config count query returned no rows.") + : null; - return { - statusCode: 200 as const, - bodyType: "json" as const, - body: { - ok: true, - generated_at_millis: Date.now(), - global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? { - tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"), - tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"), + const externalDbStatuses = shouldIncludeGlobal + ? [] + : await traceSpan("external-db-sync.status.fetchExternalDatabaseStatuses", async (externalSpan) => { + const statuses = await Promise.all( + Object.entries( + auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"], + ).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)), + ); + externalSpan.setAttribute("stack.external-db-sync.external-db-count", statuses.length); + return statuses; + }); + + const outgoingStats = currentStats.outgoingStatsRow; + + return { + statusCode: 200 as const, + bodyType: "json" as const, + body: { + ok: true, + generated_at_millis: Date.now(), + global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? { + tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"), + tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"), + sequencer: { + project_users: globalStats.projectUsersStats, + contact_channels: globalStats.contactChannelStats, + deleted_rows: { + ...globalStats.deletedRowStats, + by_table: globalStats.deletedRowsByTable, + }, + }, + poller: formatPollerStats(globalStats.outgoingStatsRow), + sync_engine: { + mappings: globalStats.mappings, + }, + } : null, + tenancy: { + id: tenancyId, + project_id: auth.tenancy.project.id, + branch_id: auth.tenancy.branchId, + }, sequencer: { - project_users: globalStats.projectUsersStats, - contact_channels: globalStats.contactChannelStats, + project_users: currentStats.projectUsersStats, + contact_channels: currentStats.contactChannelStats, deleted_rows: { - ...globalStats.deletedRowStats, - by_table: globalStats.deletedRowsByTable, + ...currentStats.deletedRowStats, + by_table: currentStats.deletedRowsByTable, }, }, - poller: formatPollerStats(globalStats.outgoingStatsRow), + poller: formatPollerStats(outgoingStats), sync_engine: { - mappings: globalStats.mappings, - }, - } : null, - tenancy: { - id: tenancyId, - project_id: auth.tenancy.project.id, - branch_id: auth.tenancy.branchId, - }, - sequencer: { - project_users: currentStats.projectUsersStats, - contact_channels: currentStats.contactChannelStats, - deleted_rows: { - ...currentStats.deletedRowStats, - by_table: currentStats.deletedRowsByTable, + mappings: currentStats.mappings, + external_databases: externalDbStatuses, }, }, - poller: formatPollerStats(outgoingStats), - sync_engine: { - mappings: currentStats.mappings, - external_databases: externalDbStatuses, - }, - }, - }; + }; + }); }, }); diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx index b25c6ed33..f61751d15 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sync-engine/route.tsx @@ -6,6 +6,7 @@ import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; import { yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields"; import { StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { traceSpan } from "@/utils/telemetry"; export const POST = createSmartRouteHandler({ metadata: { @@ -28,32 +29,50 @@ export const POST = createSmartRouteHandler({ bodyType: yupString().oneOf(["success"]).defined(), }), handler: async ({ body }, fullReq) => { - await ensureUpstashSignature(fullReq); + return await traceSpan({ + description: "external-db-sync.sync-engine", + attributes: { + "stack.external-db-sync.tenancy-id": body.tenancyId, + }, + }, async (span) => { + await ensureUpstashSignature(fullReq); + + const fusebox = await getExternalDbSyncFusebox(); + span.setAttribute("stack.external-db-sync.sync-engine-enabled", fusebox.syncEngineEnabled); + if (!fusebox.syncEngineEnabled) { + return { + statusCode: 200, + bodyType: "success", + }; + } + + const { tenancyId } = body; + + const tenancy = await traceSpan("external-db-sync.sync-engine.loadTenancy", async (tenancySpan) => { + const foundTenancy = await getTenancy(tenancyId); + tenancySpan.setAttribute("stack.external-db-sync.tenancy-found", !!foundTenancy); + return foundTenancy; + }); + if (!tenancy) { + console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`); + throw new StatusError(400, `Tenancy ${tenancyId} not found.`); + } + + const needsResync = await traceSpan("external-db-sync.sync-engine.syncExternalDatabases", async (syncSpan) => { + const resync = await syncExternalDatabases(tenancy); + syncSpan.setAttribute("stack.external-db-sync.needs-resync", resync); + return resync; + }); + if (needsResync) { + await traceSpan("external-db-sync.sync-engine.enqueueResync", async () => { + await enqueueExternalDbSync(tenancy.id); + }); + } - const fusebox = await getExternalDbSyncFusebox(); - if (!fusebox.syncEngineEnabled) { return { statusCode: 200, bodyType: "success", }; - } - - const { tenancyId } = body; - - const tenancy = await getTenancy(tenancyId); - if (!tenancy) { - console.warn(`[sync-engine] Tenancy ${tenancyId} in queue but not found, assuming it was deleted.`); - throw new StatusError(400, `Tenancy ${tenancyId} not found.`); - } - - const needsResync = await syncExternalDatabases(tenancy); - if (needsResync) { - await enqueueExternalDbSync(tenancy.id); - } - - return { - statusCode: 200, - bodyType: "success", - }; + }); }, }); diff --git a/apps/backend/src/lib/upstash.tsx b/apps/backend/src/lib/upstash.tsx index 2688fcc23..6b4f48fec 100644 --- a/apps/backend/src/lib/upstash.tsx +++ b/apps/backend/src/lib/upstash.tsx @@ -25,9 +25,9 @@ export async function ensureUpstashSignature(fullReq: SmartRequest): Promise