diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts index e78b0c9ea..dfcd36941 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/poller/route.ts @@ -47,6 +47,7 @@ export const GET = createSmartRouteHandler({ const startTime = performance.now(); const maxDurationMs = 3 * 60 * 1000; const pollIntervalMs = 50; + const staleClaimIntervalMinutes = 5; let totalRequestsProcessed = 0; async function claimPendingRequests(): Promise { @@ -58,6 +59,7 @@ export const GET = createSmartRouteHandler({ SELECT id FROM "OutgoingRequest" WHERE "startedFulfillingAt" IS NULL + OR "startedFulfillingAt" < NOW() - (${staleClaimIntervalMinutes} * INTERVAL '1 minute') ORDER BY "createdAt" LIMIT 100 FOR UPDATE SKIP LOCKED @@ -67,11 +69,22 @@ export const GET = createSmartRouteHandler({ return rows; }); } + async function deleteOutgoingRequest(id: string): Promise { + await retryTransaction(globalPrismaClient, async (tx) => { + await tx.outgoingRequest.delete({ where: { id } }); + }); + } + async function releaseOutgoingRequest(id: string): Promise { + await retryTransaction(globalPrismaClient, async (tx) => { + await tx.outgoingRequest.updateMany({ + where: { id, startedFulfillingAt: { not: null } }, + data: { startedFulfillingAt: null }, + }); + }); + } async function processRequests(requests: OutgoingRequest[]): Promise { - let processed = 0; - - for (const request of requests) { - try { + const results = await Promise.allSettled( + requests.map(async (request) => { // Prisma JsonValue doesn't carry a precise shape for this JSON blob. const options = request.qstashOptions as any; const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL"); @@ -94,13 +107,21 @@ export const GET = createSmartRouteHandler({ body: options.body, }); + await deleteOutgoingRequest(request.id); + }), + ); + + let processed = 0; + for (const [index, result] of results.entries()) { + if (result.status === "fulfilled") { processed++; - } catch (error) { - captureError( - `poller-iteration-error`, - error, - ); + continue; } + captureError( + "poller-iteration-error", + result.reason, + ); + await releaseOutgoingRequest(requests[index].id); } return processed; diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts index b7e39aa29..b0c6413f0 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-advanced.test.ts @@ -329,7 +329,7 @@ describe.sequential('External DB Sync - Advanced Tests', () => { const res = await client.query(`SELECT * FROM "users" WHERE "primary_email" = $1`, ['special@example.com']); expect(res.rows.length).toBe(1); expect(res.rows[0].display_name).toBe(specialName); - }); + }, TEST_TIMEOUT); /** * What it does: diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts index 3c114e675..b366a47c1 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts @@ -288,7 +288,7 @@ describe.sequential('External DB Sync - Basic Tests', () => { `); expect(tableCheck.rows[0].exists).toBe(true); await verifyInExternalDb(client, 'auto-create@example.com', 'Auto Create User'); - }); + }, TEST_TIMEOUT); /** * What it does: