From 9e4ac14153a7e35b871801a9b40bbe3562616ea0 Mon Sep 17 00:00:00 2001 From: armaan Date: Tue, 23 Jun 2026 20:24:55 +0000 Subject: [PATCH] cleanup: abort mapWithConcurrency on rejection, fix regex char class, remove redundant async/await Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- apps/backend/src/lib/plan-usage.ts | 2 +- .../endpoints/api/v1/internal/plan-usage.test.ts | 2 +- packages/shared/src/utils/promises.tsx | 14 ++++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/backend/src/lib/plan-usage.ts b/apps/backend/src/lib/plan-usage.ts index 99d7de8f2..6c28c398e 100644 --- a/apps/backend/src/lib/plan-usage.ts +++ b/apps/backend/src/lib/plan-usage.ts @@ -291,7 +291,7 @@ async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod) const subtotals = await mapWithConcurrency( groups, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, - async (group) => await countMeteredUsageForGroup(group, period), + (group) => countMeteredUsageForGroup(group, period), ); return subtotals.reduce( diff --git a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts index db979923c..e4bafaf40 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts @@ -238,7 +238,7 @@ async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise { * promises, which matters when `fn` hits a shared resource (e.g. a database) and * an unbounded fan-out could exhaust connections or overload a replica. Results * are returned in input order regardless of completion order, and the first - * rejection propagates (in-flight workers still settle but their results are - * discarded). + * rejection aborts further scheduling — already in-flight workers still settle + * but no new items are started. */ export async function mapWithConcurrency( items: readonly T[], @@ -454,12 +454,18 @@ export async function mapWithConcurrency( } const results = new Array(items.length); let nextIndex = 0; + let aborted = false; const worker = async () => { - while (true) { + while (!aborted) { // Claim an index synchronously before awaiting so workers never process the same item. const index = nextIndex++; if (index >= items.length) return; - results[index] = await fn(items[index]!, index); + try { + results[index] = await fn(items[index]!, index); + } catch (error) { + aborted = true; + throw error; + } } }; const workerCount = Math.min(concurrency, items.length);