fix: chunk SessionRecorder flush to stay under server 1MB body limit (#1647)

This commit is contained in:
Konsti Wohlwend 2026-06-23 12:04:46 -07:00 committed by GitHub
parent cb7ea302c7
commit a2889fc044
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 185 additions and 32 deletions

View File

@ -70,8 +70,11 @@ describe("SessionRecorder flush", () => {
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
try {
const event1 = { type: 2, timestamp: Date.now(), data: {} };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [{ type: 2, timestamp: Date.now(), data: {} }];
(recorder as any)._events = [event1];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(event1).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
@ -82,8 +85,11 @@ describe("SessionRecorder flush", () => {
// Unlike ANALYTICS_NOT_ENABLED, ad blocker errors do NOT disable the
// recorder — subsequent flushes continue attempting delivery.
const event2 = { type: 3, timestamp: Date.now(), data: {} };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [{ type: 3, timestamp: Date.now(), data: {} }];
(recorder as any)._events = [event2];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(event2).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
await vi.advanceTimersByTimeAsync(0);
@ -97,6 +103,113 @@ describe("SessionRecorder flush", () => {
}
});
it("splits large batches into multiple requests to stay under server 1MB limit", async () => {
vi.useFakeTimers();
const storageKey = `hexclave:session-replay:v1:test-project`;
localStorage.setItem(storageKey, JSON.stringify({
session_id: "test-session",
created_at_ms: Date.now(),
last_activity_ms: Date.now(),
}));
const sentBodies: string[] = [];
const recorder = new SessionRecorder(
{
projectId: "test-project",
sendBatch: async (body) => {
sentBodies.push(body);
return Result.ok(new Response("ok", { status: 200 }));
},
},
{},
);
try {
// Create events that together exceed 900KB (the per-batch cap).
// Each event is ~500KB, so two events (~1MB) must be split into two batches.
const largeData = "x".repeat(500_000);
const event1 = { type: 2, timestamp: Date.now(), data: largeData };
const event2 = { type: 3, timestamp: Date.now(), data: largeData };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [event1, event2];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(event1).length, JSON.stringify(event2).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._approxBytes = JSON.stringify(event1).length + JSON.stringify(event2).length;
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
await vi.advanceTimersByTimeAsync(0);
// Should have sent two separate batches
expect(sentBodies).toHaveLength(2);
// Each batch should contain exactly one event
const batch1 = JSON.parse(sentBodies[0]);
const batch2 = JSON.parse(sentBodies[1]);
expect(batch1.events).toHaveLength(1);
expect(batch2.events).toHaveLength(1);
// They should have different batch IDs
expect(batch1.batch_id).not.toBe(batch2.batch_id);
} finally {
recorder.stop();
localStorage.removeItem(storageKey);
vi.useRealTimers();
}
});
it("sends a single oversized event alone without dropping it", async () => {
vi.useFakeTimers();
const storageKey = `hexclave:session-replay:v1:test-project`;
localStorage.setItem(storageKey, JSON.stringify({
session_id: "test-session",
created_at_ms: Date.now(),
last_activity_ms: Date.now(),
}));
const sentBodies: string[] = [];
const recorder = new SessionRecorder(
{
projectId: "test-project",
sendBatch: async (body) => {
sentBodies.push(body);
return Result.ok(new Response("ok", { status: 200 }));
},
},
{},
);
try {
// A single event larger than 900KB — should still be sent (not dropped)
const hugeData = "y".repeat(1_000_000);
const hugeEvent = { type: 2, timestamp: Date.now(), data: hugeData };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [hugeEvent];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(hugeEvent).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._approxBytes = JSON.stringify(hugeEvent).length;
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
await vi.advanceTimersByTimeAsync(0);
// Should still send the event (the server may reject it, but we don't drop it client-side)
expect(sentBodies).toHaveLength(1);
const batch = JSON.parse(sentBodies[0]);
expect(batch.events).toHaveLength(1);
} finally {
recorder.stop();
localStorage.removeItem(storageKey);
vi.useRealTimers();
}
});
it("silently disables when client interface returns ANALYTICS_NOT_ENABLED as an error", async () => {
vi.useFakeTimers();
@ -122,8 +235,11 @@ describe("SessionRecorder flush", () => {
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
try {
const event1 = { type: 2, timestamp: Date.now(), data: {} };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [{ type: 2, timestamp: Date.now(), data: {} }];
(recorder as any)._events = [event1];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(event1).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
@ -132,8 +248,11 @@ describe("SessionRecorder flush", () => {
expect(sentBodies).toHaveLength(1);
expect(warnSpy).not.toHaveBeenCalled();
const event2 = { type: 3, timestamp: Date.now(), data: {} };
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._events = [{ type: 3, timestamp: Date.now(), data: {} }];
(recorder as any)._events = [event2];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(recorder as any)._eventSizes = [JSON.stringify(event2).length];
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(recorder as any)._tick();
await vi.advanceTimersByTimeAsync(0);

View File

@ -1,6 +1,6 @@
import { KnownErrors } from "@hexclave/shared/dist/known-errors";
import { isBrowserLike } from "@hexclave/shared/dist/utils/env";
import { captureWarning } from "@hexclave/shared/dist/utils/errors";
import { captureWarning, throwErr } from "@hexclave/shared/dist/utils/errors";
import { runAsynchronously } from "@hexclave/shared/dist/utils/promises";
import { Result } from "@hexclave/shared/dist/utils/results";
@ -106,6 +106,9 @@ const IDLE_TTL_MS = 3 * 60 * 1000;
const FLUSH_INTERVAL_MS = 5_000;
const MAX_EVENTS_PER_BATCH = 200;
const MAX_APPROX_BYTES_PER_BATCH = 512_000;
// The server rejects payloads > 1MB. Stay well under to account for JSON
// envelope overhead (browser_session_id, timestamps, wrapper keys, etc.).
const MAX_FLUSH_PAYLOAD_BYTES = 900_000;
export type StoredSession = {
session_id: string,
@ -189,6 +192,7 @@ export class SessionRecorder {
private _detachListeners: (() => void) | null = null;
private _flushTimer: ReturnType<typeof setInterval> | null = null;
private _events: unknown[] = [];
private _eventSizes: number[] = [];
private _approxBytes = 0;
private _lastPersistActivity = 0;
private _recording = false;
@ -239,6 +243,7 @@ export class SessionRecorder {
clearBuffer() {
this._events = [];
this._eventSizes = [];
this._approxBytes = 0;
}
@ -264,42 +269,68 @@ export class SessionRecorder {
const nowMs = Date.now();
const stored = getOrRotateSession({ key: this._storageKey, legacyKey: this._legacyStorageKey, nowMs });
const batchId = generateUuid();
const payload = {
browser_session_id: stored.session_id,
session_replay_segment_id: this._sessionReplaySegmentId,
batch_id: batchId,
started_at_ms: stored.created_at_ms,
sent_at_ms: nowMs,
events: this._events,
};
// Capture all buffered events upfront (before any await) so that
// stop() / _stopCurrentRecording() clearing this._events cannot race
// with the async send loop below and silently discard overflow batches.
const allEvents = this._events;
const allSizes = this._eventSizes;
this._events = [];
this._eventSizes = [];
this._approxBytes = 0;
this._flushInProgress = true;
try {
const res = await this._deps.sendBatch(
JSON.stringify(payload),
{ keepalive: options.keepalive },
);
let offset = 0;
while (offset < allEvents.length) {
// Build a batch that fits under the server's payload limit.
// When _flushInProgress blocked earlier flushes, events can accumulate
// well past MAX_APPROX_BYTES_PER_BATCH; sending them all at once would
// exceed the server's 1MB body limit (413).
let batchBytes = 0;
let batchEnd = offset;
for (let i = offset; i < allEvents.length; i++) {
const nextSize = allSizes[i] ?? throwErr("_eventSizes out of sync with _events — this should never happen");
if (batchBytes + nextSize > MAX_FLUSH_PAYLOAD_BYTES && batchEnd > offset) break;
batchBytes += nextSize;
batchEnd = i + 1;
}
if (res.status === "error") {
if (isAnalyticsNotEnabledError(res.error)) {
this._disable();
const batchEvents = allEvents.slice(offset, batchEnd);
offset = batchEnd;
const batchId = generateUuid();
const payload = {
browser_session_id: stored.session_id,
session_replay_segment_id: this._sessionReplaySegmentId,
batch_id: batchId,
started_at_ms: stored.created_at_ms,
sent_at_ms: nowMs,
events: batchEvents,
};
const res = await this._deps.sendBatch(
JSON.stringify(payload),
{ keepalive: options.keepalive },
);
if (res.status === "error") {
if (isAnalyticsNotEnabledError(res.error)) {
this._disable();
return;
}
// Ad blockers commonly block analytics endpoints, causing network
// errors. These are expected and should not pollute the console.
if (isAdBlockerNetworkError(res.error)) {
return;
}
captureWarning("SessionRecorder.flush", res.error);
return;
}
// Ad blockers commonly block analytics endpoints, causing network
// errors. These are expected and should not pollute the console.
if (isAdBlockerNetworkError(res.error)) {
if (!res.data.ok) {
captureWarning("SessionRecorder.flush", new Error(`SessionRecorder flush failed: ${res.data.status} ${await res.data.text()}`));
return;
}
captureWarning("SessionRecorder.flush", res.error);
return;
}
if (!res.data.ok) {
captureWarning("SessionRecorder.flush", new Error(`SessionRecorder flush failed: ${res.data.status} ${await res.data.text()}`));
}
} finally {
this._flushInProgress = false;
@ -353,8 +384,10 @@ export class SessionRecorder {
}
}
const eventSize = JSON.stringify(event).length;
this._events.push(event);
this._approxBytes += JSON.stringify(event).length;
this._eventSizes.push(eventSize);
this._approxBytes += eventSize;
if (this._events.length >= MAX_EVENTS_PER_BATCH || this._approxBytes >= MAX_APPROX_BYTES_PER_BATCH) {
runAsynchronously(() => this._flush({ keepalive: false }));
}
@ -387,6 +420,7 @@ export class SessionRecorder {
this._stopRecording = null;
}
this._events = [];
this._eventSizes = [];
this._approxBytes = 0;
this._recording = false;
}