added external-db-sync

This commit is contained in:
aadesh18 2025-11-30 13:30:25 -08:00
parent eb5e1cb28d
commit 9e41309dc1
28 changed files with 4033 additions and 847 deletions

View File

@ -7,7 +7,7 @@
"typecheck": "tsc --noEmit",
"with-env": "dotenv -c development --",
"with-env:prod": "dotenv -c --",
"dev": "concurrently -n \"dev,codegen,prisma-studio\" -k \"next dev --turbopack --port ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}02\" \"pnpm run codegen:watch\" \"pnpm run prisma-studio\"",
"dev": "concurrently -n \"dev,codegen,prisma-studio,cron-jobs\" -k \"next dev --turbopack --port ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}02\" \"pnpm run codegen:watch\" \"pnpm run prisma-studio\" \"pnpm run run-cron-jobs\"",
"build": "pnpm run codegen && next build",
"docker-build": "pnpm run codegen && next build --experimental-build-mode compile",
"build-self-host-migration-script": "tsup --config scripts/db-migrations.tsup.config.ts",
@ -35,7 +35,8 @@
"generate-openapi-fumadocs": "pnpm run with-env tsx scripts/generate-openapi-fumadocs.ts",
"generate-keys": "pnpm run with-env tsx scripts/generate-keys.ts",
"db-seed-script": "pnpm run db:seed",
"verify-data-integrity": "pnpm run with-env tsx scripts/verify-data-integrity.ts"
"verify-data-integrity": "pnpm run with-env tsx scripts/verify-data-integrity.ts",
"run-cron-jobs": "pnpm run with-env tsx scripts/run-cron-jobs.ts"
},
"prisma": {
"seed": "pnpm run db-seed-script"

View File

@ -0,0 +1,18 @@
CREATE SEQUENCE global_seq_id
AS BIGINT
START 1
INCREMENT BY 11
NO MINVALUE
NO MAXVALUE;
-- SPLIT_STATEMENT_SENTINEL
ALTER TABLE "ContactChannel" ADD COLUMN "sequenceId" BIGINT;
-- SPLIT_STATEMENT_SENTINEL
ALTER TABLE "ProjectUser" ADD COLUMN "sequenceId" BIGINT;
-- SPLIT_STATEMENT_SENTINEL
CREATE UNIQUE INDEX "ContactChannel_sequenceId_key" ON "ContactChannel"("sequenceId");
-- SPLIT_STATEMENT_SENTINEL
CREATE UNIQUE INDEX "ProjectUser_sequenceId_key" ON "ProjectUser"("sequenceId");

View File

@ -0,0 +1,13 @@
CREATE TABLE "OutgoingRequest" (
"id" UUID NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"qstashOptions" JSONB NOT NULL,
"fulfilledAt" TIMESTAMP(3),
CONSTRAINT "OutgoingRequest_pkey" PRIMARY KEY ("id")
);
CREATE INDEX "OutgoingRequest_fulfilledAt_idx" ON "OutgoingRequest"("fulfilledAt");

View File

@ -0,0 +1,21 @@
CREATE TABLE "DeletedRow" (
"id" UUID NOT NULL,
"tenancyId" UUID NOT NULL,
"tableName" TEXT NOT NULL,
"sequenceId" BIGINT,
"primaryKey" JSONB NOT NULL,
"data" JSONB,
"deletedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"fulfilledAt" TIMESTAMP(3),
CONSTRAINT "DeletedRow_pkey" PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX "DeletedRow_sequenceId_key" ON "DeletedRow"("sequenceId");
CREATE INDEX "DeletedRow_tableName_idx" ON "DeletedRow"("tableName");
CREATE INDEX "DeletedRow_tenancyId_idx" ON "DeletedRow"("tenancyId");

View File

@ -0,0 +1,8 @@
ALTER TABLE "ProjectUser" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE;
-- SPLIT_STATEMENT_SENTINEL
ALTER TABLE "ContactChannel" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE;
-- SPLIT_STATEMENT_SENTINEL
ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT TRUE;

View File

@ -0,0 +1,8 @@
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;
-- SPLIT_STATEMENT_SENTINEL
CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;
-- SPLIT_STATEMENT_SENTINEL
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId") WHERE "shouldUpdateSequenceId" = TRUE;

View File

@ -0,0 +1,30 @@
-- SINGLE_STATEMENT_SENTINEL
CREATE FUNCTION reset_sequence_id_on_update()
RETURNS TRIGGER AS $$
BEGIN
NEW."shouldUpdateSequenceId" := TRUE;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_should_update_sequence_id_project_user
BEFORE UPDATE ON "ProjectUser"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_should_update_sequence_id_contact_channel
BEFORE UPDATE ON "ContactChannel"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER mark_should_update_sequence_id_deleted_row
BEFORE UPDATE ON "DeletedRow"
FOR EACH ROW
WHEN (OLD."shouldUpdateSequenceId" = FALSE)
EXECUTE FUNCTION reset_sequence_id_on_update();

View File

@ -0,0 +1,55 @@
-- SINGLE_STATEMENT_SENTINEL
CREATE FUNCTION log_deleted_row()
RETURNS TRIGGER AS $function$
DECLARE
row_data jsonb;
pk jsonb := '{}'::jsonb;
col record;
BEGIN
row_data := to_jsonb(OLD);
FOR col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID
AND i.indisprimary
LOOP
pk := pk || jsonb_build_object(col.attname, row_data -> col.attname);
END LOOP;
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
VALUES (
gen_random_uuid(),
OLD."tenancyId",
TG_TABLE_NAME,
pk,
row_data,
NOW(),
TRUE
);
RETURN OLD;
END;
$function$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER log_deleted_row_project_user
BEFORE DELETE ON "ProjectUser"
FOR EACH ROW
EXECUTE FUNCTION log_deleted_row();
-- SPLIT_STATEMENT_SENTINEL
CREATE TRIGGER log_deleted_row_contact_channel
BEFORE DELETE ON "ContactChannel"
FOR EACH ROW
EXECUTE FUNCTION log_deleted_row();

View File

@ -0,0 +1,100 @@
-- SINGLE_STATEMENT_SENTINEL
CREATE FUNCTION enqueue_tenant_sync(p_tenant_id uuid)
RETURNS void AS $$
BEGIN
INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "fulfilledAt")
SELECT
gen_random_uuid(),
NOW(),
json_build_object(
'url', '/api/latest/internal/external-db-sync/sync-engine',
'body', json_build_object('tenantId', p_tenant_id)
),
NULL
WHERE NOT EXISTS (
SELECT 1
FROM "OutgoingRequest"
WHERE "fulfilledAt" IS NULL
AND ("qstashOptions"->'body'->>'tenantId')::uuid = p_tenant_id
);
END;
$$ LANGUAGE plpgsql;
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
CREATE FUNCTION backfill_null_sequence_ids()
RETURNS void AS $$
DECLARE
v_tenancy_id uuid;
BEGIN
FOR v_tenancy_id IN
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId"
FROM "ProjectUser"
WHERE "shouldUpdateSequenceId" = TRUE
OR "sequenceId" IS NULL
LIMIT 1000
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ProjectUser" pu
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE pu."tenancyId" = r."tenancyId"
AND pu."projectUserId" = r."projectUserId"
RETURNING pu."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
LOOP
PERFORM enqueue_tenant_sync(v_tenancy_id);
END LOOP;
FOR v_tenancy_id IN
WITH rows_to_update AS (
SELECT "tenancyId", "projectUserId", "id"
FROM "ContactChannel"
WHERE "shouldUpdateSequenceId" = TRUE
OR "sequenceId" IS NULL
LIMIT 1000
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "ContactChannel" cc
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE cc."tenancyId" = r."tenancyId"
AND cc."projectUserId" = r."projectUserId"
AND cc."id" = r."id"
RETURNING cc."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
LOOP
PERFORM enqueue_tenant_sync(v_tenancy_id);
END LOOP;
FOR v_tenancy_id IN
WITH rows_to_update AS (
SELECT "id", "tenancyId"
FROM "DeletedRow"
WHERE "shouldUpdateSequenceId" = TRUE
OR "sequenceId" IS NULL
LIMIT 1000
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "DeletedRow" dr
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE dr."id" = r."id"
RETURNING dr."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
LOOP
PERFORM enqueue_tenant_sync(v_tenancy_id);
END LOOP;
END;
$$ LANGUAGE plpgsql;

View File

@ -173,6 +173,9 @@ model ProjectUser {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
displayName String?
serverMetadata Json?
clientReadOnlyMetadata Json?
@ -252,6 +255,9 @@ model ContactChannel {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
type ContactChannelType
isPrimary BooleanTrue?
usedForAuth BooleanTrue?
@ -861,7 +867,7 @@ model CacheEntry {
model SubscriptionInvoice {
id String @default(uuid()) @db.Uuid
tenancyId String @db.Uuid
tenancyId String @db.Uuid
stripeSubscriptionId String
stripeInvoiceId String
isSubscriptionCreationInvoice Boolean
@ -874,3 +880,32 @@ model SubscriptionInvoice {
@@id([tenancyId, id])
@@unique([tenancyId, stripeInvoiceId])
}
model OutgoingRequest {
id String @id @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
qstashOptions Json
fulfilledAt DateTime?
@@index([fulfilledAt])
}
model DeletedRow {
id String @id @default(uuid()) @db.Uuid
tenancyId String @db.Uuid
tableName String
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
primaryKey Json
data Json?
deletedAt DateTime @default(now())
fulfilledAt DateTime?
@@index([tableName])
@@index([tenancyId])
}

View File

@ -0,0 +1,36 @@
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { runAsynchronously } from "@stackframe/stack-shared/dist/utils/promises";
const endpoints = [
"/api/latest/internal/external-db-sync/sequencer",
"/api/latest/internal/external-db-sync/poller",
];
async function main() {
console.log("Starting cron jobs...");
const cronSecret = getEnvVariable('CRON_SECRET');
const baseUrl = `http://localhost:${getEnvVariable('NEXT_PUBLIC_STACK_PORT_PREFIX', '81')}02`;
const run = (endpoint: string) => runAsynchronously(async () => {
console.log(`Running ${endpoint}...`);
const res = await fetch(`${baseUrl}${endpoint}`, {
headers: { 'Authorization': `Bearer ${cronSecret}` },
});
if (!res.ok) throw new StackAssertionError(`Failed to call ${endpoint}: ${res.status} ${res.statusText}\n${await res.text()}`, { res });
console.log(`${endpoint} completed.`);
});
for (const endpoint of endpoints) {
setInterval(() => {
run(endpoint);
}, 60000);
}
}
// eslint-disable-next-line no-restricted-syntax
main().catch((err) => {
console.error(err);
process.exit(1);
});

View File

@ -0,0 +1,129 @@
import { upstash } from "@/lib/upstash";
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { OutgoingRequest } from "@prisma/client";
import {
yupBoolean,
yupNumber,
yupObject,
yupString,
yupTuple,
} from "@stackframe/stack-shared/dist/schema-fields";
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
export const GET = createSmartRouteHandler({
metadata: {
summary: "Poll outgoing requests and push to QStash",
description:
"Internal endpoint invoked by Vercel Cron to process pending outgoing requests.",
tags: ["External DB Sync"],
hidden: true,
},
request: yupObject({
auth: yupObject({}).nullable().optional(),
method: yupString().oneOf(["GET"]).defined(),
headers: yupObject({
authorization: yupTuple([yupString()]).defined(),
}).defined(),
query: yupObject({}).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
ok: yupBoolean().defined(),
requests_processed: yupNumber().defined(),
}).defined(),
}),
handler: async ({ headers }) => {
const authHeader = headers.authorization[0];
if (authHeader !== `Bearer ${getEnvVariable("CRON_SECRET")}`) {
throw new StatusError(401, "Unauthorized");
}
const startTime = performance.now();
const maxDurationMs = 2 * 60 * 1000;
const busySleepMs = 50;
let totalRequestsProcessed = 0;
async function claimPendingRequests(): Promise<OutgoingRequest[]> {
return await retryTransaction(globalPrismaClient, async (tx) => {
const rows = await tx.$queryRaw<OutgoingRequest[]>`
UPDATE "OutgoingRequest"
SET "fulfilledAt" = NOW()
WHERE "id" IN (
SELECT id
FROM "OutgoingRequest"
WHERE "fulfilledAt" IS NULL
ORDER BY "createdAt"
LIMIT 100
FOR UPDATE SKIP LOCKED
)
RETURNING *;
`;
return rows;
});
}
async function processRequests(requests: OutgoingRequest[]): Promise<number> {
let processed = 0;
for (const request of requests) {
try {
const options = request.qstashOptions as any;
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
let fullUrl = options.url.startsWith("http")
? options.url
: new URL(options.url, baseUrl).toString();
if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
const url = new URL(fullUrl);
if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
url.hostname = "host.docker.internal";
fullUrl = url.toString();
}
}
await upstash.publishJSON({
url: fullUrl,
body: options.body,
});
processed++;
} catch (error) {
console.error(
`Failed to process outgoing request ${request.id}:`,
error,
);
}
}
return processed;
}
while (performance.now() - startTime < maxDurationMs) {
const pendingRequests = await claimPendingRequests();
totalRequestsProcessed += await processRequests(pendingRequests);
const elapsed = performance.now() - startTime;
if (elapsed >= maxDurationMs) {
break;
}
await wait(busySleepMs);
}
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
requests_processed: totalRequestsProcessed,
},
};
},
});

View File

@ -0,0 +1,77 @@
import { globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import {
yupBoolean,
yupNumber,
yupObject,
yupString,
yupTuple,
} from "@stackframe/stack-shared/dist/schema-fields";
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
export const GET = createSmartRouteHandler({
metadata: {
summary: "Run sequence ID backfill",
description:
"Internal endpoint invoked by Vercel Cron to backfill null sequence IDs.",
tags: ["External DB Sync"],
hidden: true,
},
request: yupObject({
auth: yupObject({}).nullable().optional(),
method: yupString().oneOf(["GET"]).defined(),
headers: yupObject({
authorization: yupTuple([yupString()]).defined(),
}).defined(),
query: yupObject({}).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
ok: yupBoolean().defined(),
iterations: yupNumber().defined(),
}).defined(),
}),
handler: async ({ headers }) => {
const authHeader = headers.authorization[0];
if (authHeader !== `Bearer ${getEnvVariable("CRON_SECRET")}`) {
throw new StatusError(401, "Unauthorized");
}
const startTime = performance.now();
const maxDurationMs = 2 * 60 * 1000;
const sleepMs = 50;
let iterations = 0;
while (performance.now() - startTime < maxDurationMs) {
try {
await globalPrismaClient.$executeRaw`SELECT backfill_null_sequence_ids()`;
} catch (error) {
console.warn('[sequencer] Failed to run backfill_null_sequence_ids:', error);
}
iterations++;
const elapsed = performance.now() - startTime;
if (elapsed >= maxDurationMs) {
break;
}
await wait(sleepMs);
}
return {
statusCode: 200,
bodyType: "json" as const,
body: {
ok: true,
iterations,
},
};
},
});

