stack/apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql
aadesh18 2055d98dea
External db sync (#1036)
<img width="1920" height="969" alt="Screenshot 2026-02-04 at 9 47 16 AM"
src="https://github.com/user-attachments/assets/d7d0cd04-0051-4fc4-b857-e6f87ee97a59"
/>

**This PR revolves around the following components**
1. Sequencer - sequences the updates in the internal db
2. Poller - polls for the latest updates to sync with the external db
3. Outgoing Request Handler - essentially a trigger that can make http
requests based on a change in the internal db
4. Sync Engine - syncs with the latest changes from the internal db to
the external db

**What has been done**
- Added a global sequence id for ProjectUser, ContactChannel and
DeletedRow.
- Added the deletedRow table to keep track of the rows that were deleted
across ProjectUser and ContactChannel.
- Added the OutgoingRequest table to keep track of the outgoing requests
- Added function for the sequencer to call to sequence updates
- Added a sequencer that sequences all the changes in the internal db
every 50 ms
- Added a poller that polls for the latest changes in the internal db
every 50 ms, and adds to a queue
- Added a Vercel cron that calls sequencer and poller every minute
- Added a queue that fulfills the outgoing requests by making http calls
(for external db sync, it calls the sync engine endpoint)
- Added a sync engine that uses the defined sql mapping query in the
user's schema to pull in the changes for the user, and sync them with
the external db
- Added tests to test out each functionality


**How to review this PR:**
1. Review the migrations (sequence id, deletedRow, triggers, backlog
sync) (all files created under the migrations folder)
2. Review sequencer
3. Review poller
4. Review the changes in schema
5. Review sync-engine (the function, and it's helper file)
6. Review the schema changes, and query mappings
7. Review the tests (basic, advanced and race, along with the helper
file)
8. Review the changes made in Dockerfile to support local testing using
the postgres docker

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Introduces a cron-driven external DB sync pipeline with global
sequencing, internal poller and webhook sync engine, new DB
tables/functions, config schema/mappings, and comprehensive e2e tests.
> 
> - **Database (Prisma/Migrations)**:
> - Add global sequence (`global_seq_id`) and
`sequenceId`/`shouldUpdateSequenceId` to `ProjectUser`,
`ContactChannel`, `DeletedRow` with partial indexes.
> - Create `DeletedRow` (capture deletes) and `OutgoingRequest` (queue)
tables; add unique/indexes.
> - Add triggers/functions: `log_deleted_row`,
`reset_sequence_id_on_update`, `backfill_null_sequence_ids`,
`enqueue_tenant_sync`.
> - **Backend/API**:
> - New internal routes: `GET
/api/latest/internal/external-db-sync/sequencer`, `GET /poller`, `POST
/sync-engine` (Upstash-verified) for sync orchestration.
> - Add cron wiring: `vercel.json` schedules and local
`scripts/run-cron-jobs.ts`; start in dev via `dev` script.
> - Tweak route handler (remove noisy logging) without behavior change.
> - **Sync Engine**:
> - Implement `src/lib/external-db-sync.ts` to read tenant mappings and
upsert to external Postgres (schema bootstrap, param checks,
sequencing).
> - Add default mappings `DEFAULT_DB_SYNC_MAPPINGS` and config schema
`dbSync.externalDatabases` in shared config.
> - **Testing/Infra**:
> - Add extensive e2e tests (basics, advanced, race conditions) for
sequencing, idempotency, deletes, pagination, multi-mapping, and
permissions.
> - Docker compose: add `external-db-test` Postgres for tests; e2e deps
for `pg` types.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
3f2a8efcfb. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* External PostgreSQL sync: automatic, batched replication with
mappings, resume/idempotency, and on-demand enqueueing.

* **Admin UI**
* Real-time External DB Sync dashboard and status API showing
per-mapping backlog, sequencer/poller/sync-engine telemetry, and fusebox
controls.

* **Tests**
* Large e2e suite: basic, advanced, race, high-volume tests and test
utilities for external DB sync.

* **Chores**
* DB migrations, CI/workflow updates, background cron runner and
local/dev test support.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Konsti Wohlwend <n2d4xc@gmail.com>
Co-authored-by: Bilal Godil <bg2002@gmail.com>
2026-02-05 12:04:31 -08:00

263 lines
7.0 KiB
PL/PgSQL

--set -a; source apps/backend/.env.development; set +a; psql "$STACK_DATABASE_CONNECTION_STRING" -v ON_ERROR_STOP=1 -f apps/e2e/tests/backend/performance/mock-external-db-sync-projects.sql
BEGIN;
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- NOTE:
-- - This script is intentionally heavy (1,000,000 projects + 3,000,000 users).
-- - Update BOTH settings blocks if you need a different external DB connection string.
-- - The external DB should be reachable from the backend (default uses docker postgres on port 8128).
-- =====================================================================================
-- 1) One million projects, one user each
-- =====================================================================================
WITH settings AS (
SELECT
'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string,
1000000::int AS project_count
),
config AS (
SELECT jsonb_build_object(
'dbSync',
jsonb_build_object(
'externalDatabases',
jsonb_build_object(
'main',
jsonb_build_object(
'type', 'postgres',
'connectionString', external_connection_string
)
)
)
) AS config_json
FROM settings
),
small_projects AS (
SELECT
gen_random_uuid() AS project_id,
gen_random_uuid() AS tenancy_id,
gen_random_uuid() AS project_user_id,
gen_random_uuid() AS auth_method_id,
gen_random_uuid() AS contact_id,
gs AS idx,
lpad(gs::text, 7, '0') AS padded_idx,
now() AS ts
FROM settings
CROSS JOIN generate_series(1, settings.project_count) AS gs
),
insert_projects AS (
INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt")
SELECT
project_id,
'External DB Sync Project ' || padded_idx,
'External DB sync load test project',
FALSE,
NULL,
ts,
ts
FROM small_projects
RETURNING "id"
),
insert_tenancies AS (
INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt")
SELECT
tenancy_id,
project_id,
'main',
NULL,
'TRUE'::"BooleanTrue",
ts,
ts
FROM small_projects
RETURNING "id"
),
insert_env_config AS (
INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt")
SELECT
project_id,
'main',
(SELECT config_json FROM config),
ts,
ts
FROM small_projects
ON CONFLICT ("projectId", "branchId") DO UPDATE SET
"config" = EXCLUDED."config",
"updatedAt" = EXCLUDED."updatedAt"
RETURNING "projectId"
),
insert_users AS (
INSERT INTO "ProjectUser"
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt")
SELECT
tenancy_id,
project_user_id,
project_id,
'main',
'External Sync User ' || padded_idx,
project_id,
ts,
ts
FROM small_projects
RETURNING "tenancyId", "projectUserId"
),
insert_contacts AS (
INSERT INTO "ContactChannel"
("tenancyId", "projectUserId", "id", "type", "isPrimary", "usedForAuth", "isVerified", "value", "createdAt", "updatedAt")
SELECT
tenancy_id,
project_user_id,
contact_id,
'EMAIL',
'TRUE'::"BooleanTrue",
'TRUE'::"BooleanTrue",
false,
'external-sync-user-' || padded_idx || '@load.local',
ts,
ts
FROM small_projects
RETURNING "tenancyId", "projectUserId"
),
insert_auth_methods AS (
INSERT INTO "AuthMethod"
("tenancyId", "id", "projectUserId", "createdAt", "updatedAt")
SELECT
tenancy_id,
auth_method_id,
project_user_id,
ts,
ts
FROM small_projects
RETURNING "tenancyId", "id", "projectUserId"
)
INSERT INTO "PasswordAuthMethod"
("tenancyId", "authMethodId", "projectUserId", "passwordHash", "createdAt", "updatedAt")
SELECT
tenancy_id,
auth_method_id,
project_user_id,
'$2a$13$TVyY/gpw9Db/w1fBeJkCgeNg2Rae2JfNqrPnSACtj.ufAO5cVF13.',
ts,
ts
FROM small_projects;
COMMIT;
BEGIN;
-- =====================================================================================
-- 2) Three projects, one million users each
-- =====================================================================================
SET LOCAL synchronous_commit = off;
CREATE TEMP TABLE tmp_large_projects AS
SELECT
gen_random_uuid() AS project_id,
gen_random_uuid() AS tenancy_id,
gs AS project_idx,
lpad(gs::text, 2, '0') AS padded_project_idx,
now() AS ts
FROM generate_series(1, 3) AS gs;
INSERT INTO "Project" ("id", "displayName", "description", "isProductionMode", "ownerTeamId", "createdAt", "updatedAt")
SELECT
project_id,
'External DB Sync Mega Project ' || padded_project_idx,
'External DB sync load test project (mega)',
FALSE,
NULL,
ts,
ts
FROM tmp_large_projects;
INSERT INTO "Tenancy" ("id", "projectId", "branchId", "organizationId", "hasNoOrganization", "createdAt", "updatedAt")
SELECT
tenancy_id,
project_id,
'main',
NULL,
'TRUE'::"BooleanTrue",
ts,
ts
FROM tmp_large_projects;
WITH settings AS (
SELECT
'postgresql://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:8128/loadtest'::text AS external_connection_string
),
config AS (
SELECT jsonb_build_object(
'dbSync',
jsonb_build_object(
'externalDatabases',
jsonb_build_object(
'main',
jsonb_build_object(
'type', 'postgres',
'connectionString', external_connection_string
)
)
)
) AS config_json
FROM settings
)
INSERT INTO "EnvironmentConfigOverride" ("projectId", "branchId", "config", "createdAt", "updatedAt")
SELECT
project_id,
'main',
(SELECT config_json FROM config),
ts,
ts
FROM tmp_large_projects
ON CONFLICT ("projectId", "branchId") DO UPDATE SET
"config" = EXCLUDED."config",
"updatedAt" = EXCLUDED."updatedAt";
-- ALTER TABLE "ProjectUser" DISABLE TRIGGER project_user_insert_trigger;
DO $$
DECLARE
users_per_project int := 1000000;
batch_size int := 10000;
batch_start int := 1;
batch_end int;
BEGIN
WHILE batch_start <= users_per_project LOOP
batch_end := LEAST(batch_start + batch_size - 1, users_per_project);
WITH mega_users AS (
SELECT
lp.project_id,
lp.tenancy_id,
lp.project_idx,
lp.padded_project_idx,
gs AS user_idx,
lpad(gs::text, 7, '0') AS padded_user_idx,
gen_random_uuid() AS project_user_id,
lp.ts AS ts
FROM tmp_large_projects lp
CROSS JOIN generate_series(batch_start, batch_end) AS gs
)
INSERT INTO "ProjectUser"
("tenancyId", "projectUserId", "mirroredProjectId", "mirroredBranchId", "displayName", "projectId", "createdAt", "updatedAt")
SELECT
tenancy_id,
project_user_id,
project_id,
'main',
'Mega User ' || padded_project_idx || '-' || padded_user_idx,
project_id,
ts,
ts
FROM mega_users;
RAISE NOTICE 'Inserted users %-% of % per project', batch_start, batch_end, users_per_project;
batch_start := batch_end + 1;
END LOOP;
END $$;
-- ALTER TABLE "ProjectUser" ENABLE TRIGGER project_user_insert_trigger;
COMMIT;