mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-16 21:08:38 +08:00
feat: session recording database schema, S3 utilities, and batch upload endpoint
- Add SessionRecording and SessionRecordingChunk Prisma models - Add migration for session_recordings_mvp - Add seed data for session recordings - Add S3 uploadBytes, downloadBytes, and getS3PublicUrl utilities - Add POST /session-recordings/batch endpoint for client uploads
This commit is contained in:
parent
d914d7f3ec
commit
2dcb935f0a
@ -0,0 +1,56 @@
|
||||
-- Session recording MVP: store session metadata in Postgres and rrweb events in S3.
|
||||
|
||||
CREATE TABLE "SessionRecording" (
|
||||
"id" UUID NOT NULL,
|
||||
"tenancyId" UUID NOT NULL,
|
||||
"projectUserId" UUID NOT NULL,
|
||||
"refreshTokenId" UUID NOT NULL,
|
||||
"startedAt" TIMESTAMP(3) NOT NULL,
|
||||
"lastEventAt" TIMESTAMP(3) NOT NULL,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||
CONSTRAINT "SessionRecording_pkey" PRIMARY KEY ("tenancyId","id")
|
||||
);
|
||||
|
||||
CREATE TABLE "SessionRecordingChunk" (
|
||||
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
|
||||
"tenancyId" UUID NOT NULL,
|
||||
"sessionRecordingId" UUID NOT NULL,
|
||||
"batchId" UUID NOT NULL,
|
||||
"tabId" TEXT NOT NULL,
|
||||
"s3Key" TEXT NOT NULL,
|
||||
"eventCount" INTEGER NOT NULL,
|
||||
"byteLength" INTEGER NOT NULL,
|
||||
"firstEventAt" TIMESTAMP(3) NOT NULL,
|
||||
"lastEventAt" TIMESTAMP(3) NOT NULL,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT "SessionRecordingChunk_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
ALTER TABLE "SessionRecording"
|
||||
ADD CONSTRAINT "SessionRecording_tenancyId_fkey"
|
||||
FOREIGN KEY ("tenancyId") REFERENCES "Tenancy"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
ALTER TABLE "SessionRecording"
|
||||
ADD CONSTRAINT "SessionRecording_tenancyId_projectUserId_fkey"
|
||||
FOREIGN KEY ("tenancyId", "projectUserId") REFERENCES "ProjectUser"("tenancyId", "projectUserId") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
ALTER TABLE "SessionRecordingChunk"
|
||||
ADD CONSTRAINT "SessionRecordingChunk_tenancyId_fkey"
|
||||
FOREIGN KEY ("tenancyId") REFERENCES "Tenancy"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
ALTER TABLE "SessionRecordingChunk"
|
||||
ADD CONSTRAINT "SessionRecordingChunk_sessionRecordingId_fkey"
|
||||
FOREIGN KEY ("tenancyId","sessionRecordingId") REFERENCES "SessionRecording"("tenancyId","id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
CREATE INDEX "SessionRecording_tenancyId_projectUserId_startedAt_idx"
|
||||
ON "SessionRecording"("tenancyId", "projectUserId", "startedAt");
|
||||
|
||||
CREATE INDEX "SessionRecording_tenancyId_lastEventAt_idx"
|
||||
ON "SessionRecording"("tenancyId", "lastEventAt");
|
||||
|
||||
CREATE UNIQUE INDEX "SessionRecordingChunk_sessionRecordingId_batchId_key"
|
||||
ON "SessionRecordingChunk"("tenancyId", "sessionRecordingId", "batchId");
|
||||
|
||||
CREATE INDEX "SessionRecordingChunk_tenancyId_sessionRecordingId_createdAt_idx"
|
||||
ON "SessionRecordingChunk"("tenancyId", "sessionRecordingId", "createdAt");
|
||||
@ -60,6 +60,8 @@ model Tenancy {
|
||||
organizationId String? @db.Uuid
|
||||
hasNoOrganization BooleanTrue?
|
||||
emailOutboxes EmailOutbox[]
|
||||
sessionRecordings SessionRecording[]
|
||||
sessionRecordingChunks SessionRecordingChunk[]
|
||||
|
||||
@@unique([projectId, branchId, organizationId])
|
||||
@@unique([projectId, branchId, hasNoOrganization])
|
||||
@ -234,6 +236,7 @@ model ProjectUser {
|
||||
Project Project? @relation(fields: [projectId], references: [id])
|
||||
projectId String?
|
||||
userNotificationPreference UserNotificationPreference[]
|
||||
sessionRecordings SessionRecording[]
|
||||
|
||||
@@id([tenancyId, projectUserId])
|
||||
@@unique([mirroredProjectId, mirroredBranchId, projectUserId])
|
||||
@ -277,6 +280,58 @@ model ProjectUserOAuthAccount {
|
||||
@@index([tenancyId, projectUserId])
|
||||
}
|
||||
|
||||
model SessionRecording {
|
||||
// Cross-tab session id generated by the SDK and stored in localStorage.
|
||||
id String @db.Uuid
|
||||
|
||||
tenancyId String @db.Uuid
|
||||
projectUserId String @db.Uuid
|
||||
refreshTokenId String @db.Uuid
|
||||
|
||||
startedAt DateTime
|
||||
lastEventAt DateTime
|
||||
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
projectUser ProjectUser @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade)
|
||||
tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade)
|
||||
|
||||
chunks SessionRecordingChunk[]
|
||||
|
||||
@@id([tenancyId, id])
|
||||
@@index([tenancyId, projectUserId, startedAt])
|
||||
@@index([tenancyId, lastEventAt])
|
||||
}
|
||||
|
||||
model SessionRecordingChunk {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
|
||||
tenancyId String @db.Uuid
|
||||
sessionRecordingId String @db.Uuid
|
||||
|
||||
// Unique per uploaded batch for a given session id.
|
||||
batchId String @db.Uuid
|
||||
|
||||
// Ephemeral in-memory id generated by the client. Stored for future tab separation if needed.
|
||||
tabId String
|
||||
|
||||
s3Key String
|
||||
eventCount Int
|
||||
byteLength Int
|
||||
|
||||
firstEventAt DateTime
|
||||
lastEventAt DateTime
|
||||
|
||||
createdAt DateTime @default(now())
|
||||
|
||||
sessionRecording SessionRecording @relation(fields: [tenancyId, sessionRecordingId], references: [tenancyId, id], onDelete: Cascade)
|
||||
tenancy Tenancy @relation(fields: [tenancyId], references: [id], onDelete: Cascade)
|
||||
|
||||
@@unique([tenancyId, sessionRecordingId, batchId])
|
||||
@@index([tenancyId, sessionRecordingId, createdAt])
|
||||
}
|
||||
|
||||
enum ContactChannelType {
|
||||
EMAIL
|
||||
// PHONE
|
||||
|
||||
@ -1118,6 +1118,13 @@ async function seedDummyProject(options: DummyProjectSeedOptions) {
|
||||
userEmailToId,
|
||||
});
|
||||
|
||||
await seedDummySessionRecordings({
|
||||
prisma: dummyPrisma,
|
||||
tenancyId: dummyTenancy.id,
|
||||
userEmailToId,
|
||||
targetSessionRecordingCount: 75
|
||||
});
|
||||
|
||||
console.log('Seeded dummy project data');
|
||||
}
|
||||
|
||||
@ -1765,3 +1772,65 @@ async function seedDummySessionActivityEvents(options: SessionActivityEventSeedO
|
||||
|
||||
console.log('Finished seeding session activity events');
|
||||
}
|
||||
|
||||
type SessionRecordingSeedOptions = {
|
||||
prisma: PrismaClientTransaction,
|
||||
tenancyId: string,
|
||||
userEmailToId: Map<string, string>,
|
||||
targetSessionRecordingCount?: number,
|
||||
};
|
||||
|
||||
async function seedDummySessionRecordings(options: SessionRecordingSeedOptions) {
|
||||
const {
|
||||
prisma,
|
||||
tenancyId,
|
||||
userEmailToId,
|
||||
targetSessionRecordingCount = 250,
|
||||
} = options;
|
||||
|
||||
const existingCount = await prisma.sessionRecording.count({
|
||||
where: {
|
||||
tenancyId,
|
||||
},
|
||||
});
|
||||
|
||||
if (existingCount >= targetSessionRecordingCount) {
|
||||
console.log(`Dummy project already has ${existingCount} session recordings, skipping seeding`);
|
||||
return;
|
||||
}
|
||||
|
||||
const toCreate = targetSessionRecordingCount - existingCount;
|
||||
const userIds = Array.from(userEmailToId.values());
|
||||
if (userIds.length === 0) {
|
||||
throw new Error('Cannot seed session recordings: no dummy project users exist');
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const twoWeeksAgo = new Date(now);
|
||||
twoWeeksAgo.setDate(twoWeeksAgo.getDate() - 14);
|
||||
|
||||
const seeds: Prisma.SessionRecordingCreateManyInput[] = [];
|
||||
for (let i = 0; i < toCreate; i++) {
|
||||
const startedAt = new Date(
|
||||
twoWeeksAgo.getTime() + Math.random() * (now.getTime() - twoWeeksAgo.getTime()),
|
||||
);
|
||||
const durationMs = 10_000 + Math.floor(Math.random() * (20 * 60 * 1000)); // 10s..20m
|
||||
const lastEventAt = new Date(startedAt.getTime() + durationMs);
|
||||
const projectUserId = userIds[Math.floor(Math.random() * userIds.length)]!;
|
||||
|
||||
seeds.push({
|
||||
tenancyId,
|
||||
refreshTokenId: generateUuid(),
|
||||
projectUserId,
|
||||
id: generateUuid(),
|
||||
startedAt,
|
||||
lastEventAt,
|
||||
});
|
||||
}
|
||||
|
||||
await prisma.sessionRecording.createMany({
|
||||
data: seeds,
|
||||
});
|
||||
|
||||
console.log(`Seeded ${toCreate} session recordings`);
|
||||
}
|
||||
|
||||
@ -0,0 +1,207 @@
|
||||
import { getPrismaClientForTenancy } from "@/prisma-client";
|
||||
import { uploadBytes } from "@/s3";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
import { Prisma } from "@/generated/prisma/client";
|
||||
import { KnownErrors } from "@stackframe/stack-shared";
|
||||
import { adaptSchema, clientOrHigherAuthTypeSchema, yupArray, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
|
||||
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { promisify } from "node:util";
|
||||
import { gzip as gzipCb } from "node:zlib";
|
||||
|
||||
const gzip = promisify(gzipCb);
|
||||
|
||||
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
|
||||
|
||||
const MAX_BODY_BYTES = 5_000_000;
|
||||
const MAX_EVENTS = 5_000;
|
||||
|
||||
function extractEventTimesMs(events: unknown[], fallbackMs: number) {
|
||||
let minTs = Infinity;
|
||||
let maxTs = -Infinity;
|
||||
|
||||
for (const e of events) {
|
||||
if (typeof e !== "object" || e === null) continue;
|
||||
if (!("timestamp" in e)) continue;
|
||||
const ts = (e as any).timestamp;
|
||||
if (typeof ts !== "number" || !Number.isFinite(ts)) continue;
|
||||
minTs = Math.min(minTs, ts);
|
||||
maxTs = Math.max(maxTs, ts);
|
||||
}
|
||||
|
||||
if (!Number.isFinite(minTs) || !Number.isFinite(maxTs) || minTs > maxTs) {
|
||||
return { firstMs: fallbackMs, lastMs: fallbackMs };
|
||||
}
|
||||
return { firstMs: minTs, lastMs: maxTs };
|
||||
}
|
||||
|
||||
export const POST = createSmartRouteHandler({
|
||||
metadata: {
|
||||
summary: "Upload rrweb session recording batch",
|
||||
description: "Uploads a batch of rrweb events for a cross-tab session recording.",
|
||||
tags: ["Session Recordings"],
|
||||
},
|
||||
request: yupObject({
|
||||
auth: yupObject({
|
||||
type: clientOrHigherAuthTypeSchema,
|
||||
tenancy: adaptSchema,
|
||||
user: adaptSchema,
|
||||
refreshTokenId: adaptSchema
|
||||
}).defined(),
|
||||
body: yupObject({
|
||||
session_id: yupString().defined().matches(UUID_RE, "Invalid session_id"),
|
||||
tab_id: yupString().defined().matches(UUID_RE, "Invalid tab_id"),
|
||||
batch_id: yupString().defined().matches(UUID_RE, "Invalid batch_id"),
|
||||
started_at_ms: yupNumber().defined().integer().min(0),
|
||||
sent_at_ms: yupNumber().defined().integer().min(0),
|
||||
events: yupArray(yupMixed().defined()).defined(),
|
||||
}).defined(),
|
||||
}),
|
||||
response: yupObject({
|
||||
statusCode: yupNumber().oneOf([200]).defined(),
|
||||
bodyType: yupString().oneOf(["json"]).defined(),
|
||||
body: yupObject({
|
||||
session_id: yupString().defined(),
|
||||
batch_id: yupString().defined(),
|
||||
s3_key: yupString().defined(),
|
||||
deduped: yupMixed().defined(),
|
||||
}).defined(),
|
||||
}),
|
||||
async handler({ auth, body }, fullReq) {
|
||||
if (!auth.user) {
|
||||
throw new KnownErrors.UserAuthenticationRequired();
|
||||
}
|
||||
if (!auth.refreshTokenId) {
|
||||
throw new StatusError(StatusError.BadRequest, "A refresh token is required for session recordings");
|
||||
}
|
||||
const projectUserId = auth.user.id;
|
||||
const refreshTokenId = auth.refreshTokenId;
|
||||
|
||||
if (fullReq.bodyBuffer.byteLength > MAX_BODY_BYTES) {
|
||||
throw new StatusError(StatusError.PayloadTooLarge, `Request body too large (max ${MAX_BODY_BYTES} bytes)`);
|
||||
}
|
||||
|
||||
if (body.events.length === 0) {
|
||||
throw new StatusError(StatusError.BadRequest, "events must not be empty");
|
||||
}
|
||||
if (body.events.length > MAX_EVENTS) {
|
||||
throw new StatusError(StatusError.BadRequest, `Too many events (max ${MAX_EVENTS})`);
|
||||
}
|
||||
|
||||
const sessionId = body.session_id;
|
||||
const batchId = body.batch_id;
|
||||
const tabId = body.tab_id;
|
||||
const tenancyId = auth.tenancy.id;
|
||||
|
||||
const projectId = auth.tenancy.project.id;
|
||||
const branchId = auth.tenancy.branchId;
|
||||
const s3Key = `session-recordings/${projectId}/${branchId}/${sessionId}/${batchId}.json.gz`;
|
||||
|
||||
const { firstMs, lastMs } = extractEventTimesMs(body.events, body.sent_at_ms);
|
||||
|
||||
const prisma = await getPrismaClientForTenancy(auth.tenancy);
|
||||
|
||||
// Ensure the session row exists and is up-to-date.
|
||||
const existingSession = await prisma.sessionRecording.findUnique({
|
||||
where: { tenancyId_id: { tenancyId, id: sessionId } },
|
||||
select: { startedAt: true, lastEventAt: true },
|
||||
});
|
||||
const newStartedAtMs = Math.min(existingSession?.startedAt.getTime() ?? Number.POSITIVE_INFINITY, firstMs);
|
||||
const newLastEventAtMs = Math.max(existingSession?.lastEventAt.getTime() ?? 0, lastMs);
|
||||
await prisma.sessionRecording.upsert({
|
||||
where: { tenancyId_id: { tenancyId, id: sessionId } },
|
||||
create: {
|
||||
id: sessionId,
|
||||
tenancyId,
|
||||
projectUserId: projectUserId,
|
||||
refreshTokenId,
|
||||
// Use the first event timestamp instead of "session started" timestamps,
|
||||
// since session_id can be reused across tabs/idle windows.
|
||||
startedAt: new Date(firstMs),
|
||||
lastEventAt: new Date(newLastEventAtMs),
|
||||
},
|
||||
update: {
|
||||
refreshTokenId,
|
||||
startedAt: new Date(newStartedAtMs),
|
||||
lastEventAt: new Date(newLastEventAtMs),
|
||||
},
|
||||
});
|
||||
|
||||
// If we already have this batch for this session, return deduped without touching S3.
|
||||
const existingChunk = await prisma.sessionRecordingChunk.findUnique({
|
||||
where: { tenancyId_sessionRecordingId_batchId: { tenancyId, sessionRecordingId: sessionId, batchId } },
|
||||
select: { s3Key: true },
|
||||
});
|
||||
if (existingChunk) {
|
||||
return {
|
||||
statusCode: 200,
|
||||
bodyType: "json",
|
||||
body: {
|
||||
session_id: sessionId,
|
||||
batch_id: batchId,
|
||||
s3_key: existingChunk.s3Key,
|
||||
deduped: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const payload = {
|
||||
v: 1,
|
||||
session_id: sessionId,
|
||||
tab_id: tabId,
|
||||
batch_id: batchId,
|
||||
started_at_ms: body.started_at_ms,
|
||||
sent_at_ms: body.sent_at_ms,
|
||||
events: body.events,
|
||||
};
|
||||
const payloadBytes = new TextEncoder().encode(JSON.stringify(payload));
|
||||
const gzipped = new Uint8Array(await gzip(payloadBytes));
|
||||
|
||||
await uploadBytes({
|
||||
key: s3Key,
|
||||
body: gzipped,
|
||||
contentType: "application/json",
|
||||
contentEncoding: "gzip",
|
||||
});
|
||||
|
||||
try {
|
||||
await prisma.sessionRecordingChunk.create({
|
||||
data: {
|
||||
tenancyId,
|
||||
sessionRecordingId: sessionId,
|
||||
batchId,
|
||||
tabId,
|
||||
s3Key,
|
||||
eventCount: body.events.length,
|
||||
byteLength: gzipped.byteLength,
|
||||
firstEventAt: new Date(firstMs),
|
||||
lastEventAt: new Date(lastMs),
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
if (e instanceof Prisma.PrismaClientKnownRequestError && e.code === "P2002") {
|
||||
return {
|
||||
statusCode: 200,
|
||||
bodyType: "json",
|
||||
body: {
|
||||
session_id: sessionId,
|
||||
batch_id: batchId,
|
||||
s3_key: s3Key,
|
||||
deduped: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
return {
|
||||
statusCode: 200,
|
||||
bodyType: "json",
|
||||
body: {
|
||||
session_id: sessionId,
|
||||
batch_id: batchId,
|
||||
s3_key: s3Key,
|
||||
deduped: false,
|
||||
},
|
||||
};
|
||||
},
|
||||
});
|
||||
@ -1,4 +1,4 @@
|
||||
import { PutObjectCommand, S3Client } from "@aws-sdk/client-s3";
|
||||
import { GetObjectCommand, PutObjectCommand, S3Client } from "@aws-sdk/client-s3";
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { ImageProcessingError, parseBase64Image } from "./lib/images";
|
||||
@ -34,6 +34,77 @@ export function getS3PublicUrl(key: string): string {
|
||||
}
|
||||
}
|
||||
|
||||
export async function uploadBytes(options: {
|
||||
key: string,
|
||||
body: Uint8Array,
|
||||
contentType?: string,
|
||||
contentEncoding?: string,
|
||||
}) {
|
||||
if (!s3Client) {
|
||||
throw new StackAssertionError("S3 is not configured");
|
||||
}
|
||||
|
||||
const command = new PutObjectCommand({
|
||||
Bucket: S3_BUCKET,
|
||||
Key: options.key,
|
||||
Body: options.body,
|
||||
...(options.contentType ? { ContentType: options.contentType } : {}),
|
||||
...(options.contentEncoding ? { ContentEncoding: options.contentEncoding } : {}),
|
||||
});
|
||||
|
||||
await s3Client.send(command);
|
||||
|
||||
return {
|
||||
key: options.key,
|
||||
url: getS3PublicUrl(options.key),
|
||||
};
|
||||
}
|
||||
|
||||
async function readBodyToBytes(body: unknown): Promise<Uint8Array> {
|
||||
if (body instanceof Uint8Array) return body;
|
||||
if (Buffer.isBuffer(body)) return new Uint8Array(body);
|
||||
|
||||
// Web ReadableStream (some runtimes)
|
||||
if (typeof body === "object" && body !== null && "transformToByteArray" in body && typeof (body as any).transformToByteArray === "function") {
|
||||
return (body as any).transformToByteArray();
|
||||
}
|
||||
|
||||
// Node.js Readable or any AsyncIterable<Uint8Array>
|
||||
if (typeof body === "object" && body !== null && Symbol.asyncIterator in (body as any)) {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of body as any) {
|
||||
if (chunk instanceof Uint8Array) {
|
||||
chunks.push(Buffer.from(chunk));
|
||||
} else if (Buffer.isBuffer(chunk)) {
|
||||
chunks.push(chunk);
|
||||
} else {
|
||||
throw new StackAssertionError("Unexpected S3 body chunk type");
|
||||
}
|
||||
}
|
||||
return new Uint8Array(Buffer.concat(chunks));
|
||||
}
|
||||
|
||||
throw new StackAssertionError("Unexpected S3 body type");
|
||||
}
|
||||
|
||||
export async function downloadBytes(options: { key: string }): Promise<Uint8Array> {
|
||||
if (!s3Client) {
|
||||
throw new StackAssertionError("S3 is not configured");
|
||||
}
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: S3_BUCKET,
|
||||
Key: options.key,
|
||||
});
|
||||
|
||||
const res = await s3Client.send(command);
|
||||
if (!res.Body) {
|
||||
throw new StackAssertionError("S3 getObject returned empty body");
|
||||
}
|
||||
|
||||
return await readBodyToBytes(res.Body);
|
||||
}
|
||||
|
||||
async function uploadBase64Image({
|
||||
input,
|
||||
maxBytes = 1_000_000, // 1MB
|
||||
|
||||
Loading…
Reference in New Issue
Block a user