View File

@ -0,0 +1,68 @@
import { syncExternalDatabases } from "@/lib/external-db-sync";
import { getTenancy } from "@/lib/tenancies";
import { ensureUpstashSignature } from "@/lib/upstash";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { yupBoolean, yupNumber, yupObject, yupString, yupTuple } from "@stackframe/stack-shared/dist/schema-fields";
export const POST = createSmartRouteHandler({
metadata: {
summary: "Sync engine webhook endpoint",
description: "Receives webhook from QStash to trigger external database sync for a tenant",
tags: ["External DB Sync"],
hidden: true,
},
request: yupObject({
headers: yupObject({
"upstash-signature": yupTuple([yupString()]).defined(),
}).defined(),
body: yupObject({
tenantId: yupString().defined(),
}).defined(),
method: yupString().oneOf(["POST"]).defined(),
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
success: yupBoolean().defined(),
tenantId: yupString().defined(),
timestamp: yupString().defined(),
}).defined(),
}),
handler: async ({ body }, fullReq) => {
await ensureUpstashSignature(fullReq);
const { tenantId } = body;
const timestamp = new Date().toISOString();
const tenancy = await getTenancy(tenantId);
if (!tenancy) {
console.error(`Tenant not found: ${tenantId}`);
return {
statusCode: 200,
bodyType: "json",
body: {
success: false,
tenantId,
timestamp,
},
};
}
try {
await syncExternalDatabases(tenancy);
} catch (error: any) {
console.error(` Error syncing external databases for tenant ${tenantId}:`, error);
}
return {
statusCode: 200,
bodyType: "json",
body: {
success: true,
tenantId,
timestamp,
},
};
},
});

View File

