mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-30 21:01:54 +08:00
cleanup: abort mapWithConcurrency on rejection, fix regex char class, remove redundant async/await
Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
5a11c8504c
commit
9e4ac14153
@ -291,7 +291,7 @@ async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod)
|
||||
const subtotals = await mapWithConcurrency(
|
||||
groups,
|
||||
PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY,
|
||||
async (group) => await countMeteredUsageForGroup(group, period),
|
||||
(group) => countMeteredUsageForGroup(group, period),
|
||||
);
|
||||
|
||||
return subtotals.reduce<TenancyMeteredUsage>(
|
||||
|
||||
@ -238,7 +238,7 @@ async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise<void
|
||||
throw new HexclaveAssertionError("Expected team plan purchase URL creation to succeed", { createUrlResponse });
|
||||
}
|
||||
|
||||
const fullCode = createUrlResponse.body.url.match(/\/purchase\/([a-z0-9-_]+)/)?.[1]
|
||||
const fullCode = createUrlResponse.body.url.match(/\/purchase\/([a-z0-9_-]+)/)?.[1]
|
||||
?? throwErr("Could not parse purchase code from team plan purchase URL", { createUrlResponse });
|
||||
const purchaseResponse = await niceBackendFetch("/api/latest/internal/payments/test-mode-purchase-session", {
|
||||
method: "POST",
|
||||
|
||||
@ -441,8 +441,8 @@ import.meta.vitest?.test("timeoutThrow", async ({ expect }) => {
|
||||
* promises, which matters when `fn` hits a shared resource (e.g. a database) and
|
||||
* an unbounded fan-out could exhaust connections or overload a replica. Results
|
||||
* are returned in input order regardless of completion order, and the first
|
||||
* rejection propagates (in-flight workers still settle but their results are
|
||||
* discarded).
|
||||
* rejection aborts further scheduling — already in-flight workers still settle
|
||||
* but no new items are started.
|
||||
*/
|
||||
export async function mapWithConcurrency<T, R>(
|
||||
items: readonly T[],
|
||||
@ -454,12 +454,18 @@ export async function mapWithConcurrency<T, R>(
|
||||
}
|
||||
const results = new Array<R>(items.length);
|
||||
let nextIndex = 0;
|
||||
let aborted = false;
|
||||
const worker = async () => {
|
||||
while (true) {
|
||||
while (!aborted) {
|
||||
// Claim an index synchronously before awaiting so workers never process the same item.
|
||||
const index = nextIndex++;
|
||||
if (index >= items.length) return;
|
||||
results[index] = await fn(items[index]!, index);
|
||||
try {
|
||||
results[index] = await fn(items[index]!, index);
|
||||
} catch (error) {
|
||||
aborted = true;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
const workerCount = Math.min(concurrency, items.length);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user