diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index d37ef118d..1c704ea21 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -20,7 +20,15 @@ const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000; const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT"; const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT"; const DEFAULT_POLL_CLAIM_LIMIT = 1000; -const STALE_REQUEST_THRESHOLD_MS = 60 * 1000; +const STALE_CLAIM_INTERVAL_MINUTES = 5; + +type ClaimedOutgoingRequest = OutgoingRequest & { + wasStale: boolean, +}; + +function getClaimedStaleRequestIds(claimedRequests: ClaimedOutgoingRequest[]): string[] { + return claimedRequests.filter((request) => request.wasStale).map((request) => request.id); +} function parseMaxDurationMs(value: string | undefined): number { if (!value) return DEFAULT_MAX_DURATION_MS; @@ -85,32 +93,63 @@ export const GET = createSmartRouteHandler({ const startTime = performance.now(); const maxDurationMs = parseMaxDurationMs(query.maxDurationMs); const pollIntervalMs = 50; - const staleClaimIntervalMinutes = 5; const pollerClaimLimit = getPollerClaimLimit(); span.setAttribute("stack.external-db-sync.max-duration-ms", maxDurationMs); span.setAttribute("stack.external-db-sync.poll-interval-ms", pollIntervalMs); span.setAttribute("stack.external-db-sync.poller-claim-limit", pollerClaimLimit); - span.setAttribute("stack.external-db-sync.stale-claim-minutes", staleClaimIntervalMinutes); + span.setAttribute("stack.external-db-sync.stale-claim-minutes", STALE_CLAIM_INTERVAL_MINUTES); let totalRequestsProcessed = 0; let iterationCount = 0; - async function claimPendingRequests(): Promise { + async function claimPendingRequests(): Promise { return await traceSpan("external-db-sync.poller.claimPendingRequests", async (claimSpan) => { - const requests = await globalPrismaClient.$queryRaw` - UPDATE "OutgoingRequest" - SET "startedFulfillingAt" = NOW() - WHERE "id" IN ( - SELECT id - FROM "OutgoingRequest" - WHERE "startedFulfillingAt" IS NULL - LIMIT ${pollerClaimLimit} - FOR UPDATE SKIP LOCKED - ) - RETURNING *; - `; + const requests = await retryTransaction(globalPrismaClient, async (tx) => { + const claimTime = new Date(); + const reclaimBefore = new Date(claimTime.getTime() - STALE_CLAIM_INTERVAL_MINUTES * 60 * 1000); + + const candidateRequests = await tx.outgoingRequest.findMany({ + where: { + OR: [ + { startedFulfillingAt: null }, + { startedFulfillingAt: { lt: reclaimBefore } }, + ], + }, + orderBy: [ + { createdAt: "asc" }, + { id: "asc" }, + ], + take: pollerClaimLimit, + }); + + const claimedRequests: ClaimedOutgoingRequest[] = []; + for (const candidateRequest of candidateRequests) { + const claimResult = await tx.outgoingRequest.updateMany({ + where: { + id: candidateRequest.id, + startedFulfillingAt: candidateRequest.startedFulfillingAt, + }, + data: { + startedFulfillingAt: claimTime, + }, + }); + + if (claimResult.count !== 1) { + continue; + } + + claimedRequests.push({ + ...candidateRequest, + startedFulfillingAt: claimTime, + wasStale: candidateRequest.startedFulfillingAt != null, + }); + } + + return claimedRequests; + }, { level: "serializable" }); claimSpan.setAttribute("stack.external-db-sync.claimed-count", requests.length); + claimSpan.setAttribute("stack.external-db-sync.claimed-stale-count", getClaimedStaleRequestIds(requests).length); return requests; }); } @@ -127,6 +166,7 @@ export const GET = createSmartRouteHandler({ await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } }); }); } + async function processRequest(request: OutgoingRequest): Promise { // Prisma JsonValue doesn't carry a precise shape for this JSON blob. const options = request.qstashOptions as any; @@ -243,25 +283,16 @@ export const GET = createSmartRouteHandler({ return { stopReason: "disabled", processed: 0 }; } - const staleRequests = await globalPrismaClient.$queryRaw<{ id: string, startedFulfillingAt: Date }[]>` - SELECT "id", "startedFulfillingAt" - FROM "OutgoingRequest" - WHERE "startedFulfillingAt" IS NOT NULL - AND "startedFulfillingAt" < NOW() - ${STALE_REQUEST_THRESHOLD_MS} * INTERVAL '1 millisecond' - LIMIT 10 - `; - iterationSpan.setAttribute("stack.external-db-sync.stale-count", staleRequests.length); - if (staleRequests.length > 0) { - captureError( - "poller-stale-outgoing-requests", - new StackAssertionError( - `Found ${staleRequests.length} outgoing request(s) with startedFulfillingAt older than ${STALE_REQUEST_THRESHOLD_MS}ms`, - { staleRequestIds: staleRequests.map(r => r.id) }, - ), + const pendingRequests = await claimPendingRequests(); + const reclaimedStaleRequestIds = getClaimedStaleRequestIds(pendingRequests); + iterationSpan.setAttribute("stack.external-db-sync.reclaimed-count", reclaimedStaleRequestIds.length); + iterationSpan.setAttribute("stack.external-db-sync.stale-count", reclaimedStaleRequestIds.length); + if (reclaimedStaleRequestIds.length > 0) { + console.warn( + `[Poller] Reclaimed ${reclaimedStaleRequestIds.length} stale outgoing request(s) older than ${STALE_CLAIM_INTERVAL_MINUTES} minutes.`, + { staleRequestIds: reclaimedStaleRequestIds }, ); } - - const pendingRequests = await claimPendingRequests(); iterationSpan.setAttribute("stack.external-db-sync.pending-count", pendingRequests.length); const processed = await processRequests(pendingRequests); @@ -289,3 +320,25 @@ export const GET = createSmartRouteHandler({ }); }, }); + +import.meta.vitest?.describe("getClaimedStaleRequestIds(...)", () => { + import.meta.vitest?.test("returns only requests reclaimed from stale claims", ({ expect }) => { + expect(getClaimedStaleRequestIds([ + { + id: "fresh-claim", + wasStale: false, + } as ClaimedOutgoingRequest, + { + id: "reclaimed-stale-1", + wasStale: true, + } as ClaimedOutgoingRequest, + { + id: "reclaimed-stale-2", + wasStale: true, + } as ClaimedOutgoingRequest, + ])).toEqual([ + "reclaimed-stale-1", + "reclaimed-stale-2", + ]); + }); +});