mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-30 21:01:54 +08:00
perf(platform-analytics): cut ClickHouse query peak memory
Reduce the worst-case concurrent ClickHouse memory of the internal platform-analytics route (all 17 CH queries fire in one Promise.all on the shared admin user, against a 9 GB per-user cap). - Use sipHash64(user_id) as the distinct key in the uniqExact/uniqExactIf aggregates (dauSeries, sparkByProject, mauProjects, activeByProject). Exact at this scale (64-bit, negligible collision prob over 1M users), verified byte-identical; -40% to -61% peak memory per query. - Sample the new/retained/reactivated activity split at 1-in-4 users (consistent cityHash bucket on both subqueries, counts scaled x4). The split's window function + all-history scan made it the heaviest query (~1.3 GiB at 1M users / 50M events); sampling cuts it ~78% for a ~0.4% mean error. Dashboard chart now notes it is a sampled estimate. Adds the benchmark/optimization harnesses used to validate these changes (seed isolated bench_pa DBs, measure peak memory, verify result equality).
This commit is contained in:
parent
3c803a8915
commit
81c84289b8
542
apps/backend/scripts/benchmark-platform-analytics.ts
Normal file
542
apps/backend/scripts/benchmark-platform-analytics.ts
Normal file
@ -0,0 +1,542 @@
|
||||
/**
|
||||
* Local-only benchmark harness for the queries in
|
||||
* apps/backend/src/app/api/latest/internal/platform-analytics/route.tsx.
|
||||
*
|
||||
* Seeds an ISOLATED ClickHouse database (`bench_pa`) and Postgres schema
|
||||
* (`bench_pa`) with synthetic data at a configurable scale, then runs every CH
|
||||
* and PG query from the route verbatim (table refs rewritten to the scratch
|
||||
* db/schema) and records duration / memory / rows-read from system.query_log
|
||||
* (CH) and EXPLAIN (ANALYZE, BUFFERS) (PG).
|
||||
*
|
||||
* Defaults model the scale this route is expected to face:
|
||||
* - 10,000 projects, 1,000,000 users, power-law (top project ~10% of users)
|
||||
* - 50,000,000 ClickHouse events (~50/user), single branch "main"
|
||||
*
|
||||
* Run:
|
||||
* pnpm --filter @hexclave/backend run with-env:dev tsx scripts/benchmark-platform-analytics.ts
|
||||
* Env knobs:
|
||||
* PA_PROJECTS (10000) PA_USERS (1000000) PA_EVENTS (50000000)
|
||||
* PA_SKIP_SEED=1 reuse an already-seeded bench_pa
|
||||
* PA_KEEP=1 do not drop bench_pa at the end
|
||||
* PA_OUT output json path (default /tmp/platform-analytics-bench.json)
|
||||
*
|
||||
* NOTE: this harness captures the ORIGINAL (pre-optimization) query shapes so it
|
||||
* can be used as a baseline. See optimize-platform-analytics.ts and
|
||||
* optimize-split.ts for the variant comparisons that justify the route changes.
|
||||
*/
|
||||
import { getClickhouseAdminClient, getClickhouseAdminClientForMetrics } from "@/lib/clickhouse";
|
||||
import { globalPrismaClient } from "@/prisma-client";
|
||||
import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { writeFileSync } from "node:fs";
|
||||
|
||||
function envInt(name: string, fallback: number): number {
|
||||
const v = getEnvVariable(name, "");
|
||||
if (v === "") return fallback;
|
||||
return Number(v);
|
||||
}
|
||||
function envBool(name: string): boolean {
|
||||
return ["1", "true"].includes(getEnvVariable(name, ""));
|
||||
}
|
||||
|
||||
const NUM_PROJECTS = envInt("PA_PROJECTS", 10_000);
|
||||
const NUM_USERS = envInt("PA_USERS", 1_000_000);
|
||||
const NUM_EVENTS = envInt("PA_EVENTS", 50_000_000);
|
||||
const ZIPF_K = 4; // top project ~ (1/N)^(1/k) of users => k=4 gives ~10%
|
||||
const BRANCH = "main";
|
||||
const OUT = getEnvVariable("PA_OUT", "/tmp/platform-analytics-bench.json");
|
||||
|
||||
const chAdmin = getClickhouseAdminClient();
|
||||
const chMetrics = getClickhouseAdminClientForMetrics();
|
||||
|
||||
function log(...a: unknown[]) { console.log(`[${new Date().toISOString().slice(11, 19)}]`, ...a); }
|
||||
|
||||
// ---------- window math (mirror the route) ----------
|
||||
const ONE_DAY_MS = 24 * 60 * 60 * 1000;
|
||||
const WINDOW_DAYS = 30;
|
||||
const now = new Date();
|
||||
const todayUtc = new Date(now); todayUtc.setUTCHours(0, 0, 0, 0);
|
||||
const windowStart = new Date(todayUtc.getTime() - (WINDOW_DAYS - 1) * ONE_DAY_MS);
|
||||
const priorStart = new Date(todayUtc.getTime() - (2 * WINDOW_DAYS - 1) * ONE_DAY_MS);
|
||||
const untilExclusive = new Date(todayUtc.getTime() + ONE_DAY_MS);
|
||||
const chDT = (d: Date) => d.toISOString().slice(0, 19);
|
||||
const sinceParam = chDT(windowStart);
|
||||
const midParam = chDT(windowStart);
|
||||
const priorSinceParam = chDT(priorStart);
|
||||
const untilParam = chDT(untilExclusive);
|
||||
|
||||
// =====================================================================
|
||||
// SEEDING
|
||||
// =====================================================================
|
||||
async function seedClickhouse() {
|
||||
log("CH: drop+create bench_pa");
|
||||
await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" });
|
||||
await chAdmin.command({ query: "CREATE DATABASE bench_pa" });
|
||||
for (const t of ["events", "users", "contact_channels", "teams", "connected_accounts", "email_outboxes", "clickmap_events"]) {
|
||||
await chAdmin.command({ query: `CREATE TABLE bench_pa.${t} AS analytics_internal.${t}` });
|
||||
}
|
||||
|
||||
// project index for a given numeric key (power-law)
|
||||
const projExpr = (key: string) =>
|
||||
`concat('bench-proj-', toString(toUInt32(floor(${NUM_PROJECTS} * pow((cityHash64(${key}) % 1000000)/1000000.0, ${ZIPF_K})))))`;
|
||||
const uuidExpr = (key: string) => `reinterpretAsUUID(MD5(toString(${key})))`;
|
||||
const ccExpr = `['US','DE','IN','BR','GB','FR','JP','CA','AU','NL'][(cityHash64(number,'cc') % 10)+1]`;
|
||||
|
||||
// ---- events: NUM_EVENTS rows, ~NUM_EVENTS/NUM_USERS per user, spread 90d ----
|
||||
const CHUNK = 5_000_000;
|
||||
for (let off = 0; off < NUM_EVENTS; off += CHUNK) {
|
||||
const n = Math.min(CHUNK, NUM_EVENTS - off);
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.events
|
||||
SELECT
|
||||
['$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$page-view','$page-view','$click'][((number+${off}) % 10)+1] AS event_type,
|
||||
now64(3,'UTC') - toIntervalSecond(cityHash64(number+${off},'t') % (90*86400)) AS event_at,
|
||||
CAST(concat('{"is_anonymous":', toString(toUInt8(cityHash64((number+${off}) % ${NUM_USERS},'a') % 10 = 0)),
|
||||
',"ip_info":{"country_code":"', ${ccExpr}, '"},"referrer":""}'), 'JSON') AS data,
|
||||
${projExpr(`(number+${off}) % ${NUM_USERS}`)} AS project_id,
|
||||
'${BRANCH}' AS branch_id,
|
||||
toString((number+${off}) % ${NUM_USERS}) AS user_id,
|
||||
NULL AS team_id, NULL AS refresh_token_id, NULL AS session_replay_id, NULL AS session_replay_segment_id,
|
||||
now64(3,'UTC') AS created_at
|
||||
FROM numbers(${n})`,
|
||||
});
|
||||
log(`CH events: ${(off + n).toLocaleString()} / ${NUM_EVENTS.toLocaleString()}`);
|
||||
}
|
||||
|
||||
// ---- users: 1 per user; signed_up spread over 365d; ~10% anonymous ----
|
||||
log("CH: seeding users");
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.users
|
||||
SELECT
|
||||
${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id, ${uuidExpr("number")} AS id,
|
||||
NULL AS display_name, NULL AS profile_image_url, concat('u', toString(number), '@ex.com') AS primary_email,
|
||||
toUInt8(cityHash64(number,'v') % 10 < 7) AS primary_email_verified,
|
||||
now64(3,'UTC') - toIntervalSecond(cityHash64(number,'s') % (365*86400)) AS signed_up_at,
|
||||
'{}' AS client_metadata, '{}' AS client_read_only_metadata, '{}' AS server_metadata,
|
||||
toUInt8(cityHash64(number,'a') % 10 = 0) AS is_anonymous,
|
||||
0 AS restricted_by_admin, NULL AS restricted_by_admin_reason, NULL AS restricted_by_admin_private_details,
|
||||
toInt64(number) AS sync_sequence_id, 0 AS sync_is_deleted, now64(3,'UTC') AS sync_created_at
|
||||
FROM numbers(${NUM_USERS})`,
|
||||
});
|
||||
|
||||
// ---- contact_channels: verified EMAIL for ~70% of users (matches users.id) ----
|
||||
log("CH: seeding contact_channels");
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.contact_channels
|
||||
SELECT
|
||||
${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id,
|
||||
reinterpretAsUUID(MD5(concat('cc', toString(number)))) AS id, ${uuidExpr("number")} AS user_id,
|
||||
'EMAIL' AS type, concat('u', toString(number), '@ex.com') AS value,
|
||||
1 AS is_primary, 1 AS is_verified, 1 AS used_for_auth,
|
||||
now64(3,'UTC') AS created_at, toInt64(number) AS sync_sequence_id, 0 AS sync_is_deleted, now64(3,'UTC') AS sync_created_at
|
||||
FROM numbers(${NUM_USERS}) WHERE cityHash64(number,'v') % 10 < 7`,
|
||||
});
|
||||
|
||||
// ---- teams (~150k), connected_accounts (~300k), email_outboxes (~500k) ----
|
||||
log("CH: seeding teams / connected_accounts / email_outboxes");
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.teams
|
||||
SELECT ${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id,
|
||||
reinterpretAsUUID(MD5(concat('tm', toString(number)))) AS id, concat('Team ', toString(number)) AS display_name,
|
||||
NULL AS profile_image_url, now64(3,'UTC') AS created_at, '{}' AS client_metadata, '{}' AS client_read_only_metadata,
|
||||
'{}' AS server_metadata, toInt64(number) AS sync_sequence_id, 0 AS sync_is_deleted, now64(3,'UTC') AS sync_created_at
|
||||
FROM numbers(150000)`,
|
||||
});
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.connected_accounts
|
||||
SELECT ${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id, ${uuidExpr("number")} AS user_id,
|
||||
['google','github','microsoft'][(number%3)+1] AS provider, concat('pa', toString(number)) AS provider_account_id,
|
||||
now64(3,'UTC') AS created_at, toInt64(number) AS sync_sequence_id, 0 AS sync_is_deleted, now64(3,'UTC') AS sync_created_at
|
||||
FROM numbers(300000)`,
|
||||
});
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.email_outboxes
|
||||
SELECT ${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id,
|
||||
reinterpretAsUUID(MD5(concat('eo', toString(number)))) AS id, 'SENT' AS status, 'OK' AS simple_status,
|
||||
'API' AS created_with, NULL AS email_draft_id, NULL AS email_programmatic_call_template_id, NULL AS theme_id,
|
||||
0 AS is_high_priority, 1 AS is_transactional, 'Subj' AS subject, NULL AS notification_category_id,
|
||||
NULL AS started_rendering_at, NULL AS rendered_at, NULL AS render_error, now64(3,'UTC') AS scheduled_at,
|
||||
now64(3,'UTC') AS created_at, now64(3,'UTC') AS updated_at, NULL AS started_sending_at, NULL AS server_error,
|
||||
NULL AS delivered_at, NULL AS opened_at, NULL AS clicked_at, NULL AS unsubscribed_at, NULL AS marked_as_spam_at,
|
||||
NULL AS bounced_at, NULL AS delivery_delayed_at, NULL AS can_have_delivery_info, NULL AS skipped_reason,
|
||||
NULL AS skipped_details, 0 AS send_retries, 0 AS is_paused, toInt64(number) AS sync_sequence_id, 0 AS sync_is_deleted, now64(3,'UTC') AS sync_created_at
|
||||
FROM numbers(500000)`,
|
||||
});
|
||||
|
||||
// ---- clickmap_events (~2M, ~5% dead) ----
|
||||
log("CH: seeding clickmap_events");
|
||||
await chAdmin.command({
|
||||
query: `
|
||||
INSERT INTO bench_pa.clickmap_events
|
||||
SELECT ${projExpr("number")} AS project_id, '${BRANCH}' AS branch_id,
|
||||
now64(3,'UTC') - toIntervalSecond(cityHash64(number,'t') % (90*86400)) AS event_at,
|
||||
toString(number % ${NUM_USERS}) AS user_id, NULL AS session_replay_id,
|
||||
'https://app.example.com/x' AS url, '/x' AS path, 1280 AS viewport_width, 800 AS viewport_height,
|
||||
100 AS pointer_x, 200 AS pointer_y, 200 AS client_y, 0.1 AS pointer_relative_x, 0 AS pointer_target_fixed,
|
||||
'' AS elements_chain, 'button' AS selector, 'Click' AS elements_text, 'button' AS tag_name, NULL AS href,
|
||||
toUInt8(cityHash64(number,'d') % 20 = 0) AS is_dead
|
||||
FROM numbers(2000000)`,
|
||||
});
|
||||
|
||||
await chAdmin.command({ query: "SYSTEM FLUSH LOGS" });
|
||||
for (const t of ["events", "users", "contact_channels", "teams", "connected_accounts", "email_outboxes", "clickmap_events"]) {
|
||||
const r = await (await chAdmin.query({ query: `SELECT count() c FROM bench_pa.${t}`, format: "JSONEachRow" })).json<{ c: string }>();
|
||||
log(` bench_pa.${t}: ${Number(r[0].c).toLocaleString()} rows`);
|
||||
}
|
||||
}
|
||||
|
||||
async function seedPostgres() {
|
||||
log("PG: drop+create schema bench_pa");
|
||||
const ex = (sql: string) => globalPrismaClient.$executeRawUnsafe(sql);
|
||||
await ex("DROP SCHEMA IF EXISTS bench_pa CASCADE");
|
||||
await ex("CREATE SCHEMA bench_pa");
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."Tenancy" (id uuid PRIMARY KEY, "projectId" text, "branchId" text)`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."SubscriptionInvoice" ("tenancyId" uuid, status text, "amountTotal" int, "createdAt" timestamptz)`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."Subscription" ("tenancyId" uuid, status text, product jsonb, "priceId" text, quantity int)`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."AuthMethod" ("tenancyId" uuid, id uuid, PRIMARY KEY("tenancyId", id))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."OAuthAuthMethod" ("tenancyId" uuid, "authMethodId" uuid, "configOAuthProviderId" text, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."PasswordAuthMethod" ("tenancyId" uuid, "authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."PasskeyAuthMethod" ("tenancyId" uuid, "authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."OtpAuthMethod" ("tenancyId" uuid, "authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."EmailOutbox" ("tenancyId" uuid, "finishedSendingAt" timestamptz, "deliveredAt" timestamptz, "bouncedAt" timestamptz, "simpleStatus" text, "createdAt" timestamptz)`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."SessionReplay" ("tenancyId" uuid)`);
|
||||
|
||||
// tenancy: one per project. deterministic uuid via md5.
|
||||
await ex(`INSERT INTO bench_pa."Tenancy"(id,"projectId","branchId")
|
||||
SELECT md5('ten'||g)::uuid, 'bench-proj-'||g, '${BRANCH}' FROM generate_series(0, ${NUM_PROJECTS - 1}) g`);
|
||||
|
||||
// helper: a tenancy id drawn with power-law (concentrate on low-index projects)
|
||||
const zipfTen = `md5('ten'|| floor(${NUM_PROJECTS} * power(random(), ${ZIPF_K}))::int )::uuid`;
|
||||
const days90 = `now() - (random()*90) * interval '1 day'`;
|
||||
|
||||
log("PG: seeding SubscriptionInvoice (200k)");
|
||||
await ex(`INSERT INTO bench_pa."SubscriptionInvoice"("tenancyId",status,"amountTotal","createdAt")
|
||||
SELECT ${zipfTen}, (ARRAY['paid','paid','paid','succeeded','void','open'])[1+floor(random()*6)::int],
|
||||
(500 + floor(random()*50000))::int, ${days90} FROM generate_series(1,200000)`);
|
||||
|
||||
log("PG: seeding Subscription (60k)");
|
||||
await ex(`INSERT INTO bench_pa."Subscription"("tenancyId",status,product,"priceId",quantity)
|
||||
SELECT ${zipfTen}, (ARRAY['active','active','active','trialing','paused','canceled'])[1+floor(random()*6)::int],
|
||||
jsonb_build_object('prices', jsonb_build_object('price_main',
|
||||
jsonb_build_object('interval', jsonb_build_array('month', 1), 'USD', (5+floor(random()*95))::text || '.99'))),
|
||||
'price_main', (1+floor(random()*5))::int FROM generate_series(1,60000)`);
|
||||
|
||||
log("PG: seeding AuthMethod (1M) + subtypes");
|
||||
await ex(`INSERT INTO bench_pa."AuthMethod"("tenancyId",id)
|
||||
SELECT ${zipfTen}, md5('am'||g)::uuid FROM generate_series(0, ${NUM_USERS - 1}) g`);
|
||||
await ex(`INSERT INTO bench_pa."OAuthAuthMethod"("tenancyId","authMethodId","configOAuthProviderId")
|
||||
SELECT "tenancyId", id, (ARRAY['google','github','microsoft','gitlab'])[1+floor(random()*4)::int]
|
||||
FROM bench_pa."AuthMethod" WHERE (('x'||substr(md5(id::text),1,8))::bit(32)::int) % 2 = 0`);
|
||||
await ex(`INSERT INTO bench_pa."PasswordAuthMethod"("tenancyId","authMethodId")
|
||||
SELECT "tenancyId", id FROM bench_pa."AuthMethod" WHERE (('x'||substr(md5(id::text),1,8))::bit(32)::int) % 10 IN (1,2,3)`);
|
||||
await ex(`INSERT INTO bench_pa."OtpAuthMethod"("tenancyId","authMethodId")
|
||||
SELECT "tenancyId", id FROM bench_pa."AuthMethod" WHERE (('x'||substr(md5(id::text),1,8))::bit(32)::int) % 10 IN (4,5)`);
|
||||
await ex(`INSERT INTO bench_pa."PasskeyAuthMethod"("tenancyId","authMethodId")
|
||||
SELECT "tenancyId", id FROM bench_pa."AuthMethod" WHERE (('x'||substr(md5(id::text),1,8))::bit(32)::int) % 20 = 7`);
|
||||
|
||||
log("PG: seeding EmailOutbox (1M)");
|
||||
await ex(`INSERT INTO bench_pa."EmailOutbox"("tenancyId","finishedSendingAt","deliveredAt","bouncedAt","simpleStatus","createdAt")
|
||||
SELECT ${zipfTen},
|
||||
CASE WHEN random() < 0.95 THEN now() ELSE NULL END,
|
||||
CASE WHEN random() < 0.88 THEN now() ELSE NULL END,
|
||||
CASE WHEN random() < 0.02 THEN now() ELSE NULL END,
|
||||
(ARRAY['OK','OK','OK','OK','IN_PROGRESS','ERROR'])[1+floor(random()*6)::int], ${days90}
|
||||
FROM generate_series(1,1000000)`);
|
||||
|
||||
log("PG: seeding SessionReplay (100k)");
|
||||
await ex(`INSERT INTO bench_pa."SessionReplay"("tenancyId") SELECT ${zipfTen} FROM generate_series(1,100000)`);
|
||||
|
||||
log("PG: secondary indexes + ANALYZE");
|
||||
for (const t of ["SubscriptionInvoice", "Subscription", "EmailOutbox", "SessionReplay"]) {
|
||||
await ex(`CREATE INDEX ON bench_pa."${t}" ("tenancyId")`);
|
||||
}
|
||||
await ex(`ANALYZE bench_pa."Tenancy", bench_pa."SubscriptionInvoice", bench_pa."Subscription", bench_pa."AuthMethod", bench_pa."OAuthAuthMethod", bench_pa."PasswordAuthMethod", bench_pa."PasskeyAuthMethod", bench_pa."OtpAuthMethod", bench_pa."EmailOutbox", bench_pa."SessionReplay"`);
|
||||
const cnt = await globalPrismaClient.$queryRawUnsafe<Array<{ t: string, c: bigint }>>(`
|
||||
SELECT 'Tenancy' t, count(*) c FROM bench_pa."Tenancy"
|
||||
UNION ALL SELECT 'SubscriptionInvoice', count(*) FROM bench_pa."SubscriptionInvoice"
|
||||
UNION ALL SELECT 'Subscription', count(*) FROM bench_pa."Subscription"
|
||||
UNION ALL SELECT 'AuthMethod', count(*) FROM bench_pa."AuthMethod"
|
||||
UNION ALL SELECT 'EmailOutbox', count(*) FROM bench_pa."EmailOutbox"
|
||||
UNION ALL SELECT 'SessionReplay', count(*) FROM bench_pa."SessionReplay"`);
|
||||
for (const r of cnt) log(` bench_pa.${r.t}: ${Number(r.c).toLocaleString()} rows`);
|
||||
}
|
||||
|
||||
// =====================================================================
|
||||
// QUERIES (verbatim from route.tsx, table refs -> bench_pa)
|
||||
// =====================================================================
|
||||
const T = "bench_pa"; // CH database
|
||||
const baseParams = { branchId: BRANCH, internalProjectId: "internal" };
|
||||
const windowParams = { ...baseParams, since: sinceParam, until: untilParam };
|
||||
const twoWindowParams = { ...baseParams, priorSince: priorSinceParam, mid: midParam, until: untilParam };
|
||||
const userCountsParams = { ...baseParams, mid: midParam };
|
||||
const customerEventScope = `project_id != {internalProjectId:String}`;
|
||||
const customerUserScope = `branch_id = {branchId:String} AND sync_is_deleted = 0 AND project_id != {internalProjectId:String}`;
|
||||
const verifiedSubquery = `
|
||||
(project_id, id) IN (
|
||||
SELECT project_id, user_id FROM ${T}.contact_channels FINAL
|
||||
WHERE branch_id = {branchId:String} AND sync_is_deleted = 0 AND type = 'EMAIL' AND is_verified = 1
|
||||
)`;
|
||||
|
||||
type CountRow = { projectId: string, c: string | number };
|
||||
type ChQ = { name: string, what: string, sql: string, params: Record<string, unknown> };
|
||||
const CH_QUERIES: ChQ[] = [
|
||||
{ name: "dauSeries", what: "Daily active users/day over 30d window (uniqExact user_id on $token-refresh)", params: windowParams, sql: `
|
||||
SELECT toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c FROM ${T}.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime} GROUP BY day ORDER BY day ASC` },
|
||||
{ name: "pvSeries", what: "Page views + unique visitors per day over 30d ($page-view/$click)", params: windowParams, sql: `
|
||||
SELECT toDate(event_at) AS day, countIf(event_type='$page-view') AS pv,
|
||||
uniqExactIf(assumeNotNull(user_id), event_type='$page-view') AS visitors FROM ${T}.events
|
||||
WHERE event_type IN ('$page-view','$click') AND ${customerEventScope}
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime} GROUP BY day ORDER BY day ASC` },
|
||||
{ name: "signupSeries", what: "Signups/day over 30d (users FINAL, non-anon)", params: windowParams, sql: `
|
||||
SELECT toDate(signed_up_at,'UTC') AS day, count() AS c FROM ${T}.users FINAL
|
||||
WHERE ${customerUserScope} AND is_anonymous = 0
|
||||
AND signed_up_at >= {since:DateTime} AND signed_up_at < {until:DateTime} GROUP BY day ORDER BY day ASC` },
|
||||
{ name: "mauProjects", what: "MAU + active projects, current vs prior 30d in one 60d pass", params: twoWindowParams, sql: `
|
||||
SELECT uniqExactIf(assumeNotNull(user_id), event_at >= {mid:DateTime}) AS mauCur,
|
||||
uniqExactIf(assumeNotNull(user_id), event_at < {mid:DateTime}) AS mauPrev,
|
||||
uniqExactIf(project_id, event_at >= {mid:DateTime}) AS projCur,
|
||||
uniqExactIf(project_id, event_at < {mid:DateTime}) AS projPrev FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {priorSince:DateTime} AND event_at < {until:DateTime}` },
|
||||
{ name: "userCounts", what: "Total/verified/anonymous user stock (users FINAL, verified via contact_channels IN-subquery)", params: userCountsParams, sql: `
|
||||
SELECT countIf(is_anonymous=0) AS total, countIf(is_anonymous=0 AND signed_up_at < {mid:DateTime}) AS totalPrev,
|
||||
countIf(is_anonymous=0 AND ${verifiedSubquery}) AS verified,
|
||||
countIf(is_anonymous=0 AND signed_up_at < {mid:DateTime} AND ${verifiedSubquery}) AS verifiedPrev,
|
||||
countIf(is_anonymous=1) AS anonymous FROM ${T}.users FINAL WHERE ${customerUserScope}` },
|
||||
{ name: "country", what: "Users by country over 30d (argMax country per user)", params: windowParams, sql: `
|
||||
SELECT country_code, count() AS c FROM (
|
||||
SELECT user_id, argMax(cc, event_at) AS country_code FROM (
|
||||
SELECT user_id, event_at, CAST(data.ip_info.country_code,'Nullable(String)') AS cc FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime}
|
||||
) WHERE cc IS NOT NULL GROUP BY user_id
|
||||
) WHERE country_code IS NOT NULL GROUP BY country_code ORDER BY c DESC` },
|
||||
{ name: "deadClicks", what: "Dead-click rate over 30d (clickmap_events)", params: windowParams, sql: `
|
||||
SELECT count() AS clicks, sum(is_dead) AS dead FROM ${T}.clickmap_events
|
||||
WHERE ${customerEventScope} AND event_at >= {since:DateTime} AND event_at < {until:DateTime}` },
|
||||
{ name: "split", what: "New/retained/reactivated split (window fn + all-history LEFT JOIN for first_date)", params: windowParams, sql: `
|
||||
SELECT toString(w.day) AS day, count() AS total_count,
|
||||
countIf(f.first_date = w.day) AS new_count,
|
||||
countIf(f.first_date < w.day AND w.prev_day = addDays(w.day,-1)) AS retained_count,
|
||||
countIf(f.first_date < w.day AND (isNull(w.prev_day) OR w.prev_day < addDays(w.day,-1))) AS reactivated_count
|
||||
FROM (
|
||||
SELECT day, entity_id, lagInFrame(day,1) OVER (PARTITION BY entity_id ORDER BY day) AS prev_day FROM (
|
||||
SELECT DISTINCT toDate(event_at) AS day, assumeNotNull(user_id) AS entity_id FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime}
|
||||
AND coalesce(CAST(data.is_anonymous,'Nullable(UInt8)'),0)=0
|
||||
)
|
||||
) AS w LEFT JOIN (
|
||||
SELECT assumeNotNull(user_id) AS entity_id, toDate(min(event_at)) AS first_date FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at < {until:DateTime} AND coalesce(CAST(data.is_anonymous,'Nullable(UInt8)'),0)=0 GROUP BY entity_id
|
||||
) AS f USING (entity_id) GROUP BY w.day ORDER BY w.day ASC` },
|
||||
{ name: "totalsByProject", what: "Per-project total users (users FINAL)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.users FINAL WHERE ${customerUserScope} AND is_anonymous=0 GROUP BY project_id` },
|
||||
{ name: "verifiedByProject", what: "Per-project verified users (users FINAL + verified IN-subquery)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.users FINAL WHERE ${customerUserScope} AND is_anonymous=0 AND ${verifiedSubquery} GROUP BY project_id` },
|
||||
{ name: "signupsByProject", what: "Per-project signups cur vs prior (users FINAL)", params: twoWindowParams, sql: `
|
||||
SELECT project_id AS projectId, countIf(signed_up_at >= {mid:DateTime}) AS cur, countIf(signed_up_at < {mid:DateTime}) AS prev
|
||||
FROM ${T}.users FINAL WHERE ${customerUserScope} AND is_anonymous=0
|
||||
AND signed_up_at >= {priorSince:DateTime} AND signed_up_at < {until:DateTime} GROUP BY project_id` },
|
||||
{ name: "activeByProject", what: "Per-project active users cur vs prior (events)", params: twoWindowParams, sql: `
|
||||
SELECT project_id AS projectId, uniqExactIf(assumeNotNull(user_id), event_at >= {mid:DateTime}) AS cur,
|
||||
uniqExactIf(assumeNotNull(user_id), event_at < {mid:DateTime}) AS prev FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {priorSince:DateTime} AND event_at < {until:DateTime} GROUP BY project_id` },
|
||||
{ name: "sparkByProject", what: "Per-project daily active sparkline over 30d (events)", params: windowParams, sql: `
|
||||
SELECT project_id AS projectId, toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c FROM ${T}.events
|
||||
WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${customerEventScope}
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime} GROUP BY project_id, day` },
|
||||
{ name: "teamsByProject", what: "Feature adoption: teams per project (teams FINAL)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.teams FINAL WHERE ${customerUserScope} GROUP BY project_id` },
|
||||
{ name: "oauthByProject", what: "Feature adoption: connected_accounts per project (FINAL)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.connected_accounts FINAL WHERE ${customerUserScope} GROUP BY project_id` },
|
||||
{ name: "emailsByProject", what: "Feature adoption: email_outboxes per project (FINAL)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.email_outboxes FINAL WHERE ${customerUserScope} GROUP BY project_id` },
|
||||
{ name: "analyticsByProject", what: "Feature adoption: $page-view per project (events, branch-filtered)", params: baseParams, sql: `
|
||||
SELECT project_id AS projectId, count() AS c FROM ${T}.events
|
||||
WHERE event_type='$page-view' AND branch_id = {branchId:String} AND ${customerEventScope} GROUP BY project_id` },
|
||||
];
|
||||
|
||||
const INTERNAL = "internal";
|
||||
type PgQ = { name: string, what: string, sql: string };
|
||||
const PG_QUERIES: PgQ[] = [
|
||||
{ name: "revenueDaily", what: "Daily revenue over 30d (SubscriptionInvoice JOIN Tenancy)", sql: `
|
||||
SELECT TO_CHAR(si."createdAt"::date,'YYYY-MM-DD') AS day, COALESCE(SUM(si."amountTotal"),0)::bigint AS cents
|
||||
FROM bench_pa."SubscriptionInvoice" si JOIN bench_pa."Tenancy" t ON t.id=si."tenancyId"
|
||||
WHERE si."amountTotal" IS NOT NULL AND si.status = ANY(ARRAY['paid','succeeded'])
|
||||
AND si."createdAt" >= $1 AND t."projectId" <> '${INTERNAL}' GROUP BY day ORDER BY day` },
|
||||
{ name: "revenueByProject", what: "Per-project revenue cur vs prior (SubscriptionInvoice JOIN Tenancy)", sql: `
|
||||
SELECT t."projectId" AS "projectId",
|
||||
COALESCE(SUM("amountTotal") FILTER (WHERE si."createdAt" >= $1),0)::bigint AS cur,
|
||||
COALESCE(SUM("amountTotal") FILTER (WHERE si."createdAt" < $1),0)::bigint AS prev
|
||||
FROM bench_pa."SubscriptionInvoice" si JOIN bench_pa."Tenancy" t ON t.id=si."tenancyId"
|
||||
WHERE si."amountTotal" IS NOT NULL AND si.status = ANY(ARRAY['paid','succeeded'])
|
||||
AND si."createdAt" >= $2 AND t."projectId" <> '${INTERNAL}' GROUP BY t."projectId"` },
|
||||
{ name: "subscriptions", what: "All active/trialing subs for MRR (Subscription JOIN Tenancy)", sql: `
|
||||
SELECT t."projectId" AS "projectId", s.product AS product, s."priceId" AS "priceId", s.quantity AS quantity
|
||||
FROM bench_pa."Subscription" s JOIN bench_pa."Tenancy" t ON t.id=s."tenancyId"
|
||||
WHERE s.status::text = ANY(ARRAY['active','trialing']) AND t."projectId" <> '${INTERNAL}'` },
|
||||
{ name: "authMethods", what: "Auth-method split (AuthMethod + 4 LEFT JOINs to subtype tables)", sql: `
|
||||
SELECT method, COUNT(*)::int AS count FROM (
|
||||
SELECT COALESCE(oaam."configOAuthProviderId"::text,
|
||||
CASE WHEN pam."authMethodId" IS NOT NULL THEN 'password' END,
|
||||
CASE WHEN pkm."authMethodId" IS NOT NULL THEN 'passkey' END,
|
||||
CASE WHEN oam."authMethodId" IS NOT NULL THEN 'otp' END, 'other') AS method
|
||||
FROM bench_pa."AuthMethod" am JOIN bench_pa."Tenancy" t ON t.id=am."tenancyId"
|
||||
LEFT JOIN bench_pa."OAuthAuthMethod" oaam ON oaam."tenancyId"=am."tenancyId" AND oaam."authMethodId"=am.id
|
||||
LEFT JOIN bench_pa."PasswordAuthMethod" pam ON pam."tenancyId"=am."tenancyId" AND pam."authMethodId"=am.id
|
||||
LEFT JOIN bench_pa."PasskeyAuthMethod" pkm ON pkm."tenancyId"=am."tenancyId" AND pkm."authMethodId"=am.id
|
||||
LEFT JOIN bench_pa."OtpAuthMethod" oam ON oam."tenancyId"=am."tenancyId" AND oam."authMethodId"=am.id
|
||||
WHERE t."projectId" <> '${INTERNAL}') sub GROUP BY method ORDER BY count DESC` },
|
||||
{ name: "email", what: "Email deliverability counters (EmailOutbox JOIN Tenancy)", sql: `
|
||||
SELECT COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL)::int AS sent,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL)::int AS delivered,
|
||||
COUNT(*) FILTER (WHERE eo."bouncedAt" IS NOT NULL)::int AS bounced,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='ERROR')::int AS error,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='IN_PROGRESS')::int AS in_progress,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt" >= $1)::int AS "deliveredCur",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt" >= $1)::int AS "finishedCur",
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt" >= $2 AND eo."createdAt" < $1)::int AS "deliveredPrev",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt" >= $2 AND eo."createdAt" < $1)::int AS "finishedPrev"
|
||||
FROM bench_pa."EmailOutbox" eo JOIN bench_pa."Tenancy" t ON t.id=eo."tenancyId" WHERE t."projectId" <> '${INTERNAL}'` },
|
||||
{ name: "paymentsRows", what: "DISTINCT projects with a sub (Subscription JOIN Tenancy)", sql: `
|
||||
SELECT DISTINCT t."projectId" AS "projectId" FROM bench_pa."Subscription" s JOIN bench_pa."Tenancy" t ON t.id=s."tenancyId"
|
||||
WHERE s.status IN ('active','trialing','paused') AND t."projectId" <> '${INTERNAL}'` },
|
||||
{ name: "replayRows", what: "DISTINCT projects with session replay (SessionReplay JOIN Tenancy)", sql: `
|
||||
SELECT DISTINCT t."projectId" AS "projectId" FROM bench_pa."SessionReplay" sr JOIN bench_pa."Tenancy" t ON t.id=sr."tenancyId"
|
||||
WHERE t."projectId" <> '${INTERNAL}'` },
|
||||
];
|
||||
|
||||
const PG_PARAMS: Record<string, unknown[]> = {
|
||||
revenueDaily: [windowStart],
|
||||
revenueByProject: [windowStart, priorStart],
|
||||
subscriptions: [],
|
||||
authMethods: [],
|
||||
email: [windowStart, priorStart],
|
||||
paymentsRows: [],
|
||||
replayRows: [],
|
||||
};
|
||||
|
||||
// =====================================================================
|
||||
// RUN + MEASURE
|
||||
// =====================================================================
|
||||
type ChResult = { name: string, what: string, durationMs: number, memMiB: number, readRows: number, readMiB: number, resultRows: number, error?: string };
|
||||
async function runChQuery(q: ChQ): Promise<ChResult> {
|
||||
const reps = 3;
|
||||
let best: ChResult | null = null;
|
||||
for (let i = 0; i < reps; i++) {
|
||||
const queryId = `pa-${q.name}-${randomUUID()}`;
|
||||
try {
|
||||
const r = await chMetrics.query({ query: q.sql, query_params: q.params, query_id: queryId, format: "JSONEachRow" });
|
||||
const rows = await r.json();
|
||||
await chMetrics.command({ query: "SYSTEM FLUSH LOGS" });
|
||||
const stat = await (await chMetrics.query({
|
||||
query: `SELECT query_duration_ms, memory_usage, read_rows, read_bytes, result_rows
|
||||
FROM system.query_log WHERE query_id={qid:String} AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1`,
|
||||
query_params: { qid: queryId }, format: "JSONEachRow",
|
||||
})).json<{ query_duration_ms: string, memory_usage: string, read_rows: string, read_bytes: string, result_rows: string }>();
|
||||
const s = stat[0];
|
||||
const res: ChResult = {
|
||||
name: q.name, what: q.what,
|
||||
durationMs: Number(s?.query_duration_ms ?? 0), memMiB: Number(s?.memory_usage ?? 0) / 1048576,
|
||||
readRows: Number(s?.read_rows ?? 0), readMiB: Number(s?.read_bytes ?? 0) / 1048576,
|
||||
resultRows: Number(s?.result_rows ?? rows.length),
|
||||
};
|
||||
if (!best || res.durationMs < best.durationMs) best = res;
|
||||
} catch (e) {
|
||||
return { name: q.name, what: q.what, durationMs: -1, memMiB: 0, readRows: 0, readMiB: 0, resultRows: 0, error: (e as Error).message.slice(0, 200) };
|
||||
}
|
||||
}
|
||||
return best!;
|
||||
}
|
||||
|
||||
type PgResult = { name: string, what: string, durationMs: number, planMs: number, sharedHitMiB: number, sharedReadMiB: number, rows: number, plan: string, error?: string };
|
||||
async function runPgQuery(q: PgQ): Promise<PgResult> {
|
||||
const params = PG_PARAMS[q.name] ?? [];
|
||||
const reps = 3;
|
||||
let best: PgResult | null = null;
|
||||
for (let i = 0; i < reps; i++) {
|
||||
try {
|
||||
const explainSql = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${q.sql}`;
|
||||
const rows = await globalPrismaClient.$queryRawUnsafe<Array<{ "QUERY PLAN": unknown }>>(explainSql, ...params);
|
||||
const planArr = rows[0]["QUERY PLAN"] as Array<{ Plan: Record<string, unknown>, "Execution Time": number, "Planning Time": number }>;
|
||||
const p = planArr[0];
|
||||
const top = p.Plan;
|
||||
const sharedHit = Number(top["Shared Hit Blocks"] ?? 0) + sumChildren(top, "Shared Hit Blocks");
|
||||
const sharedRead = Number(top["Shared Read Blocks"] ?? 0) + sumChildren(top, "Shared Read Blocks");
|
||||
const res: PgResult = {
|
||||
name: q.name, what: q.what,
|
||||
durationMs: Number(p["Execution Time"]), planMs: Number(p["Planning Time"]),
|
||||
sharedHitMiB: (sharedHit * 8192) / 1048576, sharedReadMiB: (sharedRead * 8192) / 1048576,
|
||||
rows: Number(top["Actual Rows"] ?? 0), plan: topNodes(top),
|
||||
};
|
||||
if (!best || res.durationMs < best.durationMs) best = res;
|
||||
} catch (e) {
|
||||
return { name: q.name, what: q.what, durationMs: -1, planMs: 0, sharedHitMiB: 0, sharedReadMiB: 0, rows: 0, plan: "", error: (e as Error).message.slice(0, 200) };
|
||||
}
|
||||
}
|
||||
return best!;
|
||||
}
|
||||
function sumChildren(node: Record<string, unknown>, key: string): number {
|
||||
const plans = (node.Plans as Array<Record<string, unknown>>) ?? [];
|
||||
let s = 0;
|
||||
for (const c of plans) s += Number(c[key] ?? 0) + sumChildren(c, key);
|
||||
return s;
|
||||
}
|
||||
function topNodes(node: Record<string, unknown>, depth = 0): string {
|
||||
const t = String(node["Node Type"] ?? "");
|
||||
const rel = node["Relation Name"] ? ` ${node["Relation Name"]}` : "";
|
||||
let s = `${" ".repeat(depth)}${t}${rel} (rows=${node["Actual Rows"]})`;
|
||||
const plans = (node.Plans as Array<Record<string, unknown>>) ?? [];
|
||||
for (const c of plans.slice(0, 3)) s += "\n" + topNodes(c, depth + 1);
|
||||
return s;
|
||||
}
|
||||
|
||||
async function main() {
|
||||
log(`scale: ${NUM_PROJECTS.toLocaleString()} projects, ${NUM_USERS.toLocaleString()} users, ${NUM_EVENTS.toLocaleString()} CH events`);
|
||||
if (!envBool("PA_SKIP_SEED")) {
|
||||
const s0 = Date.now();
|
||||
await seedClickhouse();
|
||||
await seedPostgres();
|
||||
log(`seeding done in ${((Date.now() - s0) / 1000).toFixed(0)}s`);
|
||||
} else {
|
||||
log("PA_SKIP_SEED=1 -> reusing existing bench_pa");
|
||||
}
|
||||
|
||||
log("running ClickHouse queries...");
|
||||
const chResults: ChResult[] = [];
|
||||
for (const q of CH_QUERIES) { const r = await runChQuery(q); chResults.push(r); log(` CH ${q.name}: ${r.error ? "ERR " + r.error : `${r.durationMs}ms, ${r.memMiB.toFixed(0)}MiB, read ${r.readRows.toLocaleString()} rows`}`); }
|
||||
|
||||
log("running Postgres queries...");
|
||||
const pgResults: PgResult[] = [];
|
||||
for (const q of PG_QUERIES) { const r = await runPgQuery(q); pgResults.push(r); log(` PG ${q.name}: ${r.error ? "ERR " + r.error : `${r.durationMs.toFixed(0)}ms, ${(r.sharedHitMiB + r.sharedReadMiB).toFixed(0)}MiB buffers`}`); }
|
||||
|
||||
const out = {
|
||||
generatedAt: new Date().toISOString(),
|
||||
scale: { projects: NUM_PROJECTS, users: NUM_USERS, events: NUM_EVENTS, zipfK: ZIPF_K },
|
||||
window: { since: sinceParam, prior: priorSinceParam, until: untilParam },
|
||||
clickhouse: chResults, postgres: pgResults,
|
||||
chTotal: { sumMemMiB: chResults.reduce((s, r) => s + r.memMiB, 0), maxMs: Math.max(...chResults.map(r => r.durationMs)), sumMs: chResults.reduce((s, r) => s + Math.max(0, r.durationMs), 0) },
|
||||
pgTotal: { sumMs: pgResults.reduce((s, r) => s + Math.max(0, r.durationMs), 0), maxMs: Math.max(...pgResults.map(r => r.durationMs)) },
|
||||
};
|
||||
writeFileSync(OUT, JSON.stringify(out, null, 2));
|
||||
log(`wrote ${OUT}`);
|
||||
|
||||
if (!envBool("PA_KEEP")) {
|
||||
log("cleanup: dropping bench_pa (CH + PG)");
|
||||
await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" });
|
||||
await globalPrismaClient.$executeRawUnsafe("DROP SCHEMA IF EXISTS bench_pa CASCADE");
|
||||
}
|
||||
process.exit(0);
|
||||
}
|
||||
main().catch((e) => { console.error("BENCH FAILED:", e); process.exit(1); });
|
||||
247
apps/backend/scripts/optimize-platform-analytics.ts
Normal file
247
apps/backend/scripts/optimize-platform-analytics.ts
Normal file
@ -0,0 +1,247 @@
|
||||
/**
|
||||
* Optimization harness: rewrite the heaviest platform-analytics queries and
|
||||
* prove each variant returns BYTE-IDENTICAL results to the original while
|
||||
* lowering peak memory. Isolated scratch DBs (bench_pa), seeded server-side.
|
||||
*
|
||||
* This is what justified the sipHash64 key changes in route.tsx (dauSeries,
|
||||
* sparkByProject, mauProjects, activeByProject) — exact, and 30-60% less peak
|
||||
* memory. It also documents the negative results: hashing the country group key
|
||||
* doesn't help (argMax payload dominates), and the PG authMethods rewrites are
|
||||
* far worse than the original 5-join plan once the production composite PK
|
||||
* indexes exist.
|
||||
*
|
||||
* Run: pnpm --filter @hexclave/backend run with-env:dev tsx scripts/optimize-platform-analytics.ts
|
||||
* Env: PA_SKIP_SEED=1 reuse data; PA_KEEP=1 keep data; PA_EVENTS / PA_USERS / PA_PROJECTS.
|
||||
*/
|
||||
import { getClickhouseAdminClient, getClickhouseAdminClientForMetrics } from "@/lib/clickhouse";
|
||||
import { globalPrismaClient } from "@/prisma-client";
|
||||
import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { writeFileSync } from "node:fs";
|
||||
|
||||
const envInt = (n: string, f: number) => { const v = getEnvVariable(n, ""); return v === "" ? f : Number(v); };
|
||||
const envBool = (n: string) => ["1", "true"].includes(getEnvVariable(n, ""));
|
||||
const NUM_PROJECTS = envInt("PA_PROJECTS", 10_000);
|
||||
const NUM_USERS = envInt("PA_USERS", 1_000_000);
|
||||
const NUM_EVENTS = envInt("PA_EVENTS", 50_000_000);
|
||||
const ZIPF_K = 4, BRANCH = "main", INTERNAL = "internal";
|
||||
const chAdmin = getClickhouseAdminClient();
|
||||
const chMetrics = getClickhouseAdminClientForMetrics();
|
||||
const log = (...a: unknown[]) => console.log(`[${new Date().toISOString().slice(11, 19)}]`, ...a);
|
||||
|
||||
const ONE_DAY_MS = 86400000, WINDOW_DAYS = 30;
|
||||
const now = new Date(); const todayUtc = new Date(now); todayUtc.setUTCHours(0, 0, 0, 0);
|
||||
const windowStart = new Date(todayUtc.getTime() - (WINDOW_DAYS - 1) * ONE_DAY_MS);
|
||||
const priorStart = new Date(todayUtc.getTime() - (2 * WINDOW_DAYS - 1) * ONE_DAY_MS);
|
||||
const untilExclusive = new Date(todayUtc.getTime() + ONE_DAY_MS);
|
||||
const chDT = (d: Date) => d.toISOString().slice(0, 19);
|
||||
const sinceParam = chDT(windowStart), midParam = chDT(windowStart), priorSinceParam = chDT(priorStart), untilParam = chDT(untilExclusive);
|
||||
|
||||
async function seed() {
|
||||
log("CH: (re)create bench_pa.events");
|
||||
await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" });
|
||||
await chAdmin.command({ query: "CREATE DATABASE bench_pa" });
|
||||
await chAdmin.command({ query: "CREATE TABLE bench_pa.events AS analytics_internal.events" });
|
||||
const projExpr = (k: string) => `concat('bench-proj-', toString(toUInt32(floor(${NUM_PROJECTS} * pow((cityHash64(${k}) % 1000000)/1000000.0, ${ZIPF_K})))))`;
|
||||
const cc = `['US','DE','IN','BR','GB','FR','JP','CA','AU','NL'][(cityHash64(number,'cc') % 10)+1]`;
|
||||
const CHUNK = 5_000_000;
|
||||
for (let off = 0; off < NUM_EVENTS; off += CHUNK) {
|
||||
const n = Math.min(CHUNK, NUM_EVENTS - off);
|
||||
await chAdmin.command({ query: `
|
||||
INSERT INTO bench_pa.events SELECT
|
||||
['$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$page-view','$page-view','$click'][((number+${off}) % 10)+1],
|
||||
now64(3,'UTC') - toIntervalSecond(cityHash64(number+${off},'t') % (90*86400)),
|
||||
CAST(concat('{"is_anonymous":', toString(toUInt8(cityHash64((number+${off}) % ${NUM_USERS},'a') % 10 = 0)), ',"ip_info":{"country_code":"', ${cc}, '"},"referrer":""}'), 'JSON'),
|
||||
${projExpr(`(number+${off}) % ${NUM_USERS}`)}, '${BRANCH}', toString((number+${off}) % ${NUM_USERS}),
|
||||
NULL, NULL, NULL, NULL, now64(3,'UTC')
|
||||
FROM numbers(${n})` });
|
||||
log(` events ${(off + n).toLocaleString()}/${NUM_EVENTS.toLocaleString()}`);
|
||||
}
|
||||
|
||||
log("PG: (re)create bench_pa schema + auth/email tables WITH production composite indexes");
|
||||
const ex = (s: string) => globalPrismaClient.$executeRawUnsafe(s);
|
||||
await ex("DROP SCHEMA IF EXISTS bench_pa CASCADE");
|
||||
await ex("CREATE SCHEMA bench_pa");
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."Tenancy"(id uuid PRIMARY KEY,"projectId" text,"branchId" text)`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."AuthMethod"("tenancyId" uuid,id uuid, PRIMARY KEY("tenancyId",id))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."OAuthAuthMethod"("tenancyId" uuid,"authMethodId" uuid,"configOAuthProviderId" text, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."PasswordAuthMethod"("tenancyId" uuid,"authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."PasskeyAuthMethod"("tenancyId" uuid,"authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."OtpAuthMethod"("tenancyId" uuid,"authMethodId" uuid, PRIMARY KEY("tenancyId","authMethodId"))`);
|
||||
await ex(`CREATE UNLOGGED TABLE bench_pa."EmailOutbox"("tenancyId" uuid,"finishedSendingAt" timestamptz,"deliveredAt" timestamptz,"bouncedAt" timestamptz,"simpleStatus" text,"createdAt" timestamptz)`);
|
||||
await ex(`INSERT INTO bench_pa."Tenancy" SELECT md5('ten'||g)::uuid,'bench-proj-'||g,'${BRANCH}' FROM generate_series(0,${NUM_PROJECTS - 1}) g`);
|
||||
const zipfTen = `md5('ten'|| floor(${NUM_PROJECTS}*power(random(),${ZIPF_K}))::int )::uuid`;
|
||||
await ex(`INSERT INTO bench_pa."AuthMethod" SELECT ${zipfTen}, md5('am'||g)::uuid FROM generate_series(0,${NUM_USERS - 1}) g`);
|
||||
const h = `(('x'||substr(md5(id::text),1,8))::bit(32)::int)`;
|
||||
await ex(`INSERT INTO bench_pa."OAuthAuthMethod" SELECT "tenancyId",id,(ARRAY['google','github','microsoft','gitlab'])[1+floor(random()*4)::int] FROM bench_pa."AuthMethod" WHERE ${h}%2=0`);
|
||||
await ex(`INSERT INTO bench_pa."PasswordAuthMethod" SELECT "tenancyId",id FROM bench_pa."AuthMethod" WHERE ${h}%10 IN (1,2,3)`);
|
||||
await ex(`INSERT INTO bench_pa."OtpAuthMethod" SELECT "tenancyId",id FROM bench_pa."AuthMethod" WHERE ${h}%10 IN (4,5)`);
|
||||
await ex(`INSERT INTO bench_pa."PasskeyAuthMethod" SELECT "tenancyId",id FROM bench_pa."AuthMethod" WHERE ${h}%20=7`);
|
||||
await ex(`INSERT INTO bench_pa."EmailOutbox" SELECT ${zipfTen},
|
||||
CASE WHEN random()<0.95 THEN now() END, CASE WHEN random()<0.88 THEN now() END, CASE WHEN random()<0.02 THEN now() END,
|
||||
(ARRAY['OK','OK','OK','OK','IN_PROGRESS','ERROR'])[1+floor(random()*6)::int], now()-(random()*90)*interval '1 day'
|
||||
FROM generate_series(1,1000000)`);
|
||||
await ex(`ANALYZE bench_pa."Tenancy",bench_pa."AuthMethod",bench_pa."OAuthAuthMethod",bench_pa."PasswordAuthMethod",bench_pa."PasskeyAuthMethod",bench_pa."OtpAuthMethod",bench_pa."EmailOutbox"`);
|
||||
log("seed done");
|
||||
}
|
||||
|
||||
const T = "bench_pa";
|
||||
const evScope = `project_id != {internalProjectId:String}`;
|
||||
const P = {
|
||||
base: { branchId: BRANCH, internalProjectId: INTERNAL },
|
||||
win: { branchId: BRANCH, internalProjectId: INTERNAL, since: sinceParam, until: untilParam },
|
||||
two: { branchId: BRANCH, internalProjectId: INTERNAL, priorSince: priorSinceParam, mid: midParam, until: untilParam },
|
||||
};
|
||||
type Run = { ms: number, memMiB: number, readRows: number, canon: string };
|
||||
const canonRows = (rows: Array<Record<string, unknown>>) =>
|
||||
rows.map((r) => JSON.stringify(Object.keys(r).sort().map((k) => [k, String(r[k])]))).sort().join("|");
|
||||
|
||||
async function runCh(sql: string, params: Record<string, unknown>): Promise<Run> {
|
||||
let best: Run | null = null;
|
||||
for (let i = 0; i < 3; i++) {
|
||||
const qid = `opt-${randomUUID()}`;
|
||||
const rows = await (await chMetrics.query({ query: sql, query_params: params, query_id: qid, format: "JSONEachRow" })).json<Record<string, unknown>>();
|
||||
await chMetrics.command({ query: "SYSTEM FLUSH LOGS" });
|
||||
const s = (await (await chMetrics.query({ query: `SELECT query_duration_ms d, memory_usage m, read_rows r FROM system.query_log WHERE query_id={q:String} AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1`, query_params: { q: qid }, format: "JSONEachRow" })).json<{ d: string, m: string, r: string }>())[0];
|
||||
const run: Run = { ms: Number(s?.d ?? 0), memMiB: Number(s?.m ?? 0) / 1048576, readRows: Number(s?.r ?? 0), canon: canonRows(rows) };
|
||||
if (!best || run.memMiB < best.memMiB) best = run;
|
||||
}
|
||||
return best!;
|
||||
}
|
||||
|
||||
type PgRun = { ms: number, bufMiB: number, rows: number, canon: string };
|
||||
async function runPg(sql: string, params: unknown[]): Promise<PgRun> {
|
||||
const result = await globalPrismaClient.$queryRawUnsafe<Array<Record<string, unknown>>>(sql, ...params);
|
||||
const canon = canonRows(result.map((r) => Object.fromEntries(Object.entries(r).map(([k, v]) => [k, typeof v === "bigint" ? v.toString() : v]))));
|
||||
let best: { ms: number, bufMiB: number, rows: number } | null = null;
|
||||
for (let i = 0; i < 3; i++) {
|
||||
const plan = (await globalPrismaClient.$queryRawUnsafe<Array<{ "QUERY PLAN": Array<{ Plan: Record<string, unknown>, "Execution Time": number }> }>>(`EXPLAIN (ANALYZE,BUFFERS,FORMAT JSON) ${sql}`, ...params))[0]["QUERY PLAN"][0];
|
||||
const sumBuf = (node: Record<string, unknown>): number => {
|
||||
let s = Number(node["Shared Hit Blocks"] ?? 0) + Number(node["Shared Read Blocks"] ?? 0) + Number(node["Temp Written Blocks"] ?? 0) + Number(node["Temp Read Blocks"] ?? 0);
|
||||
for (const c of (node.Plans as Array<Record<string, unknown>> ?? [])) s += sumBuf(c);
|
||||
return s;
|
||||
};
|
||||
const run = { ms: Number(plan["Execution Time"]), bufMiB: sumBuf(plan.Plan) * 8192 / 1048576, rows: Number(plan.Plan["Actual Rows"] ?? 0) };
|
||||
if (!best || run.bufMiB < best.bufMiB) best = run;
|
||||
}
|
||||
return { ...best!, canon };
|
||||
}
|
||||
|
||||
type ChCase = { query: string, variants: Array<{ name: string, sql: string, params: Record<string, unknown> }> };
|
||||
const CH_CASES: ChCase[] = [
|
||||
{ query: "dauSeries", variants: [
|
||||
{ name: "original", params: P.win, sql: `SELECT toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime} GROUP BY day ORDER BY day ASC` },
|
||||
{ name: "uniqExact(sipHash64)", params: P.win, sql: `SELECT toDate(event_at) AS day, uniqExact(sipHash64(assumeNotNull(user_id))) AS c FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime} GROUP BY day ORDER BY day ASC` },
|
||||
] },
|
||||
{ query: "sparkByProject", variants: [
|
||||
{ name: "original", params: P.win, sql: `SELECT project_id AS projectId, toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime} GROUP BY project_id, day` },
|
||||
{ name: "uniqExact(sipHash64)", params: P.win, sql: `SELECT project_id AS projectId, toDate(event_at) AS day, uniqExact(sipHash64(assumeNotNull(user_id))) AS c FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime} GROUP BY project_id, day` },
|
||||
] },
|
||||
{ query: "mauProjects", variants: [
|
||||
{ name: "original", params: P.two, sql: `SELECT uniqExactIf(assumeNotNull(user_id),event_at>={mid:DateTime}) AS mauCur, uniqExactIf(assumeNotNull(user_id),event_at<{mid:DateTime}) AS mauPrev, uniqExactIf(project_id,event_at>={mid:DateTime}) AS projCur, uniqExactIf(project_id,event_at<{mid:DateTime}) AS projPrev FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={priorSince:DateTime} AND event_at<{until:DateTime}` },
|
||||
{ name: "uniqExactIf(sipHash64)", params: P.two, sql: `SELECT uniqExactIf(sipHash64(assumeNotNull(user_id)),event_at>={mid:DateTime}) AS mauCur, uniqExactIf(sipHash64(assumeNotNull(user_id)),event_at<{mid:DateTime}) AS mauPrev, uniqExactIf(project_id,event_at>={mid:DateTime}) AS projCur, uniqExactIf(project_id,event_at<{mid:DateTime}) AS projPrev FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={priorSince:DateTime} AND event_at<{until:DateTime}` },
|
||||
] },
|
||||
{ query: "activeByProject", variants: [
|
||||
{ name: "original", params: P.two, sql: `SELECT project_id AS projectId, uniqExactIf(assumeNotNull(user_id),event_at>={mid:DateTime}) AS cur, uniqExactIf(assumeNotNull(user_id),event_at<{mid:DateTime}) AS prev FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={priorSince:DateTime} AND event_at<{until:DateTime} GROUP BY project_id` },
|
||||
{ name: "uniqExactIf(sipHash64)", params: P.two, sql: `SELECT project_id AS projectId, uniqExactIf(sipHash64(assumeNotNull(user_id)),event_at>={mid:DateTime}) AS cur, uniqExactIf(sipHash64(assumeNotNull(user_id)),event_at<{mid:DateTime}) AS prev FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={priorSince:DateTime} AND event_at<{until:DateTime} GROUP BY project_id` },
|
||||
] },
|
||||
{ query: "country", variants: [
|
||||
{ name: "original", params: P.win, sql: `SELECT country_code, count() AS c FROM (SELECT user_id, argMax(cc,event_at) AS country_code FROM (SELECT user_id,event_at,CAST(data.ip_info.country_code,'Nullable(String)') AS cc FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime}) WHERE cc IS NOT NULL GROUP BY user_id) WHERE country_code IS NOT NULL GROUP BY country_code ORDER BY c DESC` },
|
||||
{ name: "GROUP BY sipHash64(user_id)", params: P.win, sql: `SELECT country_code, count() AS c FROM (SELECT argMax(cc,event_at) AS country_code FROM (SELECT sipHash64(assumeNotNull(user_id)) AS uid,event_at,CAST(data.ip_info.country_code,'Nullable(String)') AS cc FROM ${T}.events WHERE event_type='$token-refresh' AND user_id IS NOT NULL AND ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime}) WHERE cc IS NOT NULL GROUP BY uid) WHERE country_code IS NOT NULL GROUP BY country_code ORDER BY c DESC` },
|
||||
] },
|
||||
];
|
||||
|
||||
const am = (j: string) => `bench_pa."${j}"`;
|
||||
type PgCase = { query: string, params: unknown[], variants: Array<{ name: string, sql: string }> };
|
||||
const PG_CASES: PgCase[] = [
|
||||
{ query: "authMethods", params: [], variants: [
|
||||
{ name: "original (5 LEFT JOINs)", sql: `
|
||||
SELECT method, COUNT(*)::int AS count FROM (
|
||||
SELECT COALESCE(oaam."configOAuthProviderId"::text,
|
||||
CASE WHEN pam."authMethodId" IS NOT NULL THEN 'password' END,
|
||||
CASE WHEN pkm."authMethodId" IS NOT NULL THEN 'passkey' END,
|
||||
CASE WHEN oam."authMethodId" IS NOT NULL THEN 'otp' END, 'other') AS method
|
||||
FROM ${am("AuthMethod")} am JOIN ${am("Tenancy")} t ON t.id=am."tenancyId"
|
||||
LEFT JOIN ${am("OAuthAuthMethod")} oaam ON oaam."tenancyId"=am."tenancyId" AND oaam."authMethodId"=am.id
|
||||
LEFT JOIN ${am("PasswordAuthMethod")} pam ON pam."tenancyId"=am."tenancyId" AND pam."authMethodId"=am.id
|
||||
LEFT JOIN ${am("PasskeyAuthMethod")} pkm ON pkm."tenancyId"=am."tenancyId" AND pkm."authMethodId"=am.id
|
||||
LEFT JOIN ${am("OtpAuthMethod")} oam ON oam."tenancyId"=am."tenancyId" AND oam."authMethodId"=am.id
|
||||
WHERE t."projectId" <> '${INTERNAL}') sub GROUP BY method ORDER BY count DESC` },
|
||||
{ name: "correlated subqueries (precedence preserved)", sql: `
|
||||
SELECT method, COUNT(*)::int AS count FROM (
|
||||
SELECT COALESCE(
|
||||
(SELECT oaam."configOAuthProviderId"::text FROM ${am("OAuthAuthMethod")} oaam WHERE oaam."tenancyId"=am."tenancyId" AND oaam."authMethodId"=am.id),
|
||||
CASE WHEN EXISTS(SELECT 1 FROM ${am("PasswordAuthMethod")} pam WHERE pam."tenancyId"=am."tenancyId" AND pam."authMethodId"=am.id) THEN 'password' END,
|
||||
CASE WHEN EXISTS(SELECT 1 FROM ${am("PasskeyAuthMethod")} pkm WHERE pkm."tenancyId"=am."tenancyId" AND pkm."authMethodId"=am.id) THEN 'passkey' END,
|
||||
CASE WHEN EXISTS(SELECT 1 FROM ${am("OtpAuthMethod")} oam WHERE oam."tenancyId"=am."tenancyId" AND oam."authMethodId"=am.id) THEN 'otp' END,
|
||||
'other') AS method
|
||||
FROM ${am("AuthMethod")} am JOIN ${am("Tenancy")} t ON t.id=am."tenancyId" WHERE t."projectId" <> '${INTERNAL}') sub GROUP BY method ORDER BY count DESC` },
|
||||
] },
|
||||
{ query: "email", params: [windowStart, priorStart], variants: [
|
||||
{ name: "original (1M-row scan, 9 FILTERs)", sql: `
|
||||
SELECT COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL)::int AS sent,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL)::int AS delivered,
|
||||
COUNT(*) FILTER (WHERE eo."bouncedAt" IS NOT NULL)::int AS bounced,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='ERROR')::int AS error,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='IN_PROGRESS')::int AS in_progress,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt">=$1)::int AS "deliveredCur",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt">=$1)::int AS "finishedCur",
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt">=$2 AND eo."createdAt"<$1)::int AS "deliveredPrev",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt">=$2 AND eo."createdAt"<$1)::int AS "finishedPrev"
|
||||
FROM ${am("EmailOutbox")} eo JOIN ${am("Tenancy")} t ON t.id=eo."tenancyId" WHERE t."projectId" <> '${INTERNAL}'` },
|
||||
{ name: "drop Tenancy join (EXISTS anti-filter)", sql: `
|
||||
SELECT COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL)::int AS sent,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL)::int AS delivered,
|
||||
COUNT(*) FILTER (WHERE eo."bouncedAt" IS NOT NULL)::int AS bounced,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='ERROR')::int AS error,
|
||||
COUNT(*) FILTER (WHERE eo."simpleStatus"::text='IN_PROGRESS')::int AS in_progress,
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt">=$1)::int AS "deliveredCur",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt">=$1)::int AS "finishedCur",
|
||||
COUNT(*) FILTER (WHERE eo."deliveredAt" IS NOT NULL AND eo."createdAt">=$2 AND eo."createdAt"<$1)::int AS "deliveredPrev",
|
||||
COUNT(*) FILTER (WHERE eo."finishedSendingAt" IS NOT NULL AND eo."createdAt">=$2 AND eo."createdAt"<$1)::int AS "finishedPrev"
|
||||
FROM ${am("EmailOutbox")} eo WHERE EXISTS(SELECT 1 FROM ${am("Tenancy")} t WHERE t.id=eo."tenancyId" AND t."projectId" <> '${INTERNAL}')` },
|
||||
] },
|
||||
];
|
||||
|
||||
async function main() {
|
||||
if (!envBool("PA_SKIP_SEED")) await seed(); else log("reusing bench_pa");
|
||||
const report: { ch: unknown[], pg: unknown[] } = { ch: [], pg: [] };
|
||||
|
||||
log("=== ClickHouse ===");
|
||||
for (const c of CH_CASES) {
|
||||
const runs: Array<{ name: string, run: Run, ok: boolean }> = [];
|
||||
let baseCanon = "";
|
||||
for (const v of c.variants) {
|
||||
const run = await runCh(v.sql, v.params);
|
||||
if (v.name === "original") baseCanon = run.canon;
|
||||
const ok = run.canon === baseCanon;
|
||||
runs.push({ name: v.name, run, ok });
|
||||
log(` ${c.query} / ${v.name}: ${run.memMiB.toFixed(0)}MiB ${run.ms}ms read=${run.readRows.toLocaleString()} ${ok ? "EQ" : "DIFF"}`);
|
||||
}
|
||||
const orig = runs[0].run;
|
||||
const best = runs.filter((r) => r.ok).reduce((a, b) => b.run.memMiB < a.run.memMiB ? b : a);
|
||||
report.ch.push({ query: c.query, origMemMiB: orig.memMiB, origMs: orig.ms, variants: runs.map((r) => ({ name: r.name, memMiB: r.run.memMiB, ms: r.run.ms, readRows: r.run.readRows, equal: r.ok })), best: best.name, bestMemMiB: best.run.memMiB, memReductionPct: (1 - best.run.memMiB / orig.memMiB) * 100 });
|
||||
}
|
||||
|
||||
log("=== Postgres ===");
|
||||
for (const c of PG_CASES) {
|
||||
const runs: Array<{ name: string, run: PgRun, ok: boolean }> = [];
|
||||
let baseCanon = "";
|
||||
for (const v of c.variants) {
|
||||
const run = await runPg(v.sql, c.params);
|
||||
if (v.name.startsWith("original")) baseCanon = run.canon;
|
||||
const ok = run.canon === baseCanon;
|
||||
runs.push({ name: v.name, run, ok });
|
||||
log(` ${c.query} / ${v.name}: ${run.bufMiB.toFixed(0)}MiB buf ${run.ms.toFixed(0)}ms ${ok ? "EQ" : "DIFF"}`);
|
||||
}
|
||||
const orig = runs[0].run;
|
||||
const best = runs.filter((r) => r.ok).reduce((a, b) => b.run.bufMiB < a.run.bufMiB ? b : a);
|
||||
report.pg.push({ query: c.query, origBufMiB: orig.bufMiB, origMs: orig.ms, variants: runs.map((r) => ({ name: r.name, bufMiB: r.run.bufMiB, ms: r.run.ms, equal: r.ok })), best: best.name, bestBufMiB: best.run.bufMiB, bufReductionPct: (1 - best.run.bufMiB / orig.bufMiB) * 100 });
|
||||
}
|
||||
|
||||
writeFileSync("/tmp/platform-analytics-optimize.json", JSON.stringify({ generatedAt: new Date().toISOString(), scale: { NUM_PROJECTS, NUM_USERS, NUM_EVENTS }, ...report }, null, 2));
|
||||
log("wrote /tmp/platform-analytics-optimize.json");
|
||||
if (!envBool("PA_KEEP")) { await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" }); await globalPrismaClient.$executeRawUnsafe("DROP SCHEMA IF EXISTS bench_pa CASCADE"); }
|
||||
process.exit(0);
|
||||
}
|
||||
main().catch((e) => { console.error("FAILED:", e); process.exit(1); });
|
||||
163
apps/backend/scripts/optimize-split.ts
Normal file
163
apps/backend/scripts/optimize-split.ts
Normal file
@ -0,0 +1,163 @@
|
||||
/**
|
||||
* Focused optimization of the platform-analytics "activity split"
|
||||
* (new/retained/reactivated) ClickHouse query. Tests exact rewrites AND
|
||||
* approximate (user-sampled) variants, measuring peak memory + accuracy.
|
||||
*
|
||||
* This is what justified shipping ACTIVITY_SPLIT_SAMPLE in route.tsx: at 1M
|
||||
* users / 50M events, 1-in-4 consistent user sampling cut the split's peak
|
||||
* memory ~78% (1.3 GiB -> ~0.3 GiB) for a ~0.4% mean error.
|
||||
*
|
||||
* Run: pnpm --filter @hexclave/backend run with-env:dev tsx scripts/optimize-split.ts
|
||||
* Env: PA_SKIP_SEED=1 PA_KEEP=1 PA_EVENTS PA_USERS PA_PROJECTS
|
||||
*/
|
||||
import { getClickhouseAdminClient, getClickhouseAdminClientForMetrics, METRICS_CLICKHOUSE_SETTINGS } from "@/lib/clickhouse";
|
||||
import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { writeFileSync } from "node:fs";
|
||||
|
||||
const envInt = (n: string, f: number) => { const v = getEnvVariable(n, ""); return v === "" ? f : Number(v); };
|
||||
const envBool = (n: string) => ["1", "true"].includes(getEnvVariable(n, ""));
|
||||
const NUM_PROJECTS = envInt("PA_PROJECTS", 10_000), NUM_USERS = envInt("PA_USERS", 1_000_000), NUM_EVENTS = envInt("PA_EVENTS", 50_000_000);
|
||||
const ZIPF_K = 4, BRANCH = "main", INTERNAL = "internal";
|
||||
const chAdmin = getClickhouseAdminClient();
|
||||
const chMetrics = getClickhouseAdminClientForMetrics();
|
||||
const log = (...a: unknown[]) => console.log(`[${new Date().toISOString().slice(11, 19)}]`, ...a);
|
||||
|
||||
const ONE_DAY_MS = 86400000, WINDOW_DAYS = 30;
|
||||
const now = new Date(); const todayUtc = new Date(now); todayUtc.setUTCHours(0, 0, 0, 0);
|
||||
const windowStart = new Date(todayUtc.getTime() - (WINDOW_DAYS - 1) * ONE_DAY_MS);
|
||||
const untilExclusive = new Date(todayUtc.getTime() + ONE_DAY_MS);
|
||||
const chDT = (d: Date) => d.toISOString().slice(0, 19);
|
||||
const sinceParam = chDT(windowStart), startParam = chDT(windowStart), untilParam = chDT(untilExclusive);
|
||||
const T = "bench_pa";
|
||||
const params = { branchId: BRANCH, internalProjectId: INTERNAL, since: sinceParam, start: startParam, until: untilParam };
|
||||
|
||||
async function seed() {
|
||||
log("CH: (re)create bench_pa.events");
|
||||
await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" });
|
||||
await chAdmin.command({ query: "CREATE DATABASE bench_pa" });
|
||||
await chAdmin.command({ query: "CREATE TABLE bench_pa.events AS analytics_internal.events" });
|
||||
const projExpr = (k: string) => `concat('bench-proj-', toString(toUInt32(floor(${NUM_PROJECTS} * pow((cityHash64(${k}) % 1000000)/1000000.0, ${ZIPF_K})))))`;
|
||||
const cc = `['US','DE','IN','BR','GB','FR','JP','CA','AU','NL'][(cityHash64(number,'cc') % 10)+1]`;
|
||||
const CHUNK = 5_000_000;
|
||||
for (let off = 0; off < NUM_EVENTS; off += CHUNK) {
|
||||
const n = Math.min(CHUNK, NUM_EVENTS - off);
|
||||
await chAdmin.command({ query: `INSERT INTO bench_pa.events SELECT
|
||||
['$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$token-refresh','$page-view','$page-view','$click'][((number+${off}) % 10)+1],
|
||||
now64(3,'UTC') - toIntervalSecond(cityHash64(number+${off},'t') % (90*86400)),
|
||||
CAST(concat('{"is_anonymous":', toString(toUInt8(cityHash64((number+${off}) % ${NUM_USERS},'a') % 10 = 0)), ',"ip_info":{"country_code":"', ${cc}, '"},"referrer":""}'), 'JSON'),
|
||||
${projExpr(`(number+${off}) % ${NUM_USERS}`)}, '${BRANCH}', toString((number+${off}) % ${NUM_USERS}), NULL, NULL, NULL, NULL, now64(3,'UTC')
|
||||
FROM numbers(${n})` });
|
||||
log(` events ${(off + n).toLocaleString()}/${NUM_EVENTS.toLocaleString()}`);
|
||||
}
|
||||
log("seed done");
|
||||
}
|
||||
|
||||
// classification expressions shared by exact variants
|
||||
const NEW = "f.first_date=w.day", RET = "f.first_date<w.day AND w.prev_day=addDays(w.day,-1)", REA = "f.first_date<w.day AND (isNull(w.prev_day) OR w.prev_day<addDays(w.day,-1))";
|
||||
const anon = "coalesce(CAST(data.is_anonymous,'Nullable(UInt8)'),0)=0";
|
||||
const evScope = `event_type='$token-refresh' AND user_id IS NOT NULL AND project_id != {internalProjectId:String} AND ${anon}`;
|
||||
|
||||
function exactSql(entity: string) {
|
||||
return `SELECT toString(w.day) AS day, count() AS total_count, countIf(${NEW}) AS new_count, countIf(${RET}) AS retained_count, countIf(${REA}) AS reactivated_count
|
||||
FROM (SELECT day, entity_id, lagInFrame(day,1) OVER (PARTITION BY entity_id ORDER BY day) AS prev_day FROM (
|
||||
SELECT DISTINCT toDate(event_at) AS day, ${entity} AS entity_id FROM ${T}.events
|
||||
WHERE ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime})) AS w
|
||||
LEFT JOIN (SELECT ${entity} AS entity_id, toDate(min(event_at)) AS first_date FROM ${T}.events
|
||||
WHERE ${evScope} AND event_at<{until:DateTime} GROUP BY entity_id) AS f USING (entity_id)
|
||||
GROUP BY w.day ORDER BY w.day ASC`;
|
||||
}
|
||||
|
||||
function boundedSql(entity: string) {
|
||||
return `WITH seen AS (SELECT DISTINCT ${entity} AS entity_id FROM ${T}.events WHERE ${evScope} AND event_at<{start:DateTime})
|
||||
SELECT toString(w.day) AS day, count() AS total_count,
|
||||
countIf(isNull(w.prev_day) AND w.seen_before=0) AS new_count,
|
||||
countIf(w.prev_day=addDays(w.day,-1)) AS retained_count,
|
||||
countIf((isNull(w.prev_day) AND w.seen_before=1) OR (isNotNull(w.prev_day) AND w.prev_day<addDays(w.day,-1))) AS reactivated_count
|
||||
FROM (SELECT day, entity_id, seen_before, lagInFrame(day,1) OVER (PARTITION BY entity_id ORDER BY day) AS prev_day FROM (
|
||||
SELECT d.day AS day, d.entity_id AS entity_id, (s.entity_id IS NOT NULL) AS seen_before FROM (
|
||||
SELECT DISTINCT toDate(event_at) AS day, ${entity} AS entity_id FROM ${T}.events
|
||||
WHERE ${evScope} AND event_at>={since:DateTime} AND event_at<{until:DateTime}) d
|
||||
LEFT JOIN seen s USING (entity_id))) AS w
|
||||
GROUP BY w.day ORDER BY w.day ASC`;
|
||||
}
|
||||
|
||||
// APPROXIMATE: consistent user-level sampling 1/K (cityHash bucket), counts scaled by K
|
||||
function sampledSql(K: number) {
|
||||
const samp = `AND cityHash64(assumeNotNull(user_id)) % ${K} = 0`;
|
||||
const entity = "sipHash64(assumeNotNull(user_id))";
|
||||
return `SELECT toString(w.day) AS day, count()*${K} AS total_count, countIf(${NEW})*${K} AS new_count, countIf(${RET})*${K} AS retained_count, countIf(${REA})*${K} AS reactivated_count
|
||||
FROM (SELECT day, entity_id, lagInFrame(day,1) OVER (PARTITION BY entity_id ORDER BY day) AS prev_day FROM (
|
||||
SELECT DISTINCT toDate(event_at) AS day, ${entity} AS entity_id FROM ${T}.events
|
||||
WHERE ${evScope} ${samp} AND event_at>={since:DateTime} AND event_at<{until:DateTime})) AS w
|
||||
LEFT JOIN (SELECT ${entity} AS entity_id, toDate(min(event_at)) AS first_date FROM ${T}.events
|
||||
WHERE ${evScope} ${samp} AND event_at<{until:DateTime} GROUP BY entity_id) AS f USING (entity_id)
|
||||
GROUP BY w.day ORDER BY w.day ASC`;
|
||||
}
|
||||
|
||||
type Day = { day: string, total_count: number, new_count: number, retained_count: number, reactivated_count: number };
|
||||
type Run = { mem: number, ms: number, readRows: number, rows: Day[] };
|
||||
async function run(sql: string, settings?: Record<string, string>): Promise<Run> {
|
||||
let best: Run | null = null;
|
||||
for (let i = 0; i < 3; i++) {
|
||||
const qid = `split-${randomUUID()}`;
|
||||
const r = await chMetrics.query({ query: sql, query_params: params, query_id: qid, format: "JSONEachRow", clickhouse_settings: { ...METRICS_CLICKHOUSE_SETTINGS, ...settings } });
|
||||
const rows = (await r.json<Record<string, string>>()).map((x) => ({ day: x.day, total_count: Number(x.total_count), new_count: Number(x.new_count), retained_count: Number(x.retained_count), reactivated_count: Number(x.reactivated_count) }));
|
||||
await chMetrics.command({ query: "SYSTEM FLUSH LOGS" });
|
||||
const s = (await (await chMetrics.query({ query: `SELECT query_duration_ms d, memory_usage m, read_rows rr FROM system.query_log WHERE query_id={q:String} AND type='QueryFinish' ORDER BY event_time DESC LIMIT 1`, query_params: { q: qid }, format: "JSONEachRow" })).json<{ d: string, m: string, rr: string }>())[0];
|
||||
const run = { mem: Number(s?.m ?? 0) / 1048576, ms: Number(s?.d ?? 0), readRows: Number(s?.rr ?? 0), rows };
|
||||
if (!best || run.mem < best.mem) best = run;
|
||||
}
|
||||
return best!;
|
||||
}
|
||||
|
||||
// accuracy vs exact ground truth: mean & max abs % error over days, per metric
|
||||
function accuracy(truth: Day[], approx: Day[]) {
|
||||
const tm = new Map(truth.map((d) => [d.day, d]));
|
||||
const metrics = ["total_count", "new_count", "retained_count", "reactivated_count"] as const;
|
||||
const errsAll: number[] = [];
|
||||
const per: Record<string, { mean: number, max: number }> = {};
|
||||
for (const m of metrics) {
|
||||
const errs: number[] = [];
|
||||
for (const a of approx) {
|
||||
const t = tm.get(a.day); if (!t) continue;
|
||||
const tv = t[m], av = a[m];
|
||||
if (tv === 0) { if (av !== 0) errs.push(100); continue; }
|
||||
const e = Math.abs(av - tv) / tv * 100; errs.push(e); errsAll.push(e);
|
||||
}
|
||||
per[m] = { mean: errs.reduce((s, x) => s + x, 0) / Math.max(1, errs.length), max: Math.max(0, ...errs) };
|
||||
}
|
||||
return { per, overallMean: errsAll.reduce((s, x) => s + x, 0) / Math.max(1, errsAll.length), overallMax: Math.max(0, ...errsAll) };
|
||||
}
|
||||
|
||||
async function main() {
|
||||
if (!envBool("PA_SKIP_SEED")) await seed(); else log("reusing bench_pa.events");
|
||||
|
||||
const cases: Array<{ name: string, kind: "exact" | "approx", sql: string, settings?: Record<string, string> }> = [
|
||||
{ name: "original (string entity)", kind: "exact", sql: exactSql("assumeNotNull(user_id)") },
|
||||
{ name: "sipHash entity", kind: "exact", sql: exactSql("sipHash64(assumeNotNull(user_id))") },
|
||||
{ name: "exact bounded first_date (sipHash)", kind: "exact", sql: boundedSql("sipHash64(assumeNotNull(user_id))") },
|
||||
{ name: "sipHash + max_threads=4", kind: "exact", sql: exactSql("sipHash64(assumeNotNull(user_id))"), settings: { max_threads: "4" } },
|
||||
{ name: "sipHash + max_threads=2", kind: "exact", sql: exactSql("sipHash64(assumeNotNull(user_id))"), settings: { max_threads: "2" } },
|
||||
{ name: "APPROX sample 1/4 (x4)", kind: "approx", sql: sampledSql(4) },
|
||||
{ name: "APPROX sample 1/10 (x10)", kind: "approx", sql: sampledSql(10) },
|
||||
{ name: "APPROX sample 1/20 (x20)", kind: "approx", sql: sampledSql(20) },
|
||||
{ name: "APPROX sample 1/10 + max_threads=4", kind: "approx", sql: sampledSql(10), settings: { max_threads: "4" } },
|
||||
];
|
||||
|
||||
let truth: Day[] = [];
|
||||
const out: unknown[] = [];
|
||||
for (const c of cases) {
|
||||
const r = await run(c.sql, c.settings);
|
||||
if (c.name.startsWith("original")) truth = r.rows;
|
||||
const acc = c.kind === "approx" ? accuracy(truth, r.rows) : (c.name.startsWith("original") ? null : accuracy(truth, r.rows));
|
||||
const accStr = acc ? `err mean ${acc.overallMean.toFixed(1)}% max ${acc.overallMax.toFixed(1)}%` : (c.name.startsWith("original") ? "(ground truth)" : "exact");
|
||||
log(` ${c.name.padEnd(38)} mem ${r.mem.toFixed(0).padStart(5)}MiB ${r.ms.toFixed(0).padStart(5)}ms read ${r.readRows.toLocaleString().padStart(13)} ${accStr}`);
|
||||
out.push({ name: c.name, kind: c.kind, memMiB: r.mem, ms: r.ms, readRows: r.readRows, accuracy: acc, rows: r.rows });
|
||||
}
|
||||
writeFileSync("/tmp/split-optimize.json", JSON.stringify({ generatedAt: new Date().toISOString(), scale: { NUM_PROJECTS, NUM_USERS, NUM_EVENTS }, cases: out }, null, 2));
|
||||
log("wrote /tmp/split-optimize.json");
|
||||
if (!envBool("PA_KEEP")) await chAdmin.command({ query: "DROP DATABASE IF EXISTS bench_pa" });
|
||||
process.exit(0);
|
||||
}
|
||||
main().catch((e) => { console.error("FAILED:", e); process.exit(1); });
|
||||
@ -16,6 +16,14 @@ import { HexclaveAssertionError } from "@hexclave/shared/dist/utils/errors";
|
||||
const WINDOW_DAYS = 30;
|
||||
const ONE_DAY_MS = 24 * 60 * 60 * 1000;
|
||||
const LEADERBOARD_LIMIT = 500;
|
||||
// 1-in-N consistent user-level sampling for the new/retained/reactivated activity
|
||||
// split, with counts scaled back up by N. The split's window function + all-history
|
||||
// scan made it the heaviest query in this route (~1.3 GiB peak at 1M users / 50M
|
||||
// events); sampling 1/4 cuts that ~78% for a ~0.4% mean error. The same cityHash
|
||||
// bucket is applied to both subqueries so each sampled user's full activity
|
||||
// sequence is preserved (retention/reactivation stay unbiased). See
|
||||
// scripts/benchmark-platform-analytics.ts.
|
||||
const ACTIVITY_SPLIT_SAMPLE = 4;
|
||||
const INTERNAL_PROJECT_ID = "internal";
|
||||
const AVG_DAYS_PER_MONTH = 365.25 / 12;
|
||||
const MRR_SUBSCRIPTION_STATUSES = ["active", "trialing"];
|
||||
@ -260,7 +268,7 @@ export const GET = createSmartRouteHandler({
|
||||
] = await Promise.all([
|
||||
// Platform daily DAU (active users) over the visible window.
|
||||
chQuery<{ day: string, c: string | number }>(`
|
||||
SELECT toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c
|
||||
SELECT toDate(event_at) AS day, uniqExact(sipHash64(assumeNotNull(user_id))) AS c
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL
|
||||
AND ${customerEventScope}
|
||||
@ -289,8 +297,8 @@ export const GET = createSmartRouteHandler({
|
||||
// MAU + active projects, current vs prior 30d window (single pass over 60d).
|
||||
chQuery<{ mauCur: string | number, mauPrev: string | number, projCur: string | number, projPrev: string | number }>(`
|
||||
SELECT
|
||||
uniqExactIf(assumeNotNull(user_id), event_at >= {mid:DateTime}) AS mauCur,
|
||||
uniqExactIf(assumeNotNull(user_id), event_at < {mid:DateTime}) AS mauPrev,
|
||||
uniqExactIf(sipHash64(assumeNotNull(user_id)), event_at >= {mid:DateTime}) AS mauCur,
|
||||
uniqExactIf(sipHash64(assumeNotNull(user_id)), event_at < {mid:DateTime}) AS mauPrev,
|
||||
uniqExactIf(project_id, event_at >= {mid:DateTime}) AS projCur,
|
||||
uniqExactIf(project_id, event_at < {mid:DateTime}) AS projPrev
|
||||
FROM analytics_internal.events
|
||||
@ -332,26 +340,28 @@ export const GET = createSmartRouteHandler({
|
||||
chQuery<{ day: string, total_count: string, new_count: string, retained_count: string, reactivated_count: string }>(`
|
||||
SELECT
|
||||
toString(w.day) AS day,
|
||||
count() AS total_count,
|
||||
countIf(f.first_date = w.day) AS new_count,
|
||||
countIf(f.first_date < w.day AND w.prev_day = addDays(w.day, -1)) AS retained_count,
|
||||
countIf(f.first_date < w.day AND (isNull(w.prev_day) OR w.prev_day < addDays(w.day, -1))) AS reactivated_count
|
||||
count() * ${ACTIVITY_SPLIT_SAMPLE} AS total_count,
|
||||
countIf(f.first_date = w.day) * ${ACTIVITY_SPLIT_SAMPLE} AS new_count,
|
||||
countIf(f.first_date < w.day AND w.prev_day = addDays(w.day, -1)) * ${ACTIVITY_SPLIT_SAMPLE} AS retained_count,
|
||||
countIf(f.first_date < w.day AND (isNull(w.prev_day) OR w.prev_day < addDays(w.day, -1))) * ${ACTIVITY_SPLIT_SAMPLE} AS reactivated_count
|
||||
FROM (
|
||||
SELECT day, entity_id, lagInFrame(day, 1) OVER (PARTITION BY entity_id ORDER BY day) AS prev_day
|
||||
FROM (
|
||||
SELECT DISTINCT toDate(event_at) AS day, assumeNotNull(user_id) AS entity_id
|
||||
SELECT DISTINCT toDate(event_at) AS day, sipHash64(assumeNotNull(user_id)) AS entity_id
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL
|
||||
AND ${customerEventScope}
|
||||
AND cityHash64(assumeNotNull(user_id)) % ${ACTIVITY_SPLIT_SAMPLE} = 0
|
||||
AND event_at >= {since:DateTime} AND event_at < {until:DateTime}
|
||||
AND coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) = 0
|
||||
)
|
||||
) AS w
|
||||
LEFT JOIN (
|
||||
SELECT assumeNotNull(user_id) AS entity_id, toDate(min(event_at)) AS first_date
|
||||
SELECT sipHash64(assumeNotNull(user_id)) AS entity_id, toDate(min(event_at)) AS first_date
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL
|
||||
AND ${customerEventScope}
|
||||
AND cityHash64(assumeNotNull(user_id)) % ${ACTIVITY_SPLIT_SAMPLE} = 0
|
||||
AND event_at < {until:DateTime}
|
||||
AND coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) = 0
|
||||
GROUP BY entity_id
|
||||
@ -383,8 +393,8 @@ export const GET = createSmartRouteHandler({
|
||||
// Per-project active users, current vs prior window.
|
||||
chQuery<{ projectId: string, cur: string | number, prev: string | number }>(`
|
||||
SELECT project_id AS projectId,
|
||||
uniqExactIf(assumeNotNull(user_id), event_at >= {mid:DateTime}) AS cur,
|
||||
uniqExactIf(assumeNotNull(user_id), event_at < {mid:DateTime}) AS prev
|
||||
uniqExactIf(sipHash64(assumeNotNull(user_id)), event_at >= {mid:DateTime}) AS cur,
|
||||
uniqExactIf(sipHash64(assumeNotNull(user_id)), event_at < {mid:DateTime}) AS prev
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL
|
||||
AND ${customerEventScope}
|
||||
@ -393,7 +403,7 @@ export const GET = createSmartRouteHandler({
|
||||
`, twoWindowParams),
|
||||
// Per-project daily active sparkline (visible window).
|
||||
chQuery<{ projectId: string, day: string, c: string | number }>(`
|
||||
SELECT project_id AS projectId, toDate(event_at) AS day, uniqExact(assumeNotNull(user_id)) AS c
|
||||
SELECT project_id AS projectId, toDate(event_at) AS day, uniqExact(sipHash64(assumeNotNull(user_id))) AS c
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh' AND user_id IS NOT NULL
|
||||
AND ${customerEventScope}
|
||||
|
||||
@ -294,7 +294,8 @@ function Dashboard({
|
||||
? <EmptyChart />
|
||||
: <ComposedAnalyticsChart datapoints={composed} showVisitors showPageViews showRevenue height={300} />}
|
||||
</ChartCard>
|
||||
<ChartCard title="Growth quality" subtitle="New / retained / reactivated users" gradient="green">
|
||||
{/* Counts here are a sampled estimate (1-in-4 active users, scaled back up) to keep the backend query cheap; expect a ~0.4% margin. */}
|
||||
<ChartCard title="Growth quality" subtitle="New / retained / reactivated users · sampled estimate (~0.4%)" gradient="green">
|
||||
{stacked.length === 0
|
||||
? <EmptyChart />
|
||||
: <StackedBarChartDisplay datapoints={stacked} height={300} />}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user