Support async replicas (#1097)

This commit is contained in:
Konsti Wohlwend 2026-01-13 00:07:08 +01:00 committed by GitHub
parent b3c998cfcd
commit fbcf66f479
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 271 additions and 14 deletions

View File

@ -27,7 +27,8 @@ STACK_SPOTIFY_CLIENT_SECRET=MOCK
STACK_ALLOW_SHARED_OAUTH_ACCESS_TOKENS=true
STACK_DATABASE_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe
STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://readonly:PASSWORD-PLACEHOLDER--readonlyuqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe
STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34/stackframe
STACK_DATABASE_REPLICATION_WAIT_STRATEGY=pg-stat-replication
STACK_EMAIL_HOST=127.0.0.1
STACK_EMAIL_PORT=${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}29

View File

@ -1,3 +1,4 @@
import { getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import Link from "next/link";
export default function Home() {
@ -10,6 +11,12 @@ export default function Home() {
You can also return to <Link href="https://stack-auth.com">https://stack-auth.com</Link>.<br />
<br />
<Link href="/api/v1">API v1</Link><br />
{getNodeEnvironment() === "development" && (
<>
<br />
<Link href="/dev-stats">Dev Stats</Link><br />
</>
)}
</div>
);
}

View File

@ -4,11 +4,12 @@ import { PrismaNeon } from "@prisma/adapter-neon";
import { PrismaPg } from '@prisma/adapter-pg';
import { readReplicas } from '@prisma/extension-read-replicas';
import { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
import { yupObject, yupValidate } from "@stackframe/stack-shared/dist/schema-fields";
import { getEnvVariable, getNodeEnvironment } from '@stackframe/stack-shared/dist/utils/env';
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
import { globalVar } from "@stackframe/stack-shared/dist/utils/globals";
import { deepPlainEquals, filterUndefined, typedFromEntries, typedKeys } from "@stackframe/stack-shared/dist/utils/objects";
import { concatStacktracesIfRejected, ignoreUnhandledRejection } from "@stackframe/stack-shared/dist/utils/promises";
import { concatStacktracesIfRejected, ignoreUnhandledRejection, wait } from "@stackframe/stack-shared/dist/utils/promises";
import { throwingProxy } from "@stackframe/stack-shared/dist/utils/proxies";
import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { traceSpan } from "@stackframe/stack-shared/dist/utils/telemetry";
@ -144,10 +145,130 @@ export type PrismaClientWithReplica<T extends PrismaClient = PrismaClient> = Omi
$replica: () => Omit<T, "$on">,
};
/**
* Waits until ALL replicas have caught up to the specified WAL LSN.
* This ensures read-after-write consistency when using read replicas.
*
* Strategy types (STACK_DATABASE_REPLICATION_WAIT_STRATEGY):
* - "none": Don't wait for replication (default)
* - "pg-stat-replication": Use pg_stat_replication (for local dev with streaming replication)
* - "aurora": Use aurora_replica_status() (for AWS Aurora)
*/
async function waitForReplication(primary: PrismaClient, lsn: string): Promise<void> {
const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none");
return await traceSpan({
description: 'waiting for replication',
attributes: {
'stack.db-replication.strategy': strategy,
'stack.db-replication.lsn': lsn,
},
}, async () => {
if (strategy === "none") {
return;
}
const minLsnSubquery = {
"pg-stat-replication": `(SELECT MIN(replay_lsn) FROM pg_stat_replication)`,
"aurora": `(SELECT MIN(current_read_lsn::pg_lsn) FROM aurora_replica_status())`,
}[strategy] ?? throwErr(`Unknown replication wait strategy: ${strategy}`);
// Validate LSN format (format: hex/hex, e.g., "0/1234ABC"). We do this just to be extra safe as we're using $queryRawUnsafe later
if (!/^[0-9A-Fa-f]+\/[0-9A-Fa-f]+$/.test(lsn)) {
throw new StackAssertionError(`Invalid LSN format: ${lsn}`);
}
// Poll until all replicas have caught up to the target LSN
// Using $queryRawUnsafe because DO blocks don't support bind parameters
await (primary as any).$queryRawUnsafe(`
DO $$
DECLARE
min_replica_lsn pg_lsn;
BEGIN
LOOP
SELECT ${minLsnSubquery} INTO min_replica_lsn;
-- Exit if no replicas connected or all replicas have caught up
IF min_replica_lsn IS NULL OR min_replica_lsn >= '${lsn}'::pg_lsn THEN
EXIT;
END IF;
-- Wait 10ms and check again
PERFORM pg_sleep(0.01);
END LOOP;
END $$;
`);
});
}
/**
* Extends a Prisma client to wait for replication after all operations.
* This ensures read-after-write consistency when using a read replica.
*/
function extendWithReplicationWait<T extends PrismaClient>(client: T): T {
const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none");
if (strategy === "none") {
return client;
}
const readLsnAndWaitForReplication = async (client: PrismaClient) => {
await traceSpan({
description: 'getting current LSN and waiting for replication',
attributes: {
'stack.db-replication.strategy': strategy,
},
}, async (span) => {
try {
const [{ lsn }] = await (client as any).$queryRaw<[{ lsn: string }]>`SELECT pg_current_wal_lsn()::text AS lsn`;
await waitForReplication(client, lsn);
} catch (e) {
span.setAttribute('stack.db-replication.error', `${e}`);
captureError("prisma-client-replication-error", new StackAssertionError("Error getting current LSN and waiting for replication. We'll just wait 50ms instead, but please fix this as the replication may not be working.", { cause: e }));
await wait(50);
}
});
};
return client.$extends({
client: {
async $transaction(...args: Parameters<PrismaClient['$transaction']>) {
// eslint-disable-next-line no-restricted-syntax
const result = await client.$transaction(...args);
await readLsnAndWaitForReplication(client);
return result;
},
},
query: {
async $allOperations(params: { args: any, query: (args: any) => Promise<any>, operation: string, model?: string, __internalParams?: unknown }) {
const { args, query, operation, model } = params;
// __internalParams is an undocumented property, so let's validate that it fits our schema with yup first
const internalParamsSchema = yupObject({
transaction: yupObject().nullable(),
}).defined();
const internalParams = await yupValidate(internalParamsSchema, params.__internalParams);
if (internalParams.transaction) {
// we're inside a transaction, so we don't need to wait for replication
return await query(args);
}
const result = await query(args);
await readLsnAndWaitForReplication(client);
return result;
},
},
}) as T;
}
function extendWithReadReplicas<T extends PrismaClient>(client: T, replicaConnectionString: string): PrismaClientWithReplica<T> {
// Create a separate PrismaClient for the read replica
const replicaClient = getPostgresPrismaClient(replicaConnectionString).client;
return client.$extends(readReplicas({
// First extend with replication wait, then with read replicas
const clientWithReplicationWait = extendWithReplicationWait(client);
return clientWithReplicationWait.$extends(readReplicas({
replicas: [replicaClient],
})) as PrismaClientWithReplica<T>;
}
@ -441,7 +562,6 @@ async function rawQueryArray<Q extends RawQuery<any>[]>(tx: PrismaClientTransact
const queryClient = allReadOnly && '$replica' in tx
? (tx as any).$replica()
: tx;
// eslint-disable-next-line no-restricted-syntax -- $queryRaw is allowed here
const rawResult = await queryClient.$queryRaw(sqlQuery);
const postProcessed = combinedQuery.postProcess(rawResult as any);

View File

@ -122,6 +122,7 @@
const backgroundServices = [
{ suffix: "28", label: "PostgreSQL" },
{ suffix: "34", label: "PostgreSQL Replica (15ms lag)" },
{ suffix: "29", label: "Inbucket SMTP" },
{ suffix: "30", label: "Inbucket POP3" },
{ suffix: "31", label: "OTel collector" },
@ -267,6 +268,15 @@
importance: 1,
img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png",
},
{
name: "PgHero (Replica)",
portSuffix: "35",
description: [
"For replica database performance analysis",
],
importance: 1,
img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png",
},
{
name: "PgAdmin",
portSuffix: "17",

View File

@ -1,5 +1,5 @@
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { deindent } from "@stackframe/stack-shared/dist/utils/strings";
import { deindent, nicify } from "@stackframe/stack-shared/dist/utils/strings";
import beautify from "js-beautify";
import { describe } from "vitest";
import { it, logIfTestFails } from "../../../../../helpers";
@ -1689,9 +1689,10 @@ describe("email outbox pagination", () => {
const draftId = createDraftResponse.body.id;
// Create 5 users
const mailboxes = await Promise.all(Array.from({ length: 5 }, async () => await bumpEmailAddress()));
const userIds: string[] = [];
for (let i = 0; i < 5; i++) {
const email = `pagination-test-${i}@example.com`;
const email = mailboxes[i].emailAddress;
const createUserResponse = await niceBackendFetch("/api/v1/users", {
method: "POST",
accessType: "server",
@ -1715,12 +1716,18 @@ describe("email outbox pagination", () => {
});
expect(sendResponse.status).toBe(200);
// Wait until all emails are sent
for (const mailbox of mailboxes) {
await mailbox.waitForMessagesWithSubject("Pagination Test Email");
}
// Ensure there are 5 emails in the outbox
const allResponse = await niceBackendFetch("/api/v1/emails/outbox", {
method: "GET",
accessType: "server",
});
logIfTestFails({ allResponse });
logIfTestFails("allResponse", nicify(allResponse));
expect(allResponse.status).toBe(200);
expect(allResponse.body.items.length).toBe(5);
@ -1729,7 +1736,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page1Response });
logIfTestFails("page1Response", nicify(page1Response));
expect(page1Response.status).toBe(200);
expect(page1Response.body.items.length).toBe(2);
expect(page1Response.body.is_paginated).toBe(true);
@ -1741,7 +1748,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page2Response });
logIfTestFails("page2Response", nicify(page2Response));
expect(page2Response.status).toBe(200);
expect(page2Response.body.items.length).toBe(2);
@ -1758,7 +1765,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page3Response });
logIfTestFails("page3Response", nicify(page3Response));
expect(page3Response.status).toBe(200);
expect(page3Response.body.items.length).toBe(1); // Only 1 remaining
expect(page3Response.body.pagination.next_cursor).toBeNull(); // No more pages

View File

@ -19,6 +19,24 @@ services:
cap_add:
- NET_ADMIN # required for the fake latency during dev
# ================= PostgreSQL Replica (with replication lag) =================
db-replica:
build: ../dev-postgres-replica
environment:
PGDATA: /var/lib/postgresql/data
PRIMARY_HOST: db
PRIMARY_PORT: 5432
REPLICATOR_USER: replicator
REPLICATOR_PASSWORD: PASSWORD-PLACEHOLDER--replicatorpass
RECOVERY_MIN_APPLY_DELAY: ${STACK_DATABASE_REPLICA_LAG_MS:-15}ms
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34:5432"
volumes:
- postgres-replica-data:/var/lib/postgresql/data
depends_on:
- db
# ================= PgHero =================
pghero:
@ -28,6 +46,17 @@ services:
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}16:8080"
# ================= PgHero (Replica) =================
pghero-replica:
image: ankane/pghero:latest
environment:
DATABASE_URL: postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@db-replica:5432/stackframe
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}35:8080"
depends_on:
- db-replica
# ================= PgAdmin =================
pgadmin:
@ -224,6 +253,7 @@ services:
volumes:
postgres-data:
postgres-replica-data:
inbucket-data:
svix-redis-data:
svix-postgres-data:

View File

@ -0,0 +1,7 @@
FROM postgres:15
# Copy the entrypoint script
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

View File

@ -0,0 +1,60 @@
#!/bin/bash
set -e
# Configuration from environment variables
PRIMARY_HOST="${PRIMARY_HOST:-db}"
PRIMARY_PORT="${PRIMARY_PORT:-5432}"
REPLICATOR_USER="${REPLICATOR_USER:-replicator}"
REPLICATOR_PASSWORD="${REPLICATOR_PASSWORD:-PASSWORD-PLACEHOLDER--replicatorpass}"
RECOVERY_MIN_APPLY_DELAY="${RECOVERY_MIN_APPLY_DELAY:-100ms}"
echo "Starting PostgreSQL replica with ${RECOVERY_MIN_APPLY_DELAY} apply delay..."
# Wait for primary to be ready
echo "Waiting for primary at ${PRIMARY_HOST}:${PRIMARY_PORT}..."
until PGPASSWORD="${REPLICATOR_PASSWORD}" pg_isready -h "${PRIMARY_HOST}" -p "${PRIMARY_PORT}" -U "${REPLICATOR_USER}" 2>/dev/null; do
echo "Primary not ready yet, waiting..."
sleep 2
done
echo "Primary is ready!"
# If PGDATA is empty, do a base backup from primary
if [ -z "$(ls -A ${PGDATA} 2>/dev/null)" ]; then
echo "PGDATA is empty, performing base backup from primary..."
# Perform base backup
PGPASSWORD="${REPLICATOR_PASSWORD}" pg_basebackup \
-h "${PRIMARY_HOST}" \
-p "${PRIMARY_PORT}" \
-U "${REPLICATOR_USER}" \
-D "${PGDATA}" \
-Fp \
-Xs \
-P \
-R
echo "Base backup completed!"
# Configure recovery settings with apply delay
cat >> "${PGDATA}/postgresql.auto.conf" <<EOF
# Replica configuration
primary_conninfo = 'host=${PRIMARY_HOST} port=${PRIMARY_PORT} user=${REPLICATOR_USER} password=${REPLICATOR_PASSWORD}'
recovery_min_apply_delay = ${RECOVERY_MIN_APPLY_DELAY}
hot_standby = on
EOF
# Create standby.signal to indicate this is a standby
touch "${PGDATA}/standby.signal"
# Set proper permissions
chmod 700 "${PGDATA}"
chown -R postgres:postgres "${PGDATA}"
echo "Replica configured with ${RECOVERY_MIN_APPLY_DELAY} apply delay"
else
echo "PGDATA already initialized, starting replica..."
fi
# Start PostgreSQL
exec gosu postgres postgres

View File

@ -28,6 +28,15 @@ RUN echo "GRANT USAGE ON SCHEMA public TO readonly;" >> /docker-entrypoint-initd
RUN echo "GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly;" >> /docker-entrypoint-initdb.d/init.sql
RUN echo "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO readonly;" >> /docker-entrypoint-initdb.d/init.sql
# Create a replication user for streaming replication to the replica
RUN echo "CREATE USER replicator WITH REPLICATION PASSWORD 'PASSWORD-PLACEHOLDER--replicatorpass';" >> /docker-entrypoint-initdb.d/init.sql
# Create a script to add replication permissions to pg_hba.conf after init
# This script runs after the database is initialized but before it starts accepting connections
RUN echo '#!/bin/bash' > /docker-entrypoint-initdb.d/00-setup-replication.sh && \
echo 'echo "host replication replicator all scram-sha-256" >> "$PGDATA/pg_hba.conf"' >> /docker-entrypoint-initdb.d/00-setup-replication.sh && \
chmod +x /docker-entrypoint-initdb.d/00-setup-replication.sh
# Add args to Postgres entrypoint
ENTRYPOINT ["sh", "-c", "\
# Add delay if POSTGRES_DELAY_MS is set \
@ -35,6 +44,12 @@ ENTRYPOINT ["sh", "-c", "\
apt-get update && apt-get install -y iproute2 && tc qdisc add dev eth0 root netem delay ${POSTGRES_DELAY_MS}ms; \
fi; \
\
# Start Postgres with extensions enabled \
exec docker-entrypoint.sh postgres -c shared_preload_libraries='pg_stat_statements' -c pg_stat_statements.track=all \
# Start Postgres with replication enabled and extensions \
exec docker-entrypoint.sh postgres \
-c shared_preload_libraries='pg_stat_statements' \
-c pg_stat_statements.track=all \
-c wal_level=replica \
-c max_wal_senders=3 \
-c wal_keep_size=64MB \
-c hot_standby=on \
"]

View File

@ -24,7 +24,7 @@
"codegen:backend": "pnpm pre && turbo run codegen --filter=@stackframe/stack-backend...",
"deps-compose": "docker compose -p stack-dependencies-${NEXT_PUBLIC_STACK_PORT_PREFIX:-81} -f docker/dependencies/docker.compose.yaml",
"stop-deps": "POSTGRES_DELAY_MS=0 pnpm run deps-compose kill && POSTGRES_DELAY_MS=0 pnpm run deps-compose down -v",
"wait-until-postgres-is-ready:pg_isready": "until pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28; do sleep 1; done",
"wait-until-postgres-is-ready:pg_isready": "until pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28 && pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34; do sleep 1; done",
"wait-until-postgres-is-ready": "command -v pg_isready >/dev/null 2>&1 && pnpm run wait-until-postgres-is-ready:pg_isready || sleep 10 # not everyone has pg_isready installed, so we fallback to sleeping",
"start-deps:no-delay": "pnpm pre && pnpm run deps-compose up --detach --build && pnpm run wait-until-postgres-is-ready && pnpm run db:init && echo \"\\nDependencies started in the background as Docker containers. 'pnpm run stop-deps' to stop them\"n",
"start-deps": "POSTGRES_DELAY_MS=${POSTGRES_DELAY_MS:-0} pnpm run start-deps:no-delay",