TokenRefreshEventType

This commit is contained in:
Konstantin Wohlwend 2026-01-28 11:18:15 -08:00
parent 68cc0258d2
commit 8fd5b13a3b
6 changed files with 114 additions and 63 deletions

View File

@ -2,10 +2,10 @@
-- SINGLE_STATEMENT_SENTINEL
-- RUN_OUTSIDE_TRANSACTION_SENTINEL
-- Add index on createdAt for efficient range queries (used by ClickHouse migration and similar count queries).
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_event_created_at ON "Event" ("createdAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_createdAt_idx" ON /* SCHEMA_NAME_SENTINEL */."Event" ("createdAt");
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- RUN_OUTSIDE_TRANSACTION_SENTINEL
-- Add composite index (createdAt, id) for cursor-based pagination queries.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_event_created_at_id ON "Event" ("createdAt", "id");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_createdAt_id_idx" ON /* SCHEMA_NAME_SENTINEL */."Event" ("createdAt", "id");

View File

@ -35,12 +35,8 @@ CREATE TABLE IF NOT EXISTS analytics_internal.events (
data JSON,
project_id String,
branch_id String,
user_id String,
team_id String,
refresh_token_id String,
is_anonymous Boolean,
session_id String,
ip_address String,
user_id Nullable(String),
team_id Nullable(String),
created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE MergeTree

View File

@ -22,7 +22,7 @@ const parseMillisOrThrow = (value: number | undefined, field: string) => {
return parsed;
};
const createClickhouseRows = (event: {
const createClickhouseRow = (event: {
id: string,
systemEventTypeIds: string[],
data: any,
@ -39,24 +39,18 @@ const createClickhouseRows = (event: {
};
const projectId = typeof dataRecord.projectId === "string" ? dataRecord.projectId : "";
const branchId = DEFAULT_BRANCH_ID;
const userId = typeof dataRecord.userId === "string" ? dataRecord.userId : "";
const teamId = typeof dataRecord.teamId === "string" ? dataRecord.teamId : "";
const sessionId = typeof dataRecord.sessionId === "string" ? dataRecord.sessionId : "";
const isAnonymous = typeof dataRecord.isAnonymous === "boolean" ? dataRecord.isAnonymous : false;
const userId = typeof dataRecord.userId === "string" && dataRecord.userId ? dataRecord.userId : null;
const eventTypes = [...new Set(event.systemEventTypeIds)];
return eventTypes.map(eventType => ({
event_type: eventType,
// Translate $session-activity to $token-refresh
return {
event_type: '$token-refresh',
event_at: event.eventEndedAt,
data: clickhouseEventData,
project_id: projectId,
branch_id: branchId,
user_id: userId,
team_id: teamId,
session_id: sessionId,
is_anonymous: isAnonymous,
}));
team_id: null,
};
};
export const POST = createSmartRouteHandler({
@ -111,6 +105,10 @@ export const POST = createSmartRouteHandler({
gte: minCreatedAt,
lt: maxCreatedAt,
},
// Only migrate $session-activity events (translated to $token-refresh in ClickHouse)
systemEventTypeIds: {
has: '$session-activity',
},
};
const cursorFilter: Prisma.EventWhereInput | undefined = (cursorCreatedAt && cursorId) ? {
@ -141,22 +139,19 @@ export const POST = createSmartRouteHandler({
throw new StatusError(StatusError.ServiceUnavailable, "ClickHouse is not configured");
}
const clickhouseClient = getClickhouseAdminClient();
const rowsByEvent = events.map(createClickhouseRows);
const rowsToInsert = rowsByEvent.flat();
migratedEvents = rowsByEvent.reduce((acc, rows) => acc + (rows.length ? 1 : 0), 0);
const rowsToInsert = events.map(createClickhouseRow);
migratedEvents = events.length;
if (rowsToInsert.length) {
await clickhouseClient.insert({
table: "analytics_internal.events",
values: rowsToInsert,
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",
async_insert: 1,
},
});
insertedRows = rowsToInsert.length;
}
await clickhouseClient.insert({
table: "analytics_internal.events",
values: rowsToInsert,
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",
async_insert: 1,
},
});
insertedRows = rowsToInsert.length;
}
const lastEvent = events.at(-1);

View File

@ -1,7 +1,7 @@
import withPostHog from "@/analytics";
import { globalPrismaClient } from "@/prisma-client";
import { runAsynchronouslyAndWaitUntil } from "@/utils/vercel";
import { urlSchema, yupBoolean, yupMixed, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { urlSchema, yupBoolean, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import { StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
import { HTTP_METHODS } from "@stackframe/stack-shared/dist/utils/http";
@ -13,6 +13,42 @@ import { getClickhouseAdminClient, isClickhouseConfigured } from "./clickhouse";
import { getEndUserInfo } from "./end-users";
import { DEFAULT_BRANCH_ID } from "./tenancies";
export const endUserIpInfoSchema = yupObject({
ip: yupString().defined(),
isTrusted: yupBoolean().defined(),
countryCode: yupString().optional(),
regionCode: yupString().optional(),
cityName: yupString().optional(),
latitude: yupNumber().optional(),
longitude: yupNumber().optional(),
tzIdentifier: yupString().optional(),
});
export type EndUserIpInfo = yup.InferType<typeof endUserIpInfoSchema>;
/**
* Extracts the end user IP info from the current request.
* Must be called before any async operations as it uses dynamic APIs.
*/
export async function getEndUserIpInfoForEvent(): Promise<EndUserIpInfo | null> {
const endUserInfo = await getEndUserInfo();
if (!endUserInfo) {
return null;
}
const info = endUserInfo.maybeSpoofed ? endUserInfo.spoofedInfo : endUserInfo.exactInfo;
return {
ip: info.ip,
isTrusted: !endUserInfo.maybeSpoofed,
countryCode: info.countryCode,
regionCode: info.regionCode,
cityName: info.cityName,
latitude: info.latitude,
longitude: info.longitude,
tzIdentifier: info.tzIdentifier,
};
}
type EventType = {
id: string,
dataSchema: yup.Schema<any>,
@ -65,6 +101,20 @@ const SessionActivityEventType = {
inherits: [UserActivityEventType],
} as const satisfies SystemEventTypeBase;
const TokenRefreshEventType = {
id: "$token-refresh",
dataSchema: yupObject({
projectId: yupString().defined(),
branchId: yupString().defined(),
organizationId: yupString().nullable().test("must-be-null", "Organization ID has not been implemented yet and must be null", (value) => value === null).defined(),
userId: yupString().uuid().defined(),
refreshTokenId: yupString().defined(),
isAnonymous: yupBoolean().defined(),
ipInfo: endUserIpInfoSchema.nullable().defined(),
}),
inherits: [],
} as const satisfies SystemEventTypeBase;
const ApiRequestEventType = {
id: "$api-request",
@ -84,6 +134,7 @@ export const SystemEventTypes = stripEventTypeSuffixFromKeys({
ProjectActivityEventType,
UserActivityEventType,
SessionActivityEventType,
TokenRefreshEventType,
ApiRequestEventType,
LegacyApiEventType,
} as const);
@ -165,9 +216,6 @@ export async function logEvent<T extends EventType[]>(
const projectId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.projectId === "string" ? dataRecord.projectId : "";
const branchId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.branchId === "string" ? dataRecord.branchId : DEFAULT_BRANCH_ID;
const userId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.userId === "string" ? dataRecord.userId : "";
const teamId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.teamId === "string" ? dataRecord.teamId : "";
const sessionId = typeof dataRecord === "object" && dataRecord && typeof dataRecord.sessionId === "string" ? dataRecord.sessionId : "";
const isAnonymous = typeof dataRecord === "object" && dataRecord && typeof dataRecord.isAnonymous === "boolean" ? dataRecord.isAnonymous : false;
// rest is no more dynamic APIs so we can run it asynchronously
@ -195,21 +243,20 @@ export async function logEvent<T extends EventType[]>(
},
});
if (isClickhouseConfigured()) {
// Only log TokenRefresh events to ClickHouse
if (isClickhouseConfigured() && eventTypesArray.some(e => e.id === '$token-refresh')) {
const clickhouseClient = getClickhouseAdminClient();
await clickhouseClient.insert({
table: "analytics_internal.events",
values: eventTypesArray.map(eventType => ({
event_type: eventType.id,
values: [{
event_type: '$token-refresh',
event_at: timeRange.end,
data: clickhouseEventData,
project_id: projectId,
branch_id: branchId,
user_id: userId,
team_id: teamId,
is_anonymous: isAnonymous,
session_id: sessionId,
})),
user_id: userId || null,
team_id: null, // Token refresh events don't have team context
}],
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",

View File

@ -11,8 +11,7 @@ import { Result } from '@stackframe/stack-shared/dist/utils/results';
import { traceSpan } from '@stackframe/stack-shared/dist/utils/telemetry';
import * as jose from 'jose';
import { JOSEError, JWTExpired } from 'jose/errors';
import { getEndUserInfo } from './end-users';
import { SystemEventTypes, logEvent } from './events';
import { SystemEventTypes, getEndUserIpInfoForEvent, logEvent } from './events';
import { Tenancy } from './tenancies';
export const authorizationHeaderSchema = yupString().matches(/^StackSession [^ ]+$/);
@ -213,9 +212,8 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: {
const now = new Date();
const prisma = await getPrismaClientForTenancy(options.tenancy);
// Get end user IP info for session tracking
const endUserInfo = await getEndUserInfo();
const ipInfo = endUserInfo ? (endUserInfo.maybeSpoofed ? endUserInfo.spoofedInfo : endUserInfo.exactInfo) : undefined;
// Get end user IP info for session tracking and event logging
const ipInfo = await getEndUserIpInfoForEvent();
await Promise.all([
prisma.projectUser.update({
@ -238,7 +236,7 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: {
},
data: {
lastActiveAt: now,
lastActiveAtIpInfo: ipInfo,
lastActiveAtIpInfo: ipInfo ?? undefined,
},
}),
]);
@ -256,6 +254,20 @@ export async function generateAccessTokenFromRefreshTokenIfValid(options: {
}
);
// Log token refresh event for ClickHouse analytics
await logEvent(
[SystemEventTypes.TokenRefresh],
{
projectId: options.tenancy.project.id,
branchId: options.tenancy.branchId,
userId: options.refreshTokenObj.projectUserId,
refreshTokenId: options.refreshTokenObj.id,
organizationId: null,
isAnonymous: user.is_anonymous,
ipInfo,
}
);
const payload: Omit<AccessTokenPayload, "iss" | "aud" | "iat"> = {
sub: options.refreshTokenObj.projectUserId,
project_id: options.tenancy.project.id,

View File

@ -13,7 +13,7 @@ const queryEvents = async (params: {
SELECT event_type, project_id, branch_id, user_id, team_id
FROM events
WHERE 1
${params.userId ? "AND user_id = {user_id:String}" : ""}
${params.userId ? "AND user_id = {user_id:Nullable(String)}" : ""}
${params.eventType ? "AND event_type = {event_type:String}" : ""}
ORDER BY event_at DESC
LIMIT 10
@ -55,17 +55,18 @@ it("stores backend events in ClickHouse", async ({ expect }) => {
const queryResponse = await fetchEventsWithRetry({
userId,
eventType: "$session-activity",
eventType: "$token-refresh",
});
expect(queryResponse.status).toBe(200);
const results = Array.isArray(queryResponse.body?.result) ? queryResponse.body.result : [];
expect(results.length).toBeGreaterThan(0);
expect(results[0]).toMatchObject({
event_type: "$session-activity",
event_type: "$token-refresh",
project_id: projectId,
branch_id: "main",
user_id: userId,
team_id: null,
});
});
@ -79,7 +80,7 @@ it("cannot read events from other projects", async ({ expect }) => {
const { userId: projectBUserId } = await Auth.Otp.signIn();
const projectBResponse = await fetchEventsWithRetry({
userId: projectBUserId,
eventType: "$session-activity",
eventType: "$token-refresh",
});
expect(projectBResponse).toMatchInlineSnapshot(`
NiceResponse {
@ -88,9 +89,9 @@ it("cannot read events from other projects", async ({ expect }) => {
"result": [
{
"branch_id": "main",
"event_type": "$session-activity",
"event_type": "$token-refresh",
"project_id": "<stripped UUID>",
"team_id": "",
"team_id": null,
"user_id": "<stripped UUID>",
},
],
@ -109,7 +110,7 @@ it("cannot read events from other projects", async ({ expect }) => {
const queryResponse = await queryEvents({
userId: projectBUserId,
eventType: "$session-activity",
eventType: "$token-refresh",
});
expect(queryResponse).toMatchInlineSnapshot(`
NiceResponse {
@ -134,7 +135,7 @@ it("filters analytics events by user within a project", async ({ expect }) => {
const userAResponse = await fetchEventsWithRetry({
userId: userA,
eventType: "$session-activity",
eventType: "$token-refresh",
});
expect(userAResponse.status).toBe(200);
const userAResults = Array.isArray(userAResponse.body?.result) ? userAResponse.body.result : [];
@ -143,7 +144,7 @@ it("filters analytics events by user within a project", async ({ expect }) => {
const userBResponse = await fetchEventsWithRetry({
userId: userB,
eventType: "$session-activity",
eventType: "$token-refresh",
});
expect(userBResponse.status).toBe(200);
const userBResults = Array.isArray(userBResponse.body?.result) ? userBResponse.body.result : [];