mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-30 21:01:54 +08:00
Usage page performance improvements (#1650)
<!--
Make sure you've read the CONTRIBUTING.md guidelines:
https://github.com/hexclave/hexclave/blob/dev/CONTRIBUTING.md
-->
<!-- This is an auto-generated description by cubic. -->
---
## Summary by cubic
Speed up the Usage page by aggregating metered usage across owned
projects/tenancies with fewer queries and new indexes. Adds E2E tests to
verify team-owned rollups and calendar‑month windows.
- **Performance**
- Added concurrent indexes for `EmailOutbox(tenancyId,
startedSendingAt)` and `SessionReplay(tenancyId, startedAt)`; updated
Prisma schema.
- Group tenancies by (DB client, schema) and run one SQL per group that
counts both emails and session replays; uses `mapWithConcurrency` from
`@hexclave/shared` (concurrency 4, aborts on first error).
- Added helpers `getOwnedProjectAndTenancyIdsForBillingTeam` and
`getNonAnonymousUserCountForTenancies`; made `mapWithConcurrency`
null‑safe with bounds checks.
- **Tests**
- Added E2E tests for the internal plan-usage endpoint covering
team-owned rollups, calendar‑month boundaries, and zero‑usage cases.
- Added unit tests for ownership scope resolution and non‑anonymous user
counting.
<sup>Written for commit 5d6098006c.
Summary will update on new commits.</sup>
<a
href="https://cubic.dev/pr/hexclave/hexclave/pull/1650?utm_source=github"
target="_blank" rel="noopener noreferrer"
data-no-image-dialog="true"><picture><source
media="(prefers-color-scheme: dark)"
srcset="https://www.cubic.dev/buttons/review-in-cubic-dark.svg"><source
media="(prefers-color-scheme: light)"
srcset="https://www.cubic.dev/buttons/review-in-cubic-light.svg"><img
alt="Review in cubic"
src="https://www.cubic.dev/buttons/review-in-cubic-dark.svg"></picture></a>
<!-- End of auto-generated description by cubic. -->
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
* **Performance Improvements**
* Improved plan usage rollups by aggregating metered emails and session
replays together across an owned scope.
* Added database indexes to speed up time-window metering lookups for
email outbox and session replays.
* **Tests**
* Extended unit tests for billing-team entitlement aggregation and
non-anonymous user counting.
* Added end-to-end coverage for the internal plan-usage endpoint,
including seeded scenarios and period validation.
* **Refactor**
* Reworked entitlement and usage calculations to reuse shared logic for
more consistent results.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
---------
Co-authored-by: armaan <armaan@stack-auth.com>
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
parent
6883d83ad1
commit
81723c3d55
@ -0,0 +1,11 @@
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- SINGLE_STATEMENT_SENTINEL
|
||||
-- RUN_OUTSIDE_TRANSACTION_SENTINEL
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_tenancyId_startedSendingAt_idx"
|
||||
ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("tenancyId", "startedSendingAt");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- SINGLE_STATEMENT_SENTINEL
|
||||
-- RUN_OUTSIDE_TRANSACTION_SENTINEL
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_tenancyId_startedAt_idx"
|
||||
ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("tenancyId", "startedAt");
|
||||
@ -407,6 +407,7 @@ model SessionReplay {
|
||||
|
||||
@@id([tenancyId, id])
|
||||
@@index([tenancyId, projectUserId, startedAt])
|
||||
@@index([tenancyId, startedAt], name: "SessionReplay_tenancyId_startedAt_idx")
|
||||
@@index([tenancyId, lastEventAt])
|
||||
// index by updatedAt instead of lastEventAt because event timing can be spoofed
|
||||
@@index([tenancyId, refreshTokenId, updatedAt])
|
||||
@ -1069,6 +1070,7 @@ model EmailOutbox {
|
||||
|
||||
@@id([tenancyId, id])
|
||||
@@index([tenancyId, finishedSendingAt(sort: Desc), scheduledAtIfNotYetQueued(sort: Desc), priority, id], map: "EmailOutbox_ordering_idx")
|
||||
@@index([tenancyId, startedSendingAt], name: "EmailOutbox_tenancyId_startedSendingAt_idx")
|
||||
@@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx")
|
||||
@@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx")
|
||||
@@index([isQueued], map: "EmailOutbox_isQueued_idx")
|
||||
|
||||
@ -4,6 +4,8 @@ import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
arePlanLimitsEnforced,
|
||||
getBillingTeamId,
|
||||
getNonAnonymousUserCountForTenancies,
|
||||
getOwnedProjectAndTenancyIdsForBillingTeam,
|
||||
getOwnedProjectIdsForBillingTeam,
|
||||
getOwnedTenancyIdsForBillingTeam,
|
||||
getTeamWideItemCapacityForTests,
|
||||
@ -98,6 +100,28 @@ describe("team-wide ownership aggregation", () => {
|
||||
expect(tenancyIds).toEqual(["tenancy-a-main", "tenancy-a-dev", "tenancy-b-main"]);
|
||||
});
|
||||
|
||||
it("lists owned project and tenancy ids from one ownership scope", async () => {
|
||||
const scope = await getOwnedProjectAndTenancyIdsForBillingTeam("team-1", globalPrisma);
|
||||
expect(scope).toMatchInlineSnapshot(`
|
||||
{
|
||||
"projectIds": [
|
||||
"project-a",
|
||||
"project-b",
|
||||
],
|
||||
"tenancyIds": [
|
||||
"tenancy-a-main",
|
||||
"tenancy-a-dev",
|
||||
"tenancy-b-main",
|
||||
],
|
||||
}
|
||||
`);
|
||||
});
|
||||
|
||||
it("counts non-anonymous users from already-resolved tenancies", async () => {
|
||||
const usage = await getNonAnonymousUserCountForTenancies(["tenancy-a-main", "tenancy-b-main"], globalPrisma);
|
||||
expect(usage).toBe(2);
|
||||
});
|
||||
|
||||
it("counts only non-anonymous users across all owned tenancies", async () => {
|
||||
const usage = await getTeamWideNonAnonymousUserCount("team-1", globalPrisma);
|
||||
expect(usage).toBe(3);
|
||||
|
||||
@ -39,6 +39,11 @@ type GlobalPrismaLike = {
|
||||
},
|
||||
};
|
||||
|
||||
type OwnedBillingScope = {
|
||||
projectIds: string[],
|
||||
tenancyIds: string[],
|
||||
};
|
||||
|
||||
type ItemCapacityReaders = {
|
||||
getPrismaForTenancy: (tenancy: Tenancy) => Promise<unknown>,
|
||||
getItemQuantityForCustomer: (options: {
|
||||
@ -85,13 +90,16 @@ export async function getOwnedProjectIdsForBillingTeam(
|
||||
return projects.map((project) => project.id);
|
||||
}
|
||||
|
||||
export async function getOwnedTenancyIdsForBillingTeam(
|
||||
export async function getOwnedProjectAndTenancyIdsForBillingTeam(
|
||||
billingTeamId: string,
|
||||
globalPrisma: GlobalPrismaLike = globalPrismaClient,
|
||||
): Promise<string[]> {
|
||||
): Promise<OwnedBillingScope> {
|
||||
const projectIds = await getOwnedProjectIdsForBillingTeam(billingTeamId, globalPrisma);
|
||||
if (projectIds.length === 0) {
|
||||
return [];
|
||||
return {
|
||||
projectIds,
|
||||
tenancyIds: [],
|
||||
};
|
||||
}
|
||||
const tenancies = await globalPrisma.tenancy.findMany({
|
||||
where: {
|
||||
@ -103,16 +111,23 @@ export async function getOwnedTenancyIdsForBillingTeam(
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
return tenancies.map((tenancy) => tenancy.id);
|
||||
return {
|
||||
projectIds,
|
||||
tenancyIds: tenancies.map((tenancy) => tenancy.id),
|
||||
};
|
||||
}
|
||||
|
||||
export async function getTeamWideNonAnonymousUserCount(
|
||||
export async function getOwnedTenancyIdsForBillingTeam(
|
||||
billingTeamId: string,
|
||||
globalPrisma: GlobalPrismaLike = globalPrismaClient,
|
||||
): Promise<string[]> {
|
||||
return (await getOwnedProjectAndTenancyIdsForBillingTeam(billingTeamId, globalPrisma)).tenancyIds;
|
||||
}
|
||||
|
||||
export async function getNonAnonymousUserCountForTenancies(
|
||||
tenancyIds: string[],
|
||||
globalPrisma: GlobalPrismaLike = globalPrismaClient,
|
||||
): Promise<number> {
|
||||
// Usage metric: how many non-anonymous users are currently consumed by this billing team.
|
||||
// This is compared against auth user capacity to determine over-limit conditions.
|
||||
const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma);
|
||||
if (tenancyIds.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
@ -126,6 +141,16 @@ export async function getTeamWideNonAnonymousUserCount(
|
||||
});
|
||||
}
|
||||
|
||||
export async function getTeamWideNonAnonymousUserCount(
|
||||
billingTeamId: string,
|
||||
globalPrisma: GlobalPrismaLike = globalPrismaClient,
|
||||
): Promise<number> {
|
||||
// Usage metric: how many non-anonymous users are currently consumed by this billing team.
|
||||
// This is compared against auth user capacity to determine over-limit conditions.
|
||||
const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma);
|
||||
return await getNonAnonymousUserCountForTenancies(tenancyIds, globalPrisma);
|
||||
}
|
||||
|
||||
async function getTeamWideItemCapacity(
|
||||
billingTeamId: string,
|
||||
itemId: string,
|
||||
|
||||
@ -4,20 +4,24 @@ import { getSubscriptionMapForCustomer } from "@/lib/payments/customer-data";
|
||||
import { isActiveSubscription } from "@/lib/payments";
|
||||
import {
|
||||
getBillingTeamId,
|
||||
getOwnedProjectIdsForBillingTeam,
|
||||
getOwnedTenancyIdsForBillingTeam,
|
||||
getTeamWideNonAnonymousUserCount,
|
||||
getNonAnonymousUserCountForTenancies,
|
||||
getOwnedProjectAndTenancyIdsForBillingTeam,
|
||||
} from "@/lib/plan-entitlements";
|
||||
import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch, getTenancy, type Tenancy } from "@/lib/tenancies";
|
||||
import { getPrismaClientForTenancy, getPrismaSchemaForTenancy, globalPrismaClient, sqlQuoteIdent } from "@/prisma-client";
|
||||
import { BASE_PLAN_IDS_BY_TIER, ITEM_IDS, PLAN_LIMITS, UNLIMITED, type ItemId, type PlanId } from "@hexclave/shared/dist/plans";
|
||||
import type { PlanUsageResponse } from "@hexclave/shared/dist/interface/admin-interface";
|
||||
import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors";
|
||||
import { mapWithConcurrency } from "@hexclave/shared/dist/utils/promises";
|
||||
import type { SubscriptionRow } from "./payments/schema/types";
|
||||
|
||||
type PlanUsageKind = PlanUsageResponse["rows"][number]["kind"];
|
||||
type PlanUsageRow = PlanUsageResponse["rows"][number];
|
||||
type UsageLimit = number | null;
|
||||
type TenancyMeteredUsage = {
|
||||
emails: number,
|
||||
sessionReplays: number,
|
||||
};
|
||||
|
||||
type UsagePeriod = {
|
||||
start: Date,
|
||||
@ -46,6 +50,8 @@ const PLAN_LABELS = new Map<PlanId, string>([
|
||||
["growth", "Growth"],
|
||||
]);
|
||||
|
||||
const PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY = 4;
|
||||
|
||||
export function getNextPlanId(planId: PlanId): "team" | "growth" | null {
|
||||
if (planId === "free") {
|
||||
return "team";
|
||||
@ -202,38 +208,99 @@ async function getOwnerTeamDisplayName(internalTenancy: Tenancy, ownerTeamId: st
|
||||
return team?.displayName ?? throwErr(`Owner team ${ownerTeamId} not found in the internal tenancy`);
|
||||
}
|
||||
|
||||
async function countEmailsForTenancy(tenancyId: string, period: UsagePeriod): Promise<number> {
|
||||
const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting email usage`);
|
||||
const schema = await getPrismaSchemaForTenancy(tenancy);
|
||||
const prisma = await getPrismaClientForTenancy(tenancy);
|
||||
const rows = await prisma.$replica().$queryRaw<[{ count: number }]>`
|
||||
SELECT COUNT(*)::int AS count
|
||||
FROM ${sqlQuoteIdent(schema)}."EmailOutbox"
|
||||
WHERE "tenancyId" = ${tenancy.id}::uuid
|
||||
AND "startedSendingAt" IS NOT NULL
|
||||
AND "startedSendingAt" >= ${period.start}
|
||||
AND "startedSendingAt" < ${period.end}
|
||||
`;
|
||||
return Number(rows[0].count);
|
||||
type TenancyPrismaClient = Awaited<ReturnType<typeof getPrismaClientForTenancy>>;
|
||||
|
||||
type TenancyMeteredUsageGroup = {
|
||||
prisma: TenancyPrismaClient,
|
||||
schema: string,
|
||||
tenancyIds: string[],
|
||||
};
|
||||
|
||||
// Tenancies can route to different source-of-truth databases/schemas, so we can't assume a single
|
||||
// query covers every tenancy. We group tenancies that share a (client, schema) and run one aggregate
|
||||
// COUNT per group: the common case (all projects on one database) collapses to a single round trip,
|
||||
// while multi-database teams fan out to one query per distinct database instead of one per tenancy.
|
||||
async function groupTenanciesByMeteredUsageSource(tenancyIds: string[]): Promise<TenancyMeteredUsageGroup[]> {
|
||||
const resolved = await mapWithConcurrency(tenancyIds, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, async (tenancyId) => {
|
||||
const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting plan usage`);
|
||||
const [schema, prisma] = await Promise.all([
|
||||
getPrismaSchemaForTenancy(tenancy),
|
||||
getPrismaClientForTenancy(tenancy),
|
||||
]);
|
||||
return { tenancyId: tenancy.id, schema, prisma };
|
||||
});
|
||||
|
||||
const byClient = new Map<TenancyPrismaClient, Map<string, string[]>>();
|
||||
for (const { tenancyId, schema, prisma } of resolved) {
|
||||
let bySchema = byClient.get(prisma);
|
||||
if (bySchema == null) {
|
||||
bySchema = new Map<string, string[]>();
|
||||
byClient.set(prisma, bySchema);
|
||||
}
|
||||
const existing = bySchema.get(schema);
|
||||
if (existing == null) {
|
||||
bySchema.set(schema, [tenancyId]);
|
||||
} else {
|
||||
existing.push(tenancyId);
|
||||
}
|
||||
}
|
||||
|
||||
const groups: TenancyMeteredUsageGroup[] = [];
|
||||
for (const [prisma, bySchema] of byClient) {
|
||||
for (const [schema, groupTenancyIds] of bySchema) {
|
||||
groups.push({ prisma, schema, tenancyIds: groupTenancyIds });
|
||||
}
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
async function countSessionReplaysForTenancy(tenancyId: string, period: UsagePeriod): Promise<number> {
|
||||
const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting session replay usage`);
|
||||
const schema = await getPrismaSchemaForTenancy(tenancy);
|
||||
const prisma = await getPrismaClientForTenancy(tenancy);
|
||||
const rows = await prisma.$replica().$queryRaw<[{ count: number }]>`
|
||||
SELECT COUNT(*)::int AS count
|
||||
FROM ${sqlQuoteIdent(schema)}."SessionReplay"
|
||||
WHERE "tenancyId" = ${tenancy.id}::uuid
|
||||
AND "startedAt" >= ${period.start}
|
||||
AND "startedAt" < ${period.end}
|
||||
async function countMeteredUsageForGroup(group: TenancyMeteredUsageGroup, period: UsagePeriod): Promise<TenancyMeteredUsage> {
|
||||
const rows = await group.prisma.$replica().$queryRaw<Array<{ emails: number, sessionReplays: number }>>`
|
||||
SELECT
|
||||
(
|
||||
SELECT COUNT(*)::int
|
||||
FROM ${sqlQuoteIdent(group.schema)}."EmailOutbox"
|
||||
WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[])
|
||||
AND "startedSendingAt" IS NOT NULL
|
||||
AND "startedSendingAt" >= ${period.start}
|
||||
AND "startedSendingAt" < ${period.end}
|
||||
) AS "emails",
|
||||
(
|
||||
SELECT COUNT(*)::int
|
||||
FROM ${sqlQuoteIdent(group.schema)}."SessionReplay"
|
||||
WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[])
|
||||
AND "startedAt" >= ${period.start}
|
||||
AND "startedAt" < ${period.end}
|
||||
) AS "sessionReplays"
|
||||
`;
|
||||
return Number(rows[0].count);
|
||||
const row = rows[0] ?? throwErr(`Missing plan usage count row for metered usage group on schema ${group.schema}`);
|
||||
return {
|
||||
emails: Number(row.emails),
|
||||
sessionReplays: Number(row.sessionReplays),
|
||||
};
|
||||
}
|
||||
|
||||
async function sumTenancyUsage(tenancyIds: string[], counter: (tenancyId: string) => Promise<number>): Promise<number> {
|
||||
const counts = await Promise.all(tenancyIds.map(counter));
|
||||
return counts.reduce((sum, count) => sum + count, 0);
|
||||
async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod): Promise<TenancyMeteredUsage> {
|
||||
if (tenancyIds.length === 0) {
|
||||
return { emails: 0, sessionReplays: 0 };
|
||||
}
|
||||
|
||||
const groups = await groupTenanciesByMeteredUsageSource(tenancyIds);
|
||||
// The group count equals the number of distinct databases (usually 1), so concurrency mostly guards
|
||||
// the pathological multi-database team rather than the per-tenancy fan-out it used to.
|
||||
const subtotals = await mapWithConcurrency(
|
||||
groups,
|
||||
PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY,
|
||||
(group) => countMeteredUsageForGroup(group, period),
|
||||
);
|
||||
|
||||
return subtotals.reduce<TenancyMeteredUsage>(
|
||||
(totals, subtotal) => ({
|
||||
emails: totals.emails + subtotal.emails,
|
||||
sessionReplays: totals.sessionReplays + subtotal.sessionReplays,
|
||||
}),
|
||||
{ emails: 0, sessionReplays: 0 },
|
||||
);
|
||||
}
|
||||
|
||||
async function countAnalyticsEventsForProjects(projectIds: string[], period: UsagePeriod): Promise<number> {
|
||||
@ -336,18 +403,16 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D
|
||||
const planId = resolveActivePlanId(activePlanSubscription);
|
||||
const period = getPlanUsagePeriod(activePlanSubscription, now);
|
||||
|
||||
const [ownerTeamDisplayName, ownedProjectIds, ownedTenancyIds, dashboardAdmins, authUsers] = await Promise.all([
|
||||
const [ownerTeamDisplayName, ownedScope, dashboardAdmins] = await Promise.all([
|
||||
getOwnerTeamDisplayName(internalTenancy, ownerTeamId),
|
||||
getOwnedProjectIdsForBillingTeam(ownerTeamId),
|
||||
getOwnedTenancyIdsForBillingTeam(ownerTeamId),
|
||||
getOwnedProjectAndTenancyIdsForBillingTeam(ownerTeamId),
|
||||
countDashboardAdmins(internalTenancy, ownerTeamId, now),
|
||||
getTeamWideNonAnonymousUserCount(ownerTeamId),
|
||||
]);
|
||||
|
||||
const [emails, analyticsEvents, sessionReplays] = await Promise.all([
|
||||
sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countEmailsForTenancy(tenancyId, period)),
|
||||
countAnalyticsEventsForProjects(ownedProjectIds, period),
|
||||
sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countSessionReplaysForTenancy(tenancyId, period)),
|
||||
const [authUsers, meteredUsage, analyticsEvents] = await Promise.all([
|
||||
getNonAnonymousUserCountForTenancies(ownedScope.tenancyIds),
|
||||
sumTenancyMeteredUsage(ownedScope.tenancyIds, period),
|
||||
countAnalyticsEventsForProjects(ownedScope.projectIds, period),
|
||||
]);
|
||||
|
||||
return {
|
||||
@ -362,9 +427,9 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D
|
||||
planId,
|
||||
dashboardAdmins,
|
||||
authUsers,
|
||||
emails,
|
||||
emails: meteredUsage.emails,
|
||||
analyticsEvents,
|
||||
sessionReplays,
|
||||
sessionReplays: meteredUsage.sessionReplays,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
@ -0,0 +1,407 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { Client } from "pg";
|
||||
import { describe } from "vitest";
|
||||
import { ITEM_IDS } from "@hexclave/shared/dist/plans";
|
||||
import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
|
||||
import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors";
|
||||
import { wait } from "@hexclave/shared/dist/utils/promises";
|
||||
import { planUsageResponseSchema, type PlanUsageResponse } from "@hexclave/shared/dist/interface/plan-usage";
|
||||
import { it } from "../../../../../helpers";
|
||||
import { Auth, InternalProjectKeys, Project, backendContext, niceBackendFetch } from "../../../../backend-helpers";
|
||||
|
||||
type ProjectUsageContext = {
|
||||
projectId: string,
|
||||
tenancyId: string,
|
||||
};
|
||||
|
||||
type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue };
|
||||
|
||||
function getInternalDatabaseConnectionString(): string {
|
||||
const connectionString = getEnvVariable(
|
||||
"HEXCLAVE_DATABASE_CONNECTION_STRING",
|
||||
getEnvVariable("STACK_DATABASE_CONNECTION_STRING", ""),
|
||||
);
|
||||
if (connectionString === "") {
|
||||
throw new HexclaveAssertionError("Plan usage E2E tests require a configured internal database connection string");
|
||||
}
|
||||
return connectionString;
|
||||
}
|
||||
|
||||
async function withInternalDatabase<T>(fn: (client: Client) => Promise<T>): Promise<T> {
|
||||
const client = new Client({
|
||||
connectionString: getInternalDatabaseConnectionString(),
|
||||
connectionTimeoutMillis: 10_000,
|
||||
query_timeout: 30_000,
|
||||
});
|
||||
await client.connect();
|
||||
try {
|
||||
return await fn(client);
|
||||
} finally {
|
||||
await client.end();
|
||||
}
|
||||
}
|
||||
|
||||
async function getMainTenancyId(client: Client, projectId: string): Promise<string> {
|
||||
const tenancies = await client.query<{ id: string }>(
|
||||
`SELECT "id" FROM "Tenancy" WHERE "projectId" = $1 AND "branchId" = 'main' LIMIT 1`,
|
||||
[projectId],
|
||||
);
|
||||
return tenancies.rows[0]?.id ?? throwErr(`Could not find main tenancy for project ${projectId}`);
|
||||
}
|
||||
|
||||
async function getProjectUsageContext(client: Client, projectId: string): Promise<ProjectUsageContext> {
|
||||
return {
|
||||
projectId,
|
||||
tenancyId: await getMainTenancyId(client, projectId),
|
||||
};
|
||||
}
|
||||
|
||||
async function clearSeededUsageRows(client: Client, tenancies: readonly ProjectUsageContext[]): Promise<void> {
|
||||
const tenancyIds = tenancies.map((tenancy) => tenancy.tenancyId);
|
||||
await client.query(`DELETE FROM "SessionReplay" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
|
||||
await client.query(`DELETE FROM "EmailOutbox" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
|
||||
await client.query(`DELETE FROM "ProjectUser" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]);
|
||||
}
|
||||
|
||||
function normalizeSubscriptionPeriodInJson(value: unknown, ownerTeamId: string, period: {
|
||||
start: Date,
|
||||
end: Date,
|
||||
}): JsonValue {
|
||||
if (value === null || typeof value === "boolean" || typeof value === "number" || typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return value.map((item) => normalizeSubscriptionPeriodInJson(item, ownerTeamId, period));
|
||||
}
|
||||
if (typeof value === "object") {
|
||||
const normalizedObject: { [key: string]: JsonValue } = {};
|
||||
for (const [key, entryValue] of Object.entries(value)) {
|
||||
normalizedObject[key] = normalizeSubscriptionPeriodInJson(entryValue, ownerTeamId, period);
|
||||
}
|
||||
if (
|
||||
normalizedObject.customerId === ownerTeamId
|
||||
&& typeof normalizedObject.currentPeriodStartMillis === "number"
|
||||
&& typeof normalizedObject.currentPeriodEndMillis === "number"
|
||||
) {
|
||||
return {
|
||||
...normalizedObject,
|
||||
currentPeriodStartMillis: period.start.getTime(),
|
||||
currentPeriodEndMillis: period.end.getTime(),
|
||||
};
|
||||
}
|
||||
return normalizedObject;
|
||||
}
|
||||
throw new HexclaveAssertionError("Unexpected non-JSON value in payment storage", { value });
|
||||
}
|
||||
|
||||
async function normalizeBillingTeamSubscriptionMapPeriod(client: Client, ownerTeamId: string, period: {
|
||||
start: Date,
|
||||
end: Date,
|
||||
}): Promise<void> {
|
||||
// The E2E seed data can create zero-length payment periods in the Bulldozer LFold output.
|
||||
// Plan usage reads that output directly, so normalize only this fresh test team's emitted
|
||||
// subscription-map rows to make the metered usage period deterministic.
|
||||
const rows = await client.query<{ id: string, value: unknown }>(
|
||||
`
|
||||
SELECT "id", "value"
|
||||
FROM "BulldozerStorageEngine"
|
||||
WHERE "keyPath"[1] = to_jsonb('table'::text)
|
||||
AND "keyPath"[2] = to_jsonb('external:payments-subscription-map-by-customer'::text)
|
||||
AND "keyPath"::text LIKE $1
|
||||
AND "value" <> 'null'::jsonb
|
||||
`,
|
||||
[`%${ownerTeamId}%`],
|
||||
);
|
||||
if (rows.rows.length === 0) {
|
||||
throw new HexclaveAssertionError("Expected payment subscription-map rows for billing team", { ownerTeamId });
|
||||
}
|
||||
for (const row of rows.rows) {
|
||||
await client.query(
|
||||
`UPDATE "BulldozerStorageEngine" SET "value" = $2::jsonb WHERE "id" = $1::uuid`,
|
||||
[row.id, JSON.stringify(normalizeSubscriptionPeriodInJson(row.value, ownerTeamId, period))],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async function insertProjectUsers(client: Client, context: ProjectUsageContext, options: {
|
||||
nonAnonymousCount: number,
|
||||
anonymousCount: number,
|
||||
}): Promise<string[]> {
|
||||
const nonAnonymousUsers = await client.query<{ projectUserId: string }>(
|
||||
`
|
||||
INSERT INTO "ProjectUser"
|
||||
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId",
|
||||
"displayName", "createdAt", "updatedAt", "isAnonymous",
|
||||
"signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse")
|
||||
SELECT
|
||||
$1::uuid,
|
||||
gen_random_uuid(),
|
||||
$2,
|
||||
'main',
|
||||
'Plan Usage User ' || gs,
|
||||
now(),
|
||||
now(),
|
||||
false,
|
||||
now(),
|
||||
0,
|
||||
0
|
||||
FROM generate_series(1, $3::int) AS gs
|
||||
RETURNING "projectUserId"
|
||||
`,
|
||||
[context.tenancyId, context.projectId, options.nonAnonymousCount],
|
||||
);
|
||||
|
||||
await client.query(
|
||||
`
|
||||
INSERT INTO "ProjectUser"
|
||||
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId",
|
||||
"displayName", "createdAt", "updatedAt", "isAnonymous",
|
||||
"signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse")
|
||||
SELECT
|
||||
$1::uuid,
|
||||
gen_random_uuid(),
|
||||
$2,
|
||||
'main',
|
||||
'Plan Usage Anonymous User ' || gs,
|
||||
now(),
|
||||
now(),
|
||||
true,
|
||||
now(),
|
||||
0,
|
||||
0
|
||||
FROM generate_series(1, $3::int) AS gs
|
||||
`,
|
||||
[context.tenancyId, context.projectId, options.anonymousCount],
|
||||
);
|
||||
|
||||
return nonAnonymousUsers.rows.map((row) => row.projectUserId);
|
||||
}
|
||||
|
||||
async function insertEmailOutboxRow(client: Client, tenancyId: string, startedSendingAt: Date | null): Promise<void> {
|
||||
const renderedAt = new Date();
|
||||
await client.query(
|
||||
`
|
||||
INSERT INTO "EmailOutbox"
|
||||
("tenancyId", "id", "createdAt", "updatedAt", "tsxSource", "isHighPriority", "to", "extraRenderVariables",
|
||||
"shouldSkipDeliverabilityCheck", "createdWith", "renderedByWorkerId", "startedRenderingAt",
|
||||
"finishedRenderingAt", "renderedHtml", "renderedSubject", "renderedIsTransactional",
|
||||
"scheduledAt", "isQueued", "startedSendingAt", "finishedSendingAt", "canHaveDeliveryInfo")
|
||||
VALUES
|
||||
($1::uuid, gen_random_uuid(), $4, $4, '', false, $2::jsonb, '{}'::jsonb,
|
||||
true, 'PROGRAMMATIC_CALL', $3::uuid, $4, $4, '<p>usage test</p>',
|
||||
'Plan usage test email', true, $4, true, $5, $5, $6)
|
||||
`,
|
||||
[
|
||||
tenancyId,
|
||||
JSON.stringify({ type: "custom-emails", emails: ["usage-test@example.com"] }),
|
||||
randomUUID(),
|
||||
renderedAt,
|
||||
startedSendingAt,
|
||||
startedSendingAt == null ? null : false,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
async function insertSessionReplayRow(client: Client, context: ProjectUsageContext, projectUserId: string, startedAt: Date): Promise<void> {
|
||||
await client.query(
|
||||
`
|
||||
INSERT INTO "SessionReplay"
|
||||
("tenancyId", "id", "projectUserId", "refreshTokenId", "startedAt", "lastEventAt", "createdAt", "updatedAt")
|
||||
VALUES
|
||||
($1::uuid, gen_random_uuid(), $2::uuid, $3::uuid, $4, $4, $4, $4)
|
||||
`,
|
||||
[context.tenancyId, projectUserId, randomUUID(), startedAt],
|
||||
);
|
||||
}
|
||||
|
||||
async function getPlanUsage(): Promise<PlanUsageResponse> {
|
||||
const response = await niceBackendFetch("/api/latest/internal/plan-usage", {
|
||||
accessType: "admin",
|
||||
});
|
||||
if (response.status !== 200) {
|
||||
throw new HexclaveAssertionError("Expected plan usage request to succeed", { response });
|
||||
}
|
||||
return await planUsageResponseSchema.validate(response.body);
|
||||
}
|
||||
|
||||
async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise<void> {
|
||||
const createUrlResponse = await niceBackendFetch("/api/latest/payments/purchases/create-purchase-url", {
|
||||
method: "POST",
|
||||
accessType: "client",
|
||||
body: {
|
||||
customer_type: "team",
|
||||
customer_id: ownerTeamId,
|
||||
product_id: "team",
|
||||
},
|
||||
});
|
||||
if (createUrlResponse.status !== 200 || typeof createUrlResponse.body?.url !== "string") {
|
||||
throw new HexclaveAssertionError("Expected team plan purchase URL creation to succeed", { createUrlResponse });
|
||||
}
|
||||
|
||||
const fullCode = createUrlResponse.body.url.match(/\/purchase\/([a-z0-9_-]+)/)?.[1]
|
||||
?? throwErr("Could not parse purchase code from team plan purchase URL", { createUrlResponse });
|
||||
const purchaseResponse = await niceBackendFetch("/api/latest/internal/payments/test-mode-purchase-session", {
|
||||
method: "POST",
|
||||
accessType: "admin",
|
||||
body: {
|
||||
full_code: fullCode,
|
||||
price_id: "monthly",
|
||||
quantity: 1,
|
||||
},
|
||||
});
|
||||
if (purchaseResponse.status !== 200) {
|
||||
throw new HexclaveAssertionError("Expected test-mode team plan purchase to succeed", { purchaseResponse });
|
||||
}
|
||||
}
|
||||
|
||||
function getUsedUsageValue(usage: PlanUsageResponse, itemId: string): number {
|
||||
const row = usage.rows.find((candidate) => candidate.item_id === itemId) ?? throwErr(`Missing usage row for ${itemId}`);
|
||||
return row.used ?? throwErr(`Expected usage row ${itemId} to have a used value`);
|
||||
}
|
||||
|
||||
function getCalendarMonthBounds(now: Date): { start: Date, end: Date } {
|
||||
return {
|
||||
start: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), 1)),
|
||||
end: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1)),
|
||||
};
|
||||
}
|
||||
|
||||
async function waitForPlanUsageValues(expected: {
|
||||
authUsers: number,
|
||||
emails: number,
|
||||
sessionReplays: number,
|
||||
analyticsEvents: number,
|
||||
}): Promise<PlanUsageResponse> {
|
||||
const startedAt = performance.now();
|
||||
let latestUsage: PlanUsageResponse | undefined;
|
||||
while (performance.now() - startedAt < 15_000) {
|
||||
latestUsage = await getPlanUsage();
|
||||
if (
|
||||
getUsedUsageValue(latestUsage, ITEM_IDS.authUsers) === expected.authUsers
|
||||
&& getUsedUsageValue(latestUsage, ITEM_IDS.emailsPerMonth) === expected.emails
|
||||
&& getUsedUsageValue(latestUsage, ITEM_IDS.sessionReplays) === expected.sessionReplays
|
||||
&& getUsedUsageValue(latestUsage, ITEM_IDS.analyticsEvents) === expected.analyticsEvents
|
||||
) {
|
||||
return latestUsage;
|
||||
}
|
||||
await wait(250);
|
||||
}
|
||||
throw new HexclaveAssertionError("Timed out waiting for seeded plan usage to be visible", {
|
||||
latestUsage,
|
||||
expected,
|
||||
});
|
||||
}
|
||||
|
||||
describe("internal plan usage", () => {
|
||||
it("returns zero usage for a fresh owned project with no seeded usage rows", async ({ expect }) => {
|
||||
const { projectId } = await Project.createAndSwitch({
|
||||
display_name: "Plan Usage Empty Project",
|
||||
});
|
||||
|
||||
await withInternalDatabase(async (client) => {
|
||||
const context = await getProjectUsageContext(client, projectId);
|
||||
await clearSeededUsageRows(client, [context]);
|
||||
});
|
||||
|
||||
const usage = await getPlanUsage();
|
||||
|
||||
expect(usage.owner_team_id).toBeTruthy();
|
||||
expect(usage.plan_id).toBe("free");
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(0);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(0);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(0);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0);
|
||||
});
|
||||
|
||||
it("rolls up metered usage across all projects owned by the billing team", async ({ expect }) => {
|
||||
backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null });
|
||||
await Auth.fastSignUp();
|
||||
const internalUserAuth = backendContext.value.userAuth ?? throwErr("Expected internal user auth after sign-up");
|
||||
|
||||
const primaryProject = await Project.createAndSwitch({
|
||||
display_name: "Plan Usage Primary Project",
|
||||
}, true);
|
||||
const primaryProjectKeys = backendContext.value.projectKeys;
|
||||
const ownerTeamId = primaryProject.createProjectResponse.body.owner_team_id;
|
||||
if (typeof ownerTeamId !== "string") {
|
||||
throw new HexclaveAssertionError("Expected created project to include an owner team ID", { primaryProject });
|
||||
}
|
||||
|
||||
backendContext.set({ projectKeys: InternalProjectKeys, userAuth: internalUserAuth });
|
||||
await purchaseTeamPlanForBillingTeam(ownerTeamId);
|
||||
const secondaryProject = await Project.create({
|
||||
display_name: "Plan Usage Secondary Project",
|
||||
owner_team_id: ownerTeamId,
|
||||
});
|
||||
|
||||
backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null });
|
||||
const unrelatedProject = await Project.createAndSwitch({
|
||||
display_name: "Plan Usage Unrelated Project",
|
||||
});
|
||||
|
||||
backendContext.set({ projectKeys: primaryProjectKeys, userAuth: null });
|
||||
|
||||
const { start, end } = getCalendarMonthBounds(new Date());
|
||||
const outsideBefore = new Date(start.getTime() - 2 * 24 * 60 * 60 * 1000);
|
||||
const insidePrimary = new Date(start.getTime() + 2 * 24 * 60 * 60 * 1000);
|
||||
const insideSecondaryA = new Date(start.getTime() + 3 * 24 * 60 * 60 * 1000);
|
||||
const insideSecondaryB = new Date(start.getTime() + 4 * 24 * 60 * 60 * 1000);
|
||||
const outsideAfter = new Date(end.getTime() + 2 * 24 * 60 * 60 * 1000);
|
||||
|
||||
await withInternalDatabase(async (client) => {
|
||||
const primary = await getProjectUsageContext(client, primaryProject.projectId);
|
||||
const secondary = await getProjectUsageContext(client, secondaryProject.projectId);
|
||||
const unrelated = await getProjectUsageContext(client, unrelatedProject.projectId);
|
||||
await clearSeededUsageRows(client, [primary, secondary, unrelated]);
|
||||
await normalizeBillingTeamSubscriptionMapPeriod(client, ownerTeamId, { start, end });
|
||||
|
||||
const primaryUserIds = await insertProjectUsers(client, primary, {
|
||||
nonAnonymousCount: 2,
|
||||
anonymousCount: 1,
|
||||
});
|
||||
const secondaryUserIds = await insertProjectUsers(client, secondary, {
|
||||
nonAnonymousCount: 1,
|
||||
anonymousCount: 0,
|
||||
});
|
||||
const unrelatedUserIds = await insertProjectUsers(client, unrelated, {
|
||||
nonAnonymousCount: 2,
|
||||
anonymousCount: 0,
|
||||
});
|
||||
const firstPrimaryUserId = primaryUserIds[0] ?? throwErr("Expected seeded primary project user");
|
||||
const secondPrimaryUserId = primaryUserIds[1] ?? throwErr("Expected second seeded primary project user");
|
||||
const firstSecondaryUserId = secondaryUserIds[0] ?? throwErr("Expected seeded secondary project user");
|
||||
const firstUnrelatedUserId = unrelatedUserIds[0] ?? throwErr("Expected seeded unrelated project user");
|
||||
|
||||
await insertEmailOutboxRow(client, primary.tenancyId, insidePrimary);
|
||||
await insertEmailOutboxRow(client, primary.tenancyId, insideSecondaryA);
|
||||
await insertEmailOutboxRow(client, secondary.tenancyId, insideSecondaryB);
|
||||
await insertEmailOutboxRow(client, primary.tenancyId, outsideBefore);
|
||||
await insertEmailOutboxRow(client, secondary.tenancyId, outsideAfter);
|
||||
await insertEmailOutboxRow(client, primary.tenancyId, null);
|
||||
await insertEmailOutboxRow(client, unrelated.tenancyId, insidePrimary);
|
||||
|
||||
await insertSessionReplayRow(client, primary, firstPrimaryUserId, insidePrimary);
|
||||
await insertSessionReplayRow(client, primary, secondPrimaryUserId, outsideBefore);
|
||||
await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryA);
|
||||
await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryB);
|
||||
await insertSessionReplayRow(client, secondary, firstSecondaryUserId, outsideAfter);
|
||||
await insertSessionReplayRow(client, unrelated, firstUnrelatedUserId, insidePrimary);
|
||||
});
|
||||
|
||||
const usage = await waitForPlanUsageValues({
|
||||
authUsers: 3,
|
||||
emails: 3,
|
||||
sessionReplays: 3,
|
||||
analyticsEvents: 0,
|
||||
});
|
||||
|
||||
expect(usage.owner_team_id).toBe(ownerTeamId);
|
||||
expect(usage.plan_id).toBe("team");
|
||||
expect(usage.period_start_millis).toBe(start.getTime());
|
||||
expect(usage.period_end_millis).toBe(end.getTime());
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(3);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(3);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(3);
|
||||
expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0);
|
||||
});
|
||||
});
|
||||
@ -434,6 +434,71 @@ import.meta.vitest?.test("timeoutThrow", async ({ expect }) => {
|
||||
});
|
||||
|
||||
|
||||
/**
|
||||
* Maps over `items` with `fn`, running at most `concurrency` invocations at a time.
|
||||
*
|
||||
* Unlike `Promise.all(items.map(fn))`, this bounds the number of in-flight
|
||||
* promises, which matters when `fn` hits a shared resource (e.g. a database) and
|
||||
* an unbounded fan-out could exhaust connections or overload a replica. Results
|
||||
* are returned in input order regardless of completion order, and the first
|
||||
* rejection aborts further scheduling — already in-flight workers still settle
|
||||
* but no new items are started.
|
||||
*/
|
||||
export async function mapWithConcurrency<T, R>(
|
||||
items: readonly T[],
|
||||
concurrency: number,
|
||||
fn: (item: T, index: number) => Promise<R>,
|
||||
): Promise<R[]> {
|
||||
if (!Number.isInteger(concurrency) || concurrency < 1) {
|
||||
throw new HexclaveAssertionError(`mapWithConcurrency requires a positive integer concurrency, got ${concurrency}`);
|
||||
}
|
||||
const results = new Array<R>(items.length);
|
||||
let nextIndex = 0;
|
||||
let aborted = false;
|
||||
const worker = async () => {
|
||||
while (!aborted) {
|
||||
// Claim an index synchronously before awaiting so workers never process the same item.
|
||||
const index = nextIndex++;
|
||||
if (index >= items.length) return;
|
||||
try {
|
||||
// Bounds-checked above; `?? throwErr(…)` is unsuitable because T may legitimately be null/undefined
|
||||
results[index] = await fn(items[index] as T, index);
|
||||
} catch (error) {
|
||||
aborted = true;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
const workerCount = Math.min(concurrency, items.length);
|
||||
await Promise.all(Array.from({ length: workerCount }, () => worker()));
|
||||
return results;
|
||||
}
|
||||
import.meta.vitest?.test("mapWithConcurrency", async ({ expect }) => {
|
||||
// Preserves input order regardless of completion order.
|
||||
const ordered = await mapWithConcurrency([30, 10, 20], 3, async (ms, index) => {
|
||||
await wait(ms);
|
||||
return `${index}:${ms}`;
|
||||
});
|
||||
expect(ordered).toEqual(["0:30", "1:10", "2:20"]);
|
||||
|
||||
// Never exceeds the configured concurrency.
|
||||
let inFlight = 0;
|
||||
let maxInFlight = 0;
|
||||
await mapWithConcurrency(Array.from({ length: 10 }, (_, i) => i), 3, async () => {
|
||||
inFlight++;
|
||||
maxInFlight = Math.max(maxInFlight, inFlight);
|
||||
await wait(5);
|
||||
inFlight--;
|
||||
});
|
||||
expect(maxInFlight).toBe(3);
|
||||
|
||||
// Empty input spawns no workers and returns an empty array.
|
||||
expect(await mapWithConcurrency([], 4, async () => 1)).toEqual([]);
|
||||
|
||||
// Invalid concurrency fails loudly.
|
||||
await expect(mapWithConcurrency([1], 0, async (x) => x)).rejects.toThrow("positive integer concurrency");
|
||||
});
|
||||
|
||||
export type RateLimitOptions = {
|
||||
/**
|
||||
* The number of requests to process in parallel. Currently only 1 is supported.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user