Various fixes

This commit is contained in:
Konstantin Wohlwend 2026-02-04 13:30:49 -08:00
parent 04970c25cb
commit 4abd410087
5 changed files with 22 additions and 20 deletions

View File

@ -1,8 +1,9 @@
import type { OutgoingRequest } from "@/generated/prisma/client";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { upstash } from "@/lib/upstash";
import type { PublishBatchRequest } from "@upstash/qstash";
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import type { OutgoingRequest } from "@/generated/prisma/client";
import { traceSpan } from "@/utils/telemetry";
import {
yupBoolean,
yupNumber,
@ -13,13 +14,12 @@ import {
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { traceSpan } from "@/utils/telemetry";
import type { PublishBatchRequest } from "@upstash/qstash";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT";
const DEFAULT_POLL_CLAIM_LIMIT = 100;
const DEFAULT_POLL_CLAIM_LIMIT = 1000;
function parseMaxDurationMs(value: string | undefined): number {
if (!value) return DEFAULT_MAX_DURATION_MS;
@ -118,7 +118,6 @@ export const GET = createSmartRouteHandler({
SELECT id
FROM "OutgoingRequest"
WHERE "startedFulfillingAt" IS NULL
ORDER BY "createdAt"
LIMIT ${pollerClaimLimit}
FOR UPDATE SKIP LOCKED
)
@ -225,7 +224,6 @@ export const GET = createSmartRouteHandler({
try {
const batchPayload = requests.map(buildUpstashRequest);
console.log("publishing to QStash batch", { count: batchPayload.length });
await upstash.batchJSON(batchPayload);
await deleteOutgoingRequests(requests.map((request) => request.id));
processSpan.setAttribute("stack.external-db-sync.processed-count", requests.length);
@ -269,10 +267,6 @@ export const GET = createSmartRouteHandler({
return { stopReason: null, processed };
});
if (iterationResult.stopReason) {
break;
}
iterationCount++;
totalRequestsProcessed += iterationResult.processed;

View File

@ -1,5 +1,8 @@
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { traceSpan } from "@/utils/telemetry";
import {
yupBoolean,
yupNumber,
@ -10,9 +13,6 @@ import {
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { traceSpan } from "@/utils/telemetry";
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE";
@ -234,10 +234,6 @@ export const GET = createSmartRouteHandler({
return { stopReason: null };
});
if (iterationResult.stopReason) {
break;
}
iterations++;
await wait(pollIntervalMs);
}

View File

@ -77,7 +77,7 @@ export async function proxy(request: NextRequest) {
} : undefined;
// ensure our clients can handle 429 responses
if (isApiRequest && !request.headers.get('x-stack-disable-artificial-development-delay') && getNodeEnvironment() === 'development' && request.method !== 'OPTIONS' && !request.url.includes(".well-known")) {
if (isApiRequest && !request.headers.get('x-stack-disable-artificial-development-delay') && getNodeEnvironment() === 'development' && request.method !== 'OPTIONS' && !request.url.includes(".well-known") && !request.url.includes("/api/latest/internal/external-db-sync/")) {
const now = Date.now();
while (devRateLimitTimestamps.length > 0 && now - devRateLimitTimestamps[0] > DEV_RATE_LIMIT_WINDOW_MS) {
devRateLimitTimestamps.shift();

View File

@ -103,7 +103,15 @@ export function handleApiRequest(handler: (req: NextRequest, options: any, reque
}
// request duration warning
const allowedLongRequestPaths = ["/api/latest/internal/email-queue-step", "/api/latest/internal/analytics/query", "/health/email", "/api/latest/internal/metrics"];
const allowedLongRequestPaths = [
"/api/latest/internal/email-queue-step",
"/api/latest/internal/analytics/query",
"/health/email",
"/api/latest/internal/metrics",
"/api/latest/internal/external-db-sync/poller",
"/api/latest/internal/external-db-sync/sequencer",
"/api/latest/internal/external-db-sync/sync-engine",
];
if (!allowedLongRequestPaths.includes(req.nextUrl.pathname)) {
const warnAfterSeconds = 12;
runAsynchronously(async () => {

View File

@ -42,6 +42,10 @@ if [ -z "$(ls -A ${PGDATA} 2>/dev/null)" ]; then
primary_conninfo = 'host=${PRIMARY_HOST} port=${PRIMARY_PORT} user=${REPLICATOR_USER} password=${REPLICATOR_PASSWORD}'
recovery_min_apply_delay = ${RECOVERY_MIN_APPLY_DELAY}
hot_standby = on
# pg_stat_statements for query stats
shared_preload_libraries = 'pg_stat_statements'
pg_stat_statements.track = all
EOF
# Create standby.signal to indicate this is a standby