diff --git a/apps/backend/prisma/migrations/20260316000003_add_project_api_key_sequence_columns/migration.sql b/apps/backend/prisma/migrations/20260316000003_add_project_api_key_sequence_columns/migration.sql new file mode 100644 index 000000000..d79e6fe0c --- /dev/null +++ b/apps/backend/prisma/migrations/20260316000003_add_project_api_key_sequence_columns/migration.sql @@ -0,0 +1,12 @@ +-- AlterTable +ALTER TABLE "ProjectApiKey" ADD COLUMN "sequenceId" BIGINT, +ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true; + +-- CreateIndex +CREATE UNIQUE INDEX "ProjectApiKey_sequenceId_key" ON "ProjectApiKey"("sequenceId"); + +-- CreateIndex +CREATE INDEX "ProjectApiKey_tenancyId_sequenceId_idx" ON "ProjectApiKey"("tenancyId", "sequenceId"); + +-- CreateIndex +CREATE INDEX "ProjectApiKey_shouldUpdateSequenceId_idx" ON "ProjectApiKey"("shouldUpdateSequenceId", "tenancyId"); diff --git a/apps/backend/prisma/schema.prisma b/apps/backend/prisma/schema.prisma index b79570597..baebf0bad 100644 --- a/apps/backend/prisma/schema.prisma +++ b/apps/backend/prisma/schema.prisma @@ -711,7 +711,12 @@ model ProjectApiKey { projectUser ProjectUser? @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade) team Team? @relation(fields: [tenancyId, teamId], references: [tenancyId, teamId], onDelete: Cascade) + sequenceId BigInt? @unique + shouldUpdateSequenceId Boolean @default(true) + @@id([tenancyId, id]) + @@index([tenancyId, sequenceId], name: "ProjectApiKey_tenancyId_sequenceId_idx") + @@index([shouldUpdateSequenceId, tenancyId], name: "ProjectApiKey_shouldUpdateSequenceId_idx") } enum EmailTemplateType { diff --git a/apps/backend/scripts/clickhouse-migrations.ts b/apps/backend/scripts/clickhouse-migrations.ts index f5cd50d1d..f69446cb6 100644 --- a/apps/backend/scripts/clickhouse-migrations.ts +++ b/apps/backend/scripts/clickhouse-migrations.ts @@ -26,6 +26,8 @@ export async function runClickhouseMigrations() { await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL }); await client.exec({ query: SESSION_REPLAYS_TABLE_BASE_SQL }); await client.exec({ query: SESSION_REPLAYS_VIEW_SQL }); + await client.exec({ query: PROJECT_API_KEYS_TABLE_BASE_SQL }); + await client.exec({ query: PROJECT_API_KEYS_VIEW_SQL }); await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL }); await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL }); await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL }); @@ -42,6 +44,7 @@ export async function runClickhouseMigrations() { "GRANT SELECT ON default.team_members TO limited_user;", "GRANT SELECT ON default.email_outboxes TO limited_user;", "GRANT SELECT ON default.session_replays TO limited_user;", + "GRANT SELECT ON default.project_api_keys TO limited_user;", ]; await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", @@ -64,6 +67,9 @@ export async function runClickhouseMigrations() { await client.exec({ query: "CREATE ROW POLICY IF NOT EXISTS session_replays_project_isolation ON default.session_replays FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", }); + await client.exec({ + query: "CREATE ROW POLICY IF NOT EXISTS project_api_keys_project_isolation ON default.project_api_keys FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user", + }); for (const query of queries) { await client.exec({ query }); } @@ -447,6 +453,47 @@ FINAL WHERE sync_is_deleted = 0; `; +const PROJECT_API_KEYS_TABLE_BASE_SQL = ` +CREATE TABLE IF NOT EXISTS analytics_internal.project_api_keys ( + project_id String, + branch_id String, + id UUID, + description String, + is_public UInt8, + expires_at Nullable(DateTime64(3, 'UTC')), + manually_revoked_at Nullable(DateTime64(3, 'UTC')), + created_at DateTime64(3, 'UTC'), + team_id Nullable(UUID), + user_id Nullable(UUID), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) +) +ENGINE ReplacingMergeTree(sync_sequence_id) +PARTITION BY toYYYYMM(created_at) +ORDER BY (project_id, branch_id, id); +`; + +const PROJECT_API_KEYS_VIEW_SQL = ` +CREATE OR REPLACE VIEW default.project_api_keys +SQL SECURITY DEFINER +AS +SELECT + project_id, + branch_id, + id, + description, + is_public, + expires_at, + manually_revoked_at, + created_at, + team_id, + user_id +FROM analytics_internal.project_api_keys +FINAL +WHERE sync_is_deleted = 0; +`; + const EXTERNAL_ANALYTICS_DB_SQL = ` CREATE DATABASE IF NOT EXISTS analytics_internal; `; diff --git a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts index e9d60ce88..c4e87d7ab 100644 --- a/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts +++ b/apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts @@ -222,6 +222,34 @@ async function backfillSequenceIds(batchSize: number): Promise { didUpdate = true; } + const projectApiKeyTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` + WITH rows_to_update AS ( + SELECT "tenancyId", "id" + FROM "ProjectApiKey" + WHERE "shouldUpdateSequenceId" = TRUE + ORDER BY "tenancyId" + LIMIT ${batchSize} + FOR UPDATE SKIP LOCKED + ), + updated_rows AS ( + UPDATE "ProjectApiKey" pak + SET "sequenceId" = nextval('global_seq_id'), + "shouldUpdateSequenceId" = FALSE + FROM rows_to_update r + WHERE pak."tenancyId" = r."tenancyId" + AND pak."id" = r."id" + RETURNING pak."tenancyId" + ) + SELECT DISTINCT "tenancyId" FROM updated_rows + `; + + span.setAttribute("stack.external-db-sync.project-api-key-tenants", projectApiKeyTenants.length); + + if (projectApiKeyTenants.length > 0) { + await enqueueExternalDbSyncBatch(projectApiKeyTenants.map(t => t.tenancyId)); + didUpdate = true; + } + const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>` WITH rows_to_update AS ( SELECT "id", "tenancyId" @@ -251,7 +279,7 @@ async function backfillSequenceIds(batchSize: number): Promise { span.setAttribute("stack.external-db-sync.did-update", didUpdate); if (didUpdate) { - console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, DR=${deletedRowTenants.length}`); + console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PAK=${projectApiKeyTenants.length}, DR=${deletedRowTenants.length}`); } return didUpdate; diff --git a/apps/backend/src/lib/external-db-sync.ts b/apps/backend/src/lib/external-db-sync.ts index ffd24b687..10e58fe6d 100644 --- a/apps/backend/src/lib/external-db-sync.ts +++ b/apps/backend/src/lib/external-db-sync.ts @@ -572,6 +572,10 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record { expect(row.last_event_at).toBeDefined(); }, TEST_TIMEOUT); + /** + * What it does: + * - Creates a project with analytics, signs in a user, creates a user API key, + * and verifies the project_api_keys row is synced to ClickHouse. + */ + test('ProjectApiKey sync (ClickHouse)', async ({ expect }) => { + await Project.createAndSwitch({ + config: { + magic_link_enabled: true, + allow_user_api_keys: true, + }, + }); + await Auth.Otp.signIn(); + + // Create a user API key + const createRes = await niceBackendFetch("/api/v1/user-api-keys", { + method: "POST", + accessType: "client", + body: { + user_id: "me", + description: "CH sync test key", + expires_at_millis: null, + }, + }); + expect(createRes.status).toBe(200); + const apiKeyId = createRes.body.id; + + await InternalApiKey.createAndSetProjectKeys(); + + // Poll ClickHouse until the project_api_keys row appears + const timeoutMs = 180_000; + const intervalMs = 2_000; + const start = performance.now(); + + let response; + while (performance.now() - start < timeoutMs) { + response = await runQueryForCurrentProject({ + query: "SELECT id, description, is_public, created_at, user_id FROM project_api_keys WHERE id = {id:String}", + params: { id: apiKeyId }, + }); + expect(response.status).toBe(200); + if (response.body.result.length >= 1) { + break; + } + await wait(intervalMs); + } + + expect(response!.body.result.length).toBeGreaterThanOrEqual(1); + const row = response!.body.result[0]; + expect(row.description).toBe("CH sync test key"); + expect(row.user_id).toBeDefined(); + }, TEST_TIMEOUT); + + /** + * What it does: + * - Creates a project with an external Postgres DB, signs in a user, + * creates a user API key, verifies it syncs, then revokes it and + * verifies the update is synced. + */ + test('ProjectApiKey CRUD sync (Postgres)', async () => { + const dbName = 'project_api_key_pg_test'; + const connectionString = await dbManager.createDatabase(dbName); + + await createProjectWithExternalDb({ + main: { + type: 'postgres', + connectionString, + } + }, { + display_name: 'API Key Sync Test', + config: { + magic_link_enabled: true, + allow_user_api_keys: true, + }, + }); + + await Auth.Otp.signIn(); + + // Create a user API key + const createRes = await niceBackendFetch("/api/v1/user-api-keys", { + method: "POST", + accessType: "client", + body: { + user_id: "me", + description: "PG sync test key", + expires_at_millis: null, + }, + }); + expect(createRes.status).toBe(200); + const apiKeyId = createRes.body.id; + + const client = dbManager.getClient(dbName); + + // Wait for the API key row to appear in external DB + await waitForSyncedProjectApiKey(client, apiKeyId); + + // Verify the synced row has expected columns + const res = await client.query(`SELECT * FROM "project_api_keys" WHERE "id" = $1`, [apiKeyId]); + expect(res.rows.length).toBe(1); + const row = res.rows[0]; + expect(row.description).toBe("PG sync test key"); + expect(row.is_public).toBe(false); + expect(row.user_id).toBeDefined(); + expect(row.created_at).toBeDefined(); + }, TEST_TIMEOUT); + /** * What it does: * - Reads the external DB sync fusebox settings. diff --git a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts index f594691f1..198f23424 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts @@ -339,6 +339,30 @@ export async function waitForSyncedSessionReplay(client: Client, replayId: strin ); } +export async function waitForSyncedProjectApiKey(client: Client, apiKeyId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "project_api_keys" WHERE "id" = $1`, + [apiKeyId], + { + shouldExist: true, + description: `project api key "${apiKeyId}" to appear in external DB`, + }, + ); +} + +export async function waitForSyncedProjectApiKeyDeletion(client: Client, apiKeyId: string) { + await waitForExternalDbRow( + client, + `SELECT * FROM "project_api_keys" WHERE "id" = $1`, + [apiKeyId], + { + shouldExist: false, + description: `project api key "${apiKeyId}" to be removed from external DB`, + }, + ); +} + export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) { await waitForExternalDbRow( client, diff --git a/packages/stack-shared/src/config/db-sync-mappings.ts b/packages/stack-shared/src/config/db-sync-mappings.ts index b9b206f7e..89da142f2 100644 --- a/packages/stack-shared/src/config/db-sync-mappings.ts +++ b/packages/stack-shared/src/config/db-sync-mappings.ts @@ -1256,4 +1256,156 @@ export const DEFAULT_DB_SYNC_MAPPINGS = { `.trim(), }, }, + "project_api_keys": { + sourceTables: { "ProjectApiKey": "ProjectApiKey" }, + targetTable: "project_api_keys", + targetTableSchemas: { + postgres: ` + CREATE TABLE IF NOT EXISTS "project_api_keys" ( + "id" uuid PRIMARY KEY NOT NULL, + "description" text NOT NULL, + "is_public" boolean NOT NULL, + "expires_at" timestamp without time zone, + "manually_revoked_at" timestamp without time zone, + "created_at" timestamp without time zone NOT NULL, + "team_id" uuid, + "user_id" uuid + ); + REVOKE ALL ON "project_api_keys" FROM PUBLIC; + GRANT SELECT ON "project_api_keys" TO PUBLIC; + + CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" ( + "mapping_name" text PRIMARY KEY NOT NULL, + "last_synced_sequence_id" bigint NOT NULL DEFAULT -1, + "updated_at" timestamp without time zone NOT NULL DEFAULT now() + ); + `.trim(), + clickhouse: ` + CREATE TABLE IF NOT EXISTS analytics_internal.project_api_keys ( + project_id String, + branch_id String, + id UUID, + description String, + is_public UInt8, + expires_at Nullable(DateTime64(3, 'UTC')), + manually_revoked_at Nullable(DateTime64(3, 'UTC')), + created_at DateTime64(3, 'UTC'), + team_id Nullable(UUID), + user_id Nullable(UUID), + sync_sequence_id Int64, + sync_is_deleted UInt8, + sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3) + ) + ENGINE ReplacingMergeTree(sync_sequence_id) + PARTITION BY toYYYYMM(created_at) + ORDER BY (project_id, branch_id, id); + `.trim(), + }, + internalDbFetchQueries: { + clickhouse: ` + SELECT + "Tenancy"."projectId" AS "project_id", + "Tenancy"."branchId" AS "branch_id", + "ProjectApiKey"."id" AS "id", + "ProjectApiKey"."description" AS "description", + "ProjectApiKey"."isPublic" AS "is_public", + "ProjectApiKey"."expiresAt" AS "expires_at", + "ProjectApiKey"."manuallyRevokedAt" AS "manually_revoked_at", + "ProjectApiKey"."createdAt" AS "created_at", + "ProjectApiKey"."teamId" AS "team_id", + "ProjectApiKey"."projectUserId" AS "user_id", + "ProjectApiKey"."sequenceId" AS "sync_sequence_id", + "ProjectApiKey"."tenancyId" AS "tenancyId", + false AS "sync_is_deleted" + FROM "ProjectApiKey" + JOIN "Tenancy" ON "Tenancy"."id" = "ProjectApiKey"."tenancyId" + WHERE "ProjectApiKey"."tenancyId" = $1::uuid + AND "ProjectApiKey"."sequenceId" IS NOT NULL + AND "ProjectApiKey"."sequenceId" > $2::bigint + ORDER BY "ProjectApiKey"."sequenceId" ASC + LIMIT 1000 + `.trim(), + }, + internalDbFetchQuery: ` + SELECT + "ProjectApiKey"."id" AS "id", + "ProjectApiKey"."description" AS "description", + "ProjectApiKey"."isPublic" AS "is_public", + "ProjectApiKey"."expiresAt" AS "expires_at", + "ProjectApiKey"."manuallyRevokedAt" AS "manually_revoked_at", + "ProjectApiKey"."createdAt" AS "created_at", + "ProjectApiKey"."teamId" AS "team_id", + "ProjectApiKey"."projectUserId" AS "user_id", + "ProjectApiKey"."sequenceId" AS "sequence_id", + "ProjectApiKey"."tenancyId", + false AS "is_deleted" + FROM "ProjectApiKey" + WHERE "ProjectApiKey"."tenancyId" = $1::uuid + AND "ProjectApiKey"."sequenceId" IS NOT NULL + AND "ProjectApiKey"."sequenceId" > $2::bigint + ORDER BY "ProjectApiKey"."sequenceId" ASC + LIMIT 1000 + `.trim(), + externalDbUpdateQueries: { + postgres: ` + WITH params AS ( + SELECT + $1::uuid AS "id", + $2::text AS "description", + $3::boolean AS "is_public", + $4::timestamp without time zone AS "expires_at", + $5::timestamp without time zone AS "manually_revoked_at", + $6::timestamp without time zone AS "created_at", + $7::uuid AS "team_id", + $8::uuid AS "user_id", + $9::bigint AS "sequence_id", + $10::boolean AS "is_deleted", + $11::text AS "mapping_name" + ), + deleted AS ( + DELETE FROM "project_api_keys" pak + USING params p + WHERE p."is_deleted" = true AND pak."id" = p."id" + RETURNING 1 + ), + upserted AS ( + INSERT INTO "project_api_keys" ( + "id", + "description", + "is_public", + "expires_at", + "manually_revoked_at", + "created_at", + "team_id", + "user_id" + ) + SELECT + p."id", + p."description", + p."is_public", + p."expires_at", + p."manually_revoked_at", + p."created_at", + p."team_id", + p."user_id" + FROM params p + WHERE p."is_deleted" = false + ON CONFLICT ("id") DO UPDATE SET + "description" = EXCLUDED."description", + "is_public" = EXCLUDED."is_public", + "expires_at" = EXCLUDED."expires_at", + "manually_revoked_at" = EXCLUDED."manually_revoked_at", + "created_at" = EXCLUDED."created_at", + "team_id" = EXCLUDED."team_id", + "user_id" = EXCLUDED."user_id" + RETURNING 1 + ) + INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at") + SELECT p."mapping_name", p."sequence_id", now() FROM params p + ON CONFLICT ("mapping_name") DO UPDATE SET + "last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"), + "updated_at" = now(); + `.trim(), + }, + }, } as const;