stack/apps/backend/src/lib/clickhouse.tsx
BilalG1 609579abab
Some checks failed
all-good: Did all the other checks pass? / all-good (push) Has been cancelled
Ensure Prisma migrations are in sync with the schema / check_prisma_migrations (22.x) (push) Has been cancelled
DB migration compat / Check if migrations changed (push) Has been cancelled
Docker Server Build and Push / Docker Build and Push Server (push) Has been cancelled
Docker Server Build and Run / docker (push) Has been cancelled
Runs E2E API Tests (Local Emulator) / E2E Tests (Local Emulator, Node ${{ matrix.node-version }}) (22.x) (push) Has been cancelled
Runs E2E API Tests / E2E Tests (Node ${{ matrix.node-version }}, Freestyle ${{ matrix.freestyle-mode }}) (mock, 22.x) (push) Has been cancelled
Runs E2E API Tests / E2E Tests (Node ${{ matrix.node-version }}, Freestyle ${{ matrix.freestyle-mode }}) (prod, 22.x) (push) Has been cancelled
Runs E2E API Tests with custom port prefix / build (22.x) (push) Has been cancelled
Runs E2E Fallback Tests / E2E Fallback Tests (Node ${{ matrix.node-version }}) (22.x) (push) Has been cancelled
Lint & build / lint_and_build (24) (push) Has been cancelled
TOC Generator / TOC Generator (push) Has been cancelled
DB migration compat / Back-compat — Current branch migrations with ${{ needs.check-migrations-changed.outputs.base_branch }} branch code (push) Has been cancelled
DB migration compat / Forward-compat — Current branch code with ${{ needs.check-migrations-changed.outputs.base_branch }} branch migrations (push) Has been cancelled
DB migration compat / No migration changes (skipped) (push) Has been cancelled
feat(hexclave): PR 3 — native @hexclave/* source rename + delete dual-publish wiring (#1482)
2026-05-29 15:21:59 -07:00

160 lines
5.6 KiB
TypeScript

import { createClient, type ClickHouseClient, type ClickHouseSettings } from "@clickhouse/client";
import { getEnvVariable } from "@hexclave/shared/dist/utils/env";
import { HexclaveAssertionError } from "@hexclave/shared/dist/utils/errors";
// Re-exported so other modules can hold a typed ClickHouse client (e.g. to
// thread a single warmed client through helpers) without taking a direct
// dependency on the @clickhouse/client package.
export type { ClickHouseClient } from "@clickhouse/client";
function getAdminAuth() {
return {
username: getEnvVariable("STACK_CLICKHOUSE_ADMIN_USER", "stackframe"),
password: getEnvVariable("STACK_CLICKHOUSE_ADMIN_PASSWORD"),
};
}
export function createClickhouseClient(
authType: "admin" | "external",
database?: string,
clickhouse_settings?: ClickHouseSettings,
) {
return createClient({
url: getEnvVariable("STACK_CLICKHOUSE_URL"),
...authType === "admin" ? getAdminAuth() : {
username: "limited_user",
password: getEnvVariable("STACK_CLICKHOUSE_EXTERNAL_PASSWORD"),
},
database,
request_timeout: 10 * 60 * 1000, // 10 minutes
clickhouse_settings,
});
}
export function getClickhouseAdminClient() {
return createClickhouseClient("admin", getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default"));
}
export function getClickhouseExternalClient() {
return createClickhouseClient("external", getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default"));
}
// Safety net for heavy analytical reads against `analytics_internal.events`:
// GROUP BY spills to disk at ~50% of the per-query cap (leaving headroom for
// the post-spill merge), grace_hash partitions large join build sides instead
// of allocating one giant hash table, and the per-user cap bounds total
// concurrent memory against the cluster's 10.8 GiB OvercommitTracker. Values
// are decimal bytes (how ClickHouse parses digit strings).
//
// Note: max_memory_usage_for_user is enforced ClickHouse-side per *connecting
// user* (the shared `stackframe` admin), so all admin queries — not just this
// client's — count toward the same 9 GB budget. With the 30-day bounds each
// metrics query peaks well under 100 MiB, so practical interference is low.
export const METRICS_CLICKHOUSE_SETTINGS: ClickHouseSettings = {
max_bytes_before_external_group_by: "4000000000",
max_memory_usage: "8000000000",
max_memory_usage_for_user: "9000000000",
// SDK type narrows to a single algorithm; the server accepts a fallback list.
join_algorithm: "grace_hash,parallel_hash,hash" as ClickHouseSettings["join_algorithm"],
};
export function getClickhouseAdminClientForMetrics() {
return createClickhouseClient(
"admin",
getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default"),
METRICS_CLICKHOUSE_SETTINGS,
);
}
export const getQueryTimingStats = async (client: ClickHouseClient, queryId: string) => {
// Flush logs to ensure system.query_log has latest query result.
// Todo: for performance we should instead poll for this row to become available asynchronously after returning result. Flushed every 7.5 seconds by default
await client.exec({
query: "SYSTEM FLUSH LOGS",
auth: getAdminAuth(),
});
const queryProfile = async () => {
const profile = await client.query({
query: `
SELECT
ProfileEvents['CPUTimeMicroseconds'] / 1000 AS cpu_time_ms,
ProfileEvents['RealTimeMicroseconds'] / 1000 AS wall_clock_time_ms
FROM system.query_log
WHERE query_id = {query_id:String} AND type = 'QueryFinish'
ORDER BY event_time DESC
LIMIT 1
`,
query_params: { query_id: queryId },
auth: getAdminAuth(),
format: "JSON",
});
return await profile.json<{
cpu_time_ms: number,
wall_clock_time_ms: number,
}>();
};
const retryDelaysMs = [75, 150, 300, 600, 1200];
for (let attempt = 0; attempt <= retryDelaysMs.length; attempt += 1) {
const stats = await queryProfile();
if (stats.data.length === 1) {
return stats.data[0];
}
if (stats.data.length > 1) {
throw new HexclaveAssertionError(`Unexpected number of query log results: ${stats.data.length}`, { data: stats.data });
}
if (attempt < retryDelaysMs.length) {
await new Promise((resolve) => setTimeout(resolve, retryDelaysMs[attempt]));
}
}
throw new HexclaveAssertionError("Unexpected number of query log results: 0", { data: [] });
};
export const getQueryTimingStatsForProject = async (
client: ClickHouseClient,
queryId: string,
) => {
const queryProfile = async () => {
const profile = await client.query({
query: `
SELECT
ProfileEvents['CPUTimeMicroseconds'] / 1000 AS cpu_time_ms,
ProfileEvents['RealTimeMicroseconds'] / 1000 AS wall_clock_time_ms
FROM system.query_log
WHERE query_id = {query_id:String}
AND type = 'QueryFinish'
ORDER BY event_time DESC
LIMIT 1
`,
query_params: {
query_id: queryId,
},
auth: getAdminAuth(),
format: "JSON",
});
return await profile.json<{
cpu_time_ms: number,
wall_clock_time_ms: number,
}>();
};
const retryDelaysMs = [75, 150, 300, 600, 1200, 2400, 4800];
for (let attempt = 0; attempt <= retryDelaysMs.length; attempt += 1) {
const stats = await queryProfile();
if (stats.data.length === 1) {
return stats.data[0];
}
if (stats.data.length > 1) {
throw new HexclaveAssertionError(`Unexpected number of query log results: ${stats.data.length}`, { data: stats.data });
}
if (attempt < retryDelaysMs.length) {
await new Promise((resolve) => setTimeout(resolve, retryDelaysMs[attempt]));
}
}
return null;
};