More tracing for replication-related functions

This commit is contained in:
Konstantin Wohlwend 2026-04-17 17:57:33 -07:00
parent 665870a144
commit f4ca6cb4c7
2 changed files with 39 additions and 12 deletions

View File

@ -19,8 +19,8 @@ import { isPromise } from "util/types";
import { runMigrationNeeded } from "./auto-migrations";
import { registerPgPool } from "./lib/dev-perf-stats";
import { Tenancy } from "./lib/tenancies";
import { drainInFlightPromises } from "./utils/background-tasks";
import { ensurePolyfilled } from "./polyfills";
import { drainInFlightPromises } from "./utils/background-tasks";
// just ensure we're polyfilled because this file relies on envvars being expanded
ensurePolyfilled();
@ -215,10 +215,21 @@ async function waitForReplication(replicas: PrismaClient[], target: string, time
throw new StackAssertionError(`Invalid pg_lsn format: ${target}`);
}
checkCaughtUp = async (replica) => {
const [{ caught_up }] = await (replica as any).$queryRaw<[{ caught_up: boolean }]>`
SELECT pg_last_wal_replay_lsn() >= ${target}::pg_lsn AS caught_up
`;
return caught_up;
return await traceSpan({
description: 'checking replication status from replica',
attributes: {
'stack.db-replication.strategy': strategy,
'stack.db-replication.target': target,
'stack.db-replication.replica-count': replicas.length,
'stack.db-replication.timeout-ms': timeoutMs,
},
}, async (span) => {
const [{ caught_up }] = await (replica as any).$queryRaw<[{ caught_up: boolean }]>`
SELECT pg_last_wal_replay_lsn() >= ${target}::pg_lsn AS caught_up
`;
span.setAttribute('stack.db-replication.caught-up', caught_up);
return caught_up;
});
};
} else if (strategy === "aurora") {
if (!/^\d+$/.test(target)) {
@ -226,12 +237,25 @@ async function waitForReplication(replicas: PrismaClient[], target: string, time
}
const targetBigInt = BigInt(target);
checkCaughtUp = async (replica) => {
const [{ current_lsn }] = await (replica as any).$queryRaw<[{ current_lsn: bigint | null }]>`
SELECT current_read_lsn AS current_lsn
FROM aurora_replica_status()
WHERE server_id = aurora_db_instance_identifier()
`;
return current_lsn === null || current_lsn >= targetBigInt;
return await traceSpan({
description: 'checking replication status from replica',
attributes: {
'stack.db-replication.strategy': strategy,
'stack.db-replication.target': target,
'stack.db-replication.replica-count': replicas.length,
'stack.db-replication.timeout-ms': timeoutMs,
},
}, async (span) => {
const replicaStatus = await (replica as any).$queryRaw<[{ current_lsn: bigint | null }]>`
SELECT *
FROM aurora_replica_status()
WHERE server_id = aurora_db_instance_identifier()
`;
span.setAttribute('stack.db-replication.replica-status', JSON.stringify(replicaStatus));
const currentLsn = replicaStatus[0].current_read_lsn;
span.setAttribute('stack.db-replication.current-lsn', currentLsn.toString());
return currentLsn === null || currentLsn >= targetBigInt;
});
};
} else {
throw new StackAssertionError(`Unknown replication wait strategy: ${strategy}`);

View File

@ -2,6 +2,7 @@ import { KnownError } from "..";
import { StackAssertionError, captureError, concatStacktraces, errorToNiceString } from "./errors";
import { DependenciesMap } from "./maps";
import { Result } from "./results";
import { traceSpan } from "./telemetry";
import { generateUuid } from "./uuids";
export type ReactPromise<T> = Promise<T> & (
@ -264,7 +265,9 @@ export async function wait(ms: number) {
if (ms >= 2**31) {
throw new StackAssertionError("The maximum timeout for wait() is 2147483647ms (2**31 - 1). (found: ${ms}ms)");
}
return await new Promise<void>(resolve => setTimeout(resolve, ms));
return await traceSpan({ description: 'wait(...)', attributes: { 'stack.wait.ms': ms } }, async (span) => {
return await new Promise<void>(resolve => setTimeout(resolve, ms));
});
}
import.meta.vitest?.test("wait", async ({ expect }) => {
// Test with valid input