diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index 9e06c176a..6e82f8bb4 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -1,8 +1,9 @@ +import type { OutgoingRequest } from "@/generated/prisma/client"; +import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; import { upstash } from "@/lib/upstash"; -import type { PublishBatchRequest } from "@upstash/qstash"; import { globalPrismaClient, retryTransaction } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; -import type { OutgoingRequest } from "@/generated/prisma/client"; +import { traceSpan } from "@/utils/telemetry"; import { yupBoolean, yupNumber, @@ -13,13 +14,12 @@ import { import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env"; import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; -import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; -import { traceSpan } from "@/utils/telemetry"; +import type { PublishBatchRequest } from "@upstash/qstash"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT"; const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT"; -const DEFAULT_POLL_CLAIM_LIMIT = 100; +const DEFAULT_POLL_CLAIM_LIMIT = 1000; function parseMaxDurationMs(value: string | undefined): number { if (!value) return DEFAULT_MAX_DURATION_MS; @@ -118,7 +118,6 @@ export const GET = createSmartRouteHandler({ SELECT id FROM "OutgoingRequest" WHERE "startedFulfillingAt" IS NULL - ORDER BY "createdAt" LIMIT ${pollerClaimLimit} FOR UPDATE SKIP LOCKED ) @@ -225,7 +224,6 @@ export const GET = createSmartRouteHandler({ try { const batchPayload = requests.map(buildUpstashRequest); - console.log("publishing to QStash batch", { count: batchPayload.length }); await upstash.batchJSON(batchPayload); await deleteOutgoingRequests(requests.map((request) => request.id)); processSpan.setAttribute("stack.external-db-sync.processed-count", requests.length); @@ -269,10 +267,6 @@ export const GET = createSmartRouteHandler({ return { stopReason: null, processed }; }); - if (iterationResult.stopReason) { - break; - } - iterationCount++; totalRequestsProcessed += iterationResult.processed; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index a59229e34..2a40b3e0e 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -1,5 +1,8 @@ +import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; +import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; import { globalPrismaClient } from "@/prisma-client"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; +import { traceSpan } from "@/utils/telemetry"; import { yupBoolean, yupNumber, @@ -10,9 +13,6 @@ import { import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; import { wait } from "@stackframe/stack-shared/dist/utils/promises"; -import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue"; -import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata"; -import { traceSpan } from "@/utils/telemetry"; const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE"; @@ -234,10 +234,6 @@ export const GET = createSmartRouteHandler({ return { stopReason: null }; }); - if (iterationResult.stopReason) { - break; - } - iterations++; await wait(pollIntervalMs); } diff --git a/apps/backend/src/proxy.tsx b/apps/backend/src/proxy.tsx index a413d2952..1171e7342 100644 --- a/apps/backend/src/proxy.tsx +++ b/apps/backend/src/proxy.tsx @@ -77,7 +77,7 @@ export async function proxy(request: NextRequest) { } : undefined; // ensure our clients can handle 429 responses - if (isApiRequest && !request.headers.get('x-stack-disable-artificial-development-delay') && getNodeEnvironment() === 'development' && request.method !== 'OPTIONS' && !request.url.includes(".well-known")) { + if (isApiRequest && !request.headers.get('x-stack-disable-artificial-development-delay') && getNodeEnvironment() === 'development' && request.method !== 'OPTIONS' && !request.url.includes(".well-known") && !request.url.includes("/api/latest/internal/external-db-sync/")) { const now = Date.now(); while (devRateLimitTimestamps.length > 0 && now - devRateLimitTimestamps[0] > DEV_RATE_LIMIT_WINDOW_MS) { devRateLimitTimestamps.shift(); diff --git a/apps/backend/src/route-handlers/smart-route-handler.tsx b/apps/backend/src/route-handlers/smart-route-handler.tsx index 8770ddd5f..fd81039fa 100644 --- a/apps/backend/src/route-handlers/smart-route-handler.tsx +++ b/apps/backend/src/route-handlers/smart-route-handler.tsx @@ -103,7 +103,15 @@ export function handleApiRequest(handler: (req: NextRequest, options: any, reque } // request duration warning - const allowedLongRequestPaths = ["/api/latest/internal/email-queue-step", "/api/latest/internal/analytics/query", "/health/email", "/api/latest/internal/metrics"]; + const allowedLongRequestPaths = [ + "/api/latest/internal/email-queue-step", + "/api/latest/internal/analytics/query", + "/health/email", + "/api/latest/internal/metrics", + "/api/latest/internal/external-db-sync/poller", + "/api/latest/internal/external-db-sync/sequencer", + "/api/latest/internal/external-db-sync/sync-engine", + ]; if (!allowedLongRequestPaths.includes(req.nextUrl.pathname)) { const warnAfterSeconds = 12; runAsynchronously(async () => { diff --git a/docker/dev-postgres-replica/entrypoint.sh b/docker/dev-postgres-replica/entrypoint.sh index 748c2ccd0..d96a4dc64 100644 --- a/docker/dev-postgres-replica/entrypoint.sh +++ b/docker/dev-postgres-replica/entrypoint.sh @@ -42,6 +42,10 @@ if [ -z "$(ls -A ${PGDATA} 2>/dev/null)" ]; then primary_conninfo = 'host=${PRIMARY_HOST} port=${PRIMARY_PORT} user=${REPLICATOR_USER} password=${REPLICATOR_PASSWORD}' recovery_min_apply_delay = ${RECOVERY_MIN_APPLY_DELAY} hot_standby = on + +# pg_stat_statements for query stats +shared_preload_libraries = 'pg_stat_statements' +pg_stat_statements.track = all EOF # Create standby.signal to indicate this is a standby