From 81723c3d5515af1ea43e88f8d89389e4c8a34cb8 Mon Sep 17 00:00:00 2001
From: Armaan Jain <84474476+Developing-Gamer@users.noreply.github.com>
Date: Wed, 24 Jun 2026 12:25:20 -0700
Subject: [PATCH] Usage page performance improvements (#1650)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
## Summary by cubic
Speed up the Usage page by aggregating metered usage across owned
projects/tenancies with fewer queries and new indexes. Adds E2E tests to
verify team-owned rollups and calendar‑month windows.
- **Performance**
- Added concurrent indexes for `EmailOutbox(tenancyId,
startedSendingAt)` and `SessionReplay(tenancyId, startedAt)`; updated
Prisma schema.
- Group tenancies by (DB client, schema) and run one SQL per group that
counts both emails and session replays; uses `mapWithConcurrency` from
`@hexclave/shared` (concurrency 4, aborts on first error).
- Added helpers `getOwnedProjectAndTenancyIdsForBillingTeam` and
`getNonAnonymousUserCountForTenancies`; made `mapWithConcurrency`
null‑safe with bounds checks.
- **Tests**
- Added E2E tests for the internal plan-usage endpoint covering
team-owned rollups, calendar‑month boundaries, and zero‑usage cases.
- Added unit tests for ownership scope resolution and non‑anonymous user
counting.
Written for commit 5d6098006caac804eb72429e68033ef940dbd67c.
Summary will update on new commits.
## Summary by CodeRabbit
* **Performance Improvements**
* Improved plan usage rollups by aggregating metered emails and session
replays together across an owned scope.
* Added database indexes to speed up time-window metering lookups for
email outbox and session replays.
* **Tests**
* Extended unit tests for billing-team entitlement aggregation and
non-anonymous user counting.
* Added end-to-end coverage for the internal plan-usage endpoint,
including seeded scenarios and period validation.
* **Refactor**
* Reworked entitlement and usage calculations to reuse shared logic for
more consistent results.
---------
Co-authored-by: armaan
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
---
.../migration.sql | 11 +
apps/backend/prisma/schema.prisma | 2 +
.../backend/src/lib/plan-entitlements.test.ts | 24 ++
apps/backend/src/lib/plan-entitlements.ts | 41 +-
apps/backend/src/lib/plan-usage.ts | 145 +++++--
.../api/v1/internal/plan-usage.test.ts | 407 ++++++++++++++++++
packages/shared/src/utils/promises.tsx | 65 +++
7 files changed, 647 insertions(+), 48 deletions(-)
create mode 100644 apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql
create mode 100644 apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts
diff --git a/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql
new file mode 100644
index 000000000..cebc53f24
--- /dev/null
+++ b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql
@@ -0,0 +1,11 @@
+-- SPLIT_STATEMENT_SENTINEL
+-- SINGLE_STATEMENT_SENTINEL
+-- RUN_OUTSIDE_TRANSACTION_SENTINEL
+CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_tenancyId_startedSendingAt_idx"
+ ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("tenancyId", "startedSendingAt");
+
+-- SPLIT_STATEMENT_SENTINEL
+-- SINGLE_STATEMENT_SENTINEL
+-- RUN_OUTSIDE_TRANSACTION_SENTINEL
+CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_tenancyId_startedAt_idx"
+ ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("tenancyId", "startedAt");
diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma
index 7a9421c90..f830b2989 100644
--- a/apps/backend/prisma/schema.prisma
+++ b/apps/backend/prisma/schema.prisma
@@ -407,6 +407,7 @@ model SessionReplay {
@@id([tenancyId, id])
@@index([tenancyId, projectUserId, startedAt])
+ @@index([tenancyId, startedAt], name: "SessionReplay_tenancyId_startedAt_idx")
@@index([tenancyId, lastEventAt])
// index by updatedAt instead of lastEventAt because event timing can be spoofed
@@index([tenancyId, refreshTokenId, updatedAt])
@@ -1069,6 +1070,7 @@ model EmailOutbox {
@@id([tenancyId, id])
@@index([tenancyId, finishedSendingAt(sort: Desc), scheduledAtIfNotYetQueued(sort: Desc), priority, id], map: "EmailOutbox_ordering_idx")
+ @@index([tenancyId, startedSendingAt], name: "EmailOutbox_tenancyId_startedSendingAt_idx")
@@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx")
@@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx")
@@index([isQueued], map: "EmailOutbox_isQueued_idx")
diff --git a/apps/backend/src/lib/plan-entitlements.test.ts b/apps/backend/src/lib/plan-entitlements.test.ts
index aec03993e..7d9f0d50c 100644
--- a/apps/backend/src/lib/plan-entitlements.test.ts
+++ b/apps/backend/src/lib/plan-entitlements.test.ts
@@ -4,6 +4,8 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import {
arePlanLimitsEnforced,
getBillingTeamId,
+ getNonAnonymousUserCountForTenancies,
+ getOwnedProjectAndTenancyIdsForBillingTeam,
getOwnedProjectIdsForBillingTeam,
getOwnedTenancyIdsForBillingTeam,
getTeamWideItemCapacityForTests,
@@ -98,6 +100,28 @@ describe("team-wide ownership aggregation", () => {
expect(tenancyIds).toEqual(["tenancy-a-main", "tenancy-a-dev", "tenancy-b-main"]);
});
+ it("lists owned project and tenancy ids from one ownership scope", async () => {
+ const scope = await getOwnedProjectAndTenancyIdsForBillingTeam("team-1", globalPrisma);
+ expect(scope).toMatchInlineSnapshot(`
+ {
+ "projectIds": [
+ "project-a",
+ "project-b",
+ ],
+ "tenancyIds": [
+ "tenancy-a-main",
+ "tenancy-a-dev",
+ "tenancy-b-main",
+ ],
+ }
+ `);
+ });
+
+ it("counts non-anonymous users from already-resolved tenancies", async () => {
+ const usage = await getNonAnonymousUserCountForTenancies(["tenancy-a-main", "tenancy-b-main"], globalPrisma);
+ expect(usage).toBe(2);
+ });
+
it("counts only non-anonymous users across all owned tenancies", async () => {
const usage = await getTeamWideNonAnonymousUserCount("team-1", globalPrisma);
expect(usage).toBe(3);
diff --git a/apps/backend/src/lib/plan-entitlements.ts b/apps/backend/src/lib/plan-entitlements.ts
index 932ab977c..1b8527061 100644
--- a/apps/backend/src/lib/plan-entitlements.ts
+++ b/apps/backend/src/lib/plan-entitlements.ts
@@ -39,6 +39,11 @@ type GlobalPrismaLike = {
},
};
+type OwnedBillingScope = {
+ projectIds: string[],
+ tenancyIds: string[],
+};
+
type ItemCapacityReaders = {
getPrismaForTenancy: (tenancy: Tenancy) => Promise,
getItemQuantityForCustomer: (options: {
@@ -85,13 +90,16 @@ export async function getOwnedProjectIdsForBillingTeam(
return projects.map((project) => project.id);
}
-export async function getOwnedTenancyIdsForBillingTeam(
+export async function getOwnedProjectAndTenancyIdsForBillingTeam(
billingTeamId: string,
globalPrisma: GlobalPrismaLike = globalPrismaClient,
-): Promise {
+): Promise {
const projectIds = await getOwnedProjectIdsForBillingTeam(billingTeamId, globalPrisma);
if (projectIds.length === 0) {
- return [];
+ return {
+ projectIds,
+ tenancyIds: [],
+ };
}
const tenancies = await globalPrisma.tenancy.findMany({
where: {
@@ -103,16 +111,23 @@ export async function getOwnedTenancyIdsForBillingTeam(
id: true,
},
});
- return tenancies.map((tenancy) => tenancy.id);
+ return {
+ projectIds,
+ tenancyIds: tenancies.map((tenancy) => tenancy.id),
+ };
}
-export async function getTeamWideNonAnonymousUserCount(
+export async function getOwnedTenancyIdsForBillingTeam(
billingTeamId: string,
globalPrisma: GlobalPrismaLike = globalPrismaClient,
+): Promise {
+ return (await getOwnedProjectAndTenancyIdsForBillingTeam(billingTeamId, globalPrisma)).tenancyIds;
+}
+
+export async function getNonAnonymousUserCountForTenancies(
+ tenancyIds: string[],
+ globalPrisma: GlobalPrismaLike = globalPrismaClient,
): Promise {
- // Usage metric: how many non-anonymous users are currently consumed by this billing team.
- // This is compared against auth user capacity to determine over-limit conditions.
- const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma);
if (tenancyIds.length === 0) {
return 0;
}
@@ -126,6 +141,16 @@ export async function getTeamWideNonAnonymousUserCount(
});
}
+export async function getTeamWideNonAnonymousUserCount(
+ billingTeamId: string,
+ globalPrisma: GlobalPrismaLike = globalPrismaClient,
+): Promise {
+ // Usage metric: how many non-anonymous users are currently consumed by this billing team.
+ // This is compared against auth user capacity to determine over-limit conditions.
+ const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma);
+ return await getNonAnonymousUserCountForTenancies(tenancyIds, globalPrisma);
+}
+
async function getTeamWideItemCapacity(
billingTeamId: string,
itemId: string,
diff --git a/apps/backend/src/lib/plan-usage.ts b/apps/backend/src/lib/plan-usage.ts
index b53c01391..6c28c398e 100644
--- a/apps/backend/src/lib/plan-usage.ts
+++ b/apps/backend/src/lib/plan-usage.ts
@@ -4,20 +4,24 @@ import { getSubscriptionMapForCustomer } from "@/lib/payments/customer-data";
import { isActiveSubscription } from "@/lib/payments";
import {
getBillingTeamId,
- getOwnedProjectIdsForBillingTeam,
- getOwnedTenancyIdsForBillingTeam,
- getTeamWideNonAnonymousUserCount,
+ getNonAnonymousUserCountForTenancies,
+ getOwnedProjectAndTenancyIdsForBillingTeam,
} from "@/lib/plan-entitlements";
import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch, getTenancy, type Tenancy } from "@/lib/tenancies";
import { getPrismaClientForTenancy, getPrismaSchemaForTenancy, globalPrismaClient, sqlQuoteIdent } from "@/prisma-client";
import { BASE_PLAN_IDS_BY_TIER, ITEM_IDS, PLAN_LIMITS, UNLIMITED, type ItemId, type PlanId } from "@hexclave/shared/dist/plans";
import type { PlanUsageResponse } from "@hexclave/shared/dist/interface/admin-interface";
import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors";
+import { mapWithConcurrency } from "@hexclave/shared/dist/utils/promises";
import type { SubscriptionRow } from "./payments/schema/types";
type PlanUsageKind = PlanUsageResponse["rows"][number]["kind"];
type PlanUsageRow = PlanUsageResponse["rows"][number];
type UsageLimit = number | null;
+type TenancyMeteredUsage = {
+ emails: number,
+ sessionReplays: number,
+};
type UsagePeriod = {
start: Date,
@@ -46,6 +50,8 @@ const PLAN_LABELS = new Map([
["growth", "Growth"],
]);
+const PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY = 4;
+
export function getNextPlanId(planId: PlanId): "team" | "growth" | null {
if (planId === "free") {
return "team";
@@ -202,38 +208,99 @@ async function getOwnerTeamDisplayName(internalTenancy: Tenancy, ownerTeamId: st
return team?.displayName ?? throwErr(`Owner team ${ownerTeamId} not found in the internal tenancy`);
}
-async function countEmailsForTenancy(tenancyId: string, period: UsagePeriod): Promise {
- const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting email usage`);
- const schema = await getPrismaSchemaForTenancy(tenancy);
- const prisma = await getPrismaClientForTenancy(tenancy);
- const rows = await prisma.$replica().$queryRaw<[{ count: number }]>`
- SELECT COUNT(*)::int AS count
- FROM ${sqlQuoteIdent(schema)}."EmailOutbox"
- WHERE "tenancyId" = ${tenancy.id}::uuid
- AND "startedSendingAt" IS NOT NULL
- AND "startedSendingAt" >= ${period.start}
- AND "startedSendingAt" < ${period.end}
- `;
- return Number(rows[0].count);
+type TenancyPrismaClient = Awaited>;
+
+type TenancyMeteredUsageGroup = {
+ prisma: TenancyPrismaClient,
+ schema: string,
+ tenancyIds: string[],
+};
+
+// Tenancies can route to different source-of-truth databases/schemas, so we can't assume a single
+// query covers every tenancy. We group tenancies that share a (client, schema) and run one aggregate
+// COUNT per group: the common case (all projects on one database) collapses to a single round trip,
+// while multi-database teams fan out to one query per distinct database instead of one per tenancy.
+async function groupTenanciesByMeteredUsageSource(tenancyIds: string[]): Promise {
+ const resolved = await mapWithConcurrency(tenancyIds, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, async (tenancyId) => {
+ const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting plan usage`);
+ const [schema, prisma] = await Promise.all([
+ getPrismaSchemaForTenancy(tenancy),
+ getPrismaClientForTenancy(tenancy),
+ ]);
+ return { tenancyId: tenancy.id, schema, prisma };
+ });
+
+ const byClient = new Map>();
+ for (const { tenancyId, schema, prisma } of resolved) {
+ let bySchema = byClient.get(prisma);
+ if (bySchema == null) {
+ bySchema = new Map();
+ byClient.set(prisma, bySchema);
+ }
+ const existing = bySchema.get(schema);
+ if (existing == null) {
+ bySchema.set(schema, [tenancyId]);
+ } else {
+ existing.push(tenancyId);
+ }
+ }
+
+ const groups: TenancyMeteredUsageGroup[] = [];
+ for (const [prisma, bySchema] of byClient) {
+ for (const [schema, groupTenancyIds] of bySchema) {
+ groups.push({ prisma, schema, tenancyIds: groupTenancyIds });
+ }
+ }
+ return groups;
}
-async function countSessionReplaysForTenancy(tenancyId: string, period: UsagePeriod): Promise {
- const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting session replay usage`);
- const schema = await getPrismaSchemaForTenancy(tenancy);
- const prisma = await getPrismaClientForTenancy(tenancy);
- const rows = await prisma.$replica().$queryRaw<[{ count: number }]>`
- SELECT COUNT(*)::int AS count
- FROM ${sqlQuoteIdent(schema)}."SessionReplay"
- WHERE "tenancyId" = ${tenancy.id}::uuid
- AND "startedAt" >= ${period.start}
- AND "startedAt" < ${period.end}
+async function countMeteredUsageForGroup(group: TenancyMeteredUsageGroup, period: UsagePeriod): Promise {
+ const rows = await group.prisma.$replica().$queryRaw>`
+ SELECT
+ (
+ SELECT COUNT(*)::int
+ FROM ${sqlQuoteIdent(group.schema)}."EmailOutbox"
+ WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[])
+ AND "startedSendingAt" IS NOT NULL
+ AND "startedSendingAt" >= ${period.start}
+ AND "startedSendingAt" < ${period.end}
+ ) AS "emails",
+ (
+ SELECT COUNT(*)::int
+ FROM ${sqlQuoteIdent(group.schema)}."SessionReplay"
+ WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[])
+ AND "startedAt" >= ${period.start}
+ AND "startedAt" < ${period.end}
+ ) AS "sessionReplays"
`;
- return Number(rows[0].count);
+ const row = rows[0] ?? throwErr(`Missing plan usage count row for metered usage group on schema ${group.schema}`);
+ return {
+ emails: Number(row.emails),
+ sessionReplays: Number(row.sessionReplays),
+ };
}
-async function sumTenancyUsage(tenancyIds: string[], counter: (tenancyId: string) => Promise): Promise {
- const counts = await Promise.all(tenancyIds.map(counter));
- return counts.reduce((sum, count) => sum + count, 0);
+async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod): Promise {
+ if (tenancyIds.length === 0) {
+ return { emails: 0, sessionReplays: 0 };
+ }
+
+ const groups = await groupTenanciesByMeteredUsageSource(tenancyIds);
+ // The group count equals the number of distinct databases (usually 1), so concurrency mostly guards
+ // the pathological multi-database team rather than the per-tenancy fan-out it used to.
+ const subtotals = await mapWithConcurrency(
+ groups,
+ PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY,
+ (group) => countMeteredUsageForGroup(group, period),
+ );
+
+ return subtotals.reduce(
+ (totals, subtotal) => ({
+ emails: totals.emails + subtotal.emails,
+ sessionReplays: totals.sessionReplays + subtotal.sessionReplays,
+ }),
+ { emails: 0, sessionReplays: 0 },
+ );
}
async function countAnalyticsEventsForProjects(projectIds: string[], period: UsagePeriod): Promise {
@@ -336,18 +403,16 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D
const planId = resolveActivePlanId(activePlanSubscription);
const period = getPlanUsagePeriod(activePlanSubscription, now);
- const [ownerTeamDisplayName, ownedProjectIds, ownedTenancyIds, dashboardAdmins, authUsers] = await Promise.all([
+ const [ownerTeamDisplayName, ownedScope, dashboardAdmins] = await Promise.all([
getOwnerTeamDisplayName(internalTenancy, ownerTeamId),
- getOwnedProjectIdsForBillingTeam(ownerTeamId),
- getOwnedTenancyIdsForBillingTeam(ownerTeamId),
+ getOwnedProjectAndTenancyIdsForBillingTeam(ownerTeamId),
countDashboardAdmins(internalTenancy, ownerTeamId, now),
- getTeamWideNonAnonymousUserCount(ownerTeamId),
]);
- const [emails, analyticsEvents, sessionReplays] = await Promise.all([
- sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countEmailsForTenancy(tenancyId, period)),
- countAnalyticsEventsForProjects(ownedProjectIds, period),
- sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countSessionReplaysForTenancy(tenancyId, period)),
+ const [authUsers, meteredUsage, analyticsEvents] = await Promise.all([
+ getNonAnonymousUserCountForTenancies(ownedScope.tenancyIds),
+ sumTenancyMeteredUsage(ownedScope.tenancyIds, period),
+ countAnalyticsEventsForProjects(ownedScope.projectIds, period),
]);
return {
@@ -362,9 +427,9 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D
planId,
dashboardAdmins,
authUsers,
- emails,
+ emails: meteredUsage.emails,
analyticsEvents,
- sessionReplays,
+ sessionReplays: meteredUsage.sessionReplays,
}),
};
}
diff --git a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts
new file mode 100644
index 000000000..e4bafaf40
--- /dev/null
+++ b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts
@@ -0,0 +1,407 @@
+import { randomUUID } from "node:crypto";
+import { Client } from "pg";
+import { describe } from "vitest";
+import { ITEM_IDS } from "@hexclave/shared/dist/plans";
+import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
+import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors";
+import { wait } from "@hexclave/shared/dist/utils/promises";
+import { planUsageResponseSchema, type PlanUsageResponse } from "@hexclave/shared/dist/interface/plan-usage";
+import { it } from "../../../../../helpers";
+import { Auth, InternalProjectKeys, Project, backendContext, niceBackendFetch } from "../../../../backend-helpers";
+
+type ProjectUsageContext = {
+ projectId: string,
+ tenancyId: string,
+};
+
+type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue };
+
+function getInternalDatabaseConnectionString(): string {
+ const connectionString = getEnvVariable(
+ "HEXCLAVE_DATABASE_CONNECTION_STRING",
+ getEnvVariable("STACK_DATABASE_CONNECTION_STRING", ""),
+ );
+ if (connectionString === "") {
+ throw new HexclaveAssertionError("Plan usage E2E tests require a configured internal database connection string");
+ }
+ return connectionString;
+}
+
+async function withInternalDatabase(fn: (client: Client) => Promise): Promise {
+ const client = new Client({
+ connectionString: getInternalDatabaseConnectionString(),
+ connectionTimeoutMillis: 10_000,
+ query_timeout: 30_000,
+ });
+ await client.connect();
+ try {
+ return await fn(client);
+ } finally {
+ await client.end();
+ }
+}
+
+async function getMainTenancyId(client: Client, projectId: string): Promise {
+ const tenancies = await client.query<{ id: string }>(
+ `SELECT "id" FROM "Tenancy" WHERE "projectId" = $1 AND "branchId" = 'main' LIMIT 1`,
+ [projectId],
+ );
+ return tenancies.rows[0]?.id ?? throwErr(`Could not find main tenancy for project ${projectId}`);
+}
+
+async function getProjectUsageContext(client: Client, projectId: string): Promise {
+ return {
+ projectId,
+ tenancyId: await getMainTenancyId(client, projectId),
+ };
+}
+
+async function clearSeededUsageRows(client: Client, tenancies: readonly ProjectUsageContext[]): Promise {
+ const tenancyIds = tenancies.map((tenancy) => tenancy.tenancyId);
+ await client.query(`DELETE FROM "SessionReplay" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
+ await client.query(`DELETE FROM "EmailOutbox" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
+ await client.query(`DELETE FROM "ProjectUser" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
+}
+
+function normalizeSubscriptionPeriodInJson(value: unknown, ownerTeamId: string, period: {
+ start: Date,
+ end: Date,
+}): JsonValue {
+ if (value === null || typeof value === "boolean" || typeof value === "number" || typeof value === "string") {
+ return value;
+ }
+ if (Array.isArray(value)) {
+ return value.map((item) => normalizeSubscriptionPeriodInJson(item, ownerTeamId, period));
+ }
+ if (typeof value === "object") {
+ const normalizedObject: { [key: string]: JsonValue } = {};
+ for (const [key, entryValue] of Object.entries(value)) {
+ normalizedObject[key] = normalizeSubscriptionPeriodInJson(entryValue, ownerTeamId, period);
+ }
+ if (
+ normalizedObject.customerId === ownerTeamId
+ && typeof normalizedObject.currentPeriodStartMillis === "number"
+ && typeof normalizedObject.currentPeriodEndMillis === "number"
+ ) {
+ return {
+ ...normalizedObject,
+ currentPeriodStartMillis: period.start.getTime(),
+ currentPeriodEndMillis: period.end.getTime(),
+ };
+ }
+ return normalizedObject;
+ }
+ throw new HexclaveAssertionError("Unexpected non-JSON value in payment storage", { value });
+}
+
+async function normalizeBillingTeamSubscriptionMapPeriod(client: Client, ownerTeamId: string, period: {
+ start: Date,
+ end: Date,
+}): Promise {
+ // The E2E seed data can create zero-length payment periods in the Bulldozer LFold output.
+ // Plan usage reads that output directly, so normalize only this fresh test team's emitted
+ // subscription-map rows to make the metered usage period deterministic.
+ const rows = await client.query<{ id: string, value: unknown }>(
+ `
+ SELECT "id", "value"
+ FROM "BulldozerStorageEngine"
+ WHERE "keyPath"[1] = to_jsonb('table'::text)
+ AND "keyPath"[2] = to_jsonb('external:payments-subscription-map-by-customer'::text)
+ AND "keyPath"::text LIKE $1
+ AND "value" <> 'null'::jsonb
+ `,
+ [`%${ownerTeamId}%`],
+ );
+ if (rows.rows.length === 0) {
+ throw new HexclaveAssertionError("Expected payment subscription-map rows for billing team", { ownerTeamId });
+ }
+ for (const row of rows.rows) {
+ await client.query(
+ `UPDATE "BulldozerStorageEngine" SET "value" = $2::jsonb WHERE "id" = $1::uuid`,
+ [row.id, JSON.stringify(normalizeSubscriptionPeriodInJson(row.value, ownerTeamId, period))],
+ );
+ }
+}
+
+async function insertProjectUsers(client: Client, context: ProjectUsageContext, options: {
+ nonAnonymousCount: number,
+ anonymousCount: number,
+}): Promise {
+ const nonAnonymousUsers = await client.query<{ projectUserId: string }>(
+ `
+ INSERT INTO "ProjectUser"
+ ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId",
+ "displayName", "createdAt", "updatedAt", "isAnonymous",
+ "signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse")
+ SELECT
+ $1::uuid,
+ gen_random_uuid(),
+ $2,
+ 'main',
+ 'Plan Usage User ' || gs,
+ now(),
+ now(),
+ false,
+ now(),
+ 0,
+ 0
+ FROM generate_series(1, $3::int) AS gs
+ RETURNING "projectUserId"
+ `,
+ [context.tenancyId, context.projectId, options.nonAnonymousCount],
+ );
+
+ await client.query(
+ `
+ INSERT INTO "ProjectUser"
+ ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId",
+ "displayName", "createdAt", "updatedAt", "isAnonymous",
+ "signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse")
+ SELECT
+ $1::uuid,
+ gen_random_uuid(),
+ $2,
+ 'main',
+ 'Plan Usage Anonymous User ' || gs,
+ now(),
+ now(),
+ true,
+ now(),
+ 0,
+ 0
+ FROM generate_series(1, $3::int) AS gs
+ `,
+ [context.tenancyId, context.projectId, options.anonymousCount],
+ );
+
+ return nonAnonymousUsers.rows.map((row) => row.projectUserId);
+}
+
+async function insertEmailOutboxRow(client: Client, tenancyId: string, startedSendingAt: Date | null): Promise {
+ const renderedAt = new Date();
+ await client.query(
+ `
+ INSERT INTO "EmailOutbox"
+ ("tenancyId", "id", "createdAt", "updatedAt", "tsxSource", "isHighPriority", "to", "extraRenderVariables",
+ "shouldSkipDeliverabilityCheck", "createdWith", "renderedByWorkerId", "startedRenderingAt",
+ "finishedRenderingAt", "renderedHtml", "renderedSubject", "renderedIsTransactional",
+ "scheduledAt", "isQueued", "startedSendingAt", "finishedSendingAt", "canHaveDeliveryInfo")
+ VALUES
+ ($1::uuid, gen_random_uuid(), $4, $4, '', false, $2::jsonb, '{}'::jsonb,
+ true, 'PROGRAMMATIC_CALL', $3::uuid, $4, $4, 'usage test
',
+ 'Plan usage test email', true, $4, true, $5, $5, $6)
+ `,
+ [
+ tenancyId,
+ JSON.stringify({ type: "custom-emails", emails: ["usage-test@example.com"] }),
+ randomUUID(),
+ renderedAt,
+ startedSendingAt,
+ startedSendingAt == null ? null : false,
+ ],
+ );
+}
+
+async function insertSessionReplayRow(client: Client, context: ProjectUsageContext, projectUserId: string, startedAt: Date): Promise {
+ await client.query(
+ `
+ INSERT INTO "SessionReplay"
+ ("tenancyId", "id", "projectUserId", "refreshTokenId", "startedAt", "lastEventAt", "createdAt", "updatedAt")
+ VALUES
+ ($1::uuid, gen_random_uuid(), $2::uuid, $3::uuid, $4, $4, $4, $4)
+ `,
+ [context.tenancyId, projectUserId, randomUUID(), startedAt],
+ );
+}
+
+async function getPlanUsage(): Promise {
+ const response = await niceBackendFetch("/api/latest/internal/plan-usage", {
+ accessType: "admin",
+ });
+ if (response.status !== 200) {
+ throw new HexclaveAssertionError("Expected plan usage request to succeed", { response });
+ }
+ return await planUsageResponseSchema.validate(response.body);
+}
+
+async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise {
+ const createUrlResponse = await niceBackendFetch("/api/latest/payments/purchases/create-purchase-url", {
+ method: "POST",
+ accessType: "client",
+ body: {
+ customer_type: "team",
+ customer_id: ownerTeamId,
+ product_id: "team",
+ },
+ });
+ if (createUrlResponse.status !== 200 || typeof createUrlResponse.body?.url !== "string") {
+ throw new HexclaveAssertionError("Expected team plan purchase URL creation to succeed", { createUrlResponse });
+ }
+
+ const fullCode = createUrlResponse.body.url.match(/\/purchase\/([a-z0-9_-]+)/)?.[1]
+ ?? throwErr("Could not parse purchase code from team plan purchase URL", { createUrlResponse });
+ const purchaseResponse = await niceBackendFetch("/api/latest/internal/payments/test-mode-purchase-session", {
+ method: "POST",
+ accessType: "admin",
+ body: {
+ full_code: fullCode,
+ price_id: "monthly",
+ quantity: 1,
+ },
+ });
+ if (purchaseResponse.status !== 200) {
+ throw new HexclaveAssertionError("Expected test-mode team plan purchase to succeed", { purchaseResponse });
+ }
+}
+
+function getUsedUsageValue(usage: PlanUsageResponse, itemId: string): number {
+ const row = usage.rows.find((candidate) => candidate.item_id === itemId) ?? throwErr(`Missing usage row for ${itemId}`);
+ return row.used ?? throwErr(`Expected usage row ${itemId} to have a used value`);
+}
+
+function getCalendarMonthBounds(now: Date): { start: Date, end: Date } {
+ return {
+ start: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), 1)),
+ end: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1)),
+ };
+}
+
+async function waitForPlanUsageValues(expected: {
+ authUsers: number,
+ emails: number,
+ sessionReplays: number,
+ analyticsEvents: number,
+}): Promise {
+ const startedAt = performance.now();
+ let latestUsage: PlanUsageResponse | undefined;
+ while (performance.now() - startedAt < 15_000) {
+ latestUsage = await getPlanUsage();
+ if (
+ getUsedUsageValue(latestUsage, ITEM_IDS.authUsers) === expected.authUsers
+ && getUsedUsageValue(latestUsage, ITEM_IDS.emailsPerMonth) === expected.emails
+ && getUsedUsageValue(latestUsage, ITEM_IDS.sessionReplays) === expected.sessionReplays
+ && getUsedUsageValue(latestUsage, ITEM_IDS.analyticsEvents) === expected.analyticsEvents
+ ) {
+ return latestUsage;
+ }
+ await wait(250);
+ }
+ throw new HexclaveAssertionError("Timed out waiting for seeded plan usage to be visible", {
+ latestUsage,
+ expected,
+ });
+}
+
+describe("internal plan usage", () => {
+ it("returns zero usage for a fresh owned project with no seeded usage rows", async ({ expect }) => {
+ const { projectId } = await Project.createAndSwitch({
+ display_name: "Plan Usage Empty Project",
+ });
+
+ await withInternalDatabase(async (client) => {
+ const context = await getProjectUsageContext(client, projectId);
+ await clearSeededUsageRows(client, [context]);
+ });
+
+ const usage = await getPlanUsage();
+
+ expect(usage.owner_team_id).toBeTruthy();
+ expect(usage.plan_id).toBe("free");
+ expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(0);
+ expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(0);
+ expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(0);
+ expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0);
+ });
+
+ it("rolls up metered usage across all projects owned by the billing team", async ({ expect }) => {
+ backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null });
+ await Auth.fastSignUp();
+ const internalUserAuth = backendContext.value.userAuth ?? throwErr("Expected internal user auth after sign-up");
+
+ const primaryProject = await Project.createAndSwitch({
+ display_name: "Plan Usage Primary Project",
+ }, true);
+ const primaryProjectKeys = backendContext.value.projectKeys;
+ const ownerTeamId = primaryProject.createProjectResponse.body.owner_team_id;
+ if (typeof ownerTeamId !== "string") {
+ throw new HexclaveAssertionError("Expected created project to include an owner team ID", { primaryProject });
+ }
+
+ backendContext.set({ projectKeys: InternalProjectKeys, userAuth: internalUserAuth });
+ await purchaseTeamPlanForBillingTeam(ownerTeamId);
+ const secondaryProject = await Project.create({
+ display_name: "Plan Usage Secondary Project",
+ owner_team_id: ownerTeamId,
+ });
+
+ backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null });
+ const unrelatedProject = await Project.createAndSwitch({
+ display_name: "Plan Usage Unrelated Project",
+ });
+
+ backendContext.set({ projectKeys: primaryProjectKeys, userAuth: null });
+
+ const { start, end } = getCalendarMonthBounds(new Date());
+ const outsideBefore = new Date(start.getTime() - 2 * 24 * 60 * 60 * 1000);
+ const insidePrimary = new Date(start.getTime() + 2 * 24 * 60 * 60 * 1000);
+ const insideSecondaryA = new Date(start.getTime() + 3 * 24 * 60 * 60 * 1000);
+ const insideSecondaryB = new Date(start.getTime() + 4 * 24 * 60 * 60 * 1000);
+ const outsideAfter = new Date(end.getTime() + 2 * 24 * 60 * 60 * 1000);
+
+ await withInternalDatabase(async (client) => {
+ const primary = await getProjectUsageContext(client, primaryProject.projectId);
+ const secondary = await getProjectUsageContext(client, secondaryProject.projectId);
+ const unrelated = await getProjectUsageContext(client, unrelatedProject.projectId);
+ await clearSeededUsageRows(client, [primary, secondary, unrelated]);
+ await normalizeBillingTeamSubscriptionMapPeriod(client, ownerTeamId, { start, end });
+
+ const primaryUserIds = await insertProjectUsers(client, primary, {
+ nonAnonymousCount: 2,
+ anonymousCount: 1,
+ });
+ const secondaryUserIds = await insertProjectUsers(client, secondary, {
+ nonAnonymousCount: 1,
+ anonymousCount: 0,
+ });
+ const unrelatedUserIds = await insertProjectUsers(client, unrelated, {
+ nonAnonymousCount: 2,
+ anonymousCount: 0,
+ });
+ const firstPrimaryUserId = primaryUserIds[0] ?? throwErr("Expected seeded primary project user");
+ const secondPrimaryUserId = primaryUserIds[1] ?? throwErr("Expected second seeded primary project user");
+ const firstSecondaryUserId = secondaryUserIds[0] ?? throwErr("Expected seeded secondary project user");
+ const firstUnrelatedUserId = unrelatedUserIds[0] ?? throwErr("Expected seeded unrelated project user");
+
+ await insertEmailOutboxRow(client, primary.tenancyId, insidePrimary);
+ await insertEmailOutboxRow(client, primary.tenancyId, insideSecondaryA);
+ await insertEmailOutboxRow(client, secondary.tenancyId, insideSecondaryB);
+ await insertEmailOutboxRow(client, primary.tenancyId, outsideBefore);
+ await insertEmailOutboxRow(client, secondary.tenancyId, outsideAfter);
+ await insertEmailOutboxRow(client, primary.tenancyId, null);
+ await insertEmailOutboxRow(client, unrelated.tenancyId, insidePrimary);
+
+ await insertSessionReplayRow(client, primary, firstPrimaryUserId, insidePrimary);
+ await insertSessionReplayRow(client, primary, secondPrimaryUserId, outsideBefore);
+ await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryA);
+ await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryB);
+ await insertSessionReplayRow(client, secondary, firstSecondaryUserId, outsideAfter);
+ await insertSessionReplayRow(client, unrelated, firstUnrelatedUserId, insidePrimary);
+ });
+
+ const usage = await waitForPlanUsageValues({
+ authUsers: 3,
+ emails: 3,
+ sessionReplays: 3,
+ analyticsEvents: 0,
+ });
+
+ expect(usage.owner_team_id).toBe(ownerTeamId);
+ expect(usage.plan_id).toBe("team");
+ expect(usage.period_start_millis).toBe(start.getTime());
+ expect(usage.period_end_millis).toBe(end.getTime());
+ expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(3);
+ expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(3);
+ expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(3);
+ expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0);
+ });
+});
diff --git a/packages/shared/src/utils/promises.tsx b/packages/shared/src/utils/promises.tsx
index 0cfb7df3b..36f7c77c1 100644
--- a/packages/shared/src/utils/promises.tsx
+++ b/packages/shared/src/utils/promises.tsx
@@ -434,6 +434,71 @@ import.meta.vitest?.test("timeoutThrow", async ({ expect }) => {
});
+/**
+ * Maps over `items` with `fn`, running at most `concurrency` invocations at a time.
+ *
+ * Unlike `Promise.all(items.map(fn))`, this bounds the number of in-flight
+ * promises, which matters when `fn` hits a shared resource (e.g. a database) and
+ * an unbounded fan-out could exhaust connections or overload a replica. Results
+ * are returned in input order regardless of completion order, and the first
+ * rejection aborts further scheduling — already in-flight workers still settle
+ * but no new items are started.
+ */
+export async function mapWithConcurrency(
+ items: readonly T[],
+ concurrency: number,
+ fn: (item: T, index: number) => Promise,
+): Promise {
+ if (!Number.isInteger(concurrency) || concurrency < 1) {
+ throw new HexclaveAssertionError(`mapWithConcurrency requires a positive integer concurrency, got ${concurrency}`);
+ }
+ const results = new Array(items.length);
+ let nextIndex = 0;
+ let aborted = false;
+ const worker = async () => {
+ while (!aborted) {
+ // Claim an index synchronously before awaiting so workers never process the same item.
+ const index = nextIndex++;
+ if (index >= items.length) return;
+ try {
+ // Bounds-checked above; `?? throwErr(…)` is unsuitable because T may legitimately be null/undefined
+ results[index] = await fn(items[index] as T, index);
+ } catch (error) {
+ aborted = true;
+ throw error;
+ }
+ }
+ };
+ const workerCount = Math.min(concurrency, items.length);
+ await Promise.all(Array.from({ length: workerCount }, () => worker()));
+ return results;
+}
+import.meta.vitest?.test("mapWithConcurrency", async ({ expect }) => {
+ // Preserves input order regardless of completion order.
+ const ordered = await mapWithConcurrency([30, 10, 20], 3, async (ms, index) => {
+ await wait(ms);
+ return `${index}:${ms}`;
+ });
+ expect(ordered).toEqual(["0:30", "1:10", "2:20"]);
+
+ // Never exceeds the configured concurrency.
+ let inFlight = 0;
+ let maxInFlight = 0;
+ await mapWithConcurrency(Array.from({ length: 10 }, (_, i) => i), 3, async () => {
+ inFlight++;
+ maxInFlight = Math.max(maxInFlight, inFlight);
+ await wait(5);
+ inFlight--;
+ });
+ expect(maxInFlight).toBe(3);
+
+ // Empty input spawns no workers and returns an empty array.
+ expect(await mapWithConcurrency([], 4, async () => 1)).toEqual([]);
+
+ // Invalid concurrency fails loudly.
+ await expect(mapWithConcurrency([1], 0, async (x) => x)).rejects.toThrow("positive integer concurrency");
+});
+
export type RateLimitOptions = {
/**
* The number of requests to process in parallel. Currently only 1 is supported.