@ -0,0 +1,203 @@
import { Tenancy } from "@/lib/tenancies";
import { getPrismaClientForTenancy, PrismaClientTransaction } from "@/prisma-client";
import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
import { Client } from 'pg';
export function getExternalDatabases(config: CompleteConfig) {
return config.dbSync.externalDatabases;
}
async function pushRowsToExternalDb(
externalClient: Client,
tableName: string,
newRows: any[],
upsertQuery?: string,
) {
if (!upsertQuery) {
throw new Error(
`Cannot sync table "${tableName}": No upsertQuery configured.`
);
}
if (newRows.length === 0) return;
const placeholderMatches = upsertQuery.match(/\$\d+/g) ?? [];
const expectedParamCount =
placeholderMatches.length === 0
? 0
: Math.max(...placeholderMatches.map((m) => Number(m.slice(1))));
if (expectedParamCount === 0) {
throw new Error(
`upsertQuery for table "${tableName}" contains no positional parameters ($1, $2, ...).` +
` Your mapping must use parameterized SQL.`
);
}
const sampleRow = newRows[0];
const { tenancyId: _ignore, ...restSample } = sampleRow;
const orderedKeys = Object.keys(restSample);
if (orderedKeys.length !== expectedParamCount) {
throw new Error(
` Column count mismatch for table "${tableName}".\n` +
`→ upsertQuery expects ${expectedParamCount} parameters.\n` +
`→ internalDbFetchQuery returned ${orderedKeys.length} columns (excluding tenancyId).\n` +
`Fix your SELECT column order or your SQL parameter order.`
);
}
for (const row of newRows) {
const { tenancyId, ...rest } = row;
const rowKeys = Object.keys(rest);
const validShape =
rowKeys.length === orderedKeys.length &&
rowKeys.every((k, i) => k === orderedKeys[i]);
if (!validShape) {
throw new Error(
` Row shape mismatch for table "${tableName}".\n` +
`Expected column order: [${orderedKeys.join(", ")}]\n` +
`Received column order: [${rowKeys.join(", ")}]\n` +
`Your SELECT must be explicit, ordered, and NEVER use SELECT *.\n` +
`Fix the SELECT in internalDbFetchQuery immediately.`
);
}
}
for (const row of newRows) {
const { tenancyId, ...rest } = row;
await externalClient.query(upsertQuery, Object.values(rest));
}
}
async function syncMapping(
externalClient: Client,
mappingId: string,
mapping: CompleteConfig["dbSync"]["externalDatabases"][string]["mappings"][string],
internalPrisma: PrismaClientTransaction,
dbId: string,
tenancyId: string,
) {
const rawSourceTables: any = (mapping as any).sourceTables;
const sourceTables: string[] = rawSourceTables
? Object.values(rawSourceTables)
: [];
const rawTargetPk: any = (mapping as any).targetTablePrimaryKey;
const targetTablePrimaryKey: string[] = rawTargetPk
? Object.values(rawTargetPk)
: [];
if (sourceTables.length === 0) {
console.error(
` Invalid configuration for mapping #${mappingId}: 'sourceTables' resolved to an empty list.`,
);
return;
}
if (targetTablePrimaryKey.length === 0) {
console.error(
` Invalid configuration for mapping #${mappingId}: 'targetTablePrimaryKey' resolved to an empty list.`,
);
return;
}
const fetchQuery = mapping.internalDbFetchQuery;
if (!fetchQuery || !mapping.targetTable) {
return;
}
const tableName = mapping.targetTable;
if (mapping.targetTableSchema) {
const checkTableQuery = `
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = $1
);
`;
const res = await externalClient.query(checkTableQuery, [tableName]);
if (!res.rows[0].exists) {
try {
await externalClient.query(mapping.targetTableSchema);
} catch (err: any) {
if (err.code !== '23505' || err.constraint !== 'pg_type_typname_nsp_index') {
throw err;
}
}
}
}
const rows = await internalPrisma.$queryRawUnsafe<any[]>(fetchQuery, tenancyId);
await pushRowsToExternalDb(
externalClient,
tableName,
rows,
mapping.externalDbUpdateQuery,
);
}
async function syncDatabase(
dbId: string,
dbConfig: CompleteConfig["dbSync"]["externalDatabases"][string],
internalPrisma: PrismaClientTransaction,
tenancyId: string,
) {
const mappings = dbConfig.mappings;
const isArray = Array.isArray(mappings);
const mappingCount = mappings
? (isArray ? (mappings as any[]).length : Object.keys(mappings as Record<string, unknown>).length)
: 0;
if (!dbConfig.connectionString) {
return;
}
const externalClient = new Client({
connectionString: dbConfig.connectionString,
});
try {
await externalClient.connect();
if (!mappings || mappingCount === 0) {
return;
}
for (const [mappingId, mapping] of Object.entries(mappings)) {
await syncMapping(
externalClient,
mappingId,
mapping as any,
internalPrisma,
dbId,
tenancyId,
);
}
} catch (error: any) {
console.error(`Error syncing external DB ${dbId}:`, error);
} finally {
await externalClient.end();
}
}
export async function syncExternalDatabases(tenancy: Tenancy) {
const externalDatabases = getExternalDatabases(tenancy.config);
if (Object.keys(externalDatabases).length === 0) {
return;
}
const internalPrisma = await getPrismaClientForTenancy(tenancy);
for (const [dbId, dbConfig] of Object.entries(externalDatabases)) {
await syncDatabase(dbId, dbConfig, internalPrisma, tenancy.id);
}
}

View File

