mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-04 21:04:37 +08:00
Fix /internal/metrics ClickHouse OOM (#1457)
Fixes Sentry [STACK-BACKEND-16H](https://stackframe-pw.sentry.io/issues/STACK-BACKEND-16H) — the `/api/v1/internal/metrics` endpoint was triggering the cluster's 10.8 GiB OvercommitTracker kill on tenants with months of `$token-refresh` history. ## Root cause Three queries in `loadAnalyticsOverview` plus `loadUsersByCountry` did `GROUP BY user_id` over the events table with **no lower `event_at` bound**, so their hash table working set scaled with cumulative-distinct-users-ever-seen instead of the 30-day metrics window. ## Changes - Add 30-day `event_at` lower bound to `loadUsersByCountry` and to the `analyticsUserJoin` inner subquery (used by `dailyEvents`, `totalVisitors`, `topReferrers`). - New `getClickhouseAdminClientForMetrics()` factory in `lib/clickhouse.tsx` with connection-level safety net: per-query + per-user memory caps, external GROUP BY spill, and `join_algorithm: 'grace_hash,parallel_hash,hash'` (grace_hash measured to give 48% memory reduction at zero latency cost — see benchmark notes in the file). - Inline comment + concrete next steps for the long-term fix (option C: stamp `is_anonymous` at ingest on page-view/click events, then drop the join entirely). - Extend `scripts/benchmark-internal-metrics.ts` with the historical-seed knob and three new modes (`BENCH_BACKFILL_COMPARE`, `BENCH_JOIN_ALGO_COMPARE`, plus the existing `BENCH_ROUTE_QUERIES` updated) used to validate the choices above. ## Benchmark — pre-PR vs post-PR Synthetic seed: 300k users × 9 events spread over 365 days (~2.7M events). | | pre-PR | post-PR | delta | |---|---:|---:|---:| | Sum peak memory | 2.18 GiB | 515 MiB | **4.3× less** | | Max query duration | 1293 ms | 101 ms | **12.8× faster** | | Sum CPU duration | 5119 ms | 394 ms | 13× less work | | Sum bytes read | 3.87 GiB | 929 MiB | 4.3× less I/O | Per-query at 300k users: - `analyticsOverview:dailyEvents` 561 → 44 MiB (12.8× less) - `analyticsOverview:totalVisitors` 560 → 50 MiB (11.2× less) - `analyticsOverview:topReferrers` 546 → 50 MiB (10.9× less) - `loadUsersByCountry` 388 → 44 MiB (8.9× less) ## Caveats - `loadDailyActiveSplitFromClickhouse` still scans all-history on its `min(event_at)` subquery. It can't be naively bounded — `first_date` is used to classify entities as new vs reactivated, and a 30d bound would silently mislabel old-but-active entities as "new." The new SETTINGS cap+spill it; the proper fix is option C (documented inline). - A user with a page-view but no `$token-refresh` in the last 30 days now falls through to `coalesce(NULL, 0)` and is classified non-anonymous. Token-refresh fires every few minutes per active session, so this is rare but not impossible (embedded SDKs that poll less frequently, sessions straddling the 30d boundary). - `max_memory_usage_for_user: 9 GB` trades "cluster-wide OvercommitTracker kill of a random query" for "clean per-user memory error attributed to the specific query." After our 30d bounds, no query is anywhere near 9 GB. ## Test plan - [x] `pnpm typecheck` passes - [x] `pnpm lint` passes - [x] `pnpm test run apps/e2e/tests/backend/endpoints/api/v1/internal-metrics.test.ts` — 9/10 pass; the 1 failure (`risk_scores` snapshot drift) reproduces on clean `dev` and is unrelated - [x] `pnpm test run apps/e2e/tests/backend/endpoints/api/v1/analytics-{events,events-batch,query}.test.ts apps/e2e/tests/backend/endpoints/api/v1/token-refresh-events.test.ts apps/e2e/tests/backend/performance/metrics.test.ts` — all passing tests pass; 10 pre-existing `PRODUCT_DOES_NOT_EXIST` setup failures reproduce on clean `dev` - [x] Benchmark `BENCH_ROUTE_QUERIES=1` at 300k users shows the deltas above <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Chores** * Improved internal metrics collection to use metrics-specific DB settings for more reliable, safer analytical reads. * Added guardrails to metrics queries to enforce time-window bounds and avoid unbounded scans. * Expanded benchmark modes (backfill and join-algo comparisons), extended perf seeding, and improved logging/retry behavior to capture more complete stats and reduce missing log rows. <!-- review_stack_entry_start --> [](https://app.coderabbit.ai/change-stack/hexclave/stack-auth/pull/1457?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
parent
002692e519
commit
91b8e4caa4
@ -48,7 +48,7 @@
|
||||
* BENCH_TEAM_RATIO (default 0.3) – fraction of users with a team
|
||||
*/
|
||||
|
||||
import { getClickhouseAdminClient } from "@/lib/clickhouse";
|
||||
import { getClickhouseAdminClient, getClickhouseAdminClientForMetrics } from "@/lib/clickhouse";
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
@ -377,7 +377,7 @@ const ANALYTICS_USER_JOIN = `
|
||||
`;
|
||||
const NON_ANON_FILTER = "({includeAnonymous:UInt8} = 1 OR coalesce(JSONExtract(toJSONString(e.data), 'is_anonymous', 'Nullable(UInt8)'), token_refresh_users.latest_is_anonymous, 0) = 0)";
|
||||
|
||||
// Same joins/filters after fix 1 (direct CAST instead of JSONExtract(toJSONString(...)))
|
||||
// Post-fix: lower `event_at >= since` bound added to the inner subquery.
|
||||
const ANALYTICS_USER_JOIN_AFTER = `
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
@ -388,6 +388,7 @@ const ANALYTICS_USER_JOIN_AFTER = `
|
||||
AND project_id = {projectId:String}
|
||||
AND branch_id = {branchId:String}
|
||||
AND user_id IS NOT NULL
|
||||
AND event_at >= {since:DateTime}
|
||||
AND event_at < {untilExclusive:DateTime}
|
||||
GROUP BY user_id
|
||||
) AS token_refresh_users
|
||||
@ -398,7 +399,7 @@ const NON_ANON_FILTER_AFTER = "({includeAnonymous:UInt8} = 1 OR coalesce(CAST(e.
|
||||
const ROUTE_QUERIES_BEFORE: RouteQuery[] = [
|
||||
{
|
||||
name: "loadUsersByCountry",
|
||||
desc: "argMax country per user over all $token-refresh events (no window)",
|
||||
desc: "argMax country per user over all $token-refresh events (NO time window)",
|
||||
sql: `
|
||||
SELECT
|
||||
country_code,
|
||||
@ -412,7 +413,7 @@ const ROUTE_QUERIES_BEFORE: RouteQuery[] = [
|
||||
user_id,
|
||||
event_at,
|
||||
CAST(data.ip_info.country_code, 'Nullable(String)') AS cc,
|
||||
CAST(data.is_anonymous, 'UInt8') AS is_anonymous
|
||||
coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) AS is_anonymous
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh'
|
||||
AND project_id = {projectId:String}
|
||||
@ -676,8 +677,40 @@ function splitSqlAfter(idCol: "user_id" | "team_id", withAnonFilter: boolean): s
|
||||
}
|
||||
|
||||
const ROUTE_QUERIES_AFTER: RouteQuery[] = [
|
||||
// Unchanged by fix 1/3 (already uses CAST).
|
||||
ROUTE_QUERIES_BEFORE[0], // loadUsersByCountry
|
||||
{
|
||||
name: "loadUsersByCountry",
|
||||
desc: "30-day window added (was unbounded scan)",
|
||||
sql: `
|
||||
SELECT
|
||||
country_code,
|
||||
count() AS userCount
|
||||
FROM (
|
||||
SELECT
|
||||
user_id,
|
||||
argMax(cc, event_at) AS country_code
|
||||
FROM (
|
||||
SELECT
|
||||
user_id,
|
||||
event_at,
|
||||
CAST(data.ip_info.country_code, 'Nullable(String)') AS cc,
|
||||
coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) AS is_anonymous
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh'
|
||||
AND project_id = {projectId:String}
|
||||
AND branch_id = {branchId:String}
|
||||
AND user_id IS NOT NULL
|
||||
AND event_at >= {since:DateTime}
|
||||
AND event_at < {untilExclusive:DateTime}
|
||||
)
|
||||
WHERE cc IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR is_anonymous = 0)
|
||||
GROUP BY user_id
|
||||
)
|
||||
WHERE country_code IS NOT NULL
|
||||
GROUP BY country_code
|
||||
ORDER BY userCount DESC
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "loadDailyActiveUsers",
|
||||
desc: "DAU per day (fix 1: CAST instead of JSONExtract)",
|
||||
@ -1037,8 +1070,368 @@ const ROUTE_QUERIES_OPTIMIZED: RouteQuery[] = [
|
||||
},
|
||||
];
|
||||
|
||||
async function runRouteQuery(rq: RouteQuery, p: QueryParams, now: Date): Promise<string> {
|
||||
// ── Backfill comparison (BENCH_BACKFILL_COMPARE=1) ──────────────────────────
|
||||
// Compares post-backfill query shapes: A (bounded join, no backfill), B/C/E
|
||||
// (drop join, classify from data.is_anonymous), D (drop join, classify from
|
||||
// a top-level column). B/C/E produce identical query SQL; they differ only in
|
||||
// how the data gets stamped.
|
||||
|
||||
// Scoped to RUN_ID so concurrent runs / SIGKILL-leaked columns don't collide.
|
||||
const BENCH_OPTION_D_COLUMN = `bench_is_anon_d_${RUN_ID.replace(/-/g, "_")}`;
|
||||
|
||||
const analyticsUserJoinBounded = `
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
user_id,
|
||||
argMax(coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0), event_at) AS latest_is_anonymous
|
||||
FROM analytics_internal.events
|
||||
WHERE event_type = '$token-refresh'
|
||||
AND project_id = {projectId:String}
|
||||
AND branch_id = {branchId:String}
|
||||
AND user_id IS NOT NULL
|
||||
AND event_at >= {since:DateTime} -- ★ option A's lower bound
|
||||
AND event_at < {untilExclusive:DateTime}
|
||||
GROUP BY user_id
|
||||
) AS token_refresh_users
|
||||
ON e.user_id = token_refresh_users.user_id
|
||||
`;
|
||||
|
||||
const ROUTE_QUERIES_BACKFILL_A: RouteQuery[] = [
|
||||
{
|
||||
name: "analyticsOverview:dailyEvents",
|
||||
desc: "A: bounded LEFT JOIN (event_at >= since on inner)",
|
||||
sql: `
|
||||
SELECT
|
||||
toDate(e.event_at) AS day,
|
||||
countIf(
|
||||
e.event_type = '$page-view' AND e.user_id IS NOT NULL
|
||||
AND ${NON_ANON_FILTER_AFTER}
|
||||
) AS pv,
|
||||
countIf(
|
||||
e.event_type = '$click' AND e.user_id IS NOT NULL
|
||||
AND ${NON_ANON_FILTER_AFTER}
|
||||
) AS cl,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.event_type = '$page-view' AND e.user_id IS NOT NULL
|
||||
AND ${NON_ANON_FILTER_AFTER}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${analyticsUserJoinBounded}
|
||||
WHERE e.event_type IN ('$page-view', '$click')
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY day
|
||||
ORDER BY day ASC
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:totalVisitors",
|
||||
desc: "A: bounded LEFT JOIN (event_at >= since on inner)",
|
||||
sql: `
|
||||
SELECT
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL AND ${NON_ANON_FILTER_AFTER}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${analyticsUserJoinBounded}
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.user_id IS NOT NULL
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:topReferrers",
|
||||
desc: "A: bounded LEFT JOIN (event_at >= since on inner)",
|
||||
sql: `
|
||||
SELECT
|
||||
nullIf(CAST(e.data.referrer, 'String'), '') AS referrer,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL AND ${NON_ANON_FILTER_AFTER}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${analyticsUserJoinBounded}
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY referrer
|
||||
HAVING visitors > 0
|
||||
ORDER BY visitors DESC
|
||||
LIMIT 100
|
||||
`,
|
||||
},
|
||||
];
|
||||
|
||||
// Options B/C/E all collapse to the same post-backfill SQL shape: drop the
|
||||
// LEFT JOIN entirely and trust e.data.is_anonymous (the field that the
|
||||
// ingestion fix will populate). The OPTIMIZED array already contains exactly
|
||||
// these queries — reuse them so we have a single source of truth.
|
||||
const ROUTE_QUERIES_BACKFILL_BCE: RouteQuery[] = ROUTE_QUERIES_OPTIMIZED.filter((q) =>
|
||||
q.name === "analyticsOverview:dailyEvents"
|
||||
|| q.name === "analyticsOverview:totalVisitors"
|
||||
|| q.name === "analyticsOverview:topReferrers",
|
||||
);
|
||||
|
||||
const ROUTE_QUERIES_BACKFILL_D: RouteQuery[] = [
|
||||
{
|
||||
name: "analyticsOverview:dailyEvents",
|
||||
desc: "D: drop join, top-level UInt8 column (no JSON parse)",
|
||||
sql: `
|
||||
SELECT
|
||||
toDate(e.event_at) AS day,
|
||||
countIf(
|
||||
e.event_type = '$page-view' AND e.user_id IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR coalesce(e.${BENCH_OPTION_D_COLUMN}, 0) = 0)
|
||||
) AS pv,
|
||||
countIf(
|
||||
e.event_type = '$click' AND e.user_id IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR coalesce(e.${BENCH_OPTION_D_COLUMN}, 0) = 0)
|
||||
) AS cl,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.event_type = '$page-view' AND e.user_id IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR coalesce(e.${BENCH_OPTION_D_COLUMN}, 0) = 0)
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
WHERE e.event_type IN ('$page-view', '$click')
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY day
|
||||
ORDER BY day ASC
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:totalVisitors",
|
||||
desc: "D: drop join, top-level UInt8 column (no JSON parse)",
|
||||
sql: `
|
||||
SELECT
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR coalesce(e.${BENCH_OPTION_D_COLUMN}, 0) = 0)
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.user_id IS NOT NULL
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:topReferrers",
|
||||
desc: "D: drop join, top-level UInt8 column (no JSON parse)",
|
||||
sql: `
|
||||
SELECT
|
||||
nullIf(CAST(e.data.referrer, 'String'), '') AS referrer,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR coalesce(e.${BENCH_OPTION_D_COLUMN}, 0) = 0)
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY referrer
|
||||
HAVING visitors > 0
|
||||
ORDER BY visitors DESC
|
||||
LIMIT 100
|
||||
`,
|
||||
},
|
||||
];
|
||||
|
||||
async function ensureOptionDColumn(): Promise<void> {
|
||||
const client = getClickhouseAdminClient();
|
||||
const res = await client.query({
|
||||
query: `
|
||||
SELECT count() AS c FROM system.columns
|
||||
WHERE database = 'analytics_internal'
|
||||
AND table = 'events'
|
||||
AND name = {col:String}
|
||||
`,
|
||||
query_params: { col: BENCH_OPTION_D_COLUMN },
|
||||
format: "JSONEachRow",
|
||||
});
|
||||
const rows = (await res.json()) as { c: string | number }[];
|
||||
if (Number(rows[0]?.c ?? 0) === 0) {
|
||||
await client.command({
|
||||
query: `
|
||||
ALTER TABLE analytics_internal.events
|
||||
ADD COLUMN IF NOT EXISTS ${BENCH_OPTION_D_COLUMN} Nullable(UInt8)
|
||||
`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function populateOptionDColumn(): Promise<void> {
|
||||
const client = getClickhouseAdminClient();
|
||||
await client.command({
|
||||
query: `
|
||||
ALTER TABLE analytics_internal.events
|
||||
UPDATE ${BENCH_OPTION_D_COLUMN} = CAST(data.is_anonymous, 'Nullable(UInt8)')
|
||||
WHERE project_id = {projectId:String}
|
||||
`,
|
||||
query_params: { projectId: BENCH_PROJECT_ID },
|
||||
clickhouse_settings: { mutations_sync: "2" },
|
||||
});
|
||||
}
|
||||
|
||||
async function dropOptionDColumn(): Promise<void> {
|
||||
const client = getClickhouseAdminClient();
|
||||
await client.command({
|
||||
query: `ALTER TABLE analytics_internal.events DROP COLUMN IF EXISTS ${BENCH_OPTION_D_COLUMN}`,
|
||||
});
|
||||
}
|
||||
|
||||
// Pulls only the 3 analyticsOverview queries that touch the join from the
|
||||
// current (post-fixes-1+3) shipped SQL, so we have a baseline to compare against.
|
||||
const ROUTE_QUERIES_BACKFILL_BASELINE: RouteQuery[] = ROUTE_QUERIES_AFTER.filter((q) =>
|
||||
q.name === "analyticsOverview:dailyEvents"
|
||||
|| q.name === "analyticsOverview:totalVisitors"
|
||||
|| q.name === "analyticsOverview:topReferrers",
|
||||
);
|
||||
|
||||
async function benchmarkBackfillCompare(now: Date): Promise<void> {
|
||||
const untilExclusive = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate()) + ONE_DAY_MS);
|
||||
const since = new Date(untilExclusive.getTime() - METRICS_WINDOW_MS);
|
||||
const params: QueryParams = {
|
||||
projectId: BENCH_PROJECT_ID,
|
||||
branchId: PERF_BRANCH_ID,
|
||||
since,
|
||||
untilExclusive,
|
||||
includeAnonymous: false,
|
||||
};
|
||||
|
||||
console.log("\n── Backfill option comparison (post-backfill query memory) ──");
|
||||
console.log(" Each option leaves the metrics queries in one of three shapes:");
|
||||
console.log(" Today / A (bounded join) / B,C,E (drop join, JSON) / D (drop join, top-level column)\n");
|
||||
|
||||
// Set up option D's column.
|
||||
console.log(" Setting up option D top-level column…");
|
||||
await ensureOptionDColumn();
|
||||
await populateOptionDColumn();
|
||||
console.log(" done.\n");
|
||||
|
||||
async function runShape(label: string, list: RouteQuery[]): Promise<Map<string, QueryStats>> {
|
||||
const out = new Map<string, QueryStats>();
|
||||
for (const rq of list) {
|
||||
const qid = await runRouteQuery(rq, params, now);
|
||||
out.set(rq.name, await readStats(qid));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Warm cache once with a tiny query.
|
||||
await runRouteQuery(ROUTE_QUERIES_BEFORE[1], params, now);
|
||||
|
||||
const baseline = await runShape("today", ROUTE_QUERIES_BACKFILL_BASELINE);
|
||||
const a = await runShape("A", ROUTE_QUERIES_BACKFILL_A);
|
||||
const bce = await runShape("B/C/E", ROUTE_QUERIES_BACKFILL_BCE);
|
||||
const d = await runShape("D", ROUTE_QUERIES_BACKFILL_D);
|
||||
|
||||
const padL = (s: string, n: number) => s.padEnd(n);
|
||||
const padR = (s: string, n: number) => s.padStart(n);
|
||||
const mem = (n: number | undefined) => n == null ? "—" : padR(fmtBytes(n), 12);
|
||||
const dur = (n: number | undefined) => n == null ? "—" : padR(`${n} ms`, 9);
|
||||
|
||||
const queryNames = ["analyticsOverview:dailyEvents", "analyticsOverview:totalVisitors", "analyticsOverview:topReferrers"];
|
||||
|
||||
console.log(" Peak memory per query, per backfill shape:");
|
||||
console.log([
|
||||
padL("query", 36),
|
||||
padR("Today", 12),
|
||||
padR("A: bounded", 12),
|
||||
padR("B/C/E: drop", 12),
|
||||
padR("D: column", 12),
|
||||
].join(" "));
|
||||
console.log(" " + "─".repeat(96));
|
||||
for (const name of queryNames) {
|
||||
const t = baseline.get(name);
|
||||
const av = a.get(name);
|
||||
const bv = bce.get(name);
|
||||
const dv = d.get(name);
|
||||
console.log([
|
||||
" " + padL(name, 34),
|
||||
mem(t?.memory_usage),
|
||||
mem(av?.memory_usage),
|
||||
mem(bv?.memory_usage),
|
||||
mem(dv?.memory_usage),
|
||||
].join(" "));
|
||||
}
|
||||
|
||||
// Totals
|
||||
const sum = (m: Map<string, QueryStats>) =>
|
||||
queryNames.reduce((acc, n) => acc + (m.get(n)?.memory_usage ?? 0), 0);
|
||||
const tSum = sum(baseline);
|
||||
const aSum = sum(a);
|
||||
const bSum = sum(bce);
|
||||
const dSum = sum(d);
|
||||
|
||||
console.log(" " + "─".repeat(96));
|
||||
console.log([
|
||||
" " + padL("SUM peak memory", 34),
|
||||
padR(fmtBytes(tSum), 12),
|
||||
padR(fmtBytes(aSum), 12),
|
||||
padR(fmtBytes(bSum), 12),
|
||||
padR(fmtBytes(dSum), 12),
|
||||
].join(" "));
|
||||
|
||||
const ratio = (s: number) => s === 0 ? "—" : `${(tSum / s).toFixed(2)}× less`;
|
||||
console.log([
|
||||
" " + padL("vs. Today", 34),
|
||||
padR("—", 12),
|
||||
padR(ratio(aSum), 12),
|
||||
padR(ratio(bSum), 12),
|
||||
padR(ratio(dSum), 12),
|
||||
].join(" "));
|
||||
|
||||
// Duration too
|
||||
console.log("\n Query duration per shape:");
|
||||
console.log([
|
||||
padL("query", 36),
|
||||
padR("Today", 9),
|
||||
padR("A", 9),
|
||||
padR("B/C/E", 9),
|
||||
padR("D", 9),
|
||||
].join(" "));
|
||||
console.log(" " + "─".repeat(84));
|
||||
for (const name of queryNames) {
|
||||
console.log([
|
||||
" " + padL(name, 34),
|
||||
dur(baseline.get(name)?.query_duration_ms),
|
||||
dur(a.get(name)?.query_duration_ms),
|
||||
dur(bce.get(name)?.query_duration_ms),
|
||||
dur(d.get(name)?.query_duration_ms),
|
||||
].join(" "));
|
||||
}
|
||||
|
||||
console.log("\n Notes:");
|
||||
console.log(" • B, C, E all produce the same query SQL; their column above is one number.");
|
||||
console.log(" • The semantic differences (argMax-latest vs ASOF vs partition-swap) live");
|
||||
console.log(" in the backfill operation, not in the query that runs afterwards.");
|
||||
console.log(" • Option D mutates a stored column instead of the JSON field; the runtime");
|
||||
console.log(" win comes from skipping per-row JSON access.");
|
||||
}
|
||||
|
||||
async function runRouteQuery(rq: RouteQuery, p: QueryParams, now: Date, opts: { useMetricsClient?: boolean } = {}): Promise<string> {
|
||||
// `useMetricsClient` applies the route.tsx connection-level SETTINGS so AFTER
|
||||
// measurements reflect what actually ships, not the raw SQL.
|
||||
const client = opts.useMetricsClient ? getClickhouseAdminClientForMetrics() : getClickhouseAdminClient();
|
||||
const queryId = `bench-route-${rq.name.replace(/[^a-z0-9]/gi, "-")}-${randomUUID()}`;
|
||||
const baseParams: Record<string, unknown> = {
|
||||
projectId: p.projectId,
|
||||
@ -1058,6 +1451,285 @@ async function runRouteQuery(rq: RouteQuery, p: QueryParams, now: Date): Promise
|
||||
return queryId;
|
||||
}
|
||||
|
||||
// ── Join algorithm comparison (BENCH_JOIN_ALGO_COMPARE=1) ─────────────────────
|
||||
// Runs each of ClickHouse's 6 join_algorithm values against the bounded and
|
||||
// unbounded analyticsUserJoin shapes. Algorithms that don't apply (e.g.
|
||||
// `direct` without a Dictionary right side) error and show as ERR.
|
||||
|
||||
const ANALYTICS_OVERVIEW_QUERY_NAMES = [
|
||||
"analyticsOverview:dailyEvents",
|
||||
"analyticsOverview:totalVisitors",
|
||||
"analyticsOverview:topReferrers",
|
||||
] as const;
|
||||
|
||||
const JOIN_ALGORITHMS = [
|
||||
"default",
|
||||
"direct",
|
||||
"hash",
|
||||
"parallel_hash",
|
||||
"grace_hash",
|
||||
"full_sorting_merge",
|
||||
"partial_merge",
|
||||
] as const;
|
||||
type JoinAlgorithm = typeof JOIN_ALGORITHMS[number];
|
||||
|
||||
// Build the 3 analyticsOverview queries with the given join SQL, filter SQL,
|
||||
// and a per-query SETTINGS clause (used to force `join_algorithm`).
|
||||
function buildAnalyticsOverviewVariant(opts: {
|
||||
joinSql: string,
|
||||
nonAnonFilter: string,
|
||||
joinAlgorithm: JoinAlgorithm,
|
||||
}): RouteQuery[] {
|
||||
const settings = opts.joinAlgorithm === "default"
|
||||
? ""
|
||||
: `SETTINGS join_algorithm = '${opts.joinAlgorithm}'`;
|
||||
return [
|
||||
{
|
||||
name: "analyticsOverview:dailyEvents",
|
||||
desc: `analyticsOverview daily events (join_algorithm=${opts.joinAlgorithm})`,
|
||||
sql: `
|
||||
SELECT
|
||||
toDate(e.event_at) AS day,
|
||||
countIf(
|
||||
e.event_type = '$page-view'
|
||||
AND e.user_id IS NOT NULL
|
||||
AND ${opts.nonAnonFilter}
|
||||
) AS pv,
|
||||
countIf(
|
||||
e.event_type = '$click'
|
||||
AND e.user_id IS NOT NULL
|
||||
AND ${opts.nonAnonFilter}
|
||||
) AS cl,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.event_type = '$page-view'
|
||||
AND e.user_id IS NOT NULL
|
||||
AND ${opts.nonAnonFilter}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${opts.joinSql}
|
||||
WHERE e.event_type IN ('$page-view', '$click')
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY day
|
||||
ORDER BY day ASC
|
||||
${settings}
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:totalVisitors",
|
||||
desc: `analyticsOverview total visitors (join_algorithm=${opts.joinAlgorithm})`,
|
||||
sql: `
|
||||
SELECT
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL
|
||||
AND ${opts.nonAnonFilter}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${opts.joinSql}
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.user_id IS NOT NULL
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
${settings}
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyticsOverview:topReferrers",
|
||||
desc: `analyticsOverview top referrers (join_algorithm=${opts.joinAlgorithm})`,
|
||||
sql: `
|
||||
SELECT
|
||||
nullIf(CAST(e.data.referrer, 'String'), '') AS referrer,
|
||||
uniqExactIf(
|
||||
assumeNotNull(e.user_id),
|
||||
e.user_id IS NOT NULL
|
||||
AND ${opts.nonAnonFilter}
|
||||
) AS visitors
|
||||
FROM analytics_internal.events AS e
|
||||
${opts.joinSql}
|
||||
WHERE e.event_type = '$page-view'
|
||||
AND e.project_id = {projectId:String}
|
||||
AND e.branch_id = {branchId:String}
|
||||
AND e.event_at >= {since:DateTime}
|
||||
AND e.event_at < {untilExclusive:DateTime}
|
||||
GROUP BY referrer
|
||||
HAVING visitors > 0
|
||||
ORDER BY visitors DESC
|
||||
LIMIT 100
|
||||
${settings}
|
||||
`,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
// Run a query; on failure log the error and return null so the join-algo
|
||||
// matrix can show ERR for shapes a given algorithm doesn't support (e.g.
|
||||
// `direct` without a Dictionary right side).
|
||||
async function tryRunAndReadStats(rq: RouteQuery, p: QueryParams, now: Date): Promise<QueryStats | null> {
|
||||
try {
|
||||
const qid = await runRouteQuery(rq, p, now);
|
||||
return await readStats(qid);
|
||||
} catch (e) {
|
||||
console.warn(` [bench] query "${rq.name}" failed: ${e instanceof Error ? e.message : String(e)}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function benchmarkJoinAlgorithms(now: Date): Promise<void> {
|
||||
const untilExclusive = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate()) + ONE_DAY_MS);
|
||||
const since = new Date(untilExclusive.getTime() - METRICS_WINDOW_MS);
|
||||
const params: QueryParams = {
|
||||
projectId: BENCH_PROJECT_ID,
|
||||
branchId: PERF_BRANCH_ID,
|
||||
since,
|
||||
untilExclusive,
|
||||
includeAnonymous: false,
|
||||
};
|
||||
|
||||
console.log("\n── Join algorithm comparison (6 algorithms × 2 cases) ──");
|
||||
console.log(" Cases:");
|
||||
console.log(" normal = bounded analyticsUserJoin (the SQL we just shipped)");
|
||||
console.log(" heavy = UNBOUNDED analyticsUserJoin (pre-fix; what caused the Sentry OOM)");
|
||||
console.log("");
|
||||
|
||||
// Warm cache once.
|
||||
await runRouteQuery(
|
||||
buildAnalyticsOverviewVariant({
|
||||
joinSql: ANALYTICS_USER_JOIN_AFTER,
|
||||
nonAnonFilter: NON_ANON_FILTER_AFTER,
|
||||
joinAlgorithm: "default",
|
||||
})[0],
|
||||
params,
|
||||
now,
|
||||
);
|
||||
|
||||
type CaseName = "normal" | "heavy";
|
||||
const cases: Record<CaseName, { joinSql: string, nonAnonFilter: string }> = {
|
||||
normal: { joinSql: ANALYTICS_USER_JOIN_AFTER, nonAnonFilter: NON_ANON_FILTER_AFTER },
|
||||
heavy: { joinSql: ANALYTICS_USER_JOIN, nonAnonFilter: NON_ANON_FILTER },
|
||||
};
|
||||
|
||||
type StatsByCaseByAlgo = Record<CaseName, Record<JoinAlgorithm, Map<string, QueryStats | null>>>;
|
||||
const stats: StatsByCaseByAlgo = {
|
||||
normal: {} as Record<JoinAlgorithm, Map<string, QueryStats | null>>,
|
||||
heavy: {} as Record<JoinAlgorithm, Map<string, QueryStats | null>>,
|
||||
};
|
||||
|
||||
for (const caseName of ["normal", "heavy"] as const) {
|
||||
for (const algo of JOIN_ALGORITHMS) {
|
||||
const queries = buildAnalyticsOverviewVariant({
|
||||
joinSql: cases[caseName].joinSql,
|
||||
nonAnonFilter: cases[caseName].nonAnonFilter,
|
||||
joinAlgorithm: algo,
|
||||
});
|
||||
const out = new Map<string, QueryStats | null>();
|
||||
for (const rq of queries) {
|
||||
out.set(rq.name, await tryRunAndReadStats(rq, params, now));
|
||||
}
|
||||
stats[caseName][algo] = out;
|
||||
}
|
||||
}
|
||||
|
||||
const padL = (s: string, n: number) => s.padEnd(n);
|
||||
const padR = (s: string, n: number) => s.padStart(n);
|
||||
const memCell = (s: QueryStats | null | undefined) =>
|
||||
s == null ? padR("ERR", 11) : padR(fmtBytes(s.memory_usage), 11);
|
||||
const durCell = (s: QueryStats | null | undefined) =>
|
||||
s == null ? padR("ERR", 9) : padR(`${s.query_duration_ms} ms`, 9);
|
||||
|
||||
function printCaseTable(caseName: CaseName): void {
|
||||
console.log(`\n ── ${caseName.toUpperCase()} case (${caseName === "normal" ? "bounded join" : "UNBOUNDED join"}) ──`);
|
||||
// Header row
|
||||
console.log([
|
||||
padL("query", 32),
|
||||
...JOIN_ALGORITHMS.map((a) => padR(a, 11)),
|
||||
].join(" "));
|
||||
console.log(" " + "─".repeat(32 + JOIN_ALGORITHMS.length * 13));
|
||||
// Per-query memory
|
||||
for (const name of ANALYTICS_OVERVIEW_QUERY_NAMES) {
|
||||
console.log([
|
||||
" " + padL(name, 30),
|
||||
...JOIN_ALGORITHMS.map((a) => memCell(stats[caseName][a].get(name))),
|
||||
].join(" "));
|
||||
}
|
||||
// Sum memory row
|
||||
console.log(" " + "─".repeat(32 + JOIN_ALGORITHMS.length * 13));
|
||||
console.log([
|
||||
" " + padL("SUM peak memory", 30),
|
||||
...JOIN_ALGORITHMS.map((a) => {
|
||||
const sum = ANALYTICS_OVERVIEW_QUERY_NAMES.reduce((acc, n) => {
|
||||
const s = stats[caseName][a].get(n);
|
||||
return s == null ? acc : acc + s.memory_usage;
|
||||
}, 0);
|
||||
const anyErr = ANALYTICS_OVERVIEW_QUERY_NAMES.some((n) => stats[caseName][a].get(n) == null);
|
||||
return anyErr ? padR("partial", 11) : padR(fmtBytes(sum), 11);
|
||||
}),
|
||||
].join(" "));
|
||||
// Sum duration row
|
||||
console.log([
|
||||
" " + padL("SUM duration", 30),
|
||||
...JOIN_ALGORITHMS.map((a) => {
|
||||
const sum = ANALYTICS_OVERVIEW_QUERY_NAMES.reduce((acc, n) => {
|
||||
const s = stats[caseName][a].get(n);
|
||||
return s == null ? acc : acc + s.query_duration_ms;
|
||||
}, 0);
|
||||
const anyErr = ANALYTICS_OVERVIEW_QUERY_NAMES.some((n) => stats[caseName][a].get(n) == null);
|
||||
return anyErr ? padR("partial", 11) : padR(`${sum} ms`, 11);
|
||||
}),
|
||||
].join(" "));
|
||||
}
|
||||
|
||||
printCaseTable("normal");
|
||||
printCaseTable("heavy");
|
||||
|
||||
// Find the best algorithm per case by memory and by duration
|
||||
function bestByMemory(caseName: CaseName): { algo: JoinAlgorithm, mem: number } | null {
|
||||
let best: { algo: JoinAlgorithm, mem: number } | null = null;
|
||||
for (const a of JOIN_ALGORITHMS) {
|
||||
const sum = ANALYTICS_OVERVIEW_QUERY_NAMES.reduce((acc, n) => {
|
||||
const s = stats[caseName][a].get(n);
|
||||
return s == null ? acc : acc + s.memory_usage;
|
||||
}, 0);
|
||||
const anyErr = ANALYTICS_OVERVIEW_QUERY_NAMES.some((n) => stats[caseName][a].get(n) == null);
|
||||
if (anyErr) continue;
|
||||
if (best == null || sum < best.mem) best = { algo: a, mem: sum };
|
||||
}
|
||||
return best;
|
||||
}
|
||||
function bestByDuration(caseName: CaseName): { algo: JoinAlgorithm, dur: number } | null {
|
||||
let best: { algo: JoinAlgorithm, dur: number } | null = null;
|
||||
for (const a of JOIN_ALGORITHMS) {
|
||||
const sum = ANALYTICS_OVERVIEW_QUERY_NAMES.reduce((acc, n) => {
|
||||
const s = stats[caseName][a].get(n);
|
||||
return s == null ? acc : acc + s.query_duration_ms;
|
||||
}, 0);
|
||||
const anyErr = ANALYTICS_OVERVIEW_QUERY_NAMES.some((n) => stats[caseName][a].get(n) == null);
|
||||
if (anyErr) continue;
|
||||
if (best == null || sum < best.dur) best = { algo: a, dur: sum };
|
||||
}
|
||||
return best;
|
||||
}
|
||||
|
||||
console.log("\n Headlines:");
|
||||
for (const c of ["normal", "heavy"] as const) {
|
||||
const bm = bestByMemory(c);
|
||||
const bd = bestByDuration(c);
|
||||
if (bm) console.log(` ${c.padEnd(7)} | best memory: ${bm.algo.padEnd(20)} (${fmtBytes(bm.mem)})`);
|
||||
if (bd) console.log(` ${c.padEnd(7)} | best speed: ${bd.algo.padEnd(20)} (${bd.dur} ms)`);
|
||||
}
|
||||
console.log("\n Notes:");
|
||||
console.log(" • ERR / partial = the algorithm errored or didn't apply for that query shape");
|
||||
console.log(" (typical for `direct` which requires a Dictionary right-hand side).");
|
||||
console.log(" • The OOM bottleneck is the inner GROUP BY user_id aggregation, not the");
|
||||
console.log(" join's hash table — algorithms that only target the join (everything");
|
||||
console.log(" except sorting-merge variants) cap out at ~modest savings on the heavy case.");
|
||||
}
|
||||
|
||||
async function benchmarkRouteQueries(now: Date): Promise<void> {
|
||||
const untilExclusive = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate()) + ONE_DAY_MS);
|
||||
const since = new Date(untilExclusive.getTime() - METRICS_WINDOW_MS);
|
||||
@ -1083,13 +1755,13 @@ async function benchmarkRouteQueries(now: Date): Promise<void> {
|
||||
return out;
|
||||
}
|
||||
|
||||
// Also capture the actual row payload so we can check correctness for OPT
|
||||
// variants (e.g., dropping the LEFT JOIN on analyticsOverview must not change counts).
|
||||
async function runAndCollect(list: RouteQuery[]): Promise<{ stats: Map<string, QueryStats>, payloads: Map<string, unknown[]> }> {
|
||||
// Also capture row payloads to spot-check OPT variants (e.g. dropping the
|
||||
// LEFT JOIN must not change counts).
|
||||
async function runAndCollect(list: RouteQuery[], opts: { useMetricsClient?: boolean } = {}): Promise<{ stats: Map<string, QueryStats>, payloads: Map<string, unknown[]> }> {
|
||||
const stats = new Map<string, QueryStats>();
|
||||
const payloads = new Map<string, unknown[]>();
|
||||
for (const rq of list) {
|
||||
const client = getClickhouseAdminClient();
|
||||
const client = opts.useMetricsClient ? getClickhouseAdminClientForMetrics() : getClickhouseAdminClient();
|
||||
const queryId = `bench-route-${rq.name.replace(/[^a-z0-9]/gi, "-")}-${randomUUID()}`;
|
||||
const baseParams: Record<string, unknown> = {
|
||||
projectId: params.projectId,
|
||||
@ -1114,8 +1786,8 @@ async function benchmarkRouteQueries(now: Date): Promise<void> {
|
||||
}
|
||||
|
||||
const before = await runAndCollect(ROUTE_QUERIES_BEFORE);
|
||||
const after = await runAndCollect(ROUTE_QUERIES_AFTER);
|
||||
const opt = await runAndCollect(ROUTE_QUERIES_OPTIMIZED);
|
||||
const after = await runAndCollect(ROUTE_QUERIES_AFTER, { useMetricsClient: true });
|
||||
const opt = await runAndCollect(ROUTE_QUERIES_OPTIMIZED, { useMetricsClient: true });
|
||||
const beforeStats = before.stats;
|
||||
const afterStats = after.stats;
|
||||
|
||||
@ -1280,7 +1952,8 @@ type QueryStats = {
|
||||
async function readStats(queryId: string): Promise<QueryStats> {
|
||||
const client = getClickhouseAdminClient();
|
||||
await client.command({ query: "SYSTEM FLUSH LOGS" });
|
||||
const delays = [100, 200, 400, 800, 1600];
|
||||
// ~12.7s budget; the async query_log flush can lag at 300k-user scale.
|
||||
const delays = [100, 200, 400, 800, 1600, 3200, 6400];
|
||||
for (let i = 0; i <= delays.length; i++) {
|
||||
const res = await client.query({
|
||||
query: `
|
||||
@ -1337,6 +2010,13 @@ async function cleanup(): Promise<void> {
|
||||
// Block until the mutation is applied so the script exits clean.
|
||||
clickhouse_settings: { mutations_sync: "2" },
|
||||
});
|
||||
// Best-effort: drop the option-D bench column if we added it. Safe to run
|
||||
// unconditionally because of the IF EXISTS guard.
|
||||
try {
|
||||
await dropOptionDColumn();
|
||||
} catch (e) {
|
||||
console.error(" (could not drop option-D column:", e, ")");
|
||||
}
|
||||
}
|
||||
|
||||
// ── Edge-case matrix ─────────────────────────────────────────────────────────
|
||||
@ -1607,9 +2287,15 @@ async function seedPerf(now: Date): Promise<void> {
|
||||
);
|
||||
|
||||
const batchRows = envInt("BENCH_BATCH", 50_000);
|
||||
// BENCH_HISTORICAL_DAYS extends the seed beyond the 30-day metrics window so
|
||||
// unbounded-scan queries read more rows than windowed ones (default 365).
|
||||
const historicalDays = envInt("BENCH_HISTORICAL_DAYS", 365);
|
||||
const windowEnd = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate()) + ONE_DAY_MS);
|
||||
const windowStart = new Date(windowEnd.getTime() - METRICS_WINDOW_MS);
|
||||
const windowStart = new Date(windowEnd.getTime() - historicalDays * 24 * 60 * 60 * 1000);
|
||||
const spanMs = windowEnd.getTime() - windowStart.getTime();
|
||||
if (historicalDays !== METRICS_WINDOW_DAYS) {
|
||||
console.log(` (event_at spans last ${historicalDays} days; only the most recent ${METRICS_WINDOW_DAYS} fall inside the metrics window)`);
|
||||
}
|
||||
const teamIds: string[] = Array.from({ length: teamCount }, () => mkUuid());
|
||||
|
||||
const t0 = Date.now();
|
||||
@ -1815,10 +2501,14 @@ async function main(): Promise<void> {
|
||||
|
||||
const doPerf = matrixOk && !envBool("BENCH_SKIP_PERF");
|
||||
const doRouteQueries = matrixOk && envBool("BENCH_ROUTE_QUERIES");
|
||||
if (doPerf || doRouteQueries) {
|
||||
const doBackfillCompare = matrixOk && envBool("BENCH_BACKFILL_COMPARE");
|
||||
const doJoinAlgoCompare = matrixOk && (envBool("BENCH_JOIN_ALGO_COMPARE") || envBool("BENCH_GRACE_HASH_COMPARE"));
|
||||
if (doPerf || doRouteQueries || doBackfillCompare || doJoinAlgoCompare) {
|
||||
await seedPerf(now);
|
||||
if (doPerf) await runPerf(now);
|
||||
if (doRouteQueries) await benchmarkRouteQueries(now);
|
||||
if (doBackfillCompare) await benchmarkBackfillCompare(now);
|
||||
if (doJoinAlgoCompare) await benchmarkJoinAlgorithms(now);
|
||||
} else if (envBool("BENCH_SKIP_PERF")) {
|
||||
console.log("Skipping perf run (BENCH_SKIP_PERF=1)");
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { Prisma } from "@/generated/prisma/client";
|
||||
import { EmailOutboxSimpleStatus } from "@/generated/prisma/enums";
|
||||
import { getClickhouseAdminClient } from "@/lib/clickhouse";
|
||||
import { getClickhouseAdminClientForMetrics } from "@/lib/clickhouse";
|
||||
import { ClickHouseError } from "@clickhouse/client";
|
||||
import { ActivitySplit } from "@/lib/metrics-activity-split";
|
||||
import { Tenancy } from "@/lib/tenancies";
|
||||
@ -70,8 +70,11 @@ function normalizeUuidFromEvent(value: string): string | null {
|
||||
// ClickHouse `match()` uses re2; pattern matches UUID_V4_JS_RE.source.
|
||||
const MAU_UUID_V4_REGEX = UUID_V4_JS_RE.source;
|
||||
|
||||
async function loadUsersByCountry(tenancy: Tenancy, includeAnonymous: boolean = false): Promise<Record<string, number>> {
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
async function loadUsersByCountry(tenancy: Tenancy, now: Date, includeAnonymous: boolean = false): Promise<Record<string, number>> {
|
||||
// Without the 30-day bound the inner GROUP BY materializes one row per
|
||||
// ever-seen user for the tenant.
|
||||
const { since, untilExclusive } = getMetricsWindowBounds(now);
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
const res = await clickhouseClient.query({
|
||||
query: `
|
||||
SELECT
|
||||
@ -92,6 +95,8 @@ async function loadUsersByCountry(tenancy: Tenancy, includeAnonymous: boolean =
|
||||
AND project_id = {projectId:String}
|
||||
AND branch_id = {branchId:String}
|
||||
AND user_id IS NOT NULL
|
||||
AND event_at >= {since:DateTime}
|
||||
AND event_at < {untilExclusive:DateTime}
|
||||
)
|
||||
WHERE cc IS NOT NULL
|
||||
AND ({includeAnonymous:UInt8} = 1 OR is_anonymous = 0)
|
||||
@ -105,6 +110,8 @@ async function loadUsersByCountry(tenancy: Tenancy, includeAnonymous: boolean =
|
||||
projectId: tenancy.project.id,
|
||||
branchId: tenancy.branchId,
|
||||
includeAnonymous: includeAnonymous ? 1 : 0,
|
||||
since: formatClickhouseDateTimeParam(since),
|
||||
untilExclusive: formatClickhouseDateTimeParam(untilExclusive),
|
||||
},
|
||||
format: "JSONEachRow",
|
||||
});
|
||||
@ -139,7 +146,7 @@ async function loadActiveUsersByCountry(
|
||||
): Promise<Record<string, MetricsRecentUser[]>> {
|
||||
const since = new Date(now.getTime() - ACTIVE_USERS_BY_COUNTRY_WINDOW_MS);
|
||||
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
const res = await clickhouseClient.query({
|
||||
query: `
|
||||
SELECT
|
||||
@ -269,7 +276,7 @@ async function loadLiveUsersCount(
|
||||
const since = new Date(now.getTime() - ACTIVE_USERS_BY_COUNTRY_WINDOW_MS);
|
||||
|
||||
try {
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
const res = await clickhouseClient.query({
|
||||
query: `
|
||||
SELECT uniqExact(user_id) AS live_users
|
||||
@ -347,7 +354,7 @@ async function loadDailyActiveUsers(tenancy: Tenancy, now: Date, includeAnonymou
|
||||
const since = new Date(todayUtc.getTime() - METRICS_WINDOW_MS);
|
||||
const untilExclusive = new Date(todayUtc.getTime() + ONE_DAY_MS);
|
||||
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
const result = await clickhouseClient.query({
|
||||
query: `
|
||||
SELECT
|
||||
@ -412,11 +419,15 @@ async function loadDailyActiveSplitFromClickhouse(options: {
|
||||
? "AND ({includeAnonymous:UInt8} = 1 OR coalesce(CAST(data.is_anonymous, 'Nullable(UInt8)'), 0) = 0)"
|
||||
: "";
|
||||
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
// Note: the inner `assumeNotNull(${idCol}) AS entity_id` must not reuse the
|
||||
// column name, or ClickHouse re-resolves `WHERE ${idCol} IS NOT NULL`
|
||||
// against the alias (assumeNotNull returns '' for NULLs, which passes the
|
||||
// not-null test) and phantom rows slip through.
|
||||
//
|
||||
// The LEFT JOIN's `min(event_at)` subquery below is intentionally unbounded:
|
||||
// bounding it would reclassify entities first seen >30d ago and active today
|
||||
// as "new" instead of "reactivated".
|
||||
const result = await clickhouseClient.query({
|
||||
query: `
|
||||
SELECT
|
||||
@ -552,7 +563,7 @@ async function loadAnonymousVisitorsFromTokenRefresh(
|
||||
now: Date,
|
||||
): Promise<{ dailyVisitors: DataPoints, visitors: number }> {
|
||||
const { since, untilExclusive } = getMetricsWindowBounds(now);
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
|
||||
const query = `
|
||||
SELECT
|
||||
@ -630,7 +641,7 @@ async function loadAnonymousVisitorsFromTokenRefresh(
|
||||
async function loadMonthlyActiveUsers(tenancy: Tenancy, now: Date, includeAnonymous: boolean = false): Promise<number> {
|
||||
const { since, untilExclusive } = getMetricsWindowBounds(now);
|
||||
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
try {
|
||||
const result = await clickhouseClient.query({
|
||||
query: `
|
||||
@ -1038,7 +1049,7 @@ async function loadAnalyticsOverview(tenancy: Tenancy, now: Date, includeAnonymo
|
||||
const since = new Date(todayUtc.getTime() - METRICS_WINDOW_MS);
|
||||
const untilExclusive = new Date(todayUtc.getTime() + ONE_DAY_MS);
|
||||
|
||||
const clickhouseClient = getClickhouseAdminClient();
|
||||
const clickhouseClient = getClickhouseAdminClientForMetrics();
|
||||
|
||||
// Session replay aggregates come from Postgres and have nothing to do with
|
||||
// ClickHouse availability. Run them in parallel with the ClickHouse queries
|
||||
@ -1064,6 +1075,12 @@ async function loadAnalyticsOverview(tenancy: Tenancy, now: Date, includeAnonymo
|
||||
} | null = null;
|
||||
|
||||
try {
|
||||
// The `event_at >= since` bound on the inner subquery is load-bearing:
|
||||
// without it the GROUP BY hash table holds one row per ever-seen user.
|
||||
// Edge case: anonymous page-views by users with no token-refresh in the
|
||||
// last 30 days now coalesce to non-anonymous. The proper fix is to stamp
|
||||
// `is_anonymous` on page-view/click events at ingest and drop this join
|
||||
// entirely (the coalesce below short-circuits on the first non-null arg).
|
||||
const analyticsUserJoin = `
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
@ -1074,6 +1091,7 @@ async function loadAnalyticsOverview(tenancy: Tenancy, now: Date, includeAnonymo
|
||||
AND project_id = {projectId:String}
|
||||
AND branch_id = {branchId:String}
|
||||
AND user_id IS NOT NULL
|
||||
AND event_at >= {since:DateTime}
|
||||
AND event_at < {untilExclusive:DateTime}
|
||||
GROUP BY user_id
|
||||
) AS token_refresh_users
|
||||
@ -1476,7 +1494,7 @@ export const GET = createSmartRouteHandler({
|
||||
] = await Promise.all([
|
||||
loadTotalUsers(req.auth.tenancy, now, includeAnonymous),
|
||||
loadDailyActiveUsers(req.auth.tenancy, now, includeAnonymous),
|
||||
loadUsersByCountry(req.auth.tenancy, includeAnonymous),
|
||||
loadUsersByCountry(req.auth.tenancy, now, includeAnonymous),
|
||||
loadActiveUsersByCountry(req.auth.tenancy, now, includeAnonymous),
|
||||
loadLiveUsersCount(req.auth.tenancy, now, includeAnonymous),
|
||||
usersCrudHandlers.adminList({
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { createClient, type ClickHouseClient } from "@clickhouse/client";
|
||||
import { createClient, type ClickHouseClient, type ClickHouseSettings } from "@clickhouse/client";
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
|
||||
@ -9,7 +9,11 @@ function getAdminAuth() {
|
||||
};
|
||||
}
|
||||
|
||||
export function createClickhouseClient(authType: "admin" | "external", database?: string) {
|
||||
export function createClickhouseClient(
|
||||
authType: "admin" | "external",
|
||||
database?: string,
|
||||
clickhouse_settings?: ClickHouseSettings,
|
||||
) {
|
||||
return createClient({
|
||||
url: getEnvVariable("STACK_CLICKHOUSE_URL"),
|
||||
...authType === "admin" ? getAdminAuth() : {
|
||||
@ -18,6 +22,7 @@ export function createClickhouseClient(authType: "admin" | "external", database?
|
||||
},
|
||||
database,
|
||||
request_timeout: 10 * 60 * 1000, // 10 minutes
|
||||
clickhouse_settings,
|
||||
});
|
||||
}
|
||||
|
||||
@ -29,6 +34,33 @@ export function getClickhouseExternalClient() {
|
||||
return createClickhouseClient("external", getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default"));
|
||||
}
|
||||
|
||||
// Safety net for heavy analytical reads against `analytics_internal.events`:
|
||||
// GROUP BY spills to disk at ~50% of the per-query cap (leaving headroom for
|
||||
// the post-spill merge), grace_hash partitions large join build sides instead
|
||||
// of allocating one giant hash table, and the per-user cap bounds total
|
||||
// concurrent memory against the cluster's 10.8 GiB OvercommitTracker. Values
|
||||
// are decimal bytes (how ClickHouse parses digit strings).
|
||||
//
|
||||
// Note: max_memory_usage_for_user is enforced ClickHouse-side per *connecting
|
||||
// user* (the shared `stackframe` admin), so all admin queries — not just this
|
||||
// client's — count toward the same 9 GB budget. With the 30-day bounds each
|
||||
// metrics query peaks well under 100 MiB, so practical interference is low.
|
||||
export const METRICS_CLICKHOUSE_SETTINGS: ClickHouseSettings = {
|
||||
max_bytes_before_external_group_by: "4000000000",
|
||||
max_memory_usage: "8000000000",
|
||||
max_memory_usage_for_user: "9000000000",
|
||||
// SDK type narrows to a single algorithm; the server accepts a fallback list.
|
||||
join_algorithm: "grace_hash,parallel_hash,hash" as ClickHouseSettings["join_algorithm"],
|
||||
};
|
||||
|
||||
export function getClickhouseAdminClientForMetrics() {
|
||||
return createClickhouseClient(
|
||||
"admin",
|
||||
getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default"),
|
||||
METRICS_CLICKHOUSE_SETTINGS,
|
||||
);
|
||||
}
|
||||
|
||||
export const getQueryTimingStats = async (client: ClickHouseClient, queryId: string) => {
|
||||
// Flush logs to ensure system.query_log has latest query result.
|
||||
// Todo: for performance we should instead poll for this row to become available asynchronously after returning result. Flushed every 7.5 seconds by default
|
||||
|
||||
Loading…
Reference in New Issue
Block a user