mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-30 21:01:54 +08:00
<img width="1920" height="969" alt="Screenshot 2026-02-04 at 9 47 16 AM"
src="https://github.com/user-attachments/assets/d7d0cd04-0051-4fc4-b857-e6f87ee97a59"
/>
**This PR revolves around the following components**
1. Sequencer - sequences the updates in the internal db
2. Poller - polls for the latest updates to sync with the external db
3. Outgoing Request Handler - essentially a trigger that can make http
requests based on a change in the internal db
4. Sync Engine - syncs with the latest changes from the internal db to
the external db
**What has been done**
- Added a global sequence id for ProjectUser, ContactChannel and
DeletedRow.
- Added the deletedRow table to keep track of the rows that were deleted
across ProjectUser and ContactChannel.
- Added the OutgoingRequest table to keep track of the outgoing requests
- Added function for the sequencer to call to sequence updates
- Added a sequencer that sequences all the changes in the internal db
every 50 ms
- Added a poller that polls for the latest changes in the internal db
every 50 ms, and adds to a queue
- Added a Vercel cron that calls sequencer and poller every minute
- Added a queue that fulfills the outgoing requests by making http calls
(for external db sync, it calls the sync engine endpoint)
- Added a sync engine that uses the defined sql mapping query in the
user's schema to pull in the changes for the user, and sync them with
the external db
- Added tests to test out each functionality
**How to review this PR:**
1. Review the migrations (sequence id, deletedRow, triggers, backlog
sync) (all files created under the migrations folder)
2. Review sequencer
3. Review poller
4. Review the changes in schema
5. Review sync-engine (the function, and it's helper file)
6. Review the schema changes, and query mappings
7. Review the tests (basic, advanced and race, along with the helper
file)
8. Review the changes made in Dockerfile to support local testing using
the postgres docker
<!-- CURSOR_SUMMARY -->
---
> [!NOTE]
> Introduces a cron-driven external DB sync pipeline with global
sequencing, internal poller and webhook sync engine, new DB
tables/functions, config schema/mappings, and comprehensive e2e tests.
>
> - **Database (Prisma/Migrations)**:
> - Add global sequence (`global_seq_id`) and
`sequenceId`/`shouldUpdateSequenceId` to `ProjectUser`,
`ContactChannel`, `DeletedRow` with partial indexes.
> - Create `DeletedRow` (capture deletes) and `OutgoingRequest` (queue)
tables; add unique/indexes.
> - Add triggers/functions: `log_deleted_row`,
`reset_sequence_id_on_update`, `backfill_null_sequence_ids`,
`enqueue_tenant_sync`.
> - **Backend/API**:
> - New internal routes: `GET
/api/latest/internal/external-db-sync/sequencer`, `GET /poller`, `POST
/sync-engine` (Upstash-verified) for sync orchestration.
> - Add cron wiring: `vercel.json` schedules and local
`scripts/run-cron-jobs.ts`; start in dev via `dev` script.
> - Tweak route handler (remove noisy logging) without behavior change.
> - **Sync Engine**:
> - Implement `src/lib/external-db-sync.ts` to read tenant mappings and
upsert to external Postgres (schema bootstrap, param checks,
sequencing).
> - Add default mappings `DEFAULT_DB_SYNC_MAPPINGS` and config schema
`dbSync.externalDatabases` in shared config.
> - **Testing/Infra**:
> - Add extensive e2e tests (basics, advanced, race conditions) for
sequencing, idempotency, deletes, pagination, multi-mapping, and
permissions.
> - Docker compose: add `external-db-test` Postgres for tests; e2e deps
for `pg` types.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
3f2a8efcfb. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
* **New Features**
* External PostgreSQL sync: automatic, batched replication with
mappings, resume/idempotency, and on-demand enqueueing.
* **Admin UI**
* Real-time External DB Sync dashboard and status API showing
per-mapping backlog, sequencer/poller/sync-engine telemetry, and fusebox
controls.
* **Tests**
* Large e2e suite: basic, advanced, race, high-volume tests and test
utilities for external DB sync.
* **Chores**
* DB migrations, CI/workflow updates, background cron runner and
local/dev test support.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
---------
Co-authored-by: Konsti Wohlwend <n2d4xc@gmail.com>
Co-authored-by: Bilal Godil <bg2002@gmail.com>
166 lines
6.2 KiB
TypeScript
166 lines
6.2 KiB
TypeScript
import { getEnvVariable, getNodeEnvironment } from '@stackframe/stack-shared/dist/utils/env';
|
|
import { StackAssertionError } from '@stackframe/stack-shared/dist/utils/errors';
|
|
import { wait } from '@stackframe/stack-shared/dist/utils/promises';
|
|
import apiVersions from './generated/api-versions.json';
|
|
import routes from './generated/routes.json';
|
|
import './polyfills';
|
|
|
|
import type { NextRequest } from 'next/server';
|
|
import { NextResponse } from 'next/server';
|
|
import { SmartRouter } from './smart-router';
|
|
|
|
const DEV_RATE_LIMIT_MAX_REQUESTS = 100;
|
|
const DEV_RATE_LIMIT_WINDOW_MS = 10_000;
|
|
const devRateLimitTimestamps: number[] = [];
|
|
|
|
const corsAllowedRequestHeaders = [
|
|
// General
|
|
'content-type',
|
|
'authorization', // used for OAuth basic authentication
|
|
'x-stack-project-id',
|
|
'x-stack-branch-id',
|
|
'x-stack-override-error-status',
|
|
'x-stack-random-nonce', // used to forcefully disable some caches
|
|
'x-stack-client-version',
|
|
'x-stack-disable-artificial-development-delay',
|
|
|
|
// Project auth
|
|
'x-stack-access-type',
|
|
'x-stack-publishable-client-key',
|
|
'x-stack-secret-server-key',
|
|
'x-stack-super-secret-admin-key',
|
|
'x-stack-admin-access-token',
|
|
|
|
// User auth
|
|
'x-stack-refresh-token',
|
|
'x-stack-access-token',
|
|
'x-stack-allow-anonymous-user',
|
|
|
|
// Sentry
|
|
'baggage',
|
|
'sentry-trace',
|
|
|
|
// Vercel
|
|
'x-vercel-protection-bypass',
|
|
|
|
// ngrok
|
|
'ngrok-skip-browser-warning',
|
|
];
|
|
|
|
const corsAllowedResponseHeaders = [
|
|
'content-type',
|
|
'x-stack-actual-status',
|
|
'x-stack-known-error',
|
|
];
|
|
|
|
// This function can be marked `async` if using `await` inside
|
|
export async function proxy(request: NextRequest) {
|
|
const url = new URL(request.url);
|
|
const delay = +getEnvVariable('STACK_ARTIFICIAL_DEVELOPMENT_DELAY_MS', '0');
|
|
if (delay) {
|
|
if (getNodeEnvironment().includes('production')) {
|
|
throw new StackAssertionError('STACK_ARTIFICIAL_DEVELOPMENT_DELAY_MS environment variable is only allowed in development');
|
|
}
|
|
if (!request.headers.get('x-stack-disable-artificial-development-delay')) {
|
|
await wait(delay);
|
|
}
|
|
}
|
|
const isApiRequest = url.pathname.startsWith('/api/');
|
|
|
|
const corsHeadersInit = isApiRequest ? {
|
|
// CORS headers
|
|
"Access-Control-Allow-Origin": "*",
|
|
"Access-Control-Allow-Methods": "GET, POST, PUT, PATCH, DELETE, OPTIONS",
|
|
"Access-Control-Max-Age": "86400", // 1 day (capped to lower values, eg. 10min, by some browsers)
|
|
"Access-Control-Allow-Headers": corsAllowedRequestHeaders.join(', '),
|
|
"Access-Control-Expose-Headers": corsAllowedResponseHeaders.join(', '),
|
|
"Vary": corsAllowedRequestHeaders.join(', '),
|
|
} : undefined;
|
|
|
|
// ensure our clients can handle 429 responses
|
|
if (isApiRequest && !request.headers.get('x-stack-disable-artificial-development-delay') && getNodeEnvironment() === 'development' && request.method !== 'OPTIONS' && !request.url.includes(".well-known") && !request.url.includes("/api/latest/internal/external-db-sync/")) {
|
|
const now = Date.now();
|
|
while (devRateLimitTimestamps.length > 0 && now - devRateLimitTimestamps[0] > DEV_RATE_LIMIT_WINDOW_MS) {
|
|
devRateLimitTimestamps.shift();
|
|
}
|
|
if (devRateLimitTimestamps.length >= DEV_RATE_LIMIT_MAX_REQUESTS) {
|
|
const waitMs = Math.max(0, DEV_RATE_LIMIT_WINDOW_MS - (now - devRateLimitTimestamps[0]));
|
|
const retryAfterSeconds = Math.max(1, Math.ceil(waitMs / 1000));
|
|
|
|
const response = NextResponse.json({
|
|
message: 'Artificial development rate limit triggered. Wait before retrying.',
|
|
}, {
|
|
status: 429,
|
|
});
|
|
|
|
// since not all firewalls return CORS headers with their 429 responses, 50% chance that we don't set the CORS headers
|
|
if (Math.random() < 0.5 && corsHeadersInit) {
|
|
for (const [key, value] of Object.entries(corsHeadersInit)) {
|
|
response.headers.set(key, value);
|
|
}
|
|
}
|
|
|
|
if (Math.random() < 0.5) {
|
|
// for debugging, make sure we don't always set the Retry-After header
|
|
response.headers.set('Retry-After', retryAfterSeconds.toString());
|
|
}
|
|
|
|
return response;
|
|
} else {
|
|
devRateLimitTimestamps.push(now);
|
|
}
|
|
}
|
|
|
|
const newRequestHeaders = new Headers(request.headers);
|
|
// here we could update the request headers (currently we don't)
|
|
|
|
const responseInit = isApiRequest ? {
|
|
request: {
|
|
headers: newRequestHeaders,
|
|
},
|
|
headers: corsHeadersInit,
|
|
} as const : undefined;
|
|
|
|
// we want to allow preflight requests to pass through
|
|
// even if the API route does not implement OPTIONS
|
|
if (request.method === 'OPTIONS' && isApiRequest) {
|
|
return new Response(null, responseInit);
|
|
}
|
|
|
|
// if no route is available for the requested version, rewrite to newer version
|
|
let pathname = url.pathname;
|
|
outer: for (let i = 0; i < apiVersions.length - 1; i++) {
|
|
const version = apiVersions[i];
|
|
const nextVersion = apiVersions[i + 1];
|
|
if (!nextVersion.migrationFolder) {
|
|
throw new StackAssertionError(`No migration folder found for version ${nextVersion.name}. This is a bug because every version except the first should have a migration folder.`);
|
|
}
|
|
if ((pathname + "/").startsWith(version.servedRoute + "/")) {
|
|
const nextPathname = pathname.replace(version.servedRoute, nextVersion.servedRoute);
|
|
const migrationPathname = nextPathname.replace(nextVersion.servedRoute, nextVersion.migrationFolder);
|
|
// okay, we're in an API version of the current version. let's check if at least one route matches this URL (doesn't matter which)
|
|
for (const route of routes) {
|
|
if (nextVersion.migrationFolder && (route.normalizedPath + "/").startsWith(nextVersion.migrationFolder + "/")) {
|
|
if (SmartRouter.matchNormalizedPath(migrationPathname, route.normalizedPath)) {
|
|
// success! we found a route that matches the request
|
|
// rewrite request to the migration folder
|
|
pathname = migrationPathname;
|
|
break outer;
|
|
}
|
|
}
|
|
}
|
|
// if no route matches, rewrite to the next version
|
|
pathname = nextPathname;
|
|
}
|
|
}
|
|
|
|
const newUrl = request.nextUrl.clone();
|
|
newUrl.pathname = pathname;
|
|
return NextResponse.rewrite(newUrl, responseInit);
|
|
}
|
|
|
|
// See "Matching Paths" below to learn more
|
|
export const config = {
|
|
matcher: '/:path*',
|
|
};
|