clickhouse setup (#1032)

This commit is contained in:
BilalG1 2026-01-28 09:12:33 -08:00 committed by GitHub
parent 4fc5ddc82d
commit 484c3a6332
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 3980 additions and 5 deletions

View File

@ -22,6 +22,13 @@ jobs:
docker run -d --name db -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=stackframe -p 8128:5432 postgres:latest
sleep 5
docker logs db
- name: Setup clickhouse
run: |
docker run -d --name clickhouse -e CLICKHOUSE_DB=analytics -e CLICKHOUSE_USER=stackframe -e CLICKHOUSE_PASSWORD=password -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 -p 8133:8123 clickhouse/clickhouse-server:25.10
sleep 5
docker logs clickhouse
- name: Build Docker image
run: docker build -f docker/server/Dockerfile -t server .

View File

@ -81,6 +81,13 @@ STACK_QSTASH_TOKEN=
STACK_QSTASH_CURRENT_SIGNING_KEY=
STACK_QSTASH_NEXT_SIGNING_KEY=
# Clickhouse
STACK_CLICKHOUSE_URL=# URL of the Clickhouse instance
STACK_CLICKHOUSE_ADMIN_USER=# username of the admin account
STACK_CLICKHOUSE_ADMIN_PASSWORD=# password of the admin account
STACK_CLICKHOUSE_EXTERNAL_PASSWORD=# a randomly generated secure string. The user account will be created automatically
# Misc
STACK_ACCESS_TOKEN_EXPIRATION_TIME=# enter the expiration time for the access token here. Optional, don't specify it for default value
STACK_SETUP_ADMIN_GITHUB_ID=# enter the account ID of the admin user here, and after running the seed script they will be able to access the internal project in the Stack dashboard. Optional, don't specify it for default value

View File

@ -73,3 +73,9 @@ STACK_QSTASH_URL=http://localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}25
STACK_QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=
STACK_QSTASH_CURRENT_SIGNING_KEY=sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r
STACK_QSTASH_NEXT_SIGNING_KEY=sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs
# Clickhouse
STACK_CLICKHOUSE_URL=http://localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}36
STACK_CLICKHOUSE_ADMIN_USER=stackframe
STACK_CLICKHOUSE_ADMIN_PASSWORD=PASSWORD-PLACEHOLDER--9gKyMxJeMx
STACK_CLICKHOUSE_EXTERNAL_PASSWORD=PASSWORD-PLACEHOLDER--EZeHscBMzE

View File

@ -25,6 +25,7 @@
"codegen": "pnpm run with-env pnpm run generate-migration-imports && pnpm run with-env bash -c 'if [ \"$STACK_ACCELERATE_ENABLED\" = \"true\" ]; then pnpm run prisma generate --no-engine; else pnpm run codegen-prisma; fi' && pnpm run codegen-docs && pnpm run codegen-route-info",
"codegen:watch": "concurrently -n \"prisma,docs,route-info,migration-imports\" -k \"pnpm run codegen-prisma:watch\" \"pnpm run codegen-docs:watch\" \"pnpm run codegen-route-info:watch\" \"pnpm run generate-migration-imports:watch\"",
"psql-inner": "psql $(echo $STACK_DATABASE_CONNECTION_STRING | sed 's/\\?.*$//')",
"clickhouse": "pnpm run with-env clickhouse-client --host localhost --port ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}37 --user stackframe --password PASSWORD-PLACEHOLDER--9gKyMxJeMx",
"psql": "pnpm run with-env:dev pnpm run psql-inner",
"prisma-studio": "pnpm run with-env:dev prisma studio --port ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}06 --browser none",
"prisma:dev": "pnpm run with-env:dev prisma",
@ -50,6 +51,7 @@
"dependencies": {
"@ai-sdk/openai": "^1.3.23",
"@aws-sdk/client-s3": "^3.855.0",
"@clickhouse/client": "^1.14.0",
"@node-oauth/oauth2-server": "^5.1.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/api-logs": "^0.53.0",

View File

@ -0,0 +1,61 @@
import { getClickhouseAdminClient } from "@/lib/clickhouse";
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
export async function runClickhouseMigrations() {
console.log("[Clickhouse] Running Clickhouse migrations...");
const client = getClickhouseAdminClient();
const clickhouseExternalPassword = getEnvVariable("STACK_CLICKHOUSE_EXTERNAL_PASSWORD");
await client.exec({
query: "CREATE USER IF NOT EXISTS limited_user IDENTIFIED WITH sha256_password BY {clickhouseExternalPassword:String}",
query_params: { clickhouseExternalPassword },
});
// todo: create migration files
await client.exec({ query: EXTERNAL_ANALYTICS_DB_SQL });
await client.exec({ query: EVENTS_TABLE_BASE_SQL });
await client.exec({ query: EVENTS_VIEW_SQL });
const queries = [
"REVOKE ALL PRIVILEGES ON *.* FROM limited_user;",
"REVOKE ALL FROM limited_user;",
"GRANT SELECT ON default.events TO limited_user;",
];
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
for (const query of queries) {
await client.exec({ query });
}
console.log("[Clickhouse] Clickhouse migrations complete");
await client.close();
}
const EVENTS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.events (
event_type LowCardinality(String),
event_at DateTime64(3, 'UTC'),
data JSON,
project_id String,
branch_id String,
user_id String,
team_id String,
refresh_token_id String,
is_anonymous Boolean,
session_id String,
ip_address String,
created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE MergeTree
PARTITION BY toYYYYMM(event_at)
ORDER BY (project_id, branch_id, event_at);
`;
const EVENTS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.events
SQL SECURITY DEFINER
AS
SELECT *
FROM analytics_internal.events;
`;
const EXTERNAL_ANALYTICS_DB_SQL = `
CREATE DATABASE IF NOT EXISTS analytics_internal;
`;

View File

@ -2,18 +2,25 @@ import { applyMigrations } from "@/auto-migrations";
import { MIGRATION_FILES_DIR, getMigrationFiles } from "@/auto-migrations/utils";
import { Prisma } from "@/generated/prisma/client";
import { globalPrismaClient, globalPrismaSchema, sqlQuoteIdent } from "@/prisma-client";
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { spawnSync } from "child_process";
import fs from "fs";
import path from "path";
import * as readline from "readline";
import { seed } from "../prisma/seed";
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { runClickhouseMigrations } from "./clickhouse-migrations";
import { getClickhouseAdminClient } from "@/lib/clickhouse";
const getClickhouseClient = () => getClickhouseAdminClient();
const dropSchema = async () => {
await globalPrismaClient.$executeRaw(Prisma.sql`DROP SCHEMA ${sqlQuoteIdent(globalPrismaSchema)} CASCADE`);
await globalPrismaClient.$executeRaw(Prisma.sql`CREATE SCHEMA ${sqlQuoteIdent(globalPrismaSchema)}`);
await globalPrismaClient.$executeRaw(Prisma.sql`GRANT ALL ON SCHEMA ${sqlQuoteIdent(globalPrismaSchema)} TO postgres`);
await globalPrismaClient.$executeRaw(Prisma.sql`GRANT ALL ON SCHEMA ${sqlQuoteIdent(globalPrismaSchema)} TO public`);
const clickhouseClient = getClickhouseClient();
await clickhouseClient.command({ query: "DROP DATABASE IF EXISTS analytics_internal" });
await clickhouseClient.command({ query: "CREATE DATABASE IF NOT EXISTS analytics_internal" });
};
@ -163,6 +170,8 @@ const migrate = async (selectedMigrationFiles?: { migrationName: string, sql: st
console.log('='.repeat(60) + '\n');
await runClickhouseMigrations();
return result;
};

View File

@ -0,0 +1,120 @@
import { getClickhouseExternalClient, getQueryTimingStats, isClickhouseConfigured } from "@/lib/clickhouse";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { KnownErrors } from "@stackframe/stack-shared";
import { adaptSchema, adminAuthTypeSchema, jsonSchema, yupBoolean, yupMixed, yupNumber, yupObject, yupRecord, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { randomUUID } from "crypto";
export const POST = createSmartRouteHandler({
metadata: { hidden: true },
request: yupObject({
auth: yupObject({
type: adminAuthTypeSchema,
tenancy: adaptSchema,
}).defined(),
body: yupObject({
include_all_branches: yupBoolean().default(false),
query: yupString().defined().nonEmpty(),
params: yupRecord(yupString().defined(), yupMixed().defined()).default({}),
timeout_ms: yupNumber().integer().min(1_000).default(10_000),
}).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
result: jsonSchema.defined(),
stats: yupObject({
cpu_time: yupNumber().defined(),
wall_clock_time: yupNumber().defined(),
}).defined(),
}).defined(),
}),
async handler({ body, auth }) {
if (body.include_all_branches) {
throw new StackAssertionError("include_all_branches is not supported yet");
}
if (!isClickhouseConfigured()) {
throw new StackAssertionError("ClickHouse is not configured");
}
const client = getClickhouseExternalClient();
const queryId = randomUUID();
const resultSet = await Result.fromPromise(client.query({
query: body.query,
query_id: queryId,
query_params: body.params,
clickhouse_settings: {
SQL_project_id: auth.tenancy.project.id,
SQL_branch_id: auth.tenancy.branchId,
max_execution_time: body.timeout_ms / 1000,
readonly: "1",
allow_ddl: 0,
max_result_rows: MAX_RESULT_ROWS.toString(),
max_result_bytes: MAX_RESULT_BYTES.toString(),
result_overflow_mode: "throw",
},
format: "JSONEachRow",
}));
if (resultSet.status === "error") {
const message = getSafeClickhouseErrorMessage(resultSet.error);
if (message === null) {
throw new StackAssertionError("Unknown Clickhouse error", { cause: resultSet.error });
}
throw new KnownErrors.AnalyticsQueryError(message);
}
const rows = await resultSet.data.json<Record<string, unknown>[]>();
const stats = await getQueryTimingStats(client, queryId);
return {
statusCode: 200,
bodyType: "json",
body: {
result: rows,
stats: {
cpu_time: stats.cpu_time_ms,
wall_clock_time: stats.wall_clock_time_ms,
},
},
};
},
});
const SAFE_CLICKHOUSE_ERROR_CODES = [
62, // SYNTAX_ERROR
159, // TIMEOUT_EXCEEDED
164, // READONLY
158, // TOO_MANY_ROWS
396, // TOO_MANY_ROWS_OR_BYTES
636, // CANNOT_EXTRACT_TABLE_STRUCTURE
];
const UNSAFE_CLICKHOUSE_ERROR_CODES = [
36, // BAD_ARGUMENTS
60, // UNKNOWN_TABLE
497, // ACCESS_DENIED
];
const DEFAULT_CLICKHOUSE_ERROR_MESSAGE = "Error during execution of this query.";
const MAX_RESULT_ROWS = 10_000;
const MAX_RESULT_BYTES = 10 * 1024 * 1024;
function getSafeClickhouseErrorMessage(error: unknown): string | null {
if (typeof error !== "object" || error === null || !("code" in error) || typeof error.code !== "string") {
return null;
}
const errorCode = Number(error.code);
if (isNaN(errorCode)) {
return null;
}
const message = "message" in error && typeof error.message === "string" ? error.message : null;
if (SAFE_CLICKHOUSE_ERROR_CODES.includes(errorCode)) {
return message;
}
if (UNSAFE_CLICKHOUSE_ERROR_CODES.includes(errorCode)) {
return DEFAULT_CLICKHOUSE_ERROR_MESSAGE;
}
return null;
}

View File

@ -0,0 +1,211 @@
import { getClickhouseAdminClient, isClickhouseConfigured } from "@/lib/clickhouse";
import { DEFAULT_BRANCH_ID } from "@/lib/tenancies";
import { globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import type { Prisma } from "@/generated/prisma/client";
type Cursor = {
created_at_millis: number,
id: string,
};
const parseMillisOrThrow = (value: number | undefined, field: string) => {
if (typeof value !== "number" || !Number.isFinite(value)) {
throw new StatusError(400, `Invalid ${field}`);
}
const parsed = new Date(value);
if (Number.isNaN(parsed.getTime())) {
throw new StatusError(400, `Invalid ${field}`);
}
return parsed;
};
const createClickhouseRows = (event: {
id: string,
systemEventTypeIds: string[],
data: any,
eventEndedAt: Date,
eventStartedAt: Date,
isWide: boolean,
}) => {
const dataRecord = typeof event.data === "object" && event.data !== null ? event.data as Record<string, unknown> : {};
const clickhouseEventData = {
...dataRecord,
is_wide: event.isWide,
event_started_at: event.eventStartedAt,
event_ended_at: event.eventEndedAt,
};
const projectId = typeof dataRecord.projectId === "string" ? dataRecord.projectId : "";
const branchId = DEFAULT_BRANCH_ID;
const userId = typeof dataRecord.userId === "string" ? dataRecord.userId : "";
const teamId = typeof dataRecord.teamId === "string" ? dataRecord.teamId : "";
const sessionId = typeof dataRecord.sessionId === "string" ? dataRecord.sessionId : "";
const isAnonymous = typeof dataRecord.isAnonymous === "boolean" ? dataRecord.isAnonymous : false;
const eventTypes = [...new Set(event.systemEventTypeIds)];
return eventTypes.map(eventType => ({
event_type: eventType,
event_at: event.eventEndedAt,
data: clickhouseEventData,
project_id: projectId,
branch_id: branchId,
user_id: userId,
team_id: teamId,
session_id: sessionId,
is_anonymous: isAnonymous,
}));
};
export const POST = createSmartRouteHandler({
metadata: {
summary: "Migrate analytics events from Postgres to ClickHouse",
description: "Internal-only endpoint to backfill existing events into ClickHouse.",
hidden: true,
},
request: yupObject({
auth: yupObject({
project: yupObject({
id: yupString().oneOf(["internal"]).defined(),
}).defined(),
user: yupObject({
id: yupString().defined(),
}).optional(),
}).defined(),
body: yupObject({
min_created_at_millis: yupNumber().integer().defined(),
max_created_at_millis: yupNumber().integer().defined(),
cursor: yupObject({
created_at_millis: yupNumber().integer().defined(),
id: yupString().uuid().defined(),
}).optional(),
limit: yupNumber().integer().min(1).default(1000),
}).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
total_events: yupNumber().defined(),
processed_events: yupNumber().defined(),
remaining_events: yupNumber().defined(),
migrated_events: yupNumber().defined(),
skipped_existing_events: yupNumber().defined(),
inserted_rows: yupNumber().defined(),
progress: yupNumber().min(0).max(1).defined(),
next_cursor: yupObject({
created_at_millis: yupNumber().integer().defined(),
id: yupString().defined(),
}).nullable().defined(),
}).defined(),
}),
async handler({ body }) {
const minCreatedAt = parseMillisOrThrow(body.min_created_at_millis, "min_created_at_millis");
const maxCreatedAt = parseMillisOrThrow(body.max_created_at_millis, "max_created_at_millis");
if (minCreatedAt >= maxCreatedAt) {
throw new StatusError(400, "min_created_at_millis must be before max_created_at_millis");
}
const cursorCreatedAt = body.cursor ? parseMillisOrThrow(body.cursor.created_at_millis, "cursor.created_at_millis") : undefined;
const cursorId = body.cursor?.id;
const limit = body.limit;
const baseWhere: Prisma.EventWhereInput = {
createdAt: {
gte: minCreatedAt,
lt: maxCreatedAt,
},
};
const cursorFilter: Prisma.EventWhereInput | undefined = (cursorCreatedAt && cursorId) ? {
OR: [
{ createdAt: { gt: cursorCreatedAt } },
{ createdAt: cursorCreatedAt, id: { gt: cursorId } },
],
} : undefined;
const where: Prisma.EventWhereInput = cursorFilter
? { AND: [baseWhere, cursorFilter] }
: baseWhere;
const totalEvents = await globalPrismaClient.event.count({ where: baseWhere });
const events = await globalPrismaClient.event.findMany({
where,
orderBy: [
{ createdAt: "asc" },
{ id: "asc" },
],
take: limit,
});
let insertedRows = 0;
let migratedEvents = 0;
if (events.length) {
if (!isClickhouseConfigured()) {
throw new StatusError(StatusError.ServiceUnavailable, "ClickHouse is not configured");
}
const clickhouseClient = getClickhouseAdminClient();
const rowsByEvent = events.map(createClickhouseRows);
const rowsToInsert = rowsByEvent.flat();
migratedEvents = rowsByEvent.reduce((acc, rows) => acc + (rows.length ? 1 : 0), 0);
if (rowsToInsert.length) {
await clickhouseClient.insert({
table: "analytics_internal.events",
values: rowsToInsert,
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",
async_insert: 1,
},
});
insertedRows = rowsToInsert.length;
}
}
const lastEvent = events.at(-1);
const nextCursor: Cursor | null = lastEvent ? {
created_at_millis: lastEvent.createdAt.getTime(),
id: lastEvent.id,
} : null;
const progressCursor: Cursor | null = nextCursor ?? (cursorCreatedAt && body.cursor ? {
created_at_millis: body.cursor.created_at_millis,
id: body.cursor.id,
} : null);
const progressCursorCreatedAt = progressCursor ? new Date(progressCursor.created_at_millis) : null;
const remainingWhere = progressCursor ? {
AND: [
baseWhere,
{
OR: [
{ createdAt: { gt: progressCursorCreatedAt! } },
{ createdAt: progressCursorCreatedAt!, id: { gt: progressCursor.id } },
],
},
],
} : baseWhere;
const remainingEvents = await globalPrismaClient.event.count({ where: remainingWhere });
const processedEvents = totalEvents - remainingEvents;
const progress = totalEvents === 0 ? 1 : processedEvents / totalEvents;
return {
statusCode: 200,
bodyType: "json",
body: {
total_events: totalEvents,
processed_events: processedEvents,
remaining_events: remainingEvents,
migrated_events: migratedEvents,
skipped_existing_events: 0,
inserted_rows: insertedRows,
progress,
next_cursor: nextCursor,
},
};
},
});

View File

@ -0,0 +1,77 @@
import { createClient, type ClickHouseClient } from "@clickhouse/client";
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
const clickhouseUrl = getEnvVariable("STACK_CLICKHOUSE_URL", "");
const clickhouseAdminUser = getEnvVariable("STACK_CLICKHOUSE_ADMIN_USER", "stackframe");
const clickhouseExternalUser = "limited_user";
const clickhouseAdminPassword = getEnvVariable("STACK_CLICKHOUSE_ADMIN_PASSWORD", "");
const clickhouseExternalPassword = getEnvVariable("STACK_CLICKHOUSE_EXTERNAL_PASSWORD", "");
const clickhouseDefaultDatabase = getEnvVariable("STACK_CLICKHOUSE_DATABASE", "default");
const HAS_CLICKHOUSE = !!clickhouseUrl && !!clickhouseAdminPassword && !!clickhouseExternalPassword;
if (!HAS_CLICKHOUSE) {
console.warn("ClickHouse is not configured. Analytics features will not be available.");
}
export function isClickhouseConfigured() {
return HAS_CLICKHOUSE;
}
export function createClickhouseClient(authType: "admin" | "external", database?: string) {
if (!HAS_CLICKHOUSE) {
throw new StackAssertionError("ClickHouse is not configured");
}
return createClient({
url: clickhouseUrl,
username: authType === "admin" ? clickhouseAdminUser : clickhouseExternalUser,
password: authType === "admin" ? clickhouseAdminPassword : clickhouseExternalPassword,
database,
});
}
export function getClickhouseAdminClient() {
return createClickhouseClient("admin", clickhouseDefaultDatabase);
}
export function getClickhouseExternalClient() {
return createClickhouseClient("external", clickhouseDefaultDatabase);
}
export const getQueryTimingStats = async (client: ClickHouseClient, queryId: string) => {
// Flush logs to ensure system.query_log has latest query result.
// Todo: for performance we should instead poll for this row to become available asynchronously after returning result. Flushed every 7.5 seconds by default
await client.exec({
query: "SYSTEM FLUSH LOGS",
auth: {
username: clickhouseAdminUser,
password: clickhouseAdminPassword,
},
});
const profile = await client.query({
query: `
SELECT
ProfileEvents['CPUTimeMicroseconds'] / 1000 AS cpu_time_ms,
ProfileEvents['RealTimeMicroseconds'] / 1000 AS wall_clock_time_ms
FROM system.query_log
WHERE query_id = {query_id:String} AND type = 'QueryFinish'
ORDER BY event_time DESC
LIMIT 1
`,
query_params: { query_id: queryId },
auth: {
username: clickhouseAdminUser,
password: clickhouseAdminPassword,
},
format: "JSON",
});
const stats = await profile.json<{
cpu_time_ms: number,
wall_clock_time_ms: number,
}>();
if (stats.data.length !== 1) {
throw new StackAssertionError(`Unexpected number of query log results: ${stats.data.length}`, { data: stats.data });
}
return stats.data[0];
};

View File

@ -9,6 +9,7 @@ import { filterUndefined, typedKeys } from "@stackframe/stack-shared/dist/utils/
import { UnionToIntersection } from "@stackframe/stack-shared/dist/utils/types";
import { generateUuid } from "@stackframe/stack-shared/dist/utils/uuids";
import * as yup from "yup";
import { getClickhouseAdminClient, isClickhouseConfigured } from "./clickhouse";
import { getEndUserInfo } from "./end-users";
import { DEFAULT_BRANCH_ID } from "./tenancies";
@ -51,6 +52,7 @@ const UserActivityEventType = {
userId: yupString().uuid().defined(),
// old events of this type may not have an isAnonymous field, so we default to false
isAnonymous: yupBoolean().defined().default(false),
teamId: yupString().optional().default(""),
}),
inherits: [ProjectActivityEventType],
} as const satisfies SystemEventTypeBase;
@ -152,6 +154,20 @@ export async function logEvent<T extends EventType[]>(
// get end user information
const endUserInfo = await getEndUserInfo(); // this is a dynamic API, can't run it asynchronously
const endUserInfoInner = endUserInfo?.maybeSpoofed ? endUserInfo.spoofedInfo : endUserInfo?.exactInfo;
const eventTypesArray = [...allEventTypes];
const clickhouseEventData = {
...data as Record<string, unknown>,
is_wide: isWide,
event_started_at: timeRange.start,
event_ended_at: timeRange.end,
};
const dataRecord = data as Record<string, unknown> | null | undefined;
const projectId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.projectId === "string" ? dataRecord.projectId : "";
const branchId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.branchId === "string" ? dataRecord.branchId : DEFAULT_BRANCH_ID;
const userId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.userId === "string" ? dataRecord.userId : "";
const teamId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.teamId === "string" ? dataRecord.teamId : "";
const sessionId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.sessionId === "string" ? dataRecord.sessionId : "";
const isAnonymous = typeof dataRecord === "object" && dataRecord && typeof dataRecord.isAnonymous === "boolean" ? dataRecord.isAnonymous : false;
// rest is no more dynamic APIs so we can run it asynchronously
@ -159,7 +175,7 @@ export async function logEvent<T extends EventType[]>(
// log event in DB
await globalPrismaClient.event.create({
data: {
systemEventTypeIds: [...allEventTypes].map(eventType => eventType.id),
systemEventTypeIds: eventTypesArray.map(eventType => eventType.id),
data: data as any,
isEndUserIpInfoGuessTrusted: !endUserInfo?.maybeSpoofed,
endUserIpInfoGuess: endUserInfoInner ? {
@ -179,6 +195,29 @@ export async function logEvent<T extends EventType[]>(
},
});
if (isClickhouseConfigured()) {
const clickhouseClient = getClickhouseAdminClient();
await clickhouseClient.insert({
table: "analytics_internal.events",
values: eventTypesArray.map(eventType => ({
event_type: eventType.id,
event_at: timeRange.end,
data: clickhouseEventData,
project_id: projectId,
branch_id: branchId,
user_id: userId,
team_id: teamId,
is_anonymous: isAnonymous,
session_id: sessionId,
})),
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",
async_insert: 1,
},
});
}
// log event in PostHog
if (getNodeEnvironment().includes("production") && !getEnvVariable("CI", "")) {
await withPostHog(async posthog => {

View File

@ -252,6 +252,7 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: {
userId: options.refreshTokenObj.projectUserId,
sessionId: options.refreshTokenObj.id,
isAnonymous: user.is_anonymous,
teamId: "",
}
);

View File

@ -0,0 +1,269 @@
"use client";
import { ClickhouseMigrationRequest, ClickhouseMigrationResponse } from "@stackframe/stack-shared/dist/interface/admin-interface";
import { Button, Card, CardContent, CardHeader, CardTitle, Input, Typography, Alert } from "@/components/ui";
import React from "react";
import { PageLayout } from "../page-layout";
import { useAdminApp } from "../use-admin-app";
import { notFound } from "next/navigation";
type MigrationCursor = {
createdAtMillis: number,
id: string,
};
type MigrationSnapshot = {
totalEvents: number,
processedEvents: number,
remainingEvents: number,
migratedEvents: number,
skippedExistingEvents: number,
insertedRows: number,
progress: number,
nextCursor: MigrationCursor | null,
};
const normalizeResponse = (response: ClickhouseMigrationResponse): MigrationSnapshot => ({
totalEvents: response.total_events,
processedEvents: response.processed_events,
remainingEvents: response.remaining_events,
migratedEvents: response.migrated_events,
skippedExistingEvents: response.skipped_existing_events,
insertedRows: response.inserted_rows,
progress: response.progress,
nextCursor: response.next_cursor ? {
createdAtMillis: response.next_cursor.created_at_millis,
id: response.next_cursor.id,
} : null,
});
export default function PageClient() {
const stackAdminApp = useAdminApp();
const adminInterface = React.useMemo(() => (stackAdminApp as any)._interface as {
migrateEventsToClickhouse: (options: ClickhouseMigrationRequest) => Promise<ClickhouseMigrationResponse>,
}, [stackAdminApp]);
const [minCreatedAt, setMinCreatedAt] = React.useState("");
const [maxCreatedAt, setMaxCreatedAt] = React.useState("");
const [limit, setLimit] = React.useState(1000);
const [stats, setStats] = React.useState<MigrationSnapshot | null>(null);
const [cursor, setCursor] = React.useState<MigrationCursor | null>(null);
const [running, setRunning] = React.useState(false);
const runningRef = React.useRef(false);
const cursorRef = React.useRef<MigrationCursor | null>(null);
const timeWindowRef = React.useRef<{ minCreatedAtMillis: number, maxCreatedAtMillis: number } | null>(null);
const [error, setError] = React.useState<string | null>(null);
const parseCreatedAtMillis = React.useCallback((value: string | undefined) => {
if (!value) return null;
const trimmed = value.trim();
if (!trimmed) return null;
if (/^-?\d+$/.test(trimmed)) {
const parsed = Number(trimmed);
return Number.isFinite(parsed) ? parsed : null;
}
const parsed = new Date(trimmed).getTime();
return Number.isNaN(parsed) ? null : parsed;
}, []);
const buildRequestBody = React.useCallback(() => {
const safeLimit = Number.isFinite(limit) && limit > 0 ? Math.min(1000, limit) : 1000;
const minCreatedAtMillis = timeWindowRef.current?.minCreatedAtMillis ?? parseCreatedAtMillis(minCreatedAt);
const maxCreatedAtMillis = timeWindowRef.current?.maxCreatedAtMillis ?? parseCreatedAtMillis(maxCreatedAt);
if (minCreatedAtMillis === null || maxCreatedAtMillis === null) {
throw new Error("Please provide valid unix millis (Date.now()) or ISO/datetime-local values for min/max created at.");
}
return {
min_created_at_millis: minCreatedAtMillis,
max_created_at_millis: maxCreatedAtMillis,
cursor: cursorRef.current ? {
created_at_millis: cursorRef.current.createdAtMillis,
id: cursorRef.current.id,
} : undefined,
limit: safeLimit,
};
}, [limit, maxCreatedAt, minCreatedAt, parseCreatedAtMillis]);
const runBatch = React.useCallback(async () => {
const response = await adminInterface.migrateEventsToClickhouse(buildRequestBody());
const snapshot = normalizeResponse(response);
setStats(snapshot);
cursorRef.current = snapshot.nextCursor;
setCursor(snapshot.nextCursor);
return snapshot;
}, [adminInterface, buildRequestBody]);
const stopMigration = React.useCallback(() => {
runningRef.current = false;
setRunning(false);
}, []);
const resetMigration = React.useCallback(() => {
stopMigration();
cursorRef.current = null;
timeWindowRef.current = null;
setCursor(null);
setStats(null);
setError(null);
}, [stopMigration]);
const startMigration = React.useCallback(async () => {
if (runningRef.current) return;
const minCreatedAtMillis = parseCreatedAtMillis(minCreatedAt);
const maxCreatedAtMillis = parseCreatedAtMillis(maxCreatedAt);
if (minCreatedAtMillis === null || maxCreatedAtMillis === null) {
setError("Please provide valid unix millis (Date.now()) or ISO/datetime-local values for min/max created at.");
return;
}
if (minCreatedAtMillis >= maxCreatedAtMillis) {
setError("Min created at must be before max created at.");
return;
}
setError(null);
timeWindowRef.current = { minCreatedAtMillis, maxCreatedAtMillis };
runningRef.current = true;
setRunning(true);
try {
while (runningRef.current) {
const snapshot = await runBatch();
if (!snapshot.nextCursor) {
stopMigration();
break;
}
}
} catch (e: any) {
setError(e?.message ?? "Migration failed");
stopMigration();
}
}, [maxCreatedAt, minCreatedAt, parseCreatedAtMillis, runBatch, stopMigration]);
const progressPercent = Math.min(100, Math.max(0, Math.round((stats?.progress ?? 0) * 100)));
if (stackAdminApp.projectId !== "internal") {
return notFound();
}
return (
<PageLayout
title="ClickHouse Event Migration"
description="Backfill historical events from Postgres into ClickHouse. Intended for internal use only."
fillWidth
>
<div className="flex flex-col gap-4">
<Card className="xl:col-span-2">
<CardHeader>
<CardTitle>Controls</CardTitle>
</CardHeader>
<CardContent className="flex flex-col gap-4">
<div className="grid grid-cols-1 md:grid-cols-2 gap-4">
<div className="space-y-2">
<Typography type="label">Min created at (unix millis or ISO/datetime-local)</Typography>
<Input
type="text"
value={minCreatedAt}
onChange={(e) => {
setMinCreatedAt(e.target.value);
resetMigration();
}}
placeholder="1735689600000 or 2024-08-01T00:00"
/>
</div>
<div className="space-y-2">
<Typography type="label">Max created at (use to exclude new dual-written events)</Typography>
<Input
type="text"
value={maxCreatedAt}
onChange={(e) => {
setMaxCreatedAt(e.target.value);
resetMigration();
}}
placeholder="1767225600000 or 2024-12-01T00:00"
/>
</div>
<div className="space-y-2">
<Typography type="label">Batch size</Typography>
<Input
type="number"
min={1}
max={1000}
value={limit}
onChange={(e) => {
setLimit(Number(e.target.value) || 0);
resetMigration();
}}
/>
</div>
<div className="space-y-2">
<Typography type="label">Cursor</Typography>
<Typography variant="secondary" className="text-sm break-all">
{cursor ? `${cursor.createdAtMillis} · ${cursor.id}` : "Not started"}
</Typography>
</div>
</div>
{error && (
<Alert variant="destructive">{error}</Alert>
)}
<div className="flex flex-wrap gap-2">
<Button onClick={startMigration} disabled={running} loading={running}>
{running ? "Running" : "Start / Resume"}
</Button>
<Button onClick={stopMigration} variant="secondary" disabled={!running}>
Stop
</Button>
<Button onClick={resetMigration} variant="ghost">
Reset cursor
</Button>
</div>
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle>Status</CardTitle>
</CardHeader>
<CardContent className="space-y-3">
<div className="h-3 w-full rounded-full bg-muted">
<div
className="h-3 rounded-full bg-gradient-to-r from-blue-500 to-emerald-500"
style={{ width: `${progressPercent}%` }}
/>
</div>
<div className="flex items-center justify-between">
<Typography variant="secondary">Progress</Typography>
<Typography type="label">{progressPercent}%</Typography>
</div>
<div className="grid grid-cols-2 gap-3 text-sm">
<div>
<Typography variant="secondary">Processed</Typography>
<Typography type="label">{stats?.processedEvents ?? 0}</Typography>
</div>
<div>
<Typography variant="secondary">Remaining</Typography>
<Typography type="label">{stats?.remainingEvents ?? 0}</Typography>
</div>
<div>
<Typography variant="secondary">Migrated this run</Typography>
<Typography type="label">{stats?.migratedEvents ?? 0}</Typography>
</div>
<div>
<Typography variant="secondary">Inserted rows</Typography>
<Typography type="label">{stats?.insertedRows ?? 0}</Typography>
</div>
<div>
<Typography variant="secondary">Total in scope</Typography>
<Typography type="label">{stats?.totalEvents ?? 0}</Typography>
</div>
<div className="">
<Typography variant="secondary">State</Typography>
<Typography type="label">{running ? "Running" : "Idle"}</Typography>
</div>
</div>
</CardContent>
</Card>
</div>
</PageLayout>
);
}

View File

@ -0,0 +1,9 @@
import PageClient from "./page-client";
export const metadata = {
title: "ClickHouse Event Migration",
};
export default function Page() {
return <PageClient />;
}

View File

@ -0,0 +1,215 @@
"use client";
import Editor from "@monaco-editor/react";
import type { Monaco } from "@monaco-editor/react";
import React, { useEffect, useMemo, useRef } from "react";
import { Alert, Button, Textarea, Typography } from "@/components/ui";
import { PageLayout } from "../page-layout";
import { useAdminApp } from "../use-admin-app";
import { clickhouseKeywords, clickhouseTables, conf, language } from "./monaco-clickhouse";
const CLICKHOUSE_LANGUAGE_ID = "clickhouse-sql";
type Disposable = { dispose: () => void };
type CompletionItem = Parameters<Monaco["languages"]["registerCompletionItemProvider"]>[1]["provideCompletionItems"] extends (
...args: any
) => infer R
? R extends { suggestions: Array<infer U> }
? U
: never
: never;
export default function PageClient() {
const adminApp = useAdminApp();
const [query, setQuery] = React.useState("SELECT 1 AS value;");
const [resultText, setResultText] = React.useState("");
const [error, setError] = React.useState<string | null>(null);
const [loading, setLoading] = React.useState(false);
const disposables = useRef<Disposable[]>([]);
const queryRef = useRef(query);
const tableColumnSuggestions = useMemo(() => {
return Object.entries(clickhouseTables).flatMap(([table, columns]) =>
columns.map((column) => ({ table, column })),
);
}, []);
useEffect(() => {
const disposablesToDispose = disposables.current;
return () => {
disposablesToDispose.forEach((d) => d.dispose());
};
}, []);
const runQuery = () => {
const currentQuery = queryRef.current.trim();
if (!currentQuery) {
return;
}
const execute = async () => {
setLoading(true);
setError(null);
try {
const response = await adminApp.queryAnalytics({
query: currentQuery,
include_all_branches: false,
});
setResultText(JSON.stringify(response.result, null, 2));
} catch (e: any) {
setError(e?.message ?? "Failed to run analytics query.");
setResultText("");
} finally {
setLoading(false);
}
};
// eslint-disable-next-line @typescript-eslint/no-floating-promises
void execute();
};
const handleEditorMount: Parameters<typeof Editor>[0]["onMount"] = (instance, monaco: Monaco) => {
if (!monaco.languages.getLanguages().some((lang) => lang.id === CLICKHOUSE_LANGUAGE_ID)) {
monaco.languages.register({ id: CLICKHOUSE_LANGUAGE_ID });
monaco.languages.setLanguageConfiguration(CLICKHOUSE_LANGUAGE_ID, conf);
monaco.languages.setMonarchTokensProvider(CLICKHOUSE_LANGUAGE_ID, language);
}
disposables.current.push(
monaco.languages.registerCompletionItemProvider(CLICKHOUSE_LANGUAGE_ID, {
triggerCharacters: [".", " "],
provideCompletionItems: (model, position) => {
const word = model.getWordUntilPosition(position);
const range = {
startLineNumber: position.lineNumber,
endLineNumber: position.lineNumber,
startColumn: word.startColumn,
endColumn: word.endColumn,
};
const linePrefix = model.getValueInRange({
startLineNumber: position.lineNumber,
startColumn: 1,
endLineNumber: position.lineNumber,
endColumn: position.column,
});
const tableMatch = /([a-zA-Z_][\w]*)\.\s*$/.exec(linePrefix);
const suggestions: CompletionItem[] = [];
if (tableMatch) {
const tableName = tableMatch[1].toLowerCase();
const columns = (clickhouseTables as Record<string, readonly string[] | undefined>)[tableName];
if (columns) {
columns.forEach((column) => {
suggestions.push({
label: column,
kind: monaco.languages.CompletionItemKind.Field,
insertText: column,
range,
detail: `${tableName}.${column}`,
});
});
}
} else {
Object.keys(clickhouseTables).forEach((table) => {
suggestions.push({
label: table,
kind: monaco.languages.CompletionItemKind.Class,
insertText: table,
range,
detail: "Table",
});
});
tableColumnSuggestions.forEach(({ table, column }) => {
suggestions.push({
label: `${table}.${column}`,
kind: monaco.languages.CompletionItemKind.Field,
insertText: `${table}.${column}`,
range,
});
});
clickhouseKeywords.forEach((keyword) => {
suggestions.push({
label: keyword,
kind: monaco.languages.CompletionItemKind.Keyword,
insertText: keyword,
range,
});
});
}
return { suggestions };
},
}),
);
const model = instance.getModel();
if (model) {
monaco.editor.setModelLanguage(model, CLICKHOUSE_LANGUAGE_ID);
}
instance.addCommand(monaco.KeyMod.CtrlCmd | monaco.KeyCode.Enter, () => {
void runQuery();
});
};
return (
<PageLayout
title="Analytics Query"
description="Run read-only analytics queries against your project's ClickHouse dataset."
fillWidth
>
<div className="flex justify-center">
<Button
onClick={runQuery}
loading={loading}
disabled={loading || !queryRef.current.trim()}
>
Run query ( + enter)
</Button>
</div>
{error && <Alert variant="destructive">{error}</Alert>}
<div className="grid gap-4 md:grid-cols-2">
<div className="flex flex-col gap-2">
<div className="border border-border/50 rounded-md overflow-hidden h-[460px]">
<Editor
height="100%"
defaultLanguage={CLICKHOUSE_LANGUAGE_ID}
language={CLICKHOUSE_LANGUAGE_ID}
value={query}
onChange={(value) => {
const next = value ?? "";
setQuery(next);
queryRef.current = next;
}}
onMount={handleEditorMount}
options={{
minimap: { enabled: false },
fontSize: 14,
scrollBeyondLastLine: false,
wordWrap: "on",
fixedOverflowWidgets: true,
padding: { top: 10, bottom: 10 },
}}
theme="vs-dark"
/>
</div>
</div>
<div className="flex flex-col gap-2">
<Textarea
className="font-mono h-[460px]"
readOnly
spellCheck={false}
value={resultText}
placeholder="Query results will appear here."
/>
</div>
</div>
</PageLayout>
);
}

View File

@ -0,0 +1,5 @@
import PageClient from "./page-client";
export default function Page() {
return <PageClient />;
}

View File

@ -130,6 +130,7 @@
{ suffix: "22", label: "Freestyle mock" },
{ suffix: "24", label: "LocalStack Gateway (AWS mock)" },
{ suffix: "25", label: "QStash mock" },
{ suffix: "33", label: "ClickHouse native interface" },
{ range: ["50", "99"], label: "Reserved for LocalStack (external services)" },
];
@ -306,7 +307,7 @@
},
{
name: "WAL Info",
portSuffix: "36",
portSuffix: "38",
description: [
"Replication & WAL monitoring",
"Tracks primary/replica LSN positions",
@ -329,6 +330,15 @@
"React example",
],
},
{
name: "ClickHouse HTTP",
portSuffix: "36",
description: [
"ClickHouse",
],
importance: 1,
img: "https://thumbs.bfldr.com/at/qkjfv3nvsv4rbwn94zmtb4t/v/1197417003?expiry=1764357242&fit=bounds&height=800&sig=NjEwNzA0OThjZmJiZDQzZmUwNjIyY2UxYzZiNGYxNmQ3NjJiYjc0OA%3D%3D&width=1100",
},
{
name: "Convex example",
portSuffix: "27",

View File

@ -94,7 +94,7 @@ function expectSnakeCase(obj: unknown, path: string): void {
}
} else {
for (const [key, value] of Object.entries(obj)) {
if (key.match(/[a-z0-9][A-Z][a-z0-9]+/) && !key.includes("_") && !["newUser", "afterCallbackRedirectUrl"].includes(key)) {
if (key.match(/^[a-z0-9][A-Z][a-z0-9]+$/) && !key.includes("_") && !["newUser", "afterCallbackRedirectUrl"].includes(key)) {
throw new StackAssertionError(`Object has camelCase key (expected snake_case): ${path}.${key}`);
}
if (["client_metadata", "server_metadata", "options_json", "credential", "authentication_response", "metadata", "variables", "skipped_details"].includes(key)) continue;

View File

@ -0,0 +1,152 @@
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { it } from "../../../../helpers";
import { Auth, Project, backendContext, bumpEmailAddress, niceBackendFetch } from "../../../backend-helpers";
const queryEvents = async (params: {
userId?: string,
eventType?: string,
}) => await niceBackendFetch("/api/v1/internal/analytics/query", {
method: "POST",
accessType: "admin",
body: {
query: `
SELECT event_type, project_id, branch_id, user_id, team_id
FROM events
WHERE 1
${params.userId ? "AND user_id = {user_id:String}" : ""}
${params.eventType ? "AND event_type = {event_type:String}" : ""}
ORDER BY event_at DESC
LIMIT 10
`,
params: {
...(params.userId ? { user_id: params.userId } : {}),
...(params.eventType ? { event_type: params.eventType } : {}),
},
},
});
const fetchEventsWithRetry = async (
params: { userId?: string, eventType?: string },
options: { attempts?: number, delayMs?: number } = {}
) => {
const attempts = options.attempts ?? 5;
const delayMs = options.delayMs ?? 250;
let response = await queryEvents(params);
for (let attempt = 0; attempt < attempts; attempt++) {
if (response.status !== 200) {
break;
}
const results = Array.isArray(response.body?.result) ? response.body.result : [];
if (results.length > 0) {
break;
}
await wait(delayMs);
response = await queryEvents(params);
}
return response;
};
it("stores backend events in ClickHouse", async ({ expect }) => {
const { projectId } = await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const { userId } = await Auth.Otp.signIn();
const queryResponse = await fetchEventsWithRetry({
userId,
eventType: "$session-activity",
});
expect(queryResponse.status).toBe(200);
const results = Array.isArray(queryResponse.body?.result) ? queryResponse.body.result : [];
expect(results.length).toBeGreaterThan(0);
expect(results[0]).toMatchObject({
event_type: "$session-activity",
project_id: projectId,
branch_id: "main",
user_id: userId,
});
});
it("cannot read events from other projects", async ({ expect }) => {
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const projectAKeys = backendContext.value.projectKeys;
await Auth.Otp.signIn();
// Switch to another project and generate its own event
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const { userId: projectBUserId } = await Auth.Otp.signIn();
const projectBResponse = await fetchEventsWithRetry({
userId: projectBUserId,
eventType: "$session-activity",
});
expect(projectBResponse).toMatchInlineSnapshot(`
NiceResponse {
"status": 200,
"body": {
"result": [
{
"branch_id": "main",
"event_type": "$session-activity",
"project_id": "<stripped UUID>",
"team_id": "",
"user_id": "<stripped UUID>",
},
],
"stats": {
"cpu_time": <stripped field 'cpu_time'>,
"wall_clock_time": <stripped field 'wall_clock_time'>,
},
},
"headers": Headers { <some fields may have been hidden> },
}
`);
// Switch back to project A context
backendContext.set({ projectKeys: projectAKeys, userAuth: null });
const queryResponse = await queryEvents({
userId: projectBUserId,
eventType: "$session-activity",
});
expect(queryResponse).toMatchInlineSnapshot(`
NiceResponse {
"status": 200,
"body": {
"result": [],
"stats": {
"cpu_time": <stripped field 'cpu_time'>,
"wall_clock_time": <stripped field 'wall_clock_time'>,
},
},
"headers": Headers { <some fields may have been hidden> },
}
`);
});
it("filters analytics events by user within a project", async ({ expect }) => {
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const { userId: userA } = await Auth.Otp.signIn();
await bumpEmailAddress();
const { userId: userB } = await Auth.Otp.signIn();
const userAResponse = await fetchEventsWithRetry({
userId: userA,
eventType: "$session-activity",
});
expect(userAResponse.status).toBe(200);
const userAResults = Array.isArray(userAResponse.body?.result) ? userAResponse.body.result : [];
expect(userAResults.length).toBeGreaterThan(0);
expect(userAResults.every((row: any) => row.user_id === userA)).toBe(true);
const userBResponse = await fetchEventsWithRetry({
userId: userB,
eventType: "$session-activity",
});
expect(userBResponse.status).toBe(200);
const userBResults = Array.isArray(userBResponse.body?.result) ? userBResponse.body.result : [];
expect(userBResults.length).toBeGreaterThan(0);
expect(userBResults.every((row: any) => row.user_id === userB)).toBe(true);
});

File diff suppressed because it is too large Load Diff

View File

@ -82,6 +82,8 @@ const stripFields = [
"nextAttempt",
"lastFour",
"port",
"wall_clock_time",
"cpu_time",
] as const;
const stripFieldsIfString = [
@ -119,6 +121,7 @@ const stringRegexReplacements = [
[/(\/integrations\/(neon|custom)\/oauth\/idp\/(interaction|auth)\/)[a-zA-Z0-9_-]+/gi, "$1<stripped $3 UID>"],
[new RegExp(`localhost\:${getPortPrefix()}`, "gi"), "localhost:<$$NEXT_PUBLIC_STACK_PORT_PREFIX>"],
[new RegExp(`localhost\%3A${getPortPrefix()}`, "gi"), "localhost%3A%3C%24NEXT_PUBLIC_STACK_PORT_PREFIX%3E"],
[/(Timeout exceeded: elapsed )[0-9.]+( ms)/gi, "$1<stripped time>$2"],
] as const;

View File

@ -59,7 +59,7 @@ services:
POSTGRES_PASSWORD: PASSWORD-PLACEHOLDER--uqfEC1hmmv
POSTGRES_DB: stackframe
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}36:8080"
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}38:8080"
depends_on:
- db
- db-replica
@ -254,6 +254,25 @@ services:
environment:
HOST_ON_HOST: host.docker.internal
# ================= ClickHouse =================
clickhouse:
image: clickhouse/clickhouse-server:25.10
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}36:8123" # HTTP interface
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}37:9000" # Native interface
environment:
CLICKHOUSE_DB: analytics
CLICKHOUSE_USER: stackframe
CLICKHOUSE_PASSWORD: PASSWORD-PLACEHOLDER--9gKyMxJeMx
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
volumes:
- clickhouse-data:/var/lib/clickhouse
ulimits:
nofile:
soft: 262144
hard: 262144
# ================= Drizzle Gateway =================
drizzle-gateway:
@ -278,6 +297,7 @@ volumes:
s3mock-data:
deno-cache:
localstack-data:
clickhouse-data:
drizzle-gateway:
# ================= configs =================

View File

@ -14,3 +14,7 @@ STACK_RUN_MIGRATIONS=true
STACK_RUN_SEED_SCRIPT=true
STACK_FREESTYLE_API_KEY=
STACK_CLICKHOUSE_URL=http://host.docker.internal:8133
STACK_CLICKHOUSE_ADMIN_PASSWORD=password
STACK_CLICKHOUSE_EXTERNAL_PASSWORD=password

View File

@ -31,6 +31,7 @@
"restart-deps": "pnpm pre && pnpm run stop-deps && pnpm run start-deps",
"restart-deps:no-delay": "pnpm pre && pnpm run stop-deps && pnpm run start-deps:no-delay",
"psql": "pnpm pre && pnpm run --filter=@stackframe/stack-backend psql",
"clickhouse": "pnpm pre && pnpm run --filter=@stackframe/stack-backend clickhouse",
"explain-query": "pnpm pre && echo 'Paste your query (end with Ctrl-D):' && query=$(cat) && echo 'Connecting to Postgres...' && printf \"EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS, FORMAT JSON)\n$query\" | pnpm run --silent psql -qAt | sed -n '/\\[/,$p' > explained-query.untracked.json && echo 'Explained query saved to explained-query.untracked.json. To analyze it, open it in the query analyzer at https://tatiyants.com/pev/#/plans/new'",
"db:migration-gen": "pnpm pre && pnpm run --filter=@stackframe/stack-backend db:migration-gen",
"db:reset": "pnpm pre && pnpm run --filter=@stackframe/stack-backend db:reset",

View File

@ -9,6 +9,7 @@ import { InternalApiKeysCrud } from "./crud/internal-api-keys";
import { ProjectPermissionDefinitionsCrud } from "./crud/project-permissions";
import { ProjectsCrud } from "./crud/projects";
import { SvixTokenCrud } from "./crud/svix-token";
import type { AnalyticsQueryOptions, AnalyticsQueryResponse } from "./crud/analytics";
import { TeamPermissionDefinitionsCrud } from "./crud/team-permissions";
import type { Transaction, TransactionType } from "./crud/transactions";
import { ServerAuthApplicationOptions, StackServerInterface } from "./server-interface";
@ -44,6 +45,30 @@ export type InternalApiKeyCreateCrudResponse = InternalApiKeysCrud["Admin"]["Rea
super_secret_admin_key?: string,
};
export type ClickhouseMigrationRequest = {
min_created_at_millis: number,
max_created_at_millis: number,
cursor?: {
created_at_millis: number,
id: string,
},
limit?: number,
};
export type ClickhouseMigrationResponse = {
total_events: number,
processed_events: number,
remaining_events: number,
migrated_events: number,
skipped_existing_events: number,
inserted_rows: number,
progress: number,
next_cursor: {
created_at_millis: number,
id: string,
} | null,
};
export class StackAdminInterface extends StackServerInterface {
constructor(public readonly options: AdminAuthApplicationOptions) {
super(options);
@ -701,6 +726,21 @@ export class StackAdminInterface extends StackServerInterface {
return await response.json();
}
async migrateEventsToClickhouse(options: ClickhouseMigrationRequest): Promise<ClickhouseMigrationResponse> {
const response = await this.sendAdminRequest(
"/internal/clickhouse/migrate-events",
{
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify(options),
},
null,
);
return await response.json();
}
async previewAffectedUsersByOnboardingChange(
onboarding: { require_email_verification?: boolean },
limit?: number,
@ -727,6 +767,27 @@ export class StackAdminInterface extends StackServerInterface {
return await response.json();
}
async queryAnalytics(options: AnalyticsQueryOptions): Promise<AnalyticsQueryResponse> {
const response = await this.sendAdminRequest(
"/internal/analytics/query",
{
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
query: options.query,
params: options.params ?? {},
timeout_ms: options.timeout_ms ?? 1000,
include_all_branches: options.include_all_branches ?? false,
}),
},
null,
);
const data = await response.json();
return {
result: data.result,
};
}
async listOutboxEmails(options?: { status?: string, simple_status?: string, limit?: number, cursor?: string }): Promise<EmailOutboxCrud["Server"]["List"]> {
const qs = new URLSearchParams();

View File

@ -0,0 +1,10 @@
export type AnalyticsQueryOptions = {
query: string,
params?: Record<string, unknown>,
timeout_ms?: number,
include_all_branches?: boolean,
};
export type AnalyticsQueryResponse = {
result: Record<string, unknown>[],
};

View File

@ -1692,6 +1692,28 @@ const StripeAccountInfoNotFound = createKnownErrorConstructor(
() => [] as const,
);
const AnalyticsQueryTimeout = createKnownErrorConstructor(
KnownError,
"ANALYTICS_QUERY_TIMEOUT",
(timeoutMs: number) => [
400,
`The query timed out. Please try again with a shorter query or increase the timeout. Timeout was ${timeoutMs}ms.`,
{ timeout_ms: timeoutMs },
] as const,
(json) => [json.timeout_ms] as const,
);
const AnalyticsQueryError = createKnownErrorConstructor(
KnownError,
"ANALYTICS_QUERY_ERROR",
(error: string) => [
400,
`The query failed to execute: ${error}`,
{ error },
] as const,
(json) => [json.error] as const,
);
const DefaultPaymentMethodRequired = createKnownErrorConstructor(
KnownError,
"DEFAULT_PAYMENT_METHOD_REQUIRED",
@ -1853,6 +1875,8 @@ export const KnownErrors = {
NewPurchasesBlocked,
DataVaultStoreDoesNotExist,
DataVaultStoreHashedKeyDoesNotExist,
AnalyticsQueryTimeout,
AnalyticsQueryError,
} satisfies Record<string, KnownErrorConstructor<any, any>>;

View File

@ -1,6 +1,7 @@
import { StackAdminInterface } from "@stackframe/stack-shared";
import { getProductionModeErrors } from "@stackframe/stack-shared/dist/helpers/production-mode";
import { InternalApiKeyCreateCrudResponse } from "@stackframe/stack-shared/dist/interface/admin-interface";
import { AnalyticsQueryOptions, AnalyticsQueryResponse } from "@stackframe/stack-shared/dist/interface/crud/analytics";
import { EmailTemplateCrud } from "@stackframe/stack-shared/dist/interface/crud/email-templates";
import { InternalApiKeysCrud } from "@stackframe/stack-shared/dist/interface/crud/internal-api-keys";
import { ProjectsCrud } from "@stackframe/stack-shared/dist/interface/crud/projects";
@ -940,6 +941,10 @@ export class _StackAdminAppImplIncomplete<HasTokenStore extends boolean, Project
}
// END_PLATFORM
async queryAnalytics(options: AnalyticsQueryOptions): Promise<AnalyticsQueryResponse> {
return await this._interface.queryAnalytics(options);
}
async previewAffectedUsersByOnboardingChange(
onboarding: { requireEmailVerification?: boolean },
limit?: number,

View File

@ -1,4 +1,5 @@
import { ChatContent } from "@stackframe/stack-shared/dist/interface/admin-interface";
import { AnalyticsQueryOptions, AnalyticsQueryResponse } from "@stackframe/stack-shared/dist/interface/crud/analytics";
import type { Transaction, TransactionType } from "@stackframe/stack-shared/dist/interface/crud/transactions";
import { InternalSession } from "@stackframe/stack-shared/dist/sessions";
import { Result } from "@stackframe/stack-shared/dist/utils/results";
@ -110,6 +111,7 @@ export type StackAdminApp<HasTokenStore extends boolean = boolean, ProjectId ext
{ customCustomerId: string, itemId: string, quantity: number, expiresAt?: string, description?: string }
)): Promise<void>,
refundTransaction(options: { type: "subscription" | "one-time-purchase", id: string }): Promise<void>,
queryAnalytics(options: AnalyticsQueryOptions): Promise<AnalyticsQueryResponse>,
// Email Outbox methods
listOutboxEmails(options?: EmailOutboxListOptions): Promise<EmailOutboxListResult>,

View File

@ -114,6 +114,9 @@ importers:
'@aws-sdk/client-s3':
specifier: ^3.855.0
version: 3.864.0
'@clickhouse/client':
specifier: ^1.14.0
version: 1.16.0
'@node-oauth/oauth2-server':
specifier: ^5.1.0
version: 5.1.0
@ -2941,6 +2944,13 @@ packages:
'@chevrotain/utils@11.0.3':
resolution: {integrity: sha512-YslZMgtJUyuMbZ+aKvfF3x1f5liK4mWNxghFRv7jqRR9C3R3fAOGTTKvxXDa2Y1s9zSbcpuO0cAxDYsc9SrXoQ==}
'@clickhouse/client-common@1.16.0':
resolution: {integrity: sha512-qMzkI1NmV29ZjFkNpVSvGNfA0c7sCExlufAQMv+V+5xtNeYXnRfdqzmBLIQoq6Pf1ij0kw/wGLD3HQrl7pTFLA==}
'@clickhouse/client@1.16.0':
resolution: {integrity: sha512-ThPhoRMsKsf/hmBEgWlUsGxFecsr3i+k3JI8JV0Od7UpH2BSmk9VKMGJoyPCrTL0vPUs5rJH+7o4iCqBF09Xvg==}
engines: {node: '>=16'}
'@date-fns/tz@1.2.0':
resolution: {integrity: sha512-LBrd7MiJZ9McsOgxqWX7AaxrDjcFVjWH/tIKJd7pnR7McaslGYOP1QmmiBXdJH/H/yLCT+rcQ7FaPBUxRGUtrg==}
@ -17764,6 +17774,12 @@ snapshots:
'@chevrotain/utils@11.0.3': {}
'@clickhouse/client-common@1.16.0': {}
'@clickhouse/client@1.16.0':
dependencies:
'@clickhouse/client-common': 1.16.0
'@date-fns/tz@1.2.0': {}
'@dnd-kit/accessibility@3.1.1(react@19.2.3)':