diff --git a/apps/backend/src/lib/plan-usage.ts b/apps/backend/src/lib/plan-usage.ts index 99d7de8f2..6c28c398e 100644 --- a/apps/backend/src/lib/plan-usage.ts +++ b/apps/backend/src/lib/plan-usage.ts @@ -291,7 +291,7 @@ async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod) const subtotals = await mapWithConcurrency( groups, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, - async (group) => await countMeteredUsageForGroup(group, period), + (group) => countMeteredUsageForGroup(group, period), ); return subtotals.reduce( diff --git a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts index db979923c..e4bafaf40 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts @@ -238,7 +238,7 @@ async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise { * promises, which matters when `fn` hits a shared resource (e.g. a database) and * an unbounded fan-out could exhaust connections or overload a replica. Results * are returned in input order regardless of completion order, and the first - * rejection propagates (in-flight workers still settle but their results are - * discarded). + * rejection aborts further scheduling — already in-flight workers still settle + * but no new items are started. */ export async function mapWithConcurrency( items: readonly T[], @@ -454,12 +454,18 @@ export async function mapWithConcurrency( } const results = new Array(items.length); let nextIndex = 0; + let aborted = false; const worker = async () => { - while (true) { + while (!aborted) { // Claim an index synchronously before awaiting so workers never process the same item. const index = nextIndex++; if (index >= items.length) return; - results[index] = await fn(items[index]!, index); + try { + results[index] = await fn(items[index]!, index); + } catch (error) { + aborted = true; + throw error; + } } }; const workerCount = Math.min(concurrency, items.length);