diff --git a/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql new file mode 100644 index 000000000..cebc53f24 --- /dev/null +++ b/apps/backend/prisma/migrations/20260623010000_add_plan_usage_range_indexes/migration.sql @@ -0,0 +1,11 @@ +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "EmailOutbox_tenancyId_startedSendingAt_idx" + ON /* SCHEMA_NAME_SENTINEL */."EmailOutbox"("tenancyId", "startedSendingAt"); + +-- SPLIT_STATEMENT_SENTINEL +-- SINGLE_STATEMENT_SENTINEL +-- RUN_OUTSIDE_TRANSACTION_SENTINEL +CREATE INDEX CONCURRENTLY IF NOT EXISTS "SessionReplay_tenancyId_startedAt_idx" + ON /* SCHEMA_NAME_SENTINEL */."SessionReplay"("tenancyId", "startedAt"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index 7a9421c90..f830b2989 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -407,6 +407,7 @@ model SessionReplay { @@id([tenancyId, id]) @@index([tenancyId, projectUserId, startedAt]) + @@index([tenancyId, startedAt], name: "SessionReplay_tenancyId_startedAt_idx") @@index([tenancyId, lastEventAt]) // index by updatedAt instead of lastEventAt because event timing can be spoofed @@index([tenancyId, refreshTokenId, updatedAt]) @@ -1069,6 +1070,7 @@ model EmailOutbox { @@id([tenancyId, id]) @@index([tenancyId, finishedSendingAt(sort: Desc), scheduledAtIfNotYetQueued(sort: Desc), priority, id], map: "EmailOutbox_ordering_idx") + @@index([tenancyId, startedSendingAt], name: "EmailOutbox_tenancyId_startedSendingAt_idx") @@index([tenancyId, simpleStatus], map: "EmailOutbox_simple_status_tenancy_idx") @@index([tenancyId, status], map: "EmailOutbox_status_tenancy_idx") @@index([isQueued], map: "EmailOutbox_isQueued_idx") diff --git a/apps/backend/src/lib/plan-entitlements.test.ts b/apps/backend/src/lib/plan-entitlements.test.ts index aec03993e..7d9f0d50c 100644 --- a/apps/backend/src/lib/plan-entitlements.test.ts +++ b/apps/backend/src/lib/plan-entitlements.test.ts @@ -4,6 +4,8 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { arePlanLimitsEnforced, getBillingTeamId, + getNonAnonymousUserCountForTenancies, + getOwnedProjectAndTenancyIdsForBillingTeam, getOwnedProjectIdsForBillingTeam, getOwnedTenancyIdsForBillingTeam, getTeamWideItemCapacityForTests, @@ -98,6 +100,28 @@ describe("team-wide ownership aggregation", () => { expect(tenancyIds).toEqual(["tenancy-a-main", "tenancy-a-dev", "tenancy-b-main"]); }); + it("lists owned project and tenancy ids from one ownership scope", async () => { + const scope = await getOwnedProjectAndTenancyIdsForBillingTeam("team-1", globalPrisma); + expect(scope).toMatchInlineSnapshot(` + { + "projectIds": [ + "project-a", + "project-b", + ], + "tenancyIds": [ + "tenancy-a-main", + "tenancy-a-dev", + "tenancy-b-main", + ], + } + `); + }); + + it("counts non-anonymous users from already-resolved tenancies", async () => { + const usage = await getNonAnonymousUserCountForTenancies(["tenancy-a-main", "tenancy-b-main"], globalPrisma); + expect(usage).toBe(2); + }); + it("counts only non-anonymous users across all owned tenancies", async () => { const usage = await getTeamWideNonAnonymousUserCount("team-1", globalPrisma); expect(usage).toBe(3); diff --git a/apps/backend/src/lib/plan-entitlements.ts b/apps/backend/src/lib/plan-entitlements.ts index 932ab977c..1b8527061 100644 --- a/apps/backend/src/lib/plan-entitlements.ts +++ b/apps/backend/src/lib/plan-entitlements.ts @@ -39,6 +39,11 @@ type GlobalPrismaLike = { }, }; +type OwnedBillingScope = { + projectIds: string[], + tenancyIds: string[], +}; + type ItemCapacityReaders = { getPrismaForTenancy: (tenancy: Tenancy) => Promise, getItemQuantityForCustomer: (options: { @@ -85,13 +90,16 @@ export async function getOwnedProjectIdsForBillingTeam( return projects.map((project) => project.id); } -export async function getOwnedTenancyIdsForBillingTeam( +export async function getOwnedProjectAndTenancyIdsForBillingTeam( billingTeamId: string, globalPrisma: GlobalPrismaLike = globalPrismaClient, -): Promise { +): Promise { const projectIds = await getOwnedProjectIdsForBillingTeam(billingTeamId, globalPrisma); if (projectIds.length === 0) { - return []; + return { + projectIds, + tenancyIds: [], + }; } const tenancies = await globalPrisma.tenancy.findMany({ where: { @@ -103,16 +111,23 @@ export async function getOwnedTenancyIdsForBillingTeam( id: true, }, }); - return tenancies.map((tenancy) => tenancy.id); + return { + projectIds, + tenancyIds: tenancies.map((tenancy) => tenancy.id), + }; } -export async function getTeamWideNonAnonymousUserCount( +export async function getOwnedTenancyIdsForBillingTeam( billingTeamId: string, globalPrisma: GlobalPrismaLike = globalPrismaClient, +): Promise { + return (await getOwnedProjectAndTenancyIdsForBillingTeam(billingTeamId, globalPrisma)).tenancyIds; +} + +export async function getNonAnonymousUserCountForTenancies( + tenancyIds: string[], + globalPrisma: GlobalPrismaLike = globalPrismaClient, ): Promise { - // Usage metric: how many non-anonymous users are currently consumed by this billing team. - // This is compared against auth user capacity to determine over-limit conditions. - const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma); if (tenancyIds.length === 0) { return 0; } @@ -126,6 +141,16 @@ export async function getTeamWideNonAnonymousUserCount( }); } +export async function getTeamWideNonAnonymousUserCount( + billingTeamId: string, + globalPrisma: GlobalPrismaLike = globalPrismaClient, +): Promise { + // Usage metric: how many non-anonymous users are currently consumed by this billing team. + // This is compared against auth user capacity to determine over-limit conditions. + const tenancyIds = await getOwnedTenancyIdsForBillingTeam(billingTeamId, globalPrisma); + return await getNonAnonymousUserCountForTenancies(tenancyIds, globalPrisma); +} + async function getTeamWideItemCapacity( billingTeamId: string, itemId: string, diff --git a/apps/backend/src/lib/plan-usage.ts b/apps/backend/src/lib/plan-usage.ts index b53c01391..6c28c398e 100644 --- a/apps/backend/src/lib/plan-usage.ts +++ b/apps/backend/src/lib/plan-usage.ts @@ -4,20 +4,24 @@ import { getSubscriptionMapForCustomer } from "@/lib/payments/customer-data"; import { isActiveSubscription } from "@/lib/payments"; import { getBillingTeamId, - getOwnedProjectIdsForBillingTeam, - getOwnedTenancyIdsForBillingTeam, - getTeamWideNonAnonymousUserCount, + getNonAnonymousUserCountForTenancies, + getOwnedProjectAndTenancyIdsForBillingTeam, } from "@/lib/plan-entitlements"; import { DEFAULT_BRANCH_ID, getSoleTenancyFromProjectBranch, getTenancy, type Tenancy } from "@/lib/tenancies"; import { getPrismaClientForTenancy, getPrismaSchemaForTenancy, globalPrismaClient, sqlQuoteIdent } from "@/prisma-client"; import { BASE_PLAN_IDS_BY_TIER, ITEM_IDS, PLAN_LIMITS, UNLIMITED, type ItemId, type PlanId } from "@hexclave/shared/dist/plans"; import type { PlanUsageResponse } from "@hexclave/shared/dist/interface/admin-interface"; import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors"; +import { mapWithConcurrency } from "@hexclave/shared/dist/utils/promises"; import type { SubscriptionRow } from "./payments/schema/types"; type PlanUsageKind = PlanUsageResponse["rows"][number]["kind"]; type PlanUsageRow = PlanUsageResponse["rows"][number]; type UsageLimit = number | null; +type TenancyMeteredUsage = { + emails: number, + sessionReplays: number, +}; type UsagePeriod = { start: Date, @@ -46,6 +50,8 @@ const PLAN_LABELS = new Map([ ["growth", "Growth"], ]); +const PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY = 4; + export function getNextPlanId(planId: PlanId): "team" | "growth" | null { if (planId === "free") { return "team"; @@ -202,38 +208,99 @@ async function getOwnerTeamDisplayName(internalTenancy: Tenancy, ownerTeamId: st return team?.displayName ?? throwErr(`Owner team ${ownerTeamId} not found in the internal tenancy`); } -async function countEmailsForTenancy(tenancyId: string, period: UsagePeriod): Promise { - const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting email usage`); - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); - const rows = await prisma.$replica().$queryRaw<[{ count: number }]>` - SELECT COUNT(*)::int AS count - FROM ${sqlQuoteIdent(schema)}."EmailOutbox" - WHERE "tenancyId" = ${tenancy.id}::uuid - AND "startedSendingAt" IS NOT NULL - AND "startedSendingAt" >= ${period.start} - AND "startedSendingAt" < ${period.end} - `; - return Number(rows[0].count); +type TenancyPrismaClient = Awaited>; + +type TenancyMeteredUsageGroup = { + prisma: TenancyPrismaClient, + schema: string, + tenancyIds: string[], +}; + +// Tenancies can route to different source-of-truth databases/schemas, so we can't assume a single +// query covers every tenancy. We group tenancies that share a (client, schema) and run one aggregate +// COUNT per group: the common case (all projects on one database) collapses to a single round trip, +// while multi-database teams fan out to one query per distinct database instead of one per tenancy. +async function groupTenanciesByMeteredUsageSource(tenancyIds: string[]): Promise { + const resolved = await mapWithConcurrency(tenancyIds, PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, async (tenancyId) => { + const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting plan usage`); + const [schema, prisma] = await Promise.all([ + getPrismaSchemaForTenancy(tenancy), + getPrismaClientForTenancy(tenancy), + ]); + return { tenancyId: tenancy.id, schema, prisma }; + }); + + const byClient = new Map>(); + for (const { tenancyId, schema, prisma } of resolved) { + let bySchema = byClient.get(prisma); + if (bySchema == null) { + bySchema = new Map(); + byClient.set(prisma, bySchema); + } + const existing = bySchema.get(schema); + if (existing == null) { + bySchema.set(schema, [tenancyId]); + } else { + existing.push(tenancyId); + } + } + + const groups: TenancyMeteredUsageGroup[] = []; + for (const [prisma, bySchema] of byClient) { + for (const [schema, groupTenancyIds] of bySchema) { + groups.push({ prisma, schema, tenancyIds: groupTenancyIds }); + } + } + return groups; } -async function countSessionReplaysForTenancy(tenancyId: string, period: UsagePeriod): Promise { - const tenancy = await getTenancy(tenancyId) ?? throwErr(`Tenancy ${tenancyId} not found while counting session replay usage`); - const schema = await getPrismaSchemaForTenancy(tenancy); - const prisma = await getPrismaClientForTenancy(tenancy); - const rows = await prisma.$replica().$queryRaw<[{ count: number }]>` - SELECT COUNT(*)::int AS count - FROM ${sqlQuoteIdent(schema)}."SessionReplay" - WHERE "tenancyId" = ${tenancy.id}::uuid - AND "startedAt" >= ${period.start} - AND "startedAt" < ${period.end} +async function countMeteredUsageForGroup(group: TenancyMeteredUsageGroup, period: UsagePeriod): Promise { + const rows = await group.prisma.$replica().$queryRaw>` + SELECT + ( + SELECT COUNT(*)::int + FROM ${sqlQuoteIdent(group.schema)}."EmailOutbox" + WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[]) + AND "startedSendingAt" IS NOT NULL + AND "startedSendingAt" >= ${period.start} + AND "startedSendingAt" < ${period.end} + ) AS "emails", + ( + SELECT COUNT(*)::int + FROM ${sqlQuoteIdent(group.schema)}."SessionReplay" + WHERE "tenancyId" = ANY(${group.tenancyIds}::uuid[]) + AND "startedAt" >= ${period.start} + AND "startedAt" < ${period.end} + ) AS "sessionReplays" `; - return Number(rows[0].count); + const row = rows[0] ?? throwErr(`Missing plan usage count row for metered usage group on schema ${group.schema}`); + return { + emails: Number(row.emails), + sessionReplays: Number(row.sessionReplays), + }; } -async function sumTenancyUsage(tenancyIds: string[], counter: (tenancyId: string) => Promise): Promise { - const counts = await Promise.all(tenancyIds.map(counter)); - return counts.reduce((sum, count) => sum + count, 0); +async function sumTenancyMeteredUsage(tenancyIds: string[], period: UsagePeriod): Promise { + if (tenancyIds.length === 0) { + return { emails: 0, sessionReplays: 0 }; + } + + const groups = await groupTenanciesByMeteredUsageSource(tenancyIds); + // The group count equals the number of distinct databases (usually 1), so concurrency mostly guards + // the pathological multi-database team rather than the per-tenancy fan-out it used to. + const subtotals = await mapWithConcurrency( + groups, + PLAN_USAGE_TENANCY_COUNTER_CONCURRENCY, + (group) => countMeteredUsageForGroup(group, period), + ); + + return subtotals.reduce( + (totals, subtotal) => ({ + emails: totals.emails + subtotal.emails, + sessionReplays: totals.sessionReplays + subtotal.sessionReplays, + }), + { emails: 0, sessionReplays: 0 }, + ); } async function countAnalyticsEventsForProjects(projectIds: string[], period: UsagePeriod): Promise { @@ -336,18 +403,16 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D const planId = resolveActivePlanId(activePlanSubscription); const period = getPlanUsagePeriod(activePlanSubscription, now); - const [ownerTeamDisplayName, ownedProjectIds, ownedTenancyIds, dashboardAdmins, authUsers] = await Promise.all([ + const [ownerTeamDisplayName, ownedScope, dashboardAdmins] = await Promise.all([ getOwnerTeamDisplayName(internalTenancy, ownerTeamId), - getOwnedProjectIdsForBillingTeam(ownerTeamId), - getOwnedTenancyIdsForBillingTeam(ownerTeamId), + getOwnedProjectAndTenancyIdsForBillingTeam(ownerTeamId), countDashboardAdmins(internalTenancy, ownerTeamId, now), - getTeamWideNonAnonymousUserCount(ownerTeamId), ]); - const [emails, analyticsEvents, sessionReplays] = await Promise.all([ - sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countEmailsForTenancy(tenancyId, period)), - countAnalyticsEventsForProjects(ownedProjectIds, period), - sumTenancyUsage(ownedTenancyIds, async (tenancyId) => await countSessionReplaysForTenancy(tenancyId, period)), + const [authUsers, meteredUsage, analyticsEvents] = await Promise.all([ + getNonAnonymousUserCountForTenancies(ownedScope.tenancyIds), + sumTenancyMeteredUsage(ownedScope.tenancyIds, period), + countAnalyticsEventsForProjects(ownedScope.projectIds, period), ]); return { @@ -362,9 +427,9 @@ export async function getPlanUsageForProject(project: UsageSourceProject, now: D planId, dashboardAdmins, authUsers, - emails, + emails: meteredUsage.emails, analyticsEvents, - sessionReplays, + sessionReplays: meteredUsage.sessionReplays, }), }; } diff --git a/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts new file mode 100644 index 000000000..e4bafaf40 --- /dev/null +++ b/apps/e2e/tests/backend/endpoints/api/v1/internal/plan-usage.test.ts @@ -0,0 +1,407 @@ +import { randomUUID } from "node:crypto"; +import { Client } from "pg"; +import { describe } from "vitest"; +import { ITEM_IDS } from "@hexclave/shared/dist/plans"; +import { getEnvVariable } from "@hexclave/shared/dist/utils/env"; +import { HexclaveAssertionError, throwErr } from "@hexclave/shared/dist/utils/errors"; +import { wait } from "@hexclave/shared/dist/utils/promises"; +import { planUsageResponseSchema, type PlanUsageResponse } from "@hexclave/shared/dist/interface/plan-usage"; +import { it } from "../../../../../helpers"; +import { Auth, InternalProjectKeys, Project, backendContext, niceBackendFetch } from "../../../../backend-helpers"; + +type ProjectUsageContext = { + projectId: string, + tenancyId: string, +}; + +type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue }; + +function getInternalDatabaseConnectionString(): string { + const connectionString = getEnvVariable( + "HEXCLAVE_DATABASE_CONNECTION_STRING", + getEnvVariable("STACK_DATABASE_CONNECTION_STRING", ""), + ); + if (connectionString === "") { + throw new HexclaveAssertionError("Plan usage E2E tests require a configured internal database connection string"); + } + return connectionString; +} + +async function withInternalDatabase(fn: (client: Client) => Promise): Promise { + const client = new Client({ + connectionString: getInternalDatabaseConnectionString(), + connectionTimeoutMillis: 10_000, + query_timeout: 30_000, + }); + await client.connect(); + try { + return await fn(client); + } finally { + await client.end(); + } +} + +async function getMainTenancyId(client: Client, projectId: string): Promise { + const tenancies = await client.query<{ id: string }>( + `SELECT "id" FROM "Tenancy" WHERE "projectId" = $1 AND "branchId" = 'main' LIMIT 1`, + [projectId], + ); + return tenancies.rows[0]?.id ?? throwErr(`Could not find main tenancy for project ${projectId}`); +} + +async function getProjectUsageContext(client: Client, projectId: string): Promise { + return { + projectId, + tenancyId: await getMainTenancyId(client, projectId), + }; +} + +async function clearSeededUsageRows(client: Client, tenancies: readonly ProjectUsageContext[]): Promise { + const tenancyIds = tenancies.map((tenancy) => tenancy.tenancyId); + await client.query(`DELETE FROM "SessionReplay" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]); + await client.query(`DELETE FROM "EmailOutbox" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]); + await client.query(`DELETE FROM "ProjectUser" WHERE "tenancyId" = ANY($1::uuid[])`, [tenancyIds]); +} + +function normalizeSubscriptionPeriodInJson(value: unknown, ownerTeamId: string, period: { + start: Date, + end: Date, +}): JsonValue { + if (value === null || typeof value === "boolean" || typeof value === "number" || typeof value === "string") { + return value; + } + if (Array.isArray(value)) { + return value.map((item) => normalizeSubscriptionPeriodInJson(item, ownerTeamId, period)); + } + if (typeof value === "object") { + const normalizedObject: { [key: string]: JsonValue } = {}; + for (const [key, entryValue] of Object.entries(value)) { + normalizedObject[key] = normalizeSubscriptionPeriodInJson(entryValue, ownerTeamId, period); + } + if ( + normalizedObject.customerId === ownerTeamId + && typeof normalizedObject.currentPeriodStartMillis === "number" + && typeof normalizedObject.currentPeriodEndMillis === "number" + ) { + return { + ...normalizedObject, + currentPeriodStartMillis: period.start.getTime(), + currentPeriodEndMillis: period.end.getTime(), + }; + } + return normalizedObject; + } + throw new HexclaveAssertionError("Unexpected non-JSON value in payment storage", { value }); +} + +async function normalizeBillingTeamSubscriptionMapPeriod(client: Client, ownerTeamId: string, period: { + start: Date, + end: Date, +}): Promise { + // The E2E seed data can create zero-length payment periods in the Bulldozer LFold output. + // Plan usage reads that output directly, so normalize only this fresh test team's emitted + // subscription-map rows to make the metered usage period deterministic. + const rows = await client.query<{ id: string, value: unknown }>( + ` + SELECT "id", "value" + FROM "BulldozerStorageEngine" + WHERE "keyPath"[1] = to_jsonb('table'::text) + AND "keyPath"[2] = to_jsonb('external:payments-subscription-map-by-customer'::text) + AND "keyPath"::text LIKE $1 + AND "value" <> 'null'::jsonb + `, + [`%${ownerTeamId}%`], + ); + if (rows.rows.length === 0) { + throw new HexclaveAssertionError("Expected payment subscription-map rows for billing team", { ownerTeamId }); + } + for (const row of rows.rows) { + await client.query( + `UPDATE "BulldozerStorageEngine" SET "value" = $2::jsonb WHERE "id" = $1::uuid`, + [row.id, JSON.stringify(normalizeSubscriptionPeriodInJson(row.value, ownerTeamId, period))], + ); + } +} + +async function insertProjectUsers(client: Client, context: ProjectUsageContext, options: { + nonAnonymousCount: number, + anonymousCount: number, +}): Promise { + const nonAnonymousUsers = await client.query<{ projectUserId: string }>( + ` + INSERT INTO "ProjectUser" + ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", + "displayName", "createdAt", "updatedAt", "isAnonymous", + "signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse") + SELECT + $1::uuid, + gen_random_uuid(), + $2, + 'main', + 'Plan Usage User ' || gs, + now(), + now(), + false, + now(), + 0, + 0 + FROM generate_series(1, $3::int) AS gs + RETURNING "projectUserId" + `, + [context.tenancyId, context.projectId, options.nonAnonymousCount], + ); + + await client.query( + ` + INSERT INTO "ProjectUser" + ("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", + "displayName", "createdAt", "updatedAt", "isAnonymous", + "signedUpAt", "signUpRiskScoreBot", "signUpRiskScoreFreeTrialAbuse") + SELECT + $1::uuid, + gen_random_uuid(), + $2, + 'main', + 'Plan Usage Anonymous User ' || gs, + now(), + now(), + true, + now(), + 0, + 0 + FROM generate_series(1, $3::int) AS gs + `, + [context.tenancyId, context.projectId, options.anonymousCount], + ); + + return nonAnonymousUsers.rows.map((row) => row.projectUserId); +} + +async function insertEmailOutboxRow(client: Client, tenancyId: string, startedSendingAt: Date | null): Promise { + const renderedAt = new Date(); + await client.query( + ` + INSERT INTO "EmailOutbox" + ("tenancyId", "id", "createdAt", "updatedAt", "tsxSource", "isHighPriority", "to", "extraRenderVariables", + "shouldSkipDeliverabilityCheck", "createdWith", "renderedByWorkerId", "startedRenderingAt", + "finishedRenderingAt", "renderedHtml", "renderedSubject", "renderedIsTransactional", + "scheduledAt", "isQueued", "startedSendingAt", "finishedSendingAt", "canHaveDeliveryInfo") + VALUES + ($1::uuid, gen_random_uuid(), $4, $4, '', false, $2::jsonb, '{}'::jsonb, + true, 'PROGRAMMATIC_CALL', $3::uuid, $4, $4, '

usage test

', + 'Plan usage test email', true, $4, true, $5, $5, $6) + `, + [ + tenancyId, + JSON.stringify({ type: "custom-emails", emails: ["usage-test@example.com"] }), + randomUUID(), + renderedAt, + startedSendingAt, + startedSendingAt == null ? null : false, + ], + ); +} + +async function insertSessionReplayRow(client: Client, context: ProjectUsageContext, projectUserId: string, startedAt: Date): Promise { + await client.query( + ` + INSERT INTO "SessionReplay" + ("tenancyId", "id", "projectUserId", "refreshTokenId", "startedAt", "lastEventAt", "createdAt", "updatedAt") + VALUES + ($1::uuid, gen_random_uuid(), $2::uuid, $3::uuid, $4, $4, $4, $4) + `, + [context.tenancyId, projectUserId, randomUUID(), startedAt], + ); +} + +async function getPlanUsage(): Promise { + const response = await niceBackendFetch("/api/latest/internal/plan-usage", { + accessType: "admin", + }); + if (response.status !== 200) { + throw new HexclaveAssertionError("Expected plan usage request to succeed", { response }); + } + return await planUsageResponseSchema.validate(response.body); +} + +async function purchaseTeamPlanForBillingTeam(ownerTeamId: string): Promise { + const createUrlResponse = await niceBackendFetch("/api/latest/payments/purchases/create-purchase-url", { + method: "POST", + accessType: "client", + body: { + customer_type: "team", + customer_id: ownerTeamId, + product_id: "team", + }, + }); + if (createUrlResponse.status !== 200 || typeof createUrlResponse.body?.url !== "string") { + throw new HexclaveAssertionError("Expected team plan purchase URL creation to succeed", { createUrlResponse }); + } + + const fullCode = createUrlResponse.body.url.match(/\/purchase\/([a-z0-9_-]+)/)?.[1] + ?? throwErr("Could not parse purchase code from team plan purchase URL", { createUrlResponse }); + const purchaseResponse = await niceBackendFetch("/api/latest/internal/payments/test-mode-purchase-session", { + method: "POST", + accessType: "admin", + body: { + full_code: fullCode, + price_id: "monthly", + quantity: 1, + }, + }); + if (purchaseResponse.status !== 200) { + throw new HexclaveAssertionError("Expected test-mode team plan purchase to succeed", { purchaseResponse }); + } +} + +function getUsedUsageValue(usage: PlanUsageResponse, itemId: string): number { + const row = usage.rows.find((candidate) => candidate.item_id === itemId) ?? throwErr(`Missing usage row for ${itemId}`); + return row.used ?? throwErr(`Expected usage row ${itemId} to have a used value`); +} + +function getCalendarMonthBounds(now: Date): { start: Date, end: Date } { + return { + start: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), 1)), + end: new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth() + 1, 1)), + }; +} + +async function waitForPlanUsageValues(expected: { + authUsers: number, + emails: number, + sessionReplays: number, + analyticsEvents: number, +}): Promise { + const startedAt = performance.now(); + let latestUsage: PlanUsageResponse | undefined; + while (performance.now() - startedAt < 15_000) { + latestUsage = await getPlanUsage(); + if ( + getUsedUsageValue(latestUsage, ITEM_IDS.authUsers) === expected.authUsers + && getUsedUsageValue(latestUsage, ITEM_IDS.emailsPerMonth) === expected.emails + && getUsedUsageValue(latestUsage, ITEM_IDS.sessionReplays) === expected.sessionReplays + && getUsedUsageValue(latestUsage, ITEM_IDS.analyticsEvents) === expected.analyticsEvents + ) { + return latestUsage; + } + await wait(250); + } + throw new HexclaveAssertionError("Timed out waiting for seeded plan usage to be visible", { + latestUsage, + expected, + }); +} + +describe("internal plan usage", () => { + it("returns zero usage for a fresh owned project with no seeded usage rows", async ({ expect }) => { + const { projectId } = await Project.createAndSwitch({ + display_name: "Plan Usage Empty Project", + }); + + await withInternalDatabase(async (client) => { + const context = await getProjectUsageContext(client, projectId); + await clearSeededUsageRows(client, [context]); + }); + + const usage = await getPlanUsage(); + + expect(usage.owner_team_id).toBeTruthy(); + expect(usage.plan_id).toBe("free"); + expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(0); + expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(0); + expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(0); + expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0); + }); + + it("rolls up metered usage across all projects owned by the billing team", async ({ expect }) => { + backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null }); + await Auth.fastSignUp(); + const internalUserAuth = backendContext.value.userAuth ?? throwErr("Expected internal user auth after sign-up"); + + const primaryProject = await Project.createAndSwitch({ + display_name: "Plan Usage Primary Project", + }, true); + const primaryProjectKeys = backendContext.value.projectKeys; + const ownerTeamId = primaryProject.createProjectResponse.body.owner_team_id; + if (typeof ownerTeamId !== "string") { + throw new HexclaveAssertionError("Expected created project to include an owner team ID", { primaryProject }); + } + + backendContext.set({ projectKeys: InternalProjectKeys, userAuth: internalUserAuth }); + await purchaseTeamPlanForBillingTeam(ownerTeamId); + const secondaryProject = await Project.create({ + display_name: "Plan Usage Secondary Project", + owner_team_id: ownerTeamId, + }); + + backendContext.set({ projectKeys: InternalProjectKeys, userAuth: null }); + const unrelatedProject = await Project.createAndSwitch({ + display_name: "Plan Usage Unrelated Project", + }); + + backendContext.set({ projectKeys: primaryProjectKeys, userAuth: null }); + + const { start, end } = getCalendarMonthBounds(new Date()); + const outsideBefore = new Date(start.getTime() - 2 * 24 * 60 * 60 * 1000); + const insidePrimary = new Date(start.getTime() + 2 * 24 * 60 * 60 * 1000); + const insideSecondaryA = new Date(start.getTime() + 3 * 24 * 60 * 60 * 1000); + const insideSecondaryB = new Date(start.getTime() + 4 * 24 * 60 * 60 * 1000); + const outsideAfter = new Date(end.getTime() + 2 * 24 * 60 * 60 * 1000); + + await withInternalDatabase(async (client) => { + const primary = await getProjectUsageContext(client, primaryProject.projectId); + const secondary = await getProjectUsageContext(client, secondaryProject.projectId); + const unrelated = await getProjectUsageContext(client, unrelatedProject.projectId); + await clearSeededUsageRows(client, [primary, secondary, unrelated]); + await normalizeBillingTeamSubscriptionMapPeriod(client, ownerTeamId, { start, end }); + + const primaryUserIds = await insertProjectUsers(client, primary, { + nonAnonymousCount: 2, + anonymousCount: 1, + }); + const secondaryUserIds = await insertProjectUsers(client, secondary, { + nonAnonymousCount: 1, + anonymousCount: 0, + }); + const unrelatedUserIds = await insertProjectUsers(client, unrelated, { + nonAnonymousCount: 2, + anonymousCount: 0, + }); + const firstPrimaryUserId = primaryUserIds[0] ?? throwErr("Expected seeded primary project user"); + const secondPrimaryUserId = primaryUserIds[1] ?? throwErr("Expected second seeded primary project user"); + const firstSecondaryUserId = secondaryUserIds[0] ?? throwErr("Expected seeded secondary project user"); + const firstUnrelatedUserId = unrelatedUserIds[0] ?? throwErr("Expected seeded unrelated project user"); + + await insertEmailOutboxRow(client, primary.tenancyId, insidePrimary); + await insertEmailOutboxRow(client, primary.tenancyId, insideSecondaryA); + await insertEmailOutboxRow(client, secondary.tenancyId, insideSecondaryB); + await insertEmailOutboxRow(client, primary.tenancyId, outsideBefore); + await insertEmailOutboxRow(client, secondary.tenancyId, outsideAfter); + await insertEmailOutboxRow(client, primary.tenancyId, null); + await insertEmailOutboxRow(client, unrelated.tenancyId, insidePrimary); + + await insertSessionReplayRow(client, primary, firstPrimaryUserId, insidePrimary); + await insertSessionReplayRow(client, primary, secondPrimaryUserId, outsideBefore); + await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryA); + await insertSessionReplayRow(client, secondary, firstSecondaryUserId, insideSecondaryB); + await insertSessionReplayRow(client, secondary, firstSecondaryUserId, outsideAfter); + await insertSessionReplayRow(client, unrelated, firstUnrelatedUserId, insidePrimary); + }); + + const usage = await waitForPlanUsageValues({ + authUsers: 3, + emails: 3, + sessionReplays: 3, + analyticsEvents: 0, + }); + + expect(usage.owner_team_id).toBe(ownerTeamId); + expect(usage.plan_id).toBe("team"); + expect(usage.period_start_millis).toBe(start.getTime()); + expect(usage.period_end_millis).toBe(end.getTime()); + expect(getUsedUsageValue(usage, ITEM_IDS.authUsers)).toBe(3); + expect(getUsedUsageValue(usage, ITEM_IDS.emailsPerMonth)).toBe(3); + expect(getUsedUsageValue(usage, ITEM_IDS.sessionReplays)).toBe(3); + expect(getUsedUsageValue(usage, ITEM_IDS.analyticsEvents)).toBe(0); + }); +}); diff --git a/packages/shared/src/utils/promises.tsx b/packages/shared/src/utils/promises.tsx index 0cfb7df3b..36f7c77c1 100644 --- a/packages/shared/src/utils/promises.tsx +++ b/packages/shared/src/utils/promises.tsx @@ -434,6 +434,71 @@ import.meta.vitest?.test("timeoutThrow", async ({ expect }) => { }); +/** + * Maps over `items` with `fn`, running at most `concurrency` invocations at a time. + * + * Unlike `Promise.all(items.map(fn))`, this bounds the number of in-flight + * promises, which matters when `fn` hits a shared resource (e.g. a database) and + * an unbounded fan-out could exhaust connections or overload a replica. Results + * are returned in input order regardless of completion order, and the first + * rejection aborts further scheduling — already in-flight workers still settle + * but no new items are started. + */ +export async function mapWithConcurrency( + items: readonly T[], + concurrency: number, + fn: (item: T, index: number) => Promise, +): Promise { + if (!Number.isInteger(concurrency) || concurrency < 1) { + throw new HexclaveAssertionError(`mapWithConcurrency requires a positive integer concurrency, got ${concurrency}`); + } + const results = new Array(items.length); + let nextIndex = 0; + let aborted = false; + const worker = async () => { + while (!aborted) { + // Claim an index synchronously before awaiting so workers never process the same item. + const index = nextIndex++; + if (index >= items.length) return; + try { + // Bounds-checked above; `?? throwErr(…)` is unsuitable because T may legitimately be null/undefined + results[index] = await fn(items[index] as T, index); + } catch (error) { + aborted = true; + throw error; + } + } + }; + const workerCount = Math.min(concurrency, items.length); + await Promise.all(Array.from({ length: workerCount }, () => worker())); + return results; +} +import.meta.vitest?.test("mapWithConcurrency", async ({ expect }) => { + // Preserves input order regardless of completion order. + const ordered = await mapWithConcurrency([30, 10, 20], 3, async (ms, index) => { + await wait(ms); + return `${index}:${ms}`; + }); + expect(ordered).toEqual(["0:30", "1:10", "2:20"]); + + // Never exceeds the configured concurrency. + let inFlight = 0; + let maxInFlight = 0; + await mapWithConcurrency(Array.from({ length: 10 }, (_, i) => i), 3, async () => { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + await wait(5); + inFlight--; + }); + expect(maxInFlight).toBe(3); + + // Empty input spawns no workers and returns an empty array. + expect(await mapWithConcurrency([], 4, async () => 1)).toEqual([]); + + // Invalid concurrency fails loudly. + await expect(mapWithConcurrency([1], 0, async (x) => x)).rejects.toThrow("positive integer concurrency"); +}); + export type RateLimitOptions = { /** * The number of requests to process in parallel. Currently only 1 is supported.