@ -87,20 +87,8 @@ export function handleApiRequest(handler: (req: NextRequest, options: any, reque
headers: Object.fromEntries(req.headers),
});
// During development, don't trash the console with logs from E2E tests
const disableExtendedLogging = getNodeEnvironment().includes('dev') && !!req.headers.get("x-stack-development-disable-extended-logging");
let hasRequestFinished = false;
try {
// censor long query parameters because they might contain sensitive data
const censoredUrl = new URL(req.url);
for (const [key, value] of censoredUrl.searchParams.entries()) {
if (value.length <= 8) {
continue;
}
censoredUrl.searchParams.set(key, value.slice(0, 4) + "--REDACTED--" + value.slice(-4));
}
// request duration warning
const warnAfterSeconds = 12;
runAsynchronously(async () => {
@ -110,32 +98,20 @@ export function handleApiRequest(handler: (req: NextRequest, options: any, reque
}
});
if (!disableExtendedLogging) console.log(`[API REQ] [${requestId}] ${req.method} ${censoredUrl}`);
const timeStart = performance.now();
const res = await handler(req, options, requestId);
const time = (performance.now() - timeStart);
if ([301, 302].includes(res.status)) {
throw new StackAssertionError("HTTP status codes 301 and 302 should not be returned by our APIs because the behavior for non-GET methods is inconsistent across implementations. Use 303 (to rewrite method to GET) or 307/308 (to preserve the original method and data) instead.", { status: res.status, url: req.nextUrl, req, res });
}
if (!disableExtendedLogging) console.log(`[ RES] [${requestId}] ${req.method} ${censoredUrl}: ${res.status} (in ${time.toFixed(0)}ms)`);
return res;
} catch (e) {
let statusError: StatusError;
try {
statusError = catchError(e, requestId);
} catch (e) {
if (!disableExtendedLogging) console.log(`[ EXC] [${requestId}] ${req.method} ${req.url}: Non-error caught (such as a redirect), will be re-thrown. Digest: ${(e as any)?.digest}`);
throw e;
}
if (!disableExtendedLogging) console.log(`[ ERR] [${requestId}] ${req.method} ${req.url}: ${statusError.message}`);
if (!isCommonError(statusError)) {
// HACK: Log a nicified version of the error instead of statusError to get around buggy Next.js pretty-printing
// https://www.reddit.com/r/nextjs/comments/1gkxdqe/comment/m19kxgn/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button
if (!disableExtendedLogging) console.debug(`For the error above with request ID ${requestId}, the full error is:`, errorToNiceString(statusError));
}
const res = await createResponse(req, requestId, {
statusCode: statusError.statusCode,
bodyType: "binary",

View File

@ -18,7 +18,9 @@
"dotenv": "^16.4.5"
},
"devDependencies": {
"jose": "^5.6.3"
"@types/pg": "^8.15.6",
"jose": "^5.6.3",
"pg": "^8.16.3"
},
"packageManager": "pnpm@10.23.0"
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,565 @@
import { afterAll, beforeAll, describe, expect } from 'vitest';
import { test } from '../../../../helpers';
import { User, niceBackendFetch } from '../../../backend-helpers';
import {
TEST_TIMEOUT,
TestDbManager,
createProjectWithExternalDb,
verifyInExternalDb,
verifyNotInExternalDb,
waitForCondition,
waitForSyncedData,
waitForSyncedDeletion,
waitForTable
} from './external-db-sync-utils';
// Run tests sequentially to avoid concurrency issues with shared backend state
describe.sequential('External DB Sync - Basic Tests', () => {
let dbManager: TestDbManager;
beforeAll(async () => {
dbManager = new TestDbManager();
await dbManager.init();
});
afterAll(async () => {
await dbManager.cleanup();
});
/**
* What it does:
* - Creates a user, patches the display name, and triggers the sync once.
* - Checks PartialUsers for a matching row only after the sync completes.
*
* Why it matters:
* - Ensures inserts never appear externally until the sync pipeline runs.
*/
test('Insert: New user is synced to external DB', async () => {
const dbName = 'insert_only_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'insert-only@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Insert Only User' }
});
await waitForSyncedData(client, 'insert-only@example.com', 'Insert Only User');
await verifyInExternalDb(client, 'insert-only@example.com', 'Insert Only User');
}, TEST_TIMEOUT);
/**
* What it does:
* - Exports a baseline row, mutates the display name, runs another sync, and reads PartialUsers.
* - Compares the stored display name to guarantee it reflects the latest mutation.
*
* Why it matters:
* - Proves updates propagate to the external DB instead of leaving stale data.
*/
test('Update: Existing user changes are reflected in external DB', async () => {
const dbName = 'update_only_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'update-only@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Before Update' }
});
await waitForSyncedData(client, 'update-only@example.com', 'Before Update');
await verifyInExternalDb(client, 'update-only@example.com', 'Before Update');
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'After Update' }
});
await waitForSyncedData(client, 'update-only@example.com', 'After Update');
await verifyInExternalDb(client, 'update-only@example.com', 'After Update');
}, TEST_TIMEOUT);
/**
* What it does:
* - Syncs a user into PartialUsers, deletes the user internally, and waits for the deletion helper.
* - Queries PartialUsers to ensure the row disappears.
*
* Why it matters:
* - Validates deletion events propagate and prevent orphaned rows in external DBs.
*/
test('Delete: Deleted user is removed from external DB', async () => {
const dbName = 'delete_only_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
display_name: '🗑️ Delete Test Project',
description: 'Testing deletion sync to external database'
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'delete-only@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Delete Only User' }
});
await waitForSyncedData(client, 'delete-only@example.com', 'Delete Only User');
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'DELETE',
});
const deletedUserResponse = await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'GET',
});
expect(deletedUserResponse.status).toBe(404);
await waitForSyncedDeletion(client, 'delete-only@example.com');
await verifyNotInExternalDb(client, 'delete-only@example.com');
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a user while verifying the PartialUsers table is absent before sync.
* - Triggers sync, waits for table creation, and confirms the row appears afterward.
*
* Why it matters:
* - Demonstrates that syncs control both table provisioning and data export timing.
*/
test('Sync Mechanism Verification: Data appears ONLY after sync', async () => {
const dbName = 'sync_verification_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, {
display_name: '🔄 Sync Verification Test Project',
description: 'Testing that data only appears after sync is triggered'
});
const user = await User.create({ emailAddress: 'sync-verify@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Sync Verify User' }
});
const client = dbManager.getClient(dbName);
const tableCheckBefore = await client.query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'PartialUsers'
);
`);
expect(tableCheckBefore.rows[0].exists).toBe(false);
await waitForTable(client, 'PartialUsers');
await waitForCondition(
async () => {
const res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, ['sync-verify@example.com']);
return res.rows.length > 0;
},
{ description: 'data to appear in external DB', timeoutMs: 90000 }
);
await verifyInExternalDb(client, 'sync-verify@example.com', 'Sync Verify User');
}, TEST_TIMEOUT);
/**
* What it does:
* - Runs create, update, and delete actions in order while syncing between each step.
* - Verifies PartialUsers reflects each intermediate state.
*
* Why it matters:
* - Confirms the sync handles the entire lifecycle without leaving stale records.
*/
test('Full CRUD Lifecycle: Create, Update, Delete', async () => {
const dbName = 'crud_lifecycle_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'crud-test@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Original Name' }
});
await waitForSyncedData(client, 'crud-test@example.com', 'Original Name');
await verifyInExternalDb(client, 'crud-test@example.com', 'Original Name');
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Updated Name' }
});
await waitForSyncedData(client, 'crud-test@example.com', 'Updated Name');
await verifyInExternalDb(client, 'crud-test@example.com', 'Updated Name');
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForSyncedDeletion(client, 'crud-test@example.com');
await verifyNotInExternalDb(client, 'crud-test@example.com');
}, TEST_TIMEOUT);
/**
* What it does:
* - Syncs a user into an empty database to trigger table auto-creation.
* - Queries `information_schema` and PartialUsers to confirm the table and row exist.
*
* Why it matters:
* - Ensures mappings can provision their own schema without manual migrations.
*/
test('Automatic Table Creation', async () => {
const dbName = 'auto_table_creation_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const user = await User.create({ emailAddress: 'auto-create@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Auto Create User' }
});
const client = dbManager.getClient(dbName);
await waitForSyncedData(client, 'auto-create@example.com', 'Auto Create User');
const tableCheck = await client.query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'PartialUsers'
);
`);
expect(tableCheck.rows[0].exists).toBe(true);
await verifyInExternalDb(client, 'auto-create@example.com', 'Auto Create User');
});
/**
* What it does:
* - Configures one valid and one invalid external DB mapping for the same project.
* - Runs sync and verifies the healthy DB still receives the exported row.
*
* Why it matters:
* - Shows a failing database connection does not block successful targets.
*/
test('Resilience: One bad DB should not crash the sync', async () => {
const goodDbName = 'resilience_good_db';
const goodConnectionString = await dbManager.createDatabase(goodDbName);
const badConnectionString = 'postgresql://invalid:invalid@invalid:5432/invalid';
await createProjectWithExternalDb({
good_db: {
type: 'postgres',
connectionString: goodConnectionString,
},
bad_db: {
type: 'postgres',
connectionString: badConnectionString,
}
});
const user = await User.create({ emailAddress: 'resilience@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Resilience User' }
});
await waitForSyncedData(dbManager.getClient(goodDbName), 'resilience@example.com', 'Resilience User');
const client = dbManager.getClient(goodDbName);
const res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, ['resilience@example.com']);
expect(res.rows.length).toBe(1);
expect(res.rows[0].displayName).toBe('Resilience User');
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a user with two contact channels and runs the sync.
* - Reads PartialUsers to assert both channel values are present with the same display name.
*
* Why it matters:
* - Confirms multi-channel users export all addresses instead of overwriting each other.
*/
test('Multi-ContactChannel: User with multiple contact channels syncs all', async () => {
const dbName = 'multi_contact_channel_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'multi-contact@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Multi Contact User' }
});
const secondEmailResponse = await niceBackendFetch(`/api/v1/contact-channels`, {
accessType: 'admin',
method: 'POST',
body: {
user_id: user.userId,
type: 'email',
value: 'second-email@example.com',
is_verified: false,
used_for_auth: false,
}
});
expect(secondEmailResponse.status).toBe(201);
// Wait for BOTH contact channels to be synced
await waitForSyncedData(client, 'multi-contact@example.com', 'Multi Contact User');
await waitForSyncedData(client, 'second-email@example.com', 'Multi Contact User');
const allRows = await client.query(`SELECT * FROM "PartialUsers" ORDER BY "value"`);
expect(allRows.rows.length).toBe(2);
const emails = allRows.rows.map(r => r.value);
expect(emails).toContain('multi-contact@example.com');
expect(emails).toContain('second-email@example.com');
expect(allRows.rows[0].displayName).toBe('Multi Contact User');
expect(allRows.rows[1].displayName).toBe('Multi Contact User');
}, TEST_TIMEOUT);
/**
* What it does:
* - Exports a user with multiple channels, deletes the user, and waits for deletion sync.
* - Ensures PartialUsers no longer contains any row for that user.
*
* Why it matters:
* - Validates that cascading deletes remove every external row tied to the user.
*/
test('Multi-ContactChannel Deletion: Deleting user cascades all contact channels', async () => {
const dbName = 'multi_contact_deletion_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'cascade-delete@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Cascade Delete User' }
});
await niceBackendFetch(`/api/v1/contact-channels`, {
accessType: 'admin',
method: 'POST',
body: {
user_id: user.userId,
type: 'email',
value: 'cascade-second@example.com',
is_verified: false,
used_for_auth: false,
}
});
await waitForSyncedData(client, 'cascade-delete@example.com', 'Cascade Delete User');
// Verify both are synced
const beforeDelete = await client.query(`SELECT * FROM "PartialUsers"`);
expect(beforeDelete.rows.length).toBe(2);
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForSyncedDeletion(client, 'cascade-delete@example.com');
const afterDelete = await client.query(`SELECT * FROM "PartialUsers"`);
expect(afterDelete.rows.length).toBe(0);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates two contact channels, deletes only the secondary one, and runs the deletion sync helper.
* - Confirms PartialUsers retains the primary contact while the secondary row is removed.
*
* Why it matters:
* - Ensures granular channel deletions do not wipe the entire user from external DBs.
*/
test('Single ContactChannel Deletion: Deleting one channel keeps user and other channels', async () => {
const dbName = 'single_contact_deletion_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'single-delete@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Single Delete User' }
});
const secondEmailResponse = await niceBackendFetch(`/api/v1/contact-channels`, {
accessType: 'admin',
method: 'POST',
body: {
user_id: user.userId,
type: 'email',
value: 'single-keep@example.com',
is_verified: false,
used_for_auth: false,
}
});
const secondChannelId = secondEmailResponse.body.id;
await waitForSyncedData(client, 'single-delete@example.com', 'Single Delete User');
await waitForSyncedData(client, 'single-keep@example.com', 'Single Delete User');
const beforeDelete = await client.query(`SELECT * FROM "PartialUsers" ORDER BY "value"`);
expect(beforeDelete.rows.length).toBe(2);
const deleteResponse = await niceBackendFetch(`/api/v1/contact-channels/${user.userId}/${secondChannelId}`, {
accessType: 'admin',
method: 'DELETE',
});
expect(deleteResponse.status).toBe(200);
await waitForSyncedDeletion(client, 'single-keep@example.com');
const afterDelete = await client.query(`SELECT * FROM "PartialUsers"`);
expect(afterDelete.rows.length).toBe(1);
expect(afterDelete.rows[0].value).toBe('single-delete@example.com');
expect(afterDelete.rows[0].displayName).toBe('Single Delete User');
}, TEST_TIMEOUT);
/**
* What it does:
* - Exports a user, bumps its sequenceId with an update, and attempts to delete using the old sequenceId.
* - Verifies the row still exists with the latest data.
*
* Why it matters:
* - Demonstrates sequence guards prevent older deletes from clobbering newer updates.
*/
test('Race Condition Protection: Old delete cannot remove newer record', async () => {
const dbName = 'race_condition_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'race-test@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Original Name' }
});
await waitForSyncedData(client, 'race-test@example.com', 'Original Name');
const firstRow = await verifyInExternalDb(client, 'race-test@example.com', 'Original Name');
const firstSequenceId = BigInt(firstRow.sequenceId);
const contactChannelId = firstRow.id;
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Updated Name' }
});
await waitForSyncedData(client, 'race-test@example.com', 'Updated Name');
const secondRow = await verifyInExternalDb(client, 'race-test@example.com', 'Updated Name');
const secondSequenceId = BigInt(secondRow.sequenceId);
const deleteAttempt = await client.query(
`DELETE FROM "PartialUsers" WHERE "id" = $1 AND "sequenceId" <= $2`,
[contactChannelId, firstSequenceId.toString()]
);
expect(deleteAttempt.rowCount).toBe(0);
const afterDelete = await verifyInExternalDb(client, 'race-test@example.com', 'Updated Name');
expect(BigInt(afterDelete.sequenceId)).toBe(secondSequenceId);
}, TEST_TIMEOUT);
});

