mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-19 21:00:40 +08:00
fix routes, external-db dashboard
This commit is contained in:
parent
61f2b79f46
commit
8e9220505d
@ -37,12 +37,17 @@ CREATE INDEX "ContactChannel_tenancyId_sequenceId_idx" ON "ContactChannel"("tena
|
||||
CREATE TABLE "OutgoingRequest" (
|
||||
"id" UUID NOT NULL,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"deduplicationKey" TEXT,
|
||||
"qstashOptions" JSONB NOT NULL,
|
||||
"startedFulfillingAt" TIMESTAMP(3),
|
||||
|
||||
CONSTRAINT "OutgoingRequest_pkey" PRIMARY KEY ("id")
|
||||
CONSTRAINT "OutgoingRequest_pkey" PRIMARY KEY ("id"),
|
||||
CONSTRAINT "OutgoingRequest_deduplicationKey_key" UNIQUE ("deduplicationKey")
|
||||
);
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "OutgoingRequest_startedFulfillingAt_deduplicationKey_idx" ON "OutgoingRequest"("startedFulfillingAt", "deduplicationKey");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- Creates composite index on startedFulfillingAt and createdAt for efficient querying of pending requests in order.
|
||||
-- This allows fast lookups of pending requests (WHERE startedFulfillingAt IS NULL) ordered by createdAt.
|
||||
@ -91,13 +96,13 @@ ALTER TABLE "DeletedRow" ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DE
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- Creates partial indexes on (shouldUpdateSequenceId, tenancyId) to quickly find rows that need updates
|
||||
-- and support ORDER BY tenancyId for less fragmented updates.
|
||||
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
CREATE INDEX "ProjectUser_shouldUpdateSequenceId_idx" ON "ProjectUser"("shouldUpdateSequenceId", "tenancyId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
CREATE INDEX "ContactChannel_shouldUpdateSequenceId_idx" ON "ContactChannel"("shouldUpdateSequenceId", "tenancyId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId") WHERE "shouldUpdateSequenceId" = TRUE;
|
||||
CREATE INDEX "DeletedRow_shouldUpdateSequenceId_idx" ON "DeletedRow"("shouldUpdateSequenceId", "tenancyId");
|
||||
|
||||
-- SPLIT_STATEMENT_SENTINEL
|
||||
-- SINGLE_STATEMENT_SENTINEL
|
||||
@ -240,4 +245,3 @@ CREATE TRIGGER log_deleted_row_contact_channel
|
||||
BEFORE DELETE ON "ContactChannel"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION log_deleted_row();
|
||||
|
||||
|
||||
@ -1071,8 +1071,11 @@ model OutgoingRequest {
|
||||
|
||||
qstashOptions Json
|
||||
startedFulfillingAt DateTime?
|
||||
deduplicationKey String?
|
||||
|
||||
@@unique([deduplicationKey])
|
||||
@@index([startedFulfillingAt, createdAt])
|
||||
@@index([startedFulfillingAt, deduplicationKey])
|
||||
}
|
||||
|
||||
model DeletedRow {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import { upstash } from "@/lib/upstash";
|
||||
import type { PublishBatchRequest } from "@upstash/qstash";
|
||||
import { globalPrismaClient, retryTransaction } from "@/prisma-client";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
import type { OutgoingRequest } from "@/generated/prisma/client";
|
||||
@ -10,11 +11,13 @@ import {
|
||||
yupTuple,
|
||||
} from "@stackframe/stack-shared/dist/schema-fields";
|
||||
import { getEnvVariable, getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
|
||||
|
||||
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
|
||||
const DIRECT_SYNC_ENV = "STACK_EXTERNAL_DB_SYNC_DIRECT";
|
||||
const POLLER_CLAIM_LIMIT_ENV = "STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT";
|
||||
const DEFAULT_POLL_CLAIM_LIMIT = 100;
|
||||
|
||||
function parseMaxDurationMs(value: string | undefined): number {
|
||||
if (!value) return DEFAULT_MAX_DURATION_MS;
|
||||
@ -36,6 +39,18 @@ function directSyncEnabled(): boolean {
|
||||
return getEnvVariable(DIRECT_SYNC_ENV, "") === "true";
|
||||
}
|
||||
|
||||
function getPollerClaimLimit(): number {
|
||||
const rawValue = getEnvVariable(POLLER_CLAIM_LIMIT_ENV, "");
|
||||
if (!rawValue) return DEFAULT_POLL_CLAIM_LIMIT;
|
||||
const parsed = Number.parseInt(rawValue, 10);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new StackAssertionError(
|
||||
`${POLLER_CLAIM_LIMIT_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}`
|
||||
);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function getLocalApiBaseUrl(): string {
|
||||
const prefix = getEnvVariable("NEXT_PUBLIC_STACK_PORT_PREFIX", "81");
|
||||
return `http://localhost:${prefix}02`;
|
||||
@ -79,32 +94,37 @@ export const GET = createSmartRouteHandler({
|
||||
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
|
||||
const pollIntervalMs = 50;
|
||||
const staleClaimIntervalMinutes = 5;
|
||||
const pollerClaimLimit = getPollerClaimLimit();
|
||||
|
||||
let totalRequestsProcessed = 0;
|
||||
async function claimPendingRequests(): Promise<OutgoingRequest[]> {
|
||||
return await retryTransaction(globalPrismaClient, async (tx) => {
|
||||
const rows = await tx.$queryRaw<OutgoingRequest[]>`
|
||||
return await globalPrismaClient.$queryRaw<OutgoingRequest[]>`
|
||||
UPDATE "OutgoingRequest"
|
||||
SET "startedFulfillingAt" = NOW()
|
||||
WHERE "id" IN (
|
||||
SELECT id
|
||||
FROM "OutgoingRequest"
|
||||
WHERE "startedFulfillingAt" IS NULL
|
||||
OR "startedFulfillingAt" < NOW() - (${staleClaimIntervalMinutes} * INTERVAL '1 minute')
|
||||
ORDER BY "createdAt"
|
||||
LIMIT 100
|
||||
LIMIT ${pollerClaimLimit}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING *;
|
||||
`;
|
||||
return rows;
|
||||
});
|
||||
}
|
||||
|
||||
async function deleteOutgoingRequest(id: string): Promise<void> {
|
||||
await retryTransaction(globalPrismaClient, async (tx) => {
|
||||
await tx.outgoingRequest.delete({ where: { id } });
|
||||
});
|
||||
}
|
||||
|
||||
async function deleteOutgoingRequests(ids: string[]): Promise<void> {
|
||||
if (ids.length === 0) return;
|
||||
await retryTransaction(globalPrismaClient, async (tx) => {
|
||||
await tx.outgoingRequest.deleteMany({ where: { id: { in: ids } } });
|
||||
});
|
||||
}
|
||||
async function processRequest(request: OutgoingRequest): Promise<void> {
|
||||
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
|
||||
const options = request.qstashOptions as any;
|
||||
@ -112,6 +132,33 @@ export const GET = createSmartRouteHandler({
|
||||
|
||||
let fullUrl = new URL(options.url, baseUrl).toString();
|
||||
|
||||
// In dev/test, QStash runs in Docker so "localhost" won't work.
|
||||
// Replace with "host.docker.internal" to reach the host machine.
|
||||
// if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
|
||||
// const url = new URL(fullUrl);
|
||||
// if (url.hostname === "localhost" || url.hostname === "127.0.0.1") {
|
||||
// url.hostname = "host.docker.internal";
|
||||
// fullUrl = url.toString();
|
||||
// }
|
||||
// }
|
||||
|
||||
await upstash.publishJSON({
|
||||
url: fullUrl,
|
||||
body: options.body,
|
||||
flowControl: options.flowControl,
|
||||
});
|
||||
await deleteOutgoingRequest(request.id);
|
||||
}
|
||||
|
||||
type UpstashRequest = PublishBatchRequest<unknown>;
|
||||
|
||||
function buildUpstashRequest(request: OutgoingRequest): UpstashRequest {
|
||||
// Prisma JsonValue doesn't carry a precise shape for this JSON blob.
|
||||
const options = request.qstashOptions as any;
|
||||
const baseUrl = getEnvVariable("NEXT_PUBLIC_STACK_API_URL");
|
||||
|
||||
let fullUrl = new URL(options.url, baseUrl).toString();
|
||||
|
||||
// In dev/test, QStash runs in Docker so "localhost" won't work.
|
||||
// Replace with "host.docker.internal" to reach the host machine.
|
||||
if (getNodeEnvironment().includes("development") || getNodeEnvironment().includes("test")) {
|
||||
@ -122,27 +169,13 @@ export const GET = createSmartRouteHandler({
|
||||
}
|
||||
}
|
||||
|
||||
if (directSyncEnabled()) {
|
||||
const directUrl = new URL(options.url, getLocalApiBaseUrl()).toString();
|
||||
const res = await fetch(directUrl, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
"upstash-signature": "test-bypass",
|
||||
},
|
||||
body: JSON.stringify(options.body),
|
||||
});
|
||||
if (!res.ok) {
|
||||
throw new StatusError(res.status, `Direct sync failed: ${res.status} ${res.statusText}`);
|
||||
}
|
||||
} else {
|
||||
await upstash.publishJSON({
|
||||
url: fullUrl,
|
||||
body: options.body,
|
||||
});
|
||||
}
|
||||
const flowControl = options.flowControl as UpstashRequest["flowControl"];
|
||||
|
||||
await deleteOutgoingRequest(request.id);
|
||||
return {
|
||||
url: fullUrl,
|
||||
body: options.body,
|
||||
...(flowControl ? { flowControl } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async function processRequests(requests: OutgoingRequest[]): Promise<number> {
|
||||
@ -160,19 +193,22 @@ export const GET = createSmartRouteHandler({
|
||||
return processed;
|
||||
}
|
||||
|
||||
const results = await Promise.allSettled(requests.map(processRequest));
|
||||
for (const result of results) {
|
||||
if (result.status === "fulfilled") {
|
||||
processed++;
|
||||
continue;
|
||||
}
|
||||
captureError("poller-iteration-error", result.reason);
|
||||
}
|
||||
if (requests.length === 0) return 0;
|
||||
|
||||
return processed;
|
||||
try {
|
||||
const batchPayload = requests.map(buildUpstashRequest);
|
||||
console.log("publishing to QStash batch", { count: batchPayload.length });
|
||||
await upstash.batchJSON(batchPayload);
|
||||
await deleteOutgoingRequests(requests.map((request) => request.id));
|
||||
return requests.length;
|
||||
} catch (error) {
|
||||
captureError("poller-iteration-error", error);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
while (performance.now() - startTime < maxDurationMs) {
|
||||
console.log("poller-iteration", performance.now() - startTime);
|
||||
const pendingRequests = await claimPendingRequests();
|
||||
|
||||
if (stopWhenIdle && pendingRequests.length === 0) {
|
||||
|
||||
@ -8,11 +8,13 @@ import {
|
||||
yupTuple,
|
||||
} from "@stackframe/stack-shared/dist/schema-fields";
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { captureError, StackAssertionError, StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
|
||||
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
|
||||
|
||||
const DEFAULT_MAX_DURATION_MS = 3 * 60 * 1000;
|
||||
const SEQUENCER_BATCH_SIZE_ENV = "STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE";
|
||||
const DEFAULT_BATCH_SIZE = 1000;
|
||||
|
||||
function parseMaxDurationMs(value: string | undefined): number {
|
||||
if (!value) return DEFAULT_MAX_DURATION_MS;
|
||||
@ -30,19 +32,30 @@ function parseStopWhenIdle(value: string | undefined): boolean {
|
||||
throw new StatusError(400, "stopWhenIdle must be 'true' or 'false'");
|
||||
}
|
||||
|
||||
function getSequencerBatchSize(): number {
|
||||
const rawValue = getEnvVariable(SEQUENCER_BATCH_SIZE_ENV, "");
|
||||
if (!rawValue) return DEFAULT_BATCH_SIZE;
|
||||
const parsed = Number.parseInt(rawValue, 10);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new StackAssertionError(
|
||||
`${SEQUENCER_BATCH_SIZE_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}`
|
||||
);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
|
||||
// Assigns sequence IDs to rows that need them and queues sync requests for affected tenants.
|
||||
// Processes up to 1000 rows at a time from each table.
|
||||
async function backfillSequenceIds(): Promise<boolean> {
|
||||
// Processes up to batchSize rows at a time from each table.
|
||||
async function backfillSequenceIds(batchSize: number): Promise<boolean> {
|
||||
let didUpdate = false;
|
||||
const projectUserTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "tenancyId", "projectUserId"
|
||||
FROM "ProjectUser"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
LIMIT ${batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
@ -68,9 +81,8 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
SELECT "tenancyId", "projectUserId", "id"
|
||||
FROM "ContactChannel"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
LIMIT ${batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
@ -96,9 +108,8 @@ async function backfillSequenceIds(): Promise<boolean> {
|
||||
SELECT "id", "tenancyId"
|
||||
FROM "DeletedRow"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
OR "sequenceId" IS NULL
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT 1000
|
||||
LIMIT ${batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
@ -161,12 +172,13 @@ export const GET = createSmartRouteHandler({
|
||||
const maxDurationMs = parseMaxDurationMs(query.maxDurationMs);
|
||||
const stopWhenIdle = parseStopWhenIdle(query.stopWhenIdle);
|
||||
const pollIntervalMs = 50;
|
||||
const batchSize = getSequencerBatchSize();
|
||||
|
||||
let iterations = 0;
|
||||
|
||||
while (performance.now() - startTime < maxDurationMs) {
|
||||
try {
|
||||
const didUpdate = await backfillSequenceIds();
|
||||
const didUpdate = await backfillSequenceIds(batchSize);
|
||||
if (stopWhenIdle && !didUpdate) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -0,0 +1,796 @@
|
||||
import { globalPrismaClient } from "@/prisma-client";
|
||||
import { Prisma } from "@/generated/prisma/client";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
|
||||
import {
|
||||
adaptSchema,
|
||||
adminAuthTypeSchema,
|
||||
yupArray,
|
||||
yupBoolean,
|
||||
yupNumber,
|
||||
yupObject,
|
||||
yupString,
|
||||
} from "@stackframe/stack-shared/dist/schema-fields";
|
||||
import { errorToNiceString, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { Result } from "@stackframe/stack-shared/dist/utils/results";
|
||||
import { Client } from "pg";
|
||||
import { KnownErrors } from "@stackframe/stack-shared";
|
||||
|
||||
const STALE_CLAIM_INTERVAL_MINUTES = 5;
|
||||
|
||||
const sequenceStatsSchema = yupObject({
|
||||
total: yupString().defined(),
|
||||
pending: yupString().defined(),
|
||||
null_sequence_id: yupString().defined(),
|
||||
min_sequence_id: yupString().nullable().defined(),
|
||||
max_sequence_id: yupString().nullable().defined(),
|
||||
});
|
||||
|
||||
const deletedRowByTableSchema = yupObject({
|
||||
table_name: yupString().defined(),
|
||||
total: yupString().defined(),
|
||||
pending: yupString().defined(),
|
||||
null_sequence_id: yupString().defined(),
|
||||
min_sequence_id: yupString().nullable().defined(),
|
||||
max_sequence_id: yupString().nullable().defined(),
|
||||
});
|
||||
|
||||
const externalDbMetadataSchema = yupObject({
|
||||
mapping_name: yupString().defined(),
|
||||
last_synced_sequence_id: yupString().defined(),
|
||||
updated_at_millis: yupNumber().nullable().defined(),
|
||||
});
|
||||
|
||||
const externalDbMappingStatusSchema = yupObject({
|
||||
mapping_id: yupString().defined(),
|
||||
internal_max_sequence_id: yupString().nullable().defined(),
|
||||
last_synced_sequence_id: yupString().nullable().defined(),
|
||||
updated_at_millis: yupNumber().nullable().defined(),
|
||||
backlog: yupString().nullable().defined(),
|
||||
});
|
||||
|
||||
const externalDbSchema = yupObject({
|
||||
id: yupString().defined(),
|
||||
type: yupString().defined(),
|
||||
connection: yupObject({
|
||||
redacted: yupString().nullable().defined(),
|
||||
host: yupString().nullable().defined(),
|
||||
port: yupNumber().nullable().defined(),
|
||||
database: yupString().nullable().defined(),
|
||||
user: yupString().nullable().defined(),
|
||||
}).defined(),
|
||||
status: yupString().oneOf(["ok", "error"]).defined(),
|
||||
error: yupString().nullable().defined(),
|
||||
metadata: yupArray(externalDbMetadataSchema).defined(),
|
||||
users_table: yupObject({
|
||||
exists: yupBoolean().defined(),
|
||||
total_rows: yupString().nullable().defined(),
|
||||
min_signed_up_at_millis: yupNumber().nullable().defined(),
|
||||
max_signed_up_at_millis: yupNumber().nullable().defined(),
|
||||
}).defined(),
|
||||
mapping_status: yupArray(externalDbMappingStatusSchema).defined(),
|
||||
});
|
||||
|
||||
const mappingSchema = yupObject({
|
||||
mapping_id: yupString().defined(),
|
||||
internal_min_sequence_id: yupString().nullable().defined(),
|
||||
internal_max_sequence_id: yupString().nullable().defined(),
|
||||
internal_pending_count: yupString().defined(),
|
||||
});
|
||||
|
||||
const globalSchema = yupObject({
|
||||
tenancies_total: yupString().defined(),
|
||||
tenancies_with_db_sync: yupString().defined(),
|
||||
sequencer: yupObject({
|
||||
project_users: sequenceStatsSchema.defined(),
|
||||
contact_channels: sequenceStatsSchema.defined(),
|
||||
deleted_rows: sequenceStatsSchema.shape({
|
||||
by_table: yupArray(deletedRowByTableSchema).defined(),
|
||||
}).defined(),
|
||||
}).defined(),
|
||||
poller: yupObject({
|
||||
total: yupString().defined(),
|
||||
pending: yupString().defined(),
|
||||
in_flight: yupString().defined(),
|
||||
stale: yupString().defined(),
|
||||
oldest_created_at_millis: yupNumber().nullable().defined(),
|
||||
newest_created_at_millis: yupNumber().nullable().defined(),
|
||||
}).defined(),
|
||||
sync_engine: yupObject({
|
||||
mappings: yupArray(mappingSchema).defined(),
|
||||
}).defined(),
|
||||
});
|
||||
|
||||
const responseSchema = yupObject({
|
||||
statusCode: yupNumber().oneOf([200]).defined(),
|
||||
bodyType: yupString().oneOf(["json"]).defined(),
|
||||
body: yupObject({
|
||||
ok: yupBoolean().defined(),
|
||||
generated_at_millis: yupNumber().defined(),
|
||||
global: globalSchema.nullable().defined(),
|
||||
tenancy: yupObject({
|
||||
id: yupString().defined(),
|
||||
project_id: yupString().defined(),
|
||||
branch_id: yupString().defined(),
|
||||
}).defined(),
|
||||
sequencer: yupObject({
|
||||
project_users: sequenceStatsSchema.defined(),
|
||||
contact_channels: sequenceStatsSchema.defined(),
|
||||
deleted_rows: sequenceStatsSchema.shape({
|
||||
by_table: yupArray(deletedRowByTableSchema).defined(),
|
||||
}).defined(),
|
||||
}).defined(),
|
||||
poller: yupObject({
|
||||
total: yupString().defined(),
|
||||
pending: yupString().defined(),
|
||||
in_flight: yupString().defined(),
|
||||
stale: yupString().defined(),
|
||||
oldest_created_at_millis: yupNumber().nullable().defined(),
|
||||
newest_created_at_millis: yupNumber().nullable().defined(),
|
||||
}).defined(),
|
||||
sync_engine: yupObject({
|
||||
mappings: yupArray(mappingSchema).defined(),
|
||||
external_databases: yupArray(externalDbSchema).defined(),
|
||||
}).defined(),
|
||||
}).defined(),
|
||||
});
|
||||
|
||||
type SequenceStatsRow = {
|
||||
total: unknown,
|
||||
pending: unknown,
|
||||
null_sequence_id: unknown,
|
||||
min_sequence_id: unknown,
|
||||
max_sequence_id: unknown,
|
||||
};
|
||||
|
||||
type DeletedRowStatsRow = SequenceStatsRow & {
|
||||
table_name: string,
|
||||
};
|
||||
|
||||
type OutgoingStatsRow = {
|
||||
total: unknown,
|
||||
pending: unknown,
|
||||
in_flight: unknown,
|
||||
stale: unknown,
|
||||
oldest_created_at: unknown,
|
||||
newest_created_at: unknown,
|
||||
};
|
||||
|
||||
type ExternalDbMetadataRow = {
|
||||
mapping_name: string,
|
||||
last_synced_sequence_id: unknown,
|
||||
updated_at: unknown,
|
||||
};
|
||||
|
||||
type UsersTableStatsRow = {
|
||||
total_rows: unknown,
|
||||
min_signed_up_at: unknown,
|
||||
max_signed_up_at: unknown,
|
||||
};
|
||||
|
||||
type CountRow = {
|
||||
total: unknown,
|
||||
};
|
||||
|
||||
type SequenceStats = ReturnType<typeof formatSequenceStats>;
|
||||
type DeletedRowSummary = SequenceStats & { table_name: string };
|
||||
|
||||
function toBigIntString(value: unknown): string | null {
|
||||
if (value === null || value === undefined) return null;
|
||||
if (typeof value === "bigint") return value.toString();
|
||||
if (typeof value === "number" && Number.isFinite(value)) return Math.trunc(value).toString();
|
||||
if (typeof value === "string") return value;
|
||||
return null;
|
||||
}
|
||||
|
||||
function toBigIntStringOrThrow(value: unknown, label: string): string {
|
||||
return toBigIntString(value) ?? throwErr(`Expected ${label} to be a bigint-compatible value.`, { value });
|
||||
}
|
||||
|
||||
function toMillis(value: unknown): number | null {
|
||||
if (value === null || value === undefined) return null;
|
||||
if (value instanceof Date) return value.getTime();
|
||||
if (typeof value === "number" && Number.isFinite(value)) return value;
|
||||
if (typeof value === "string") {
|
||||
const parsed = new Date(value);
|
||||
return Number.isNaN(parsed.getTime()) ? null : parsed.getTime();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function addBigIntStrings(a: string | null | undefined, b: string | null | undefined): string {
|
||||
const first = a ? BigInt(a) : 0n;
|
||||
const second = b ? BigInt(b) : 0n;
|
||||
return (first + second).toString();
|
||||
}
|
||||
|
||||
function minBigIntString(values: Array<string | null | undefined>): string | null {
|
||||
let minValue: bigint | null = null;
|
||||
for (const value of values) {
|
||||
if (!value) continue;
|
||||
const parsed = BigInt(value);
|
||||
if (minValue === null || parsed < minValue) {
|
||||
minValue = parsed;
|
||||
}
|
||||
}
|
||||
return minValue === null ? null : minValue.toString();
|
||||
}
|
||||
|
||||
function maxBigIntString(values: Array<string | null | undefined>): string | null {
|
||||
let maxValue: bigint | null = null;
|
||||
for (const value of values) {
|
||||
if (!value) continue;
|
||||
const parsed = BigInt(value);
|
||||
if (maxValue === null || parsed > maxValue) {
|
||||
maxValue = parsed;
|
||||
}
|
||||
}
|
||||
return maxValue === null ? null : maxValue.toString();
|
||||
}
|
||||
|
||||
function buildMappingInternalStats(
|
||||
projectUsersStats: SequenceStats,
|
||||
deletedRowsByTable: DeletedRowSummary[],
|
||||
) {
|
||||
const deletedProjectUserStats = deletedRowsByTable.find((row) => row.table_name === "ProjectUser") ?? null;
|
||||
|
||||
const mappingInternalStats = new Map<string, {
|
||||
mapping_id: string,
|
||||
internal_min_sequence_id: string | null,
|
||||
internal_max_sequence_id: string | null,
|
||||
internal_pending_count: string,
|
||||
}>();
|
||||
|
||||
const usersMappingMin = minBigIntString([
|
||||
projectUsersStats.min_sequence_id,
|
||||
deletedProjectUserStats?.min_sequence_id,
|
||||
]);
|
||||
const usersMappingMax = maxBigIntString([
|
||||
projectUsersStats.max_sequence_id,
|
||||
deletedProjectUserStats?.max_sequence_id,
|
||||
]);
|
||||
const usersMappingPending = addBigIntStrings(
|
||||
projectUsersStats.pending,
|
||||
deletedProjectUserStats?.pending,
|
||||
);
|
||||
|
||||
mappingInternalStats.set("users", {
|
||||
mapping_id: "users",
|
||||
internal_min_sequence_id: usersMappingMin,
|
||||
internal_max_sequence_id: usersMappingMax,
|
||||
internal_pending_count: usersMappingPending,
|
||||
});
|
||||
|
||||
const mappings = Array.from(mappingInternalStats.values());
|
||||
const mappingStatuses = mappings.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
}));
|
||||
|
||||
return { mappings, mappingStatuses };
|
||||
}
|
||||
|
||||
async function fetchInternalStats(tenancyId: string | null) {
|
||||
const tenancyWhere = tenancyId
|
||||
? Prisma.sql`WHERE "tenancyId" = ${tenancyId}::uuid`
|
||||
: Prisma.sql``;
|
||||
|
||||
const projectUserStatsRow = (await globalPrismaClient.$queryRaw<SequenceStatsRow[]>`
|
||||
SELECT
|
||||
COUNT(*)::bigint AS "total",
|
||||
COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending",
|
||||
COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id",
|
||||
MIN("sequenceId") AS "min_sequence_id",
|
||||
MAX("sequenceId") AS "max_sequence_id"
|
||||
FROM "ProjectUser"
|
||||
${tenancyWhere}
|
||||
`).at(0) ?? throwErr("Project user stats query returned no rows.");
|
||||
|
||||
const contactChannelStatsRow = (await globalPrismaClient.$queryRaw<SequenceStatsRow[]>`
|
||||
SELECT
|
||||
COUNT(*)::bigint AS "total",
|
||||
COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending",
|
||||
COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id",
|
||||
MIN("sequenceId") AS "min_sequence_id",
|
||||
MAX("sequenceId") AS "max_sequence_id"
|
||||
FROM "ContactChannel"
|
||||
${tenancyWhere}
|
||||
`).at(0) ?? throwErr("Contact channel stats query returned no rows.");
|
||||
|
||||
const deletedRowStatsRow = (await globalPrismaClient.$queryRaw<SequenceStatsRow[]>`
|
||||
SELECT
|
||||
COUNT(*)::bigint AS "total",
|
||||
COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending",
|
||||
COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id",
|
||||
MIN("sequenceId") AS "min_sequence_id",
|
||||
MAX("sequenceId") AS "max_sequence_id"
|
||||
FROM "DeletedRow"
|
||||
${tenancyWhere}
|
||||
`).at(0) ?? throwErr("Deleted row stats query returned no rows.");
|
||||
|
||||
const deletedRowsByTableRows = await globalPrismaClient.$queryRaw<DeletedRowStatsRow[]>`
|
||||
SELECT
|
||||
"tableName" AS "table_name",
|
||||
COUNT(*)::bigint AS "total",
|
||||
COUNT(*) FILTER (WHERE "shouldUpdateSequenceId" = TRUE OR "sequenceId" IS NULL)::bigint AS "pending",
|
||||
COUNT(*) FILTER (WHERE "sequenceId" IS NULL)::bigint AS "null_sequence_id",
|
||||
MIN("sequenceId") AS "min_sequence_id",
|
||||
MAX("sequenceId") AS "max_sequence_id"
|
||||
FROM "DeletedRow"
|
||||
${tenancyWhere}
|
||||
GROUP BY "tableName"
|
||||
ORDER BY "tableName" ASC
|
||||
`;
|
||||
|
||||
const outgoingTenancyFilter = tenancyId
|
||||
? Prisma.sql`AND ("qstashOptions"->'body'->>'tenancyId') = ${tenancyId}`
|
||||
: Prisma.sql``;
|
||||
|
||||
const outgoingStatsRow = (await globalPrismaClient.$queryRaw<OutgoingStatsRow[]>`
|
||||
SELECT
|
||||
COUNT(*)::bigint AS "total",
|
||||
COUNT(*) FILTER (WHERE "startedFulfillingAt" IS NULL)::bigint AS "pending",
|
||||
COUNT(*) FILTER (WHERE "startedFulfillingAt" IS NOT NULL)::bigint AS "in_flight",
|
||||
COUNT(*) FILTER (
|
||||
WHERE "startedFulfillingAt" < NOW() - (${STALE_CLAIM_INTERVAL_MINUTES} * INTERVAL '1 minute')
|
||||
)::bigint AS "stale",
|
||||
MIN("createdAt") AS "oldest_created_at",
|
||||
MAX("createdAt") AS "newest_created_at"
|
||||
FROM "OutgoingRequest"
|
||||
WHERE ("qstashOptions"->>'url') = '/api/latest/internal/external-db-sync/sync-engine'
|
||||
${outgoingTenancyFilter}
|
||||
`).at(0) ?? throwErr("Outgoing request stats query returned no rows.");
|
||||
|
||||
const projectUsersStats = formatSequenceStats(projectUserStatsRow);
|
||||
const contactChannelStats = formatSequenceStats(contactChannelStatsRow);
|
||||
const deletedRowStats = formatSequenceStats(deletedRowStatsRow);
|
||||
|
||||
const deletedRowsByTable = deletedRowsByTableRows.map((row) => ({
|
||||
table_name: row.table_name,
|
||||
...formatSequenceStats(row),
|
||||
}));
|
||||
|
||||
const { mappings, mappingStatuses } = buildMappingInternalStats(projectUsersStats, deletedRowsByTable);
|
||||
|
||||
return {
|
||||
projectUsersStats,
|
||||
contactChannelStats,
|
||||
deletedRowStats,
|
||||
deletedRowsByTable,
|
||||
outgoingStatsRow,
|
||||
mappings,
|
||||
mappingStatuses,
|
||||
};
|
||||
}
|
||||
|
||||
function formatPollerStats(outgoingStats: OutgoingStatsRow) {
|
||||
return {
|
||||
total: toBigIntStringOrThrow(outgoingStats.total, "outgoing total"),
|
||||
pending: toBigIntStringOrThrow(outgoingStats.pending, "outgoing pending"),
|
||||
in_flight: toBigIntStringOrThrow(outgoingStats.in_flight, "outgoing in_flight"),
|
||||
stale: toBigIntStringOrThrow(outgoingStats.stale, "outgoing stale"),
|
||||
oldest_created_at_millis: toMillis(outgoingStats.oldest_created_at),
|
||||
newest_created_at_millis: toMillis(outgoingStats.newest_created_at),
|
||||
};
|
||||
}
|
||||
|
||||
function formatSequenceStats(row: SequenceStatsRow) {
|
||||
return {
|
||||
total: toBigIntStringOrThrow(row.total, "sequence stats total"),
|
||||
pending: toBigIntStringOrThrow(row.pending, "sequence stats pending"),
|
||||
null_sequence_id: toBigIntStringOrThrow(row.null_sequence_id, "sequence stats null_sequence_id"),
|
||||
min_sequence_id: toBigIntString(row.min_sequence_id),
|
||||
max_sequence_id: toBigIntString(row.max_sequence_id),
|
||||
};
|
||||
}
|
||||
|
||||
function formatError(error: unknown): string {
|
||||
return errorToNiceString(error);
|
||||
}
|
||||
|
||||
function parseConnectionString(connectionString: string | null | undefined) {
|
||||
if (!connectionString) {
|
||||
return {
|
||||
redacted: null,
|
||||
host: null,
|
||||
port: null,
|
||||
database: null,
|
||||
user: null,
|
||||
};
|
||||
}
|
||||
|
||||
const parsed = Result.fromThrowing(() => new URL(connectionString));
|
||||
if (parsed.status === "error") {
|
||||
return {
|
||||
redacted: null,
|
||||
host: null,
|
||||
port: null,
|
||||
database: null,
|
||||
user: null,
|
||||
};
|
||||
}
|
||||
|
||||
const url = parsed.data;
|
||||
const user = url.username ? decodeURIComponent(url.username) : null;
|
||||
const host = url.hostname || null;
|
||||
const port = url.port ? Number.parseInt(url.port, 10) : null;
|
||||
const database = url.pathname ? url.pathname.replace(/^\//, "") : null;
|
||||
const redacted = `${url.protocol}//${url.username ? encodeURIComponent(url.username) : ""}${url.username ? ":" : ""}${url.password ? "***" : ""}${url.username ? "@" : ""}${url.hostname}${url.port ? ":" + url.port : ""}${url.pathname}${url.search}`;
|
||||
|
||||
return {
|
||||
redacted,
|
||||
host,
|
||||
port: Number.isFinite(port ?? NaN) ? port : null,
|
||||
database,
|
||||
user,
|
||||
};
|
||||
}
|
||||
|
||||
async function fetchExternalDatabaseStatus(
|
||||
dbId: string,
|
||||
dbConfig: CompleteConfig["dbSync"]["externalDatabases"][string],
|
||||
mappingStatuses: Array<{
|
||||
mapping_id: string,
|
||||
internal_max_sequence_id: string | null,
|
||||
}>,
|
||||
) {
|
||||
const connection = parseConnectionString(dbConfig.connectionString ?? null);
|
||||
|
||||
if (dbConfig.type !== "postgres") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: String(dbConfig.type),
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: `Unsupported database type: ${String(dbConfig.type)}`,
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: false,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
if (!dbConfig.connectionString) {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: "Missing connection string",
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: false,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
const client = new Client({ connectionString: dbConfig.connectionString });
|
||||
const connectResult = await Result.fromPromise(client.connect());
|
||||
if (connectResult.status === "error") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: formatError(connectResult.error),
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: false,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
let metadata: ExternalDbMetadataRow[] = [];
|
||||
let metadataExists = false;
|
||||
let usersExists = false;
|
||||
let usersStats: UsersTableStatsRow | null = null;
|
||||
|
||||
try {
|
||||
const metadataExistsResult = await Result.fromPromise(client.query<{ exists: boolean }>(`
|
||||
SELECT EXISTS (
|
||||
SELECT FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = '_stack_sync_metadata'
|
||||
) AS "exists";
|
||||
`));
|
||||
if (metadataExistsResult.status === "error") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: formatError(metadataExistsResult.error),
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: false,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
metadataExists = metadataExistsResult.data.rows[0]?.exists === true;
|
||||
|
||||
const usersExistsResult = await Result.fromPromise(client.query<{ exists: boolean }>(`
|
||||
SELECT EXISTS (
|
||||
SELECT FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'users'
|
||||
) AS "exists";
|
||||
`));
|
||||
if (usersExistsResult.status === "error") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: formatError(usersExistsResult.error),
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: false,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
usersExists = usersExistsResult.data.rows[0]?.exists === true;
|
||||
|
||||
if (metadataExists) {
|
||||
const metadataResult = await Result.fromPromise(client.query<ExternalDbMetadataRow>(`
|
||||
SELECT "mapping_name", "last_synced_sequence_id", "updated_at"
|
||||
FROM "_stack_sync_metadata"
|
||||
ORDER BY "mapping_name" ASC;
|
||||
`));
|
||||
if (metadataResult.status === "error") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: formatError(metadataResult.error),
|
||||
metadata: [],
|
||||
users_table: {
|
||||
exists: usersExists,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
metadata = metadataResult.data.rows;
|
||||
}
|
||||
|
||||
if (usersExists) {
|
||||
const usersStatsResult = await Result.fromPromise(client.query<UsersTableStatsRow>(`
|
||||
SELECT
|
||||
COUNT(*)::bigint AS "total_rows",
|
||||
MIN("signed_up_at") AS "min_signed_up_at",
|
||||
MAX("signed_up_at") AS "max_signed_up_at"
|
||||
FROM "users";
|
||||
`));
|
||||
if (usersStatsResult.status === "error") {
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "error" as const,
|
||||
error: formatError(usersStatsResult.error),
|
||||
metadata: metadata.map((row) => ({
|
||||
mapping_name: row.mapping_name,
|
||||
last_synced_sequence_id: toBigIntString(row.last_synced_sequence_id) ?? "-1",
|
||||
updated_at_millis: toMillis(row.updated_at),
|
||||
})),
|
||||
users_table: {
|
||||
exists: usersExists,
|
||||
total_rows: null,
|
||||
min_signed_up_at_millis: null,
|
||||
max_signed_up_at_millis: null,
|
||||
},
|
||||
mapping_status: mappingStatuses.map((mapping) => ({
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: null,
|
||||
updated_at_millis: null,
|
||||
backlog: null,
|
||||
})),
|
||||
};
|
||||
}
|
||||
usersStats = usersStatsResult.data.rows[0] ?? null;
|
||||
}
|
||||
} finally {
|
||||
await Result.fromPromise(client.end());
|
||||
}
|
||||
|
||||
const metadataMap = new Map<string, { last_synced_sequence_id: string | null, updated_at_millis: number | null }>();
|
||||
const formattedMetadata = metadata.map((row) => {
|
||||
const lastSynced = toBigIntString(row.last_synced_sequence_id) ?? "-1";
|
||||
const updatedAt = toMillis(row.updated_at);
|
||||
metadataMap.set(row.mapping_name, { last_synced_sequence_id: lastSynced, updated_at_millis: updatedAt });
|
||||
return {
|
||||
mapping_name: row.mapping_name,
|
||||
last_synced_sequence_id: lastSynced,
|
||||
updated_at_millis: updatedAt,
|
||||
};
|
||||
});
|
||||
|
||||
const mappingStatus = mappingStatuses.map((mapping) => {
|
||||
const external = metadataMap.get(mapping.mapping_id);
|
||||
const lastSynced = external?.last_synced_sequence_id ?? null;
|
||||
const updatedAt = external?.updated_at_millis ?? null;
|
||||
let backlog: string | null = null;
|
||||
if (mapping.internal_max_sequence_id && lastSynced) {
|
||||
backlog = (BigInt(mapping.internal_max_sequence_id) - BigInt(lastSynced)).toString();
|
||||
}
|
||||
return {
|
||||
mapping_id: mapping.mapping_id,
|
||||
internal_max_sequence_id: mapping.internal_max_sequence_id,
|
||||
last_synced_sequence_id: lastSynced,
|
||||
updated_at_millis: updatedAt,
|
||||
backlog,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
id: dbId,
|
||||
type: dbConfig.type,
|
||||
connection,
|
||||
status: "ok" as const,
|
||||
error: null,
|
||||
metadata: formattedMetadata,
|
||||
users_table: {
|
||||
exists: usersExists,
|
||||
total_rows: toBigIntString(usersStats?.total_rows ?? null),
|
||||
min_signed_up_at_millis: toMillis(usersStats?.min_signed_up_at ?? null),
|
||||
max_signed_up_at_millis: toMillis(usersStats?.max_signed_up_at ?? null),
|
||||
},
|
||||
mapping_status: mappingStatus,
|
||||
};
|
||||
}
|
||||
|
||||
export const GET = createSmartRouteHandler({
|
||||
metadata: {
|
||||
summary: "External DB sync status",
|
||||
description: "Returns sequencing, queue, and external sync progress for the current tenancy. Optional global aggregate when scope=all.",
|
||||
tags: ["External DB Sync"],
|
||||
hidden: true,
|
||||
},
|
||||
request: yupObject({
|
||||
auth: yupObject({
|
||||
type: adminAuthTypeSchema,
|
||||
tenancy: adaptSchema,
|
||||
}).defined(),
|
||||
query: yupObject({
|
||||
scope: yupString().oneOf(["tenancy", "all"]).default("tenancy"),
|
||||
}).defined(),
|
||||
method: yupString().oneOf(["GET"]).defined(),
|
||||
}),
|
||||
response: responseSchema,
|
||||
handler: async ({ auth, query }) => {
|
||||
if (auth.tenancy.project.id !== "internal") {
|
||||
throw new KnownErrors.ExpectedInternalProject();
|
||||
}
|
||||
const tenancyId = auth.tenancy.id;
|
||||
|
||||
const shouldIncludeGlobal = query.scope === "all";
|
||||
const currentStats = shouldIncludeGlobal ? await fetchInternalStats(null) : await fetchInternalStats(tenancyId);
|
||||
const globalStats = shouldIncludeGlobal ? currentStats : null;
|
||||
const globalTenanciesCount = shouldIncludeGlobal
|
||||
? (await globalPrismaClient.$queryRaw<CountRow[]>`
|
||||
SELECT COUNT(*)::bigint AS "total"
|
||||
FROM "Tenancy"
|
||||
`).at(0) ?? throwErr("Tenancy count query returned no rows.")
|
||||
: null;
|
||||
const globalDbSyncCount = shouldIncludeGlobal
|
||||
? (await globalPrismaClient.$queryRaw<CountRow[]>`
|
||||
SELECT COUNT(*)::bigint AS "total"
|
||||
FROM "EnvironmentConfigOverride"
|
||||
WHERE ("config"->'dbSync'->'externalDatabases') IS NOT NULL
|
||||
`).at(0) ?? throwErr("DB sync config count query returned no rows.")
|
||||
: null;
|
||||
|
||||
const externalDbStatuses = shouldIncludeGlobal
|
||||
? []
|
||||
: await Promise.all(
|
||||
Object.entries(
|
||||
auth.tenancy.config.dbSync.externalDatabases as CompleteConfig["dbSync"]["externalDatabases"],
|
||||
).map(([dbId, dbConfig]) => fetchExternalDatabaseStatus(dbId, dbConfig, currentStats.mappingStatuses)),
|
||||
);
|
||||
|
||||
const outgoingStats = currentStats.outgoingStatsRow;
|
||||
|
||||
return {
|
||||
statusCode: 200 as const,
|
||||
bodyType: "json" as const,
|
||||
body: {
|
||||
ok: true,
|
||||
generated_at_millis: Date.now(),
|
||||
global: shouldIncludeGlobal && globalStats && globalTenanciesCount && globalDbSyncCount ? {
|
||||
tenancies_total: toBigIntStringOrThrow(globalTenanciesCount.total, "tenancies total"),
|
||||
tenancies_with_db_sync: toBigIntStringOrThrow(globalDbSyncCount.total, "tenancies with db sync"),
|
||||
sequencer: {
|
||||
project_users: globalStats.projectUsersStats,
|
||||
contact_channels: globalStats.contactChannelStats,
|
||||
deleted_rows: {
|
||||
...globalStats.deletedRowStats,
|
||||
by_table: globalStats.deletedRowsByTable,
|
||||
},
|
||||
},
|
||||
poller: formatPollerStats(globalStats.outgoingStatsRow),
|
||||
sync_engine: {
|
||||
mappings: globalStats.mappings,
|
||||
},
|
||||
} : null,
|
||||
tenancy: {
|
||||
id: tenancyId,
|
||||
project_id: auth.tenancy.project.id,
|
||||
branch_id: auth.tenancy.branchId,
|
||||
},
|
||||
sequencer: {
|
||||
project_users: currentStats.projectUsersStats,
|
||||
contact_channels: currentStats.contactChannelStats,
|
||||
deleted_rows: {
|
||||
...currentStats.deletedRowStats,
|
||||
by_table: currentStats.deletedRowsByTable,
|
||||
},
|
||||
},
|
||||
poller: formatPollerStats(outgoingStats),
|
||||
sync_engine: {
|
||||
mappings: currentStats.mappings,
|
||||
external_databases: externalDbStatuses,
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
});
|
||||
@ -1,4 +1,5 @@
|
||||
import { syncExternalDatabases } from "@/lib/external-db-sync";
|
||||
import { enqueueExternalDbSync } from "@/lib/external-db-sync-queue";
|
||||
import { getTenancy } from "@/lib/tenancies";
|
||||
import { ensureUpstashSignature } from "@/lib/upstash";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
@ -36,7 +37,10 @@ export const POST = createSmartRouteHandler({
|
||||
throw new StatusError(400, `Tenancy ${tenancyId} not found.`);
|
||||
}
|
||||
|
||||
await syncExternalDatabases(tenancy);
|
||||
const needsResync = await syncExternalDatabases(tenancy);
|
||||
if (needsResync) {
|
||||
await enqueueExternalDbSync(tenancy.id);
|
||||
}
|
||||
|
||||
return {
|
||||
statusCode: 200,
|
||||
|
||||
@ -26,21 +26,18 @@ export async function enqueueExternalDbSyncBatch(tenancyIds: string[]): Promise<
|
||||
|
||||
// Use unnest to pass array of UUIDs and insert all in one query
|
||||
await globalPrismaClient.$executeRaw`
|
||||
INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt")
|
||||
INSERT INTO "OutgoingRequest" ("id", "createdAt", "qstashOptions", "startedFulfillingAt", "deduplicationKey")
|
||||
SELECT
|
||||
gen_random_uuid(),
|
||||
NOW(),
|
||||
json_build_object(
|
||||
'url', '/api/latest/internal/external-db-sync/sync-engine',
|
||||
'body', json_build_object('tenancyId', t.tenancy_id)
|
||||
'body', json_build_object('tenancyId', t.tenancy_id),
|
||||
'flowControl', json_build_object('key', 'sentinel-sync-key', 'parallelism', 20)
|
||||
),
|
||||
NULL
|
||||
NULL,
|
||||
'sentinel-sync-key-' || t.tenancy_id
|
||||
FROM unnest(${tenancyIds}::uuid[]) AS t(tenancy_id)
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM "OutgoingRequest"
|
||||
WHERE "startedFulfillingAt" IS NULL
|
||||
AND ("qstashOptions"->'body'->>'tenancyId')::uuid = t.tenancy_id
|
||||
)
|
||||
ON CONFLICT ("deduplicationKey") DO NOTHING
|
||||
`;
|
||||
}
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
import { Tenancy } from "@/lib/tenancies";
|
||||
import { getPrismaClientForTenancy, PrismaClientTransaction } from "@/prisma-client";
|
||||
import { getPrismaClientForTenancy, PrismaClientWithReplica } from "@/prisma-client";
|
||||
import { DEFAULT_DB_SYNC_MAPPINGS } from "@stackframe/stack-shared/dist/config/db-sync-mappings";
|
||||
import type { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
|
||||
import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env";
|
||||
import { omit } from "@stackframe/stack-shared/dist/utils/objects";
|
||||
import { Result } from "@stackframe/stack-shared/dist/utils/results";
|
||||
import { Client } from 'pg';
|
||||
|
||||
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
||||
const MAX_BATCHES_PER_MAPPING_ENV = "STACK_EXTERNAL_DB_SYNC_MAX_BATCHES_PER_MAPPING";
|
||||
|
||||
function assertNonEmptyString(value: unknown, label: string): asserts value is string {
|
||||
if (typeof value !== "string" || value.trim().length === 0) {
|
||||
@ -42,6 +44,18 @@ function isConcurrentUpdateError(error: unknown): error is PgErrorLike {
|
||||
return typeof pgError.message === "string" && pgError.message.includes("tuple concurrently updated");
|
||||
}
|
||||
|
||||
function getMaxBatchesPerMapping(): number | null {
|
||||
const rawValue = getEnvVariable(MAX_BATCHES_PER_MAPPING_ENV, "");
|
||||
if (!rawValue) return null;
|
||||
const parsed = Number.parseInt(rawValue, 10);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new StackAssertionError(
|
||||
`${MAX_BATCHES_PER_MAPPING_ENV} must be a positive integer. Received: ${JSON.stringify(rawValue)}`
|
||||
);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
async function ensureExternalSchema(
|
||||
externalClient: Client,
|
||||
tableSchemaSql: string,
|
||||
@ -143,11 +157,11 @@ async function syncMapping(
|
||||
externalClient: Client,
|
||||
mappingId: string,
|
||||
mapping: typeof DEFAULT_DB_SYNC_MAPPINGS[keyof typeof DEFAULT_DB_SYNC_MAPPINGS],
|
||||
internalPrisma: PrismaClientTransaction,
|
||||
internalPrisma: PrismaClientWithReplica,
|
||||
dbId: string,
|
||||
tenancyId: string,
|
||||
dbType: 'postgres',
|
||||
) {
|
||||
): Promise<boolean> {
|
||||
assertNonEmptyString(mappingId, "mappingId");
|
||||
assertNonEmptyString(mapping.targetTable, "mapping.targetTable");
|
||||
assertUuid(tenancyId, "tenancyId");
|
||||
@ -180,13 +194,16 @@ async function syncMapping(
|
||||
}
|
||||
|
||||
const BATCH_LIMIT = 1000;
|
||||
const maxBatchesPerMapping = getMaxBatchesPerMapping();
|
||||
let batchesProcessed = 0;
|
||||
let throttled = false;
|
||||
|
||||
while (true) {
|
||||
assertUuid(tenancyId, "tenancyId");
|
||||
if (!Number.isFinite(lastSequenceId)) {
|
||||
throw new StackAssertionError(`lastSequenceId must be a finite number for mapping ${mappingId}.`);
|
||||
}
|
||||
const rows = await internalPrisma.$queryRawUnsafe<any[]>(fetchQuery, tenancyId, lastSequenceId);
|
||||
const rows = await internalPrisma.$replica().$queryRawUnsafe<any[]>(fetchQuery, tenancyId, lastSequenceId);
|
||||
|
||||
if (rows.length === 0) {
|
||||
break;
|
||||
@ -216,16 +233,24 @@ async function syncMapping(
|
||||
if (rows.length < BATCH_LIMIT) {
|
||||
break;
|
||||
}
|
||||
|
||||
batchesProcessed++;
|
||||
if (maxBatchesPerMapping !== null && batchesProcessed >= maxBatchesPerMapping) {
|
||||
throttled = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return throttled;
|
||||
}
|
||||
|
||||
|
||||
async function syncDatabase(
|
||||
dbId: string,
|
||||
dbConfig: CompleteConfig["dbSync"]["externalDatabases"][string],
|
||||
internalPrisma: PrismaClientTransaction,
|
||||
internalPrisma: PrismaClientWithReplica,
|
||||
tenancyId: string,
|
||||
) {
|
||||
): Promise<boolean> {
|
||||
assertNonEmptyString(dbId, "dbId");
|
||||
assertUuid(tenancyId, "tenancyId");
|
||||
const dbType = dbConfig.type;
|
||||
@ -246,13 +271,14 @@ async function syncDatabase(
|
||||
connectionString: dbConfig.connectionString,
|
||||
});
|
||||
|
||||
let needsResync = false;
|
||||
const syncResult = await Result.fromPromise((async () => {
|
||||
await externalClient.connect();
|
||||
|
||||
// Always use DEFAULT_DB_SYNC_MAPPINGS - users cannot customize mappings
|
||||
// because internalDbFetchQuery runs against Stack Auth's internal DB
|
||||
for (const [mappingId, mapping] of Object.entries(DEFAULT_DB_SYNC_MAPPINGS)) {
|
||||
await syncMapping(
|
||||
const mappingThrottled = await syncMapping(
|
||||
externalClient,
|
||||
mappingId,
|
||||
mapping,
|
||||
@ -261,6 +287,9 @@ async function syncDatabase(
|
||||
tenancyId,
|
||||
dbType,
|
||||
);
|
||||
if (mappingThrottled) {
|
||||
needsResync = true;
|
||||
}
|
||||
}
|
||||
})());
|
||||
|
||||
@ -271,23 +300,31 @@ async function syncDatabase(
|
||||
|
||||
if (syncResult.status === "error") {
|
||||
captureError(`external-db-sync-${dbId}`, syncResult.error);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
return needsResync;
|
||||
}
|
||||
|
||||
|
||||
export async function syncExternalDatabases(tenancy: Tenancy) {
|
||||
export async function syncExternalDatabases(tenancy: Tenancy): Promise<boolean> {
|
||||
assertUuid(tenancy.id, "tenancy.id");
|
||||
const externalDatabases = tenancy.config.dbSync.externalDatabases;
|
||||
const internalPrisma = await getPrismaClientForTenancy(tenancy);
|
||||
let needsResync = false;
|
||||
|
||||
for (const [dbId, dbConfig] of Object.entries(externalDatabases)) {
|
||||
try {
|
||||
await syncDatabase(dbId, dbConfig, internalPrisma, tenancy.id);
|
||||
const databaseThrottled = await syncDatabase(dbId, dbConfig, internalPrisma, tenancy.id);
|
||||
if (databaseThrottled) {
|
||||
needsResync = true;
|
||||
}
|
||||
} catch (error) {
|
||||
// Log the error but continue syncing other databases
|
||||
// This ensures one bad database config doesn't block successful syncs to other databases
|
||||
captureError(`external-db-sync-${dbId}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
return needsResync;
|
||||
}
|
||||
|
||||
@ -0,0 +1,592 @@
|
||||
"use client";
|
||||
|
||||
import { PageLayout } from "../page-layout";
|
||||
import { useAdminApp } from "../use-admin-app";
|
||||
import {
|
||||
Alert,
|
||||
Button,
|
||||
Card,
|
||||
CardContent,
|
||||
CardDescription,
|
||||
CardHeader,
|
||||
CardTitle,
|
||||
Skeleton,
|
||||
Switch,
|
||||
Table,
|
||||
TableBody,
|
||||
TableCell,
|
||||
TableHead,
|
||||
TableHeader,
|
||||
TableRow,
|
||||
Typography,
|
||||
} from "@/components/ui";
|
||||
import { Result } from "@stackframe/stack-shared/dist/utils/results";
|
||||
import { runAsynchronously, runAsynchronouslyWithAlert } from "@stackframe/stack-shared/dist/utils/promises";
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||
import { notFound } from "next/navigation";
|
||||
|
||||
const stackAppInternalsSymbol = Symbol.for("StackAuth--DO-NOT-USE-OR-YOU-WILL-BE-FIRED--StackAppInternals");
|
||||
const AUTO_REFRESH_INTERVAL_MS = 5000;
|
||||
|
||||
type SequenceStats = {
|
||||
total: string,
|
||||
pending: string,
|
||||
null_sequence_id: string,
|
||||
min_sequence_id: string | null,
|
||||
max_sequence_id: string | null,
|
||||
};
|
||||
|
||||
type DeletedRowStats = SequenceStats & {
|
||||
by_table: Array<SequenceStats & { table_name: string }>,
|
||||
};
|
||||
|
||||
type PollerStats = {
|
||||
total: string,
|
||||
pending: string,
|
||||
in_flight: string,
|
||||
stale: string,
|
||||
oldest_created_at_millis: number | null,
|
||||
newest_created_at_millis: number | null,
|
||||
};
|
||||
|
||||
type MappingStats = {
|
||||
mapping_id: string,
|
||||
internal_min_sequence_id: string | null,
|
||||
internal_max_sequence_id: string | null,
|
||||
internal_pending_count: string,
|
||||
};
|
||||
|
||||
type ExternalDbMetadata = {
|
||||
mapping_name: string,
|
||||
last_synced_sequence_id: string,
|
||||
updated_at_millis: number | null,
|
||||
};
|
||||
|
||||
type ExternalDbMappingStatus = {
|
||||
mapping_id: string,
|
||||
internal_max_sequence_id: string | null,
|
||||
last_synced_sequence_id: string | null,
|
||||
updated_at_millis: number | null,
|
||||
backlog: string | null,
|
||||
};
|
||||
|
||||
type ExternalDbSyncStatus = {
|
||||
ok: true,
|
||||
generated_at_millis: number,
|
||||
global: {
|
||||
tenancies_total: string,
|
||||
tenancies_with_db_sync: string,
|
||||
sequencer: {
|
||||
project_users: SequenceStats,
|
||||
contact_channels: SequenceStats,
|
||||
deleted_rows: DeletedRowStats,
|
||||
},
|
||||
poller: PollerStats,
|
||||
sync_engine: {
|
||||
mappings: MappingStats[],
|
||||
},
|
||||
} | null,
|
||||
tenancy: {
|
||||
id: string,
|
||||
project_id: string,
|
||||
branch_id: string,
|
||||
},
|
||||
sequencer: {
|
||||
project_users: SequenceStats,
|
||||
contact_channels: SequenceStats,
|
||||
deleted_rows: DeletedRowStats,
|
||||
},
|
||||
poller: PollerStats,
|
||||
sync_engine: {
|
||||
mappings: MappingStats[],
|
||||
external_databases: Array<{
|
||||
id: string,
|
||||
type: string,
|
||||
connection: {
|
||||
redacted: string | null,
|
||||
host: string | null,
|
||||
port: number | null,
|
||||
database: string | null,
|
||||
user: string | null,
|
||||
},
|
||||
status: "ok" | "error",
|
||||
error: string | null,
|
||||
metadata: ExternalDbMetadata[],
|
||||
users_table: {
|
||||
exists: boolean,
|
||||
total_rows: string | null,
|
||||
min_signed_up_at_millis: number | null,
|
||||
max_signed_up_at_millis: number | null,
|
||||
},
|
||||
mapping_status: ExternalDbMappingStatus[],
|
||||
}>,
|
||||
},
|
||||
};
|
||||
|
||||
type AdminAppInternals = {
|
||||
sendRequest: (path: string, requestOptions: RequestInit, requestType?: "client" | "server" | "admin") => Promise<Response>,
|
||||
};
|
||||
|
||||
type AdminAppWithInternals = ReturnType<typeof useAdminApp> & {
|
||||
[stackAppInternalsSymbol]: AdminAppInternals,
|
||||
};
|
||||
|
||||
function formatBigInt(value: string | null) {
|
||||
if (value === null) return "—";
|
||||
if (value.length > 15) return value;
|
||||
const asNumber = Number(value);
|
||||
return Number.isFinite(asNumber) ? new Intl.NumberFormat().format(asNumber) : value;
|
||||
}
|
||||
|
||||
function formatMillis(value: number | null) {
|
||||
if (!value) return "—";
|
||||
return new Date(value).toLocaleString();
|
||||
}
|
||||
|
||||
function sumBigIntStrings(values: Array<string | null | undefined>) {
|
||||
let total = BigInt(0);
|
||||
for (const value of values) {
|
||||
if (!value) continue;
|
||||
if (!/^[-]?\d+$/.test(value)) continue;
|
||||
total += BigInt(value);
|
||||
}
|
||||
return total.toString();
|
||||
}
|
||||
|
||||
function parseBigIntString(value: string | null | undefined) {
|
||||
if (value === null || value === undefined) return null;
|
||||
if (!/^[-]?\d+$/.test(value)) return null;
|
||||
return BigInt(value);
|
||||
}
|
||||
|
||||
const BIGINT_ZERO = BigInt(0);
|
||||
const BIGINT_HUNDRED = BigInt(100);
|
||||
const BIGINT_THOUSAND = BigInt(1000);
|
||||
|
||||
function formatThroughput(value: bigint | number | null) {
|
||||
if (value === null) return "—";
|
||||
if (typeof value === "number") {
|
||||
if (!Number.isFinite(value)) return "—";
|
||||
if (value === 0) return "0/s";
|
||||
const sign = value > 0 ? "+" : "";
|
||||
const abs = Math.abs(value);
|
||||
const display = abs >= 100 ? abs.toFixed(0) : abs >= 10 ? abs.toFixed(1) : abs.toFixed(2);
|
||||
return `${sign}${display}/s`;
|
||||
}
|
||||
if (value === BIGINT_ZERO) return "0/s";
|
||||
const sign = value > BIGINT_ZERO ? "+" : "";
|
||||
const abs = value > BIGINT_ZERO ? value : -value;
|
||||
const intPart = abs / BIGINT_HUNDRED;
|
||||
const fracPart = abs % BIGINT_HUNDRED;
|
||||
const intDisplay = new Intl.NumberFormat().format(intPart);
|
||||
const fracDisplay = fracPart.toString().padStart(2, "0");
|
||||
const display = intPart === BIGINT_ZERO ? `0.${fracDisplay}` : `${intDisplay}.${fracDisplay}`;
|
||||
return `${sign}${display}/s`;
|
||||
}
|
||||
|
||||
function calculateThroughputScaled(prev: bigint | null, current: bigint | null, deltaMillis: number) {
|
||||
if (prev === null || current === null) return null;
|
||||
if (deltaMillis <= 0) return null;
|
||||
const deltaMillisBigInt = BigInt(deltaMillis);
|
||||
return (prev - current) * BIGINT_THOUSAND * BIGINT_HUNDRED / deltaMillisBigInt;
|
||||
}
|
||||
|
||||
function DataValue(props: { value: string | null | undefined, loading: boolean }) {
|
||||
if (props.loading) {
|
||||
return <Skeleton className="h-5 w-20" />;
|
||||
}
|
||||
return <span>{formatBigInt(props.value ?? null)}</span>;
|
||||
}
|
||||
|
||||
function DataDate(props: { value: number | null | undefined, loading: boolean }) {
|
||||
if (props.loading) {
|
||||
return <Skeleton className="h-5 w-32" />;
|
||||
}
|
||||
return <span>{formatMillis(props.value ?? null)}</span>;
|
||||
}
|
||||
|
||||
export default function PageClient() {
|
||||
const adminApp = useAdminApp() as AdminAppWithInternals;
|
||||
const [status, setStatus] = useState<ExternalDbSyncStatus | null>(null);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [loading, setLoading] = useState(false);
|
||||
const [autoRefresh, setAutoRefresh] = useState(true);
|
||||
const inFlightRef = useRef(false);
|
||||
const summarySamplesRef = useRef<Array<{
|
||||
timestampMillis: number,
|
||||
sequencerPending: string,
|
||||
pollerPending: string,
|
||||
mappingPending: string,
|
||||
}>>([]);
|
||||
|
||||
const loadStatus = useCallback(async () => {
|
||||
if (inFlightRef.current) return;
|
||||
inFlightRef.current = true;
|
||||
setLoading(true);
|
||||
|
||||
const result = await Result.fromPromise((async () => {
|
||||
const response = await adminApp[stackAppInternalsSymbol].sendRequest(
|
||||
"/internal/external-db-sync/status?scope=all",
|
||||
{ method: "GET" },
|
||||
"admin",
|
||||
);
|
||||
const body = await response.json();
|
||||
if (!response.ok) {
|
||||
const message = typeof body?.error === "string" ? body.error : "Failed to load external DB sync status.";
|
||||
throw new Error(message);
|
||||
}
|
||||
return body as ExternalDbSyncStatus;
|
||||
})());
|
||||
|
||||
if (result.status === "error") {
|
||||
const message = result.error instanceof Error ? result.error.message : String(result.error);
|
||||
setError(message);
|
||||
setLoading(false);
|
||||
inFlightRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
setStatus(result.data);
|
||||
setError(null);
|
||||
setLoading(false);
|
||||
inFlightRef.current = false;
|
||||
}, [adminApp]);
|
||||
|
||||
const refreshWithAlert = useCallback(() => {
|
||||
runAsynchronouslyWithAlert(loadStatus);
|
||||
}, [loadStatus]);
|
||||
|
||||
useEffect(() => {
|
||||
runAsynchronously(loadStatus);
|
||||
}, [loadStatus]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!autoRefresh) return undefined;
|
||||
const interval = setInterval(() => {
|
||||
runAsynchronously(loadStatus);
|
||||
}, AUTO_REFRESH_INTERVAL_MS);
|
||||
return () => clearInterval(interval);
|
||||
}, [autoRefresh, loadStatus]);
|
||||
|
||||
const summaryStats = useMemo(() => {
|
||||
if (!status) return null;
|
||||
const summarySource = status.global ?? status;
|
||||
const sequencerPending = sumBigIntStrings([
|
||||
summarySource.sequencer.project_users.pending,
|
||||
summarySource.sequencer.contact_channels.pending,
|
||||
summarySource.sequencer.deleted_rows.pending,
|
||||
]);
|
||||
const mappingPending = sumBigIntStrings(
|
||||
summarySource.sync_engine.mappings.map((mapping) => mapping.internal_pending_count),
|
||||
);
|
||||
|
||||
return {
|
||||
sequencerPending,
|
||||
pollerPending: summarySource.poller.pending,
|
||||
mappingPending,
|
||||
isGlobal: Boolean(status.global),
|
||||
};
|
||||
}, [status]);
|
||||
|
||||
const throughputStats = useMemo(() => {
|
||||
if (!status || !summaryStats) return null;
|
||||
const currentSample = {
|
||||
timestampMillis: status.generated_at_millis,
|
||||
sequencerPending: summaryStats.sequencerPending,
|
||||
pollerPending: summaryStats.pollerPending,
|
||||
mappingPending: summaryStats.mappingPending,
|
||||
};
|
||||
const samples = summarySamplesRef.current;
|
||||
const samplesWithCurrent = samples.length === 0 || samples[samples.length - 1].timestampMillis !== currentSample.timestampMillis
|
||||
? [...samples, currentSample]
|
||||
: samples;
|
||||
const windowStart = status.generated_at_millis - 20000;
|
||||
const windowedSamples = samplesWithCurrent.filter((sample) => sample.timestampMillis >= windowStart);
|
||||
if (windowedSamples.length < 2) return null;
|
||||
const oldest = windowedSamples[0];
|
||||
const deltaMillis = status.generated_at_millis - oldest.timestampMillis;
|
||||
if (deltaMillis <= 0) return null;
|
||||
|
||||
return {
|
||||
sequencer: calculateThroughputScaled(
|
||||
parseBigIntString(oldest.sequencerPending),
|
||||
parseBigIntString(summaryStats.sequencerPending),
|
||||
deltaMillis,
|
||||
),
|
||||
poller: calculateThroughputScaled(
|
||||
parseBigIntString(oldest.pollerPending),
|
||||
parseBigIntString(summaryStats.pollerPending),
|
||||
deltaMillis,
|
||||
),
|
||||
mapping: calculateThroughputScaled(
|
||||
parseBigIntString(oldest.mappingPending),
|
||||
parseBigIntString(summaryStats.mappingPending),
|
||||
deltaMillis,
|
||||
),
|
||||
};
|
||||
}, [status, summaryStats]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!status || !summaryStats) return;
|
||||
const nextSamples = [...summarySamplesRef.current, {
|
||||
timestampMillis: status.generated_at_millis,
|
||||
sequencerPending: summaryStats.sequencerPending,
|
||||
pollerPending: summaryStats.pollerPending,
|
||||
mappingPending: summaryStats.mappingPending,
|
||||
}];
|
||||
const windowStart = status.generated_at_millis - 20000;
|
||||
summarySamplesRef.current = nextSamples.filter((sample) => sample.timestampMillis >= windowStart);
|
||||
}, [status, summaryStats]);
|
||||
|
||||
const loadingState = loading && !status;
|
||||
const globalStatus = status?.global ?? null;
|
||||
const deletedRowsByTable = status?.sequencer.deleted_rows.by_table ?? [];
|
||||
const mappingRows = status?.sync_engine.mappings ?? [];
|
||||
|
||||
if (adminApp.projectId !== "internal") {
|
||||
return notFound();
|
||||
}
|
||||
|
||||
return (
|
||||
<PageLayout
|
||||
title="External DB Sync"
|
||||
description="Real-time sequencing, queue, and sync visibility across all tenancies."
|
||||
actions={
|
||||
<div className="flex items-center gap-4">
|
||||
<div className="flex items-center gap-2 text-xs text-muted-foreground">
|
||||
<Switch checked={autoRefresh} onCheckedChange={setAutoRefresh} />
|
||||
<span>Auto refresh</span>
|
||||
</div>
|
||||
<Button onClick={refreshWithAlert} loading={loading} variant="secondary">
|
||||
Refresh
|
||||
</Button>
|
||||
</div>
|
||||
}
|
||||
fillWidth
|
||||
>
|
||||
{error && <Alert variant="destructive">{error}</Alert>}
|
||||
|
||||
<div className="flex flex-wrap items-center gap-3 text-xs text-muted-foreground">
|
||||
<span>Scope: {globalStatus ? "All tenancies" : "Current tenancy"}</span>
|
||||
{globalStatus && (
|
||||
<>
|
||||
<span>Tenancies: {formatBigInt(globalStatus.tenancies_total)}</span>
|
||||
<span>DB sync configs: {formatBigInt(globalStatus.tenancies_with_db_sync)}</span>
|
||||
</>
|
||||
)}
|
||||
<span>Last updated: {status ? formatMillis(status.generated_at_millis) : "—"}</span>
|
||||
</div>
|
||||
|
||||
<div className="grid gap-4 md:grid-cols-3">
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardDescription>Sequencer pending rows</CardDescription>
|
||||
<CardTitle className="text-2xl">
|
||||
{loadingState ? <Skeleton className="h-8 w-28" /> : formatBigInt(summaryStats?.sequencerPending ?? null)}
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="space-y-2 text-xs text-muted-foreground">
|
||||
<div>ProjectUser + ContactChannel + DeletedRow rows waiting for sequence IDs.</div>
|
||||
<div className="flex items-center justify-between">
|
||||
<span>Throughput</span>
|
||||
<span>{loadingState ? "—" : formatThroughput(throughputStats?.sequencer ?? null)}</span>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardDescription>Outgoing sync requests</CardDescription>
|
||||
<CardTitle className="text-2xl">
|
||||
{loadingState ? <Skeleton className="h-8 w-24" /> : formatBigInt(summaryStats?.pollerPending ?? null)}
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="space-y-2 text-xs text-muted-foreground">
|
||||
<div>Requests still queued for the poller to dispatch.</div>
|
||||
<div className="flex items-center justify-between">
|
||||
<span>Throughput</span>
|
||||
<span>{loadingState ? "—" : formatThroughput(throughputStats?.poller ?? null)}</span>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardDescription>Mapping pending rows</CardDescription>
|
||||
<CardTitle className="text-2xl">
|
||||
{loadingState ? <Skeleton className="h-8 w-24" /> : formatBigInt(summaryStats?.mappingPending ?? null)}
|
||||
</CardTitle>
|
||||
</CardHeader>
|
||||
<CardContent className="space-y-2 text-xs text-muted-foreground">
|
||||
<div>Pending internal rows waiting for sync across mappings.</div>
|
||||
<div className="flex items-center justify-between">
|
||||
<span>Throughput</span>
|
||||
<span>{loadingState ? "—" : formatThroughput(throughputStats?.mapping ?? null)}</span>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
|
||||
<div className="grid gap-4 lg:grid-cols-2">
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Sequencer</CardTitle>
|
||||
<CardDescription>Rows awaiting sequence ID backfill per table.</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead>Table</TableHead>
|
||||
<TableHead>Total</TableHead>
|
||||
<TableHead>Pending</TableHead>
|
||||
<TableHead>Null Seq</TableHead>
|
||||
<TableHead>Min Seq</TableHead>
|
||||
<TableHead>Max Seq</TableHead>
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
<TableRow>
|
||||
<TableCell className="font-medium">ProjectUser</TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.project_users.total} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.project_users.pending} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.project_users.null_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.project_users.min_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.project_users.max_sequence_id} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
<TableRow>
|
||||
<TableCell className="font-medium">ContactChannel</TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.contact_channels.total} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.contact_channels.pending} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.contact_channels.null_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.contact_channels.min_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.contact_channels.max_sequence_id} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
<TableRow>
|
||||
<TableCell className="font-medium">DeletedRow</TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.deleted_rows.total} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.deleted_rows.pending} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.deleted_rows.null_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.deleted_rows.min_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.sequencer.deleted_rows.max_sequence_id} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
</TableBody>
|
||||
</Table>
|
||||
|
||||
<div className="mt-4">
|
||||
<Typography type="p" className="text-xs font-semibold uppercase text-muted-foreground">
|
||||
Deleted rows by table
|
||||
</Typography>
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead>Table</TableHead>
|
||||
<TableHead>Total</TableHead>
|
||||
<TableHead>Pending</TableHead>
|
||||
<TableHead>Null Seq</TableHead>
|
||||
<TableHead>Min Seq</TableHead>
|
||||
<TableHead>Max Seq</TableHead>
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
{deletedRowsByTable.map((row) => (
|
||||
<TableRow key={row.table_name}>
|
||||
<TableCell className="font-medium">{row.table_name}</TableCell>
|
||||
<TableCell><DataValue value={row.total} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={row.pending} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={row.null_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={row.min_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={row.max_sequence_id} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
))}
|
||||
{!loadingState && deletedRowsByTable.length === 0 && (
|
||||
<TableRow>
|
||||
<TableCell colSpan={6} className="text-center text-muted-foreground">
|
||||
No deleted rows recorded yet.
|
||||
</TableCell>
|
||||
</TableRow>
|
||||
)}
|
||||
</TableBody>
|
||||
</Table>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Poller</CardTitle>
|
||||
<CardDescription>OutgoingRequest queue and processing overview.</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead>Total</TableHead>
|
||||
<TableHead>Pending</TableHead>
|
||||
<TableHead>In Flight</TableHead>
|
||||
<TableHead>Stale</TableHead>
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
<TableRow>
|
||||
<TableCell><DataValue value={status?.poller.total} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.poller.pending} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.poller.in_flight} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={status?.poller.stale} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
</TableBody>
|
||||
</Table>
|
||||
|
||||
<div className="mt-4 grid gap-2 text-sm">
|
||||
<div className="flex items-center justify-between">
|
||||
<span className="text-muted-foreground">Oldest request</span>
|
||||
<DataDate value={status?.poller.oldest_created_at_millis} loading={loadingState} />
|
||||
</div>
|
||||
<div className="flex items-center justify-between">
|
||||
<span className="text-muted-foreground">Newest request</span>
|
||||
<DataDate value={status?.poller.newest_created_at_millis} loading={loadingState} />
|
||||
</div>
|
||||
</div>
|
||||
</CardContent>
|
||||
</Card>
|
||||
</div>
|
||||
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Sync Engine</CardTitle>
|
||||
<CardDescription>Internal mapping checkpoints before external sync.</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
<Table>
|
||||
<TableHeader>
|
||||
<TableRow>
|
||||
<TableHead>Mapping</TableHead>
|
||||
<TableHead>Min Seq</TableHead>
|
||||
<TableHead>Max Seq</TableHead>
|
||||
<TableHead>Pending Rows</TableHead>
|
||||
</TableRow>
|
||||
</TableHeader>
|
||||
<TableBody>
|
||||
{mappingRows.map((mapping) => (
|
||||
<TableRow key={mapping.mapping_id}>
|
||||
<TableCell className="font-medium">{mapping.mapping_id}</TableCell>
|
||||
<TableCell><DataValue value={mapping.internal_min_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={mapping.internal_max_sequence_id} loading={loadingState} /></TableCell>
|
||||
<TableCell><DataValue value={mapping.internal_pending_count} loading={loadingState} /></TableCell>
|
||||
</TableRow>
|
||||
))}
|
||||
{!loadingState && mappingRows.length === 0 && (
|
||||
<TableRow>
|
||||
<TableCell colSpan={4} className="text-center text-muted-foreground">
|
||||
No mappings configured.
|
||||
</TableCell>
|
||||
</TableRow>
|
||||
)}
|
||||
</TableBody>
|
||||
</Table>
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
</PageLayout>
|
||||
);
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
import PageClient from "./page-client";
|
||||
|
||||
export const metadata = {
|
||||
title: "External DB Sync",
|
||||
};
|
||||
|
||||
export default function Page() {
|
||||
return <PageClient />;
|
||||
}
|
||||
@ -448,4 +448,56 @@ describe.sequential('External DB Sync - Basic Tests', () => {
|
||||
const seq2 = Number(metadata2.rows[0].last_synced_sequence_id);
|
||||
expect(seq2).toBeGreaterThan(seq1);
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
/**
|
||||
* What it does:
|
||||
* - Creates a project with external DB sync enabled.
|
||||
* - Calls the internal status endpoint and validates the response shape.
|
||||
*
|
||||
* Why it matters:
|
||||
* - Confirms the dashboard status API exposes sequencer, poller, and sync-engine metrics.
|
||||
*/
|
||||
test('Status endpoint exposes sync pipeline metrics', async () => {
|
||||
const dbName = 'status_endpoint_test';
|
||||
const connectionString = await dbManager.createDatabase(dbName);
|
||||
|
||||
await createProjectWithExternalDb({
|
||||
main: {
|
||||
type: 'postgres',
|
||||
connectionString,
|
||||
}
|
||||
}, {
|
||||
display_name: '📈 External DB Sync Status',
|
||||
description: 'Validating sync status endpoint shape',
|
||||
});
|
||||
|
||||
const response = await niceBackendFetch('/api/latest/internal/external-db-sync/status', {
|
||||
accessType: 'admin',
|
||||
});
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.body).toMatchObject({
|
||||
ok: true,
|
||||
tenancy: {
|
||||
id: expect.any(String),
|
||||
project_id: expect.any(String),
|
||||
branch_id: expect.any(String),
|
||||
},
|
||||
sequencer: {
|
||||
project_users: expect.any(Object),
|
||||
contact_channels: expect.any(Object),
|
||||
deleted_rows: expect.any(Object),
|
||||
},
|
||||
poller: {
|
||||
total: expect.any(String),
|
||||
pending: expect.any(String),
|
||||
in_flight: expect.any(String),
|
||||
stale: expect.any(String),
|
||||
},
|
||||
sync_engine: {
|
||||
mappings: expect.any(Array),
|
||||
external_databases: expect.any(Array),
|
||||
},
|
||||
});
|
||||
}, TEST_TIMEOUT);
|
||||
});
|
||||
|
||||
@ -0,0 +1,262 @@
|
||||
--set -a; source apps/backend/.env.development; set +a; psql "$STACK_DATABASE_CONNECTION_STRING" -v ON_ERROR_STOP=1 -f apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql
|
||||
|
||||
BEGIN;
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
|
||||
|
||||
-- NOTE:
|
||||
-- - This script is intentionally heavy (1,000,000 projects + 3,000,000 users).
|
||||
-- - Update BOTH settings blocks if you need a different external DB connection string.
|
||||
-- - The external DB should be reachable from the backend (default uses docker postgres on port 8128).
|
||||
|
||||
-- =====================================================================================
|
||||
-- 1) One million projects, one user each
|
||||
-- =====================================================================================
|
||||
WITH settings AS (
|
||||
SELECT
|
||||
'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string,
|
||||
1000000::int AS project_count
|
||||
),
|
||||
config AS (
|
||||
SELECT jsonb_build_object(
|
||||
'dbSync',
|
||||
jsonb_build_object(
|
||||
'externalDatabases',
|
||||
jsonb_build_object(
|
||||
'main',
|
||||
jsonb_build_object(
|
||||
'type', 'postgres',
|
||||
'connectionString', external_connection_string
|
||||
)
|
||||
)
|
||||
)
|
||||
) AS config_json
|
||||
FROM settings
|
||||
),
|
||||
small_projects AS (
|
||||
SELECT
|
||||
gen_random_uuid() AS project_id,
|
||||
gen_random_uuid() AS tenancy_id,
|
||||
gen_random_uuid() AS project_user_id,
|
||||
gen_random_uuid() AS auth_method_id,
|
||||
gen_random_uuid() AS contact_id,
|
||||
gs AS idx,
|
||||
lpad(gs::text, 7, '0') AS padded_idx,
|
||||
now() AS ts
|
||||
FROM settings
|
||||
CROSS JOIN generate_series(1, settings.project_count) AS gs
|
||||
),
|
||||
insert_projects AS (
|
||||
INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
project_id,
|
||||
'External DB Sync Project ' || padded_idx,
|
||||
'External DB sync load test project',
|
||||
FALSE,
|
||||
NULL,
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
RETURNING "id"
|
||||
),
|
||||
insert_tenancies AS (
|
||||
INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
project_id,
|
||||
'main',
|
||||
NULL,
|
||||
'TRUE'::"BooleanTrue",
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
RETURNING "id"
|
||||
),
|
||||
insert_env_config AS (
|
||||
INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
project_id,
|
||||
'main',
|
||||
(SELECT config_json FROM config),
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
ON CONFLICT ("projectId", "branchId") DO UPDATE SET
|
||||
"config" = EXCLUDED."config",
|
||||
"updatedAt" = EXCLUDED."updatedAt"
|
||||
RETURNING "projectId"
|
||||
),
|
||||
insert_users AS (
|
||||
INSERT INTO "ProjectUser"
|
||||
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
project_user_id,
|
||||
project_id,
|
||||
'main',
|
||||
'External Sync User ' || padded_idx,
|
||||
project_id,
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
RETURNING "tenancyId", "projectUserId"
|
||||
),
|
||||
insert_contacts AS (
|
||||
INSERT INTO "ContactChannel"
|
||||
("tenancyId", "projectUserId", "id", "type", "isPrimary", "usedForAuth", "isVerified", "value", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
project_user_id,
|
||||
contact_id,
|
||||
'EMAIL',
|
||||
'TRUE'::"BooleanTrue",
|
||||
'TRUE'::"BooleanTrue",
|
||||
false,
|
||||
'external-sync-user-' || padded_idx || '@load.local',
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
RETURNING "tenancyId", "projectUserId"
|
||||
),
|
||||
insert_auth_methods AS (
|
||||
INSERT INTO "AuthMethod"
|
||||
("tenancyId", "id", "projectUserId", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
auth_method_id,
|
||||
project_user_id,
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects
|
||||
RETURNING "tenancyId", "id", "projectUserId"
|
||||
)
|
||||
INSERT INTO "PasswordAuthMethod"
|
||||
("tenancyId", "authMethodId", "projectUserId", "passwordHash", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
auth_method_id,
|
||||
project_user_id,
|
||||
'$2a$13$TVyY/gpw9Db/w1fBeJkCgeNg2Rae2JfNqrPnSACtj.ufAO5cVF13.',
|
||||
ts,
|
||||
ts
|
||||
FROM small_projects;
|
||||
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
|
||||
-- =====================================================================================
|
||||
-- 2) Three projects, one million users each
|
||||
-- =====================================================================================
|
||||
SET LOCAL synchronous_commit = off;
|
||||
|
||||
CREATE TEMP TABLE tmp_large_projects AS
|
||||
SELECT
|
||||
gen_random_uuid() AS project_id,
|
||||
gen_random_uuid() AS tenancy_id,
|
||||
gs AS project_idx,
|
||||
lpad(gs::text, 2, '0') AS padded_project_idx,
|
||||
now() AS ts
|
||||
FROM generate_series(1, 3) AS gs;
|
||||
|
||||
INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
project_id,
|
||||
'External DB Sync Mega Project ' || padded_project_idx,
|
||||
'External DB sync load test project (mega)',
|
||||
FALSE,
|
||||
NULL,
|
||||
ts,
|
||||
ts
|
||||
FROM tmp_large_projects;
|
||||
|
||||
INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
project_id,
|
||||
'main',
|
||||
NULL,
|
||||
'TRUE'::"BooleanTrue",
|
||||
ts,
|
||||
ts
|
||||
FROM tmp_large_projects;
|
||||
|
||||
WITH settings AS (
|
||||
SELECT
|
||||
'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string
|
||||
),
|
||||
config AS (
|
||||
SELECT jsonb_build_object(
|
||||
'dbSync',
|
||||
jsonb_build_object(
|
||||
'externalDatabases',
|
||||
jsonb_build_object(
|
||||
'main',
|
||||
jsonb_build_object(
|
||||
'type', 'postgres',
|
||||
'connectionString', external_connection_string
|
||||
)
|
||||
)
|
||||
)
|
||||
) AS config_json
|
||||
FROM settings
|
||||
)
|
||||
INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
project_id,
|
||||
'main',
|
||||
(SELECT config_json FROM config),
|
||||
ts,
|
||||
ts
|
||||
FROM tmp_large_projects
|
||||
ON CONFLICT ("projectId", "branchId") DO UPDATE SET
|
||||
"config" = EXCLUDED."config",
|
||||
"updatedAt" = EXCLUDED."updatedAt";
|
||||
|
||||
-- ALTER TABLE "ProjectUser" DISABLE TRIGGER project_user_insert_trigger;
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
users_per_project int := 1000000;
|
||||
batch_size int := 10000;
|
||||
batch_start int := 1;
|
||||
batch_end int;
|
||||
BEGIN
|
||||
WHILE batch_start <= users_per_project LOOP
|
||||
batch_end := LEAST(batch_start + batch_size - 1, users_per_project);
|
||||
|
||||
WITH mega_users AS (
|
||||
SELECT
|
||||
lp.project_id,
|
||||
lp.tenancy_id,
|
||||
lp.project_idx,
|
||||
lp.padded_project_idx,
|
||||
gs AS user_idx,
|
||||
lpad(gs::text, 7, '0') AS padded_user_idx,
|
||||
gen_random_uuid() AS project_user_id,
|
||||
lp.ts AS ts
|
||||
FROM tmp_large_projects lp
|
||||
CROSS JOIN generate_series(batch_start, batch_end) AS gs
|
||||
)
|
||||
INSERT INTO "ProjectUser"
|
||||
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt")
|
||||
SELECT
|
||||
tenancy_id,
|
||||
project_user_id,
|
||||
project_id,
|
||||
'main',
|
||||
'Mega User ' || padded_project_idx || '-' || padded_user_idx,
|
||||
project_id,
|
||||
ts,
|
||||
ts
|
||||
FROM mega_users;
|
||||
|
||||
RAISE NOTICE 'Inserted users %-% of % per project', batch_start, batch_end, users_per_project;
|
||||
|
||||
batch_start := batch_end + 1;
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
-- ALTER TABLE "ProjectUser" ENABLE TRIGGER project_user_insert_trigger;
|
||||
|
||||
COMMIT;
|
||||
@ -17,3 +17,27 @@ A: Use `poolMatchGlobs` to route the external DB sync test globs to the `forks`
|
||||
|
||||
Q: How can CI keep most tests parallel while isolating external DB sync tests?
|
||||
A: Split workflow test runs into two steps: run the full suite with `--exclude "**/external-db-sync*.test.ts"`, then run only external DB sync tests with `--min-workers=1 --max-workers=1`.
|
||||
|
||||
Q: How do I call a custom internal admin endpoint from the dashboard without adding SDK methods?
|
||||
A: Grab the Stack internals symbol (`Symbol.for("StackAuth--DO-NOT-USE-OR-YOU-WILL-BE-FIRED--StackAppInternals")`) and call `adminApp[stackAppInternalsSymbol].sendRequest(path, options, "admin")` to issue an authenticated admin request.
|
||||
|
||||
Q: How do I read project and branch IDs from admin auth in backend route handlers?
|
||||
A: Use `auth.tenancy.project.id` and `auth.tenancy.branchId` when `adminAuthTypeSchema` is in use; `auth.project` and `auth.branchId` do not exist there.
|
||||
|
||||
Q: How do I avoid BigInt literal errors in dashboard typecheck?
|
||||
A: Avoid `0n` in client code and use `BigInt(0)` instead, since BigInt literals require an ES2020 target.
|
||||
|
||||
Q: How can I get external DB sync status across all tenancies?
|
||||
A: Call `/api/latest/internal/external-db-sync/status?scope=all` with admin auth; the response includes a `global` aggregate alongside the current tenancy details.
|
||||
|
||||
Q: How can I throttle external DB sync in dev without pausing it?
|
||||
A: Use `STACK_EXTERNAL_DB_SYNC_POLL_CLAIM_LIMIT` to cap poller throughput, `STACK_EXTERNAL_DB_SYNC_SEQUENCER_BATCH_SIZE` to reduce backfill batch size, and `STACK_EXTERNAL_DB_SYNC_MAX_BATCHES_PER_MAPPING` to limit sync-engine work per request (it re-enqueues when throttled).
|
||||
|
||||
Q: How can the external DB sync dashboard show global stats only?
|
||||
A: When `/api/latest/internal/external-db-sync/status?scope=all` is used, the route can return global aggregates for the main stats and an empty `external_databases` array; the dashboard should avoid tenancy-specific fields and external DB cards in that mode.
|
||||
|
||||
Q: How can I make the mega-user load in `mock-external-db-sync-projects.sql` show progress and avoid huge single-statement inserts?
|
||||
A: Create a temp table for the three mega projects, then use a `DO $$` loop to insert users in batches (e.g., 10k per project) with a `RAISE NOTICE` after each batch; this keeps memory pressure lower and gives progress feedback.
|
||||
|
||||
Q: Why is the mega-user load still slow even with batching?
|
||||
A: `ProjectUser` inserts fire the `project_user_insert_trigger` (from `apps/backend/prisma/migrations/20250304200822_add_project_user_count/migration.sql`) which updates `Project.userCount` on every insert; with 1M users per project, that means 1M updates to the same project row, causing huge write amplification. ContactChannel inserts also trigger `mark_project_user_on_contact_channel_*` updates. For fast bulk loads, disable those triggers during the load and recompute `Project.userCount` afterward.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user