View File

@ -0,0 +1,751 @@
import { Client } from 'pg';
import { afterAll, beforeAll, describe, expect } from 'vitest';
import { test } from '../../../../helpers';
import { InternalApiKey, User, niceBackendFetch } from '../../../backend-helpers';
import {
HIGH_VOLUME_TIMEOUT,
TEST_TIMEOUT,
TestDbManager,
createProjectWithExternalDb,
waitForCondition,
waitForSyncedDeletion,
waitForTable
} from './external-db-sync-utils';
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
describe.sequential('External DB Sync - Race Condition Tests', () => {
let dbManager: TestDbManager;
beforeAll(async () => {
dbManager = new TestDbManager();
await dbManager.init();
});
afterAll(async () => {
await dbManager.cleanup();
});
/**
* What it does:
* - Updates a user, triggers two sync cycles concurrently, and waits for PartialUsers to show the last value.
* - Confirms only a single row exists with the final display name.
*
* Why it matters:
* - Demonstrates overlapping pollers remain idempotent instead of duplicating or reverting data.
*/
test('Concurrent sync triggers produce a single consistent export', async () => {
const dbName = 'race_parallel_sync_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
},
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'parallel-sync@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Initial Name' },
});
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Final Name' },
});
await waitForTable(client, 'PartialUsers');
await waitForCondition(
async () => {
const res = await client.query(
`SELECT * FROM "PartialUsers" WHERE "value" = $1`,
['parallel-sync@example.com'],
);
return res.rows.length === 1 && res.rows[0].displayName === 'Final Name';
},
{ description: 'sync to converge on final state', timeoutMs: 90000 },
);
}, TEST_TIMEOUT);
/**
* What it does:
* - Issues a final update, deletes the user immediately afterward, and runs the deletion helper.
* - Confirms PartialUsers has zero rows for that value.
*
* Why it matters:
* - Shows delete events win over closely preceding updates, preventing stale data resurrection.
*/
test('Immediate delete after update removes the contact channel', async () => {
const dbName = 'race_update_delete_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
},
});
const client = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: 'update-delete@example.com' });
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Before Delete' },
});
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: 'Should Be Deleted' },
});
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForTable(client, 'PartialUsers');
await waitForSyncedDeletion(client, 'update-delete@example.com');
const res = await client.query(
`SELECT * FROM "PartialUsers" WHERE "value" = $1`,
['update-delete@example.com'],
);
expect(res.rows.length).toBe(0);
}, TEST_TIMEOUT);
/**
* What it does:
* - Exports 300 users (forcing multi-page fetches), deletes a low-sequence contact channel, and syncs again.
* - Checks the deleted row is gone and the total count drops by exactly one.
*
* Why it matters:
* - Prevents pagination LIMIT boundaries from causing delete events to be skipped.
*/
test('Deletes near pagination boundaries are honored', async () => {
const dbName = 'race_pagination_delete_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
},
});
const client = dbManager.getClient(dbName);
const totalUsers = 300;
const users = [];
await InternalApiKey.createAndSetProjectKeys();
const batchSize = 10;
for (let batchStart = 0; batchStart < totalUsers; batchStart += batchSize) {
const batchEnd = Math.min(batchStart + batchSize, totalUsers);
const batchPromises = [];
for (let i = batchStart; i < batchEnd; i++) {
const email = `page-user-${i}@example.com`;
batchPromises.push(
User.create({ emailAddress: email }).then(async (user) => {
await niceBackendFetch(`/api/v1/users/${user.userId}`, {
accessType: 'admin',
method: 'PATCH',
body: { display_name: `Paged User ${i}` },
});
return { email, userId: user.userId };
})
);
}
const batchUsers = await Promise.all(batchPromises);
users.push(...batchUsers);
if (batchEnd < totalUsers) {
await new Promise(resolve => setTimeout(resolve, 200));
}
if (batchEnd < totalUsers && batchEnd % 200 === 0) {
await InternalApiKey.createAndSetProjectKeys();
}
}
await waitForTable(client, 'PartialUsers');
await waitForCondition(
async () => {
const res = await client.query(`SELECT COUNT(*) AS count FROM "PartialUsers"`);
return parseInt(res.rows[0].count, 10) === totalUsers;
},
{ description: 'initial >300 users exported', timeoutMs: 60000 },
);
const deletedUser = users[1];
await niceBackendFetch(`/api/v1/users/${deletedUser.userId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForCondition(
async () => {
const res = await client.query(`SELECT COUNT(*) AS count FROM "PartialUsers"`);
return parseInt(res.rows[0].count, 10) === totalUsers - 1;
},
{ description: 'pagination delete reflected', timeoutMs: 180000 },
);
const deletedRow = await client.query(
`SELECT * FROM "PartialUsers" WHERE "value" = $1`,
[deletedUser.email],
);
expect(deletedRow.rows.length).toBe(0);
}, HIGH_VOLUME_TIMEOUT);
/**
* What it does:
* - Creates overlapping database transactions that update the same row
* - Commits them at different times while sync is happening
* - Verifies that the highest sequence ID wins in the external DB
*
* Why it matters:
* - Proves true database-level race conditions are handled correctly
* - Tests that sync captures all committed changes eventually
*/
describe('Race conditions with overlapping transactions', () => {
const LOCAL_TEST_TIMEOUT = 120_000; // Must be > 70s sleep + setup time
async function setupExternalDbWithBaseline(dbName: string) {
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
},
});
const externalClient = dbManager.getClient(dbName);
const user = await User.create({ emailAddress: `${dbName}@example.com` });
// Make sure the PartialUsers row exists
await waitForTable(externalClient, 'PartialUsers');
await waitForCondition(
async () => {
const res = await externalClient.query<{
displayName: string | null,
sequenceId: string | null,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
return res.rows.length === 1;
},
{ description: `baseline row for ${dbName}`, timeoutMs: 60000 },
);
const baseline = await externalClient.query<{
displayName: string | null,
sequenceId: string | null,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
if (baseline.rows.length !== 1) {
throw new Error(`Expected baseline row for ${dbName}, got ${baseline.rows.length}`);
}
const baselineRow = baseline.rows[0];
const baselineSeq = baselineRow.sequenceId
? BigInt(baselineRow.sequenceId)
: BigInt(0);
return {
externalClient,
user,
baselineSeq,
};
}
function makeInternalDbUrl() {
const portPrefix = process.env.NEXT_PUBLIC_STACK_PORT_PREFIX || '81';
return `postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${portPrefix}28/stackframe`;
}
/**
* Scenario 1:
* Poller runs while a transaction is in-flight and uncommitted.
* Only the baseline committed value should be visible.
*
*/
test(
'Poller ignores uncommitted overlapping updates',
async () => {
const dbName = 'race_uncommitted_poll_test';
const { externalClient, user, baselineSeq } =
await setupExternalDbWithBaseline(dbName);
const internalDbUrl = makeInternalDbUrl();
const internalClient = new Client({ connectionString: internalDbUrl });
await internalClient.connect();
try {
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 1', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await sleep(70000);
const during = await externalClient.query<{
displayName: string | null,
sequenceId: string | null,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(during.rows.length).toBe(1);
const row = during.rows[0];
expect(row.displayName).not.toBe('Transaction 1');
const seq = row.sequenceId ? BigInt(row.sequenceId) : BigInt(0);
expect(seq).toBe(baselineSeq);
await internalClient.query('ROLLBACK');
} finally {
await internalClient.end();
}
},
LOCAL_TEST_TIMEOUT,
);
/**
* Scenario 2:
* First transaction commits, then poller runs.
* Poller should pick up Transaction 1 and sequenceId should increase.
*/
test(
'Poller picks up first committed transaction',
async () => {
const dbName = 'race_after_first_commit_test';
const { externalClient, user, baselineSeq } =
await setupExternalDbWithBaseline(dbName);
const internalDbUrl = makeInternalDbUrl();
const internalClient = new Client({ connectionString: internalDbUrl });
await internalClient.connect();
try {
// Commit Transaction 1
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 1', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await internalClient.query('COMMIT');
await waitForCondition(
async () => {
const res = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
return (
res.rows.length === 1 &&
res.rows[0].displayName === 'Transaction 1'
);
},
{ description: 'Transaction 1 exported', timeoutMs: 90000 },
);
const afterT1 = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(afterT1.rows.length).toBe(1);
const row = afterT1.rows[0];
expect(row.displayName).toBe('Transaction 1');
const seq1 = BigInt(row.sequenceId);
expect(seq1).toBeGreaterThan(baselineSeq);
} finally {
await internalClient.end();
}
},
LOCAL_TEST_TIMEOUT,
);
/**
* Scenario 3:
* First transaction is committed and synced.
* Second transaction has UPDATE done but is still uncommitted.
* Poller should STILL see Transaction 1 (not Transaction 2).
*/
test(
'Poller does not see second update until commit',
async () => {
const dbName = 'race_second_uncommitted_poll_test';
const { externalClient, user, baselineSeq } =
await setupExternalDbWithBaseline(dbName);
const internalDbUrl = makeInternalDbUrl();
const internalClient = new Client({ connectionString: internalDbUrl });
await internalClient.connect();
try {
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 1', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await internalClient.query('COMMIT');
await waitForTable(externalClient, 'PartialUsers');
const afterT1 = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(afterT1.rows.length).toBe(1);
const afterT1Row = afterT1.rows[0];
const seq1 = BigInt(afterT1Row.sequenceId);
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 2', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await sleep(7000);
const duringT2 = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(duringT2.rows.length).toBe(1);
const duringT2Row = duringT2.rows[0];
expect(duringT2Row.displayName).not.toBe('Transaction 2');
const seqDuring = BigInt(duringT2Row.sequenceId);
expect(seqDuring).toBeGreaterThanOrEqual(seq1);
await internalClient.query('ROLLBACK');
} finally {
await internalClient.end();
}
},
LOCAL_TEST_TIMEOUT,
);
/**
* Scenario 4:
* Two different rows, out-of-order commits:
* - T1 starts
* - T2 starts
* - T2 updates row2
* - T1 updates row1
* - T2 commits
* - Sync only T2's row visible, T1's row unchanged
* - T1 commits
* - Sync T1's row now visible
*
* Uses two different users to avoid row-level locking.
*/
test(
'Out-of-order commits on different rows: uncommitted changes invisible',
async () => {
const dbName = 'race_two_rows_out_of_order_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
},
});
const externalClient = dbManager.getClient(dbName);
const user1 = await User.create({ emailAddress: 'row1@example.com' });
const user2 = await User.create({ emailAddress: 'row2@example.com' });
await waitForTable(externalClient, 'PartialUsers');
await waitForCondition(
async () => {
const res = await externalClient.query(`SELECT COUNT(*) as count FROM "PartialUsers"`);
return parseInt(res.rows[0].count, 10) === 2;
},
{ description: 'both users synced initially', timeoutMs: 60000 },
);
const internalDbUrl = makeInternalDbUrl();
const t1Client = new Client({ connectionString: internalDbUrl });
const t2Client = new Client({ connectionString: internalDbUrl });
await t1Client.connect();
await t2Client.connect();
try {
await t1Client.query('BEGIN');
await t2Client.query('BEGIN');
await t2Client.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'T2 Updated', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user2.userId],
);
await t1Client.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'T1 Updated', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user1.userId],
);
await t2Client.query('COMMIT');
await waitForCondition(
async () => {
const res = await externalClient.query<{ displayName: string | null }>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
['row2@example.com'],
);
return res.rows.length === 1 && res.rows[0].displayName === 'T2 Updated';
},
{ description: 'T2 row synced after T2 commit', timeoutMs: 90000 },
);
const row1BeforeT1Commit = await externalClient.query<{ displayName: string | null }>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
['row1@example.com'],
);
expect(row1BeforeT1Commit.rows.length).toBe(1);
expect(row1BeforeT1Commit.rows[0].displayName).not.toBe('T1 Updated');
await t1Client.query('COMMIT');
await waitForCondition(
async () => {
const res = await externalClient.query<{ displayName: string | null }>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
['row1@example.com'],
);
return res.rows.length === 1 && res.rows[0].displayName === 'T1 Updated';
},
{ description: 'T1 row synced after T1 commit', timeoutMs: 90000 },
);
const finalRow1 = await externalClient.query<{ displayName: string | null }>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
['row1@example.com'],
);
const finalRow2 = await externalClient.query<{ displayName: string | null }>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
['row2@example.com'],
);
expect(finalRow1.rows[0].displayName).toBe('T1 Updated');
expect(finalRow2.rows[0].displayName).toBe('T2 Updated');
} finally {
await t1Client.end();
await t2Client.end();
}
},
LOCAL_TEST_TIMEOUT,
);
/**
* Scenario 5:
* Full lifecycle:
* - baseline
* - Transaction 1 committed & synced
* - Transaction 2 committed after a later sync
* Final state must be Transaction 2 with a higher sequenceId.
*/
test(
'Highest sequenceId wins after both transactions commit',
async () => {
const dbName = 'race_full_lifecycle_test';
const { externalClient, user, baselineSeq } =
await setupExternalDbWithBaseline(dbName);
const internalDbUrl = makeInternalDbUrl();
const internalClient = new Client({ connectionString: internalDbUrl });
await internalClient.connect();
try {
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 1', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await internalClient.query('COMMIT');
await waitForCondition(
async () => {
const res = await externalClient.query<{
displayName: string | null,
}>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
[`${dbName}@example.com`],
);
return res.rows.length === 1 && res.rows[0].displayName === 'Transaction 1';
},
{ description: 'T1 synced', timeoutMs: 90000 },
);
const afterT1 = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(afterT1.rows.length).toBe(1);
const afterT1Row = afterT1.rows[0];
expect(afterT1Row.displayName).toBe('Transaction 1');
const seq1 = BigInt(afterT1Row.sequenceId);
expect(seq1).toBeGreaterThan(baselineSeq);
await internalClient.query('BEGIN');
await internalClient.query(
`
UPDATE "ProjectUser"
SET "displayName" = 'Transaction 2', "updatedAt" = NOW()
WHERE "projectUserId" = $1
`,
[user.userId],
);
await internalClient.query('COMMIT');
await waitForCondition(
async () => {
const res = await externalClient.query<{
displayName: string | null,
}>(
`SELECT "displayName" FROM "PartialUsers" WHERE "value" = $1`,
[`${dbName}@example.com`],
);
return res.rows.length === 1 && res.rows[0].displayName === 'Transaction 2';
},
{ description: 'T2 synced', timeoutMs: 90000 },
);
const afterT2 = await externalClient.query<{
displayName: string | null,
sequenceId: string,
}>(
`
SELECT "displayName", "sequenceId"
FROM "PartialUsers"
WHERE "value" = $1
`,
[`${dbName}@example.com`],
);
expect(afterT2.rows.length).toBe(1);
const afterT2Row = afterT2.rows[0];
expect(afterT2Row.displayName).toBe('Transaction 2');
const seq2 = BigInt(afterT2Row.sequenceId);
expect(seq2).toBeGreaterThan(seq1);
} finally {
await internalClient.end();
}
},
LOCAL_TEST_TIMEOUT,
);
});
});

View File

@ -0,0 +1,225 @@
import { Client } from 'pg';
import { expect } from 'vitest';
import { Project } from '../../../backend-helpers';
const PORT_PREFIX = process.env.NEXT_PUBLIC_STACK_PORT_PREFIX || '81';
export const POSTGRES_HOST = process.env.EXTERNAL_DB_TEST_HOST || `localhost:${PORT_PREFIX}32`;
export const POSTGRES_USER = process.env.EXTERNAL_DB_TEST_USER || 'postgres';
export const POSTGRES_PASSWORD = process.env.EXTERNAL_DB_TEST_PASSWORD || 'external-db-test-password';
export const TEST_TIMEOUT = 120000;
export const HIGH_VOLUME_TIMEOUT = 240000;
/**
* Helper class to manage external test databases
*/
export class TestDbManager {
private setupClient: Client | null = null;
private databases: Map<string, Client> = new Map();
private databaseNames: Set<string> = new Set();
async init() {
this.setupClient = new Client({
connectionString: `postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}/postgres`,
});
await this.setupClient.connect();
}
async createDatabase(dbName: string): Promise<string> {
if (!this.setupClient) throw new Error('TestDbManager not initialized');
const uniqueDbName = `${dbName}_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
await this.setupClient.query(`CREATE DATABASE "${uniqueDbName}"`);
const connectionString = `postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}/${uniqueDbName}`;
const client = new Client({ connectionString });
await client.connect();
this.databases.set(dbName, client);
this.databaseNames.add(uniqueDbName);
return connectionString;
}
getClient(dbName: string): Client {
const client = this.databases.get(dbName);
if (!client) throw new Error(`Database ${dbName} not found`);
return client;
}
async cleanup() {
for (const client of this.databases.values()) {
await client.end();
}
this.databases.clear();
if (this.setupClient) {
for (const dbName of this.databaseNames) {
try {
await this.setupClient.query(`DROP DATABASE IF EXISTS "${dbName}"`);
} catch (err) {
console.warn(`Failed to drop database ${dbName}:`, err);
}
}
this.databaseNames.clear();
await this.setupClient.end();
this.setupClient = null;
}
}
}
/**
* Wait for a condition to be true by polling, with timeout
*/
export async function waitForCondition(
checkFn: () => Promise<boolean>,
options: { timeoutMs?: number, intervalMs?: number, description?: string } = {}
): Promise<void> {
const { timeoutMs = 10000, intervalMs = 100, description = 'condition' } = options;
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
if (await checkFn()) {
return;
}
await new Promise(r => setTimeout(r, intervalMs));
}
throw new Error(`Timeout waiting for ${description} after ${timeoutMs}ms`);
}
/**
* Wait for data to appear in external DB (relies on automatic cron job)
*/
export async function waitForSyncedData(client: Client, email: string, expectedName?: string) {
await waitForCondition(
async () => {
let res;
try {
res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, [email]);
} catch (err: any) {
if (err && err.code === '42P01') {
return false;
}
throw err;
}
if (res.rows.length === 0) {
return false;
}
if (expectedName && res.rows[0].displayName !== expectedName) {
return false;
}
return true;
},
{
description: `data for ${email} to appear in external DB`,
timeoutMs: 90000,
intervalMs: 500,
}
);
}
/**
* Wait for data to be removed from external DB (relies on automatic cron job)
*/
export async function waitForSyncedDeletion(client: Client, email: string) {
await waitForCondition(
async () => {
let res;
try {
res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, [email]);
} catch (err: any) {
if (err && err.code === '42P01') {
return false;
}
throw err;
}
return res.rows.length === 0;
},
{
description: `data for ${email} to be removed from external DB`,
timeoutMs: 90000,
intervalMs: 500,
}
);
}
/**
* Wait for table to be created (relies on automatic cron job)
*/
export async function waitForTable(client: Client, tableName: string) {
await waitForCondition(
async () => {
const res = await client.query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = $1
);
`, [tableName]);
const exists = res.rows[0].exists;
return exists;
},
{
description: `table ${tableName} to be created`,
timeoutMs: 90000,
intervalMs: 500,
}
);
}
/**
* Helper to verify data does NOT exist in external DB
*/
export async function verifyNotInExternalDb(client: Client, email: string) {
const res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, [email]);
expect(res.rows.length).toBe(0);
}
/**
* Helper to verify data DOES exist in external DB
*/
export async function verifyInExternalDb(client: Client, email: string, expectedName?: string) {
const res = await client.query(`SELECT * FROM "PartialUsers" WHERE "value" = $1`, [email]);
expect(res.rows.length).toBe(1);
if (expectedName) {
expect(res.rows[0].displayName).toBe(expectedName);
}
return res.rows[0];
}
/**
* Helper to count total users in external DB
*/
export async function countUsersInExternalDb(client: Client): Promise<number> {
try {
const res = await client.query(`SELECT COUNT(*) FROM "PartialUsers"`);
return parseInt(res.rows[0].count, 10);
} catch (err: any) {
if (err && err.code === '42P01') {
return 0;
}
throw err;
}
}
/**
* Helper to create a project and update its config with external DB settings
*/
export async function createProjectWithExternalDb(externalDatabases: any, projectOptions?: { display_name?: string, description?: string }) {
const project = await Project.createAndSwitch(projectOptions);
await Project.updateConfig({
"dbSync.externalDatabases": externalDatabases
});
return project;
}
/**
* Helper to remove external DB config from current project
*/
export async function cleanupProjectExternalDb() {
await Project.updateConfig({
"dbSync.externalDatabases": {}
});
}

View File

@ -204,6 +204,19 @@ services:
environment:
HOST_ON_HOST: host.docker.internal
# ================= External DB Sync Test Postgres =================
external-db-test:
image: "docker.io/postgres:16.1"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: external-db-test-password
POSTGRES_DB: postgres
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}32:5432"
volumes:
- external-db-test-data:/var/lib/postgresql/data
# ================= volumes =================
@ -215,6 +228,7 @@ volumes:
s3mock-data:
deno-cache:
localstack-data:
external-db-test-data:
# ================= configs =================

View File

@ -65,6 +65,7 @@
"@changesets/cli": "^2.27.9",
"@testing-library/react": "^15.0.7",
"@types/node": "20.17.6",
"@types/pg": "^8.15.6",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^6.21.0",
"@typescript-eslint/parser": "^6.21.0",

View File

@ -0,0 +1,166 @@
export const DEFAULT_DB_SYNC_MAPPINGS = {
"PartialUsers": {
sourceTables: ["ContactChannel", "ProjectUser"],
targetTable: "PartialUsers",
targetTablePrimaryKey: ["id"],
targetTableSchema: `
CREATE TABLE IF NOT EXISTS "PartialUsers" (
"id" uuid PRIMARY KEY,
"createdAt" timestamp with time zone,
"updatedAt" timestamp with time zone,
"type" text,
"isPrimary" boolean,
"isVerified" boolean,
"value" text,
"sequenceId" bigint,
"userUpdatedAt" timestamp with time zone,
"profileImageUrl" text,
"displayName" text,
"userCreatedAt" timestamp with time zone,
"isAnonymous" boolean
);
CREATE INDEX ON "PartialUsers" ("sequenceId");
REVOKE ALL ON "PartialUsers" FROM PUBLIC;
GRANT SELECT ON "PartialUsers" TO PUBLIC;
`.trim(),
internalDbFetchQuery: `
SELECT *
FROM (
SELECT
"ContactChannel"."id",
"ContactChannel"."createdAt",
"ContactChannel"."updatedAt",
"ContactChannel"."type"::text AS "type",
CASE WHEN "ContactChannel"."isPrimary" = 'TRUE' THEN true ELSE false END AS "isPrimary",
"ContactChannel"."isVerified",
"ContactChannel"."value",
GREATEST("ContactChannel"."sequenceId", "ProjectUser"."sequenceId") AS "sequenceId",
"ProjectUser"."updatedAt" AS "userUpdatedAt",
"ProjectUser"."profileImageUrl",
"ProjectUser"."displayName",
"ProjectUser"."createdAt" AS "userCreatedAt",
"ProjectUser"."isAnonymous",
"ContactChannel"."tenancyId",
false AS "isDeleted"
FROM "ContactChannel"
JOIN "ProjectUser"
ON "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId"
AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId"
WHERE "ContactChannel"."tenancyId" = $1::uuid
UNION ALL
SELECT
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
NULL::timestamptz AS "createdAt",
"DeletedRow"."deletedAt" AS "updatedAt",
NULL::text AS "type",
NULL::boolean AS "isPrimary",
NULL::boolean AS "isVerified",
NULL::text AS "value",
"DeletedRow"."sequenceId" AS "sequenceId",
NULL::timestamptz AS "userUpdatedAt",
NULL::text AS "profileImageUrl",
NULL::text AS "displayName",
NULL::timestamptz AS "userCreatedAt",
NULL::boolean AS "isAnonymous",
"DeletedRow"."tenancyId",
true AS "isDeleted"
FROM "DeletedRow"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'ContactChannel'
) AS "_src"
WHERE "sequenceId" IS NOT NULL
ORDER BY "sequenceId" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQuery: `
WITH existing AS (
SELECT "sequenceId" AS "oldSeq"
FROM "PartialUsers"
WHERE "id" = $1::uuid
),
decision AS (
SELECT
$1::uuid AS "id",
$2::timestamptz AS "createdAt",
$3::timestamptz AS "updatedAt",
$4::text AS "type",
$5::boolean AS "isPrimary",
$6::boolean AS "isVerified",
$7::text AS "value",
$8::bigint AS "newSeq",
$9::timestamptz AS "userUpdatedAt",
$10::text AS "profileImageUrl",
$11::text AS "displayName",
$12::timestamptz AS "userCreatedAt",
$13::boolean AS "isAnonymous",
$14::boolean AS "isDeleted",
(SELECT "oldSeq" FROM existing) AS "oldSeq"
),
deleted AS (
DELETE FROM "PartialUsers" p
USING decision d
WHERE
d."isDeleted" = true
AND (
d."oldSeq" IS NULL
OR d."newSeq" >= d."oldSeq"
)
AND p."id" = d."id"
RETURNING 1
)
INSERT INTO "PartialUsers" (
"id",
"createdAt",
"updatedAt",
"type",
"isPrimary",
"isVerified",
"value",
"sequenceId",
"userUpdatedAt",
"profileImageUrl",
"displayName",
"userCreatedAt",
"isAnonymous"
)
SELECT
d."id",
d."createdAt",
d."updatedAt",
d."type",
d."isPrimary",
d."isVerified",
d."value",
d."newSeq" AS "sequenceId",
d."userUpdatedAt",
d."profileImageUrl",
d."displayName",
d."userCreatedAt",
d."isAnonymous"
FROM decision d
WHERE
d."isDeleted" = false
AND (
d."oldSeq" IS NULL
OR d."newSeq" > d."oldSeq"
)
ON CONFLICT ("id") DO UPDATE SET
"createdAt" = EXCLUDED."createdAt",
"updatedAt" = EXCLUDED."updatedAt",
"type" = EXCLUDED."type",
"isPrimary" = EXCLUDED."isPrimary",
"isVerified" = EXCLUDED."isVerified",
"value" = EXCLUDED."value",
"sequenceId" = EXCLUDED."sequenceId",
"userUpdatedAt" = EXCLUDED."userUpdatedAt",
"profileImageUrl" = EXCLUDED."profileImageUrl",
"displayName" = EXCLUDED."displayName",
"userCreatedAt" = EXCLUDED."userCreatedAt",
"isAnonymous" = EXCLUDED."isAnonymous"
WHERE
EXCLUDED."sequenceId" > "PartialUsers"."sequenceId";
`.trim(),
},
} as const;

View File

@ -49,6 +49,27 @@ const branchSchemaFuzzerConfig = [{
}],
}],
}],
dbSync: [{
externalDatabases: [{
"some-external-db-id": [{
type: ["neon", "postgres"] as const,
connectionString: [
"postgres://user:password@host:port/database",
"some-connection-string",
],
mappings: [{
"some-mapping-id": [{
sourceTables: [[["table1"], ["table2"]]],
targetTable: ["target_table"],
targetTableSchema: ["public"],
targetTablePrimaryKey: [[["id"]]] as const,
internalDbFetchQuery: ["SELECT * FROM table"],
externalDbUpdateQuery: ["UPDATE table SET ..."],
}]
}],
}],
}],
}],
dataVault: [{
stores: [{
"some-store-id": [{

View File

@ -8,13 +8,14 @@ import * as yup from "yup";
import { ALL_APPS } from "../apps/apps-config";
import { DEFAULT_EMAIL_TEMPLATES, DEFAULT_EMAIL_THEMES, DEFAULT_EMAIL_THEME_ID } from "../helpers/emails";
import * as schemaFields from "../schema-fields";
import { productSchema, userSpecifiedIdSchema, yupBoolean, yupDate, yupMixed, yupNever, yupNumber, yupObject, yupRecord, yupString, yupTuple, yupUnion } from "../schema-fields";
import { productSchema, userSpecifiedIdSchema, yupArray, yupBoolean, yupDate, yupMixed, yupNever, yupNumber, yupObject, yupRecord, yupString, yupTuple, yupUnion } from "../schema-fields";
import { SUPPORTED_CURRENCIES } from "../utils/currency-constants";
import { StackAssertionError } from "../utils/errors";
import { allProviders } from "../utils/oauth";
import { DeepFilterUndefined, DeepMerge, DeepRequiredOrUndefined, filterUndefined, get, has, isObjectLike, mapValues, set, typedAssign, typedEntries, typedFromEntries } from "../utils/objects";
import { Result } from "../utils/results";
import { CollapseObjectUnion, Expand, IntersectAll, IsUnion, typeAssert, typeAssertExtends, typeAssertIs } from "../utils/types";
import { DEFAULT_DB_SYNC_MAPPINGS } from "./db-sync-mappings";
import { Config, NormalizationError, NormalizesTo, assertNormalized, getInvalidConfigReason, normalize } from "./format";
export const configLevels = ['project', 'branch', 'environment', 'organization'] as const;
@ -200,6 +201,27 @@ export const branchConfigSchema = canNoLongerBeOverridden(projectConfigSchema, [
payments: branchPaymentsSchema,
dbSync: yupObject({
externalDatabases: yupRecord(
userSpecifiedIdSchema("externalDatabaseId"),
yupObject({
type: yupString().oneOf(['neon', 'postgres']).defined(),
connectionString: yupString().defined(),
mappings: yupRecord(
userSpecifiedIdSchema("mappingId"),
yupObject({
sourceTables: yupArray(yupString()).optional(),
targetTable: yupString().defined(),
targetTableSchema: yupString().optional(),
targetTablePrimaryKey: yupTuple([yupString().defined()]).optional(),
internalDbFetchQuery: yupString().defined(),
externalDbUpdateQuery: yupString().optional(),
})
).default(() => DEFAULT_DB_SYNC_MAPPINGS as any),
})
),
}),
dataVault: yupObject({
stores: yupRecord(
userSpecifiedIdSchema("storeId"),
@ -566,6 +588,15 @@ const organizationConfigDefaults = {
} as const)
},
dbSync: {
externalDatabases: (key: string) => ({
type: undefined,
connectionString: undefined,
mappings: DEFAULT_DB_SYNC_MAPPINGS as any,
}),
},
dataVault: {
stores: (key: string) => ({
displayName: "Unnamed Vault",
@ -858,12 +889,9 @@ export async function getConfigOverrideErrors<T extends yup.AnySchema>(schema: T
return yupMixed();
}
case "array": {
throw new StackAssertionError(`Arrays are not supported in config JSON files (besides tuples). Use a record instead.`, { schemaInfo, schema });
// This is how the implementation would look like, but we don't support arrays in config JSON files (besides tuples)
// const arraySchema = schema as yup.ArraySchema<any, any, any, any>;
// const innerType = arraySchema.innerType;
// return yupArray(innerType ? getRestrictedSchema(path + ".[]", innerType as any) : undefined);
const arraySchema = schema as yup.ArraySchema<any, any, any, any>;
const innerType = arraySchema.innerType;
return yupArray(innerType ? getRestrictedSchema(path + ".[]", innerType as any) : yupMixed());
}
case "tuple": {
return yupTuple(schemaInfo.items.map((s, index) => getRestrictedSchema(path + `[${index}]`, s)) as any);

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,14 @@
{
"crons": [
{
"path": "/api/latest/internal/external-db-sync/poller",
"schedule": "* * * * *"
},
{
"path": "/api/latest/internal/external-db-sync/sequencer",
"schedule": "* * * * *"
}
],
"functions": {
"**/*": {
"maxDuration": 300