mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-13 21:01:21 +08:00
clickhouse sync project api keys
This commit is contained in:
parent
10b2f19de1
commit
63524e261a
@ -0,0 +1,12 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "ProjectApiKey" ADD COLUMN "sequenceId" BIGINT,
|
||||
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "ProjectApiKey_sequenceId_key" ON "ProjectApiKey"("sequenceId");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "ProjectApiKey_tenancyId_sequenceId_idx" ON "ProjectApiKey"("tenancyId", "sequenceId");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "ProjectApiKey_shouldUpdateSequenceId_idx" ON "ProjectApiKey"("shouldUpdateSequenceId", "tenancyId");
|
||||
@ -711,7 +711,12 @@ model ProjectApiKey {
|
||||
projectUser ProjectUser? @relation(fields: [tenancyId, projectUserId], references: [tenancyId, projectUserId], onDelete: Cascade)
|
||||
team Team? @relation(fields: [tenancyId, teamId], references: [tenancyId, teamId], onDelete: Cascade)
|
||||
|
||||
sequenceId BigInt? @unique
|
||||
shouldUpdateSequenceId Boolean @default(true)
|
||||
|
||||
@@id([tenancyId, id])
|
||||
@@index([tenancyId, sequenceId], name: "ProjectApiKey_tenancyId_sequenceId_idx")
|
||||
@@index([shouldUpdateSequenceId, tenancyId], name: "ProjectApiKey_shouldUpdateSequenceId_idx")
|
||||
}
|
||||
|
||||
enum EmailTemplateType {
|
||||
|
||||
@ -26,6 +26,8 @@ export async function runClickhouseMigrations() {
|
||||
await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL });
|
||||
await client.exec({ query: SESSION_REPLAYS_TABLE_BASE_SQL });
|
||||
await client.exec({ query: SESSION_REPLAYS_VIEW_SQL });
|
||||
await client.exec({ query: PROJECT_API_KEYS_TABLE_BASE_SQL });
|
||||
await client.exec({ query: PROJECT_API_KEYS_VIEW_SQL });
|
||||
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
|
||||
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
|
||||
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
|
||||
@ -42,6 +44,7 @@ export async function runClickhouseMigrations() {
|
||||
"GRANT SELECT ON default.team_members TO limited_user;",
|
||||
"GRANT SELECT ON default.email_outboxes TO limited_user;",
|
||||
"GRANT SELECT ON default.session_replays TO limited_user;",
|
||||
"GRANT SELECT ON default.project_api_keys TO limited_user;",
|
||||
];
|
||||
await client.exec({
|
||||
query: "CREATE ROW POLICY IF NOT EXISTS events_project_isolation ON default.events FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
|
||||
@ -64,6 +67,9 @@ export async function runClickhouseMigrations() {
|
||||
await client.exec({
|
||||
query: "CREATE ROW POLICY IF NOT EXISTS session_replays_project_isolation ON default.session_replays FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
|
||||
});
|
||||
await client.exec({
|
||||
query: "CREATE ROW POLICY IF NOT EXISTS project_api_keys_project_isolation ON default.project_api_keys FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
|
||||
});
|
||||
for (const query of queries) {
|
||||
await client.exec({ query });
|
||||
}
|
||||
@ -447,6 +453,47 @@ FINAL
|
||||
WHERE sync_is_deleted = 0;
|
||||
`;
|
||||
|
||||
const PROJECT_API_KEYS_TABLE_BASE_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS analytics_internal.project_api_keys (
|
||||
project_id String,
|
||||
branch_id String,
|
||||
id UUID,
|
||||
description String,
|
||||
is_public UInt8,
|
||||
expires_at Nullable(DateTime64(3, 'UTC')),
|
||||
manually_revoked_at Nullable(DateTime64(3, 'UTC')),
|
||||
created_at DateTime64(3, 'UTC'),
|
||||
team_id Nullable(UUID),
|
||||
user_id Nullable(UUID),
|
||||
sync_sequence_id Int64,
|
||||
sync_is_deleted UInt8,
|
||||
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
|
||||
)
|
||||
ENGINE ReplacingMergeTree(sync_sequence_id)
|
||||
PARTITION BY toYYYYMM(created_at)
|
||||
ORDER BY (project_id, branch_id, id);
|
||||
`;
|
||||
|
||||
const PROJECT_API_KEYS_VIEW_SQL = `
|
||||
CREATE OR REPLACE VIEW default.project_api_keys
|
||||
SQL SECURITY DEFINER
|
||||
AS
|
||||
SELECT
|
||||
project_id,
|
||||
branch_id,
|
||||
id,
|
||||
description,
|
||||
is_public,
|
||||
expires_at,
|
||||
manually_revoked_at,
|
||||
created_at,
|
||||
team_id,
|
||||
user_id
|
||||
FROM analytics_internal.project_api_keys
|
||||
FINAL
|
||||
WHERE sync_is_deleted = 0;
|
||||
`;
|
||||
|
||||
const EXTERNAL_ANALYTICS_DB_SQL = `
|
||||
CREATE DATABASE IF NOT EXISTS analytics_internal;
|
||||
`;
|
||||
|
||||
@ -222,6 +222,34 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
const projectApiKeyTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "tenancyId", "id"
|
||||
FROM "ProjectApiKey"
|
||||
WHERE "shouldUpdateSequenceId" = TRUE
|
||||
ORDER BY "tenancyId"
|
||||
LIMIT ${batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
),
|
||||
updated_rows AS (
|
||||
UPDATE "ProjectApiKey" pak
|
||||
SET "sequenceId" = nextval('global_seq_id'),
|
||||
"shouldUpdateSequenceId" = FALSE
|
||||
FROM rows_to_update r
|
||||
WHERE pak."tenancyId" = r."tenancyId"
|
||||
AND pak."id" = r."id"
|
||||
RETURNING pak."tenancyId"
|
||||
)
|
||||
SELECT DISTINCT "tenancyId" FROM updated_rows
|
||||
`;
|
||||
|
||||
span.setAttribute("stack.external-db-sync.project-api-key-tenants", projectApiKeyTenants.length);
|
||||
|
||||
if (projectApiKeyTenants.length > 0) {
|
||||
await enqueueExternalDbSyncBatch(projectApiKeyTenants.map(t => t.tenancyId));
|
||||
didUpdate = true;
|
||||
}
|
||||
|
||||
const deletedRowTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
|
||||
WITH rows_to_update AS (
|
||||
SELECT "id", "tenancyId"
|
||||
@ -251,7 +279,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
|
||||
|
||||
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
|
||||
if (didUpdate) {
|
||||
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, DR=${deletedRowTenants.length}`);
|
||||
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, SR=${sessionReplayTenants.length}, PAK=${projectApiKeyTenants.length}, DR=${deletedRowTenants.length}`);
|
||||
}
|
||||
|
||||
return didUpdate;
|
||||
|
||||
@ -572,6 +572,10 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boo
|
||||
session_replays: {
|
||||
sync_is_deleted: 'boolean',
|
||||
},
|
||||
project_api_keys: {
|
||||
is_public: 'boolean',
|
||||
sync_is_deleted: 'boolean',
|
||||
},
|
||||
};
|
||||
|
||||
async function pushRowsToClickhouse(
|
||||
|
||||
@ -17,6 +17,7 @@ import {
|
||||
waitForSyncedDeletion,
|
||||
waitForSyncedEmailOutbox,
|
||||
waitForSyncedEmailOutboxByStatus,
|
||||
waitForSyncedProjectApiKey,
|
||||
waitForSyncedSessionReplay,
|
||||
waitForSyncedTeam,
|
||||
waitForSyncedTeamDeletion,
|
||||
@ -1186,6 +1187,112 @@ describe.sequential('External DB Sync - Basic Tests', () => {
|
||||
expect(row.last_event_at).toBeDefined();
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
/**
|
||||
* What it does:
|
||||
* - Creates a project with analytics, signs in a user, creates a user API key,
|
||||
* and verifies the project_api_keys row is synced to ClickHouse.
|
||||
*/
|
||||
test('ProjectApiKey sync (ClickHouse)', async ({ expect }) => {
|
||||
await Project.createAndSwitch({
|
||||
config: {
|
||||
magic_link_enabled: true,
|
||||
allow_user_api_keys: true,
|
||||
},
|
||||
});
|
||||
await Auth.Otp.signIn();
|
||||
|
||||
// Create a user API key
|
||||
const createRes = await niceBackendFetch("/api/v1/user-api-keys", {
|
||||
method: "POST",
|
||||
accessType: "client",
|
||||
body: {
|
||||
user_id: "me",
|
||||
description: "CH sync test key",
|
||||
expires_at_millis: null,
|
||||
},
|
||||
});
|
||||
expect(createRes.status).toBe(200);
|
||||
const apiKeyId = createRes.body.id;
|
||||
|
||||
await InternalApiKey.createAndSetProjectKeys();
|
||||
|
||||
// Poll ClickHouse until the project_api_keys row appears
|
||||
const timeoutMs = 180_000;
|
||||
const intervalMs = 2_000;
|
||||
const start = performance.now();
|
||||
|
||||
let response;
|
||||
while (performance.now() - start < timeoutMs) {
|
||||
response = await runQueryForCurrentProject({
|
||||
query: "SELECT id, description, is_public, created_at, user_id FROM project_api_keys WHERE id = {id:String}",
|
||||
params: { id: apiKeyId },
|
||||
});
|
||||
expect(response.status).toBe(200);
|
||||
if (response.body.result.length >= 1) {
|
||||
break;
|
||||
}
|
||||
await wait(intervalMs);
|
||||
}
|
||||
|
||||
expect(response!.body.result.length).toBeGreaterThanOrEqual(1);
|
||||
const row = response!.body.result[0];
|
||||
expect(row.description).toBe("CH sync test key");
|
||||
expect(row.user_id).toBeDefined();
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
/**
|
||||
* What it does:
|
||||
* - Creates a project with an external Postgres DB, signs in a user,
|
||||
* creates a user API key, verifies it syncs, then revokes it and
|
||||
* verifies the update is synced.
|
||||
*/
|
||||
test('ProjectApiKey CRUD sync (Postgres)', async () => {
|
||||
const dbName = 'project_api_key_pg_test';
|
||||
const connectionString = await dbManager.createDatabase(dbName);
|
||||
|
||||
await createProjectWithExternalDb({
|
||||
main: {
|
||||
type: 'postgres',
|
||||
connectionString,
|
||||
}
|
||||
}, {
|
||||
display_name: 'API Key Sync Test',
|
||||
config: {
|
||||
magic_link_enabled: true,
|
||||
allow_user_api_keys: true,
|
||||
},
|
||||
});
|
||||
|
||||
await Auth.Otp.signIn();
|
||||
|
||||
// Create a user API key
|
||||
const createRes = await niceBackendFetch("/api/v1/user-api-keys", {
|
||||
method: "POST",
|
||||
accessType: "client",
|
||||
body: {
|
||||
user_id: "me",
|
||||
description: "PG sync test key",
|
||||
expires_at_millis: null,
|
||||
},
|
||||
});
|
||||
expect(createRes.status).toBe(200);
|
||||
const apiKeyId = createRes.body.id;
|
||||
|
||||
const client = dbManager.getClient(dbName);
|
||||
|
||||
// Wait for the API key row to appear in external DB
|
||||
await waitForSyncedProjectApiKey(client, apiKeyId);
|
||||
|
||||
// Verify the synced row has expected columns
|
||||
const res = await client.query(`SELECT * FROM "project_api_keys" WHERE "id" = $1`, [apiKeyId]);
|
||||
expect(res.rows.length).toBe(1);
|
||||
const row = res.rows[0];
|
||||
expect(row.description).toBe("PG sync test key");
|
||||
expect(row.is_public).toBe(false);
|
||||
expect(row.user_id).toBeDefined();
|
||||
expect(row.created_at).toBeDefined();
|
||||
}, TEST_TIMEOUT);
|
||||
|
||||
/**
|
||||
* What it does:
|
||||
* - Reads the external DB sync fusebox settings.
|
||||
|
||||
@ -339,6 +339,30 @@ export async function waitForSyncedSessionReplay(client: Client, replayId: strin
|
||||
);
|
||||
}
|
||||
|
||||
export async function waitForSyncedProjectApiKey(client: Client, apiKeyId: string) {
|
||||
await waitForExternalDbRow(
|
||||
client,
|
||||
`SELECT * FROM "project_api_keys" WHERE "id" = $1`,
|
||||
[apiKeyId],
|
||||
{
|
||||
shouldExist: true,
|
||||
description: `project api key "${apiKeyId}" to appear in external DB`,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export async function waitForSyncedProjectApiKeyDeletion(client: Client, apiKeyId: string) {
|
||||
await waitForExternalDbRow(
|
||||
client,
|
||||
`SELECT * FROM "project_api_keys" WHERE "id" = $1`,
|
||||
[apiKeyId],
|
||||
{
|
||||
shouldExist: false,
|
||||
description: `project api key "${apiKeyId}" to be removed from external DB`,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export async function waitForSyncedEmailOutboxByStatus(client: Client, status: string) {
|
||||
await waitForExternalDbRow(
|
||||
client,
|
||||
|
||||
@ -1256,4 +1256,156 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
|
||||
`.trim(),
|
||||
},
|
||||
},
|
||||
"project_api_keys": {
|
||||
sourceTables: { "ProjectApiKey": "ProjectApiKey" },
|
||||
targetTable: "project_api_keys",
|
||||
targetTableSchemas: {
|
||||
postgres: `
|
||||
CREATE TABLE IF NOT EXISTS "project_api_keys" (
|
||||
"id" uuid PRIMARY KEY NOT NULL,
|
||||
"description" text NOT NULL,
|
||||
"is_public" boolean NOT NULL,
|
||||
"expires_at" timestamp without time zone,
|
||||
"manually_revoked_at" timestamp without time zone,
|
||||
"created_at" timestamp without time zone NOT NULL,
|
||||
"team_id" uuid,
|
||||
"user_id" uuid
|
||||
);
|
||||
REVOKE ALL ON "project_api_keys" FROM PUBLIC;
|
||||
GRANT SELECT ON "project_api_keys" TO PUBLIC;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
|
||||
"mapping_name" text PRIMARY KEY NOT NULL,
|
||||
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
|
||||
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
|
||||
);
|
||||
`.trim(),
|
||||
clickhouse: `
|
||||
CREATE TABLE IF NOT EXISTS analytics_internal.project_api_keys (
|
||||
project_id String,
|
||||
branch_id String,
|
||||
id UUID,
|
||||
description String,
|
||||
is_public UInt8,
|
||||
expires_at Nullable(DateTime64(3, 'UTC')),
|
||||
manually_revoked_at Nullable(DateTime64(3, 'UTC')),
|
||||
created_at DateTime64(3, 'UTC'),
|
||||
team_id Nullable(UUID),
|
||||
user_id Nullable(UUID),
|
||||
sync_sequence_id Int64,
|
||||
sync_is_deleted UInt8,
|
||||
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
|
||||
)
|
||||
ENGINE ReplacingMergeTree(sync_sequence_id)
|
||||
PARTITION BY toYYYYMM(created_at)
|
||||
ORDER BY (project_id, branch_id, id);
|
||||
`.trim(),
|
||||
},
|
||||
internalDbFetchQueries: {
|
||||
clickhouse: `
|
||||
SELECT
|
||||
"Tenancy"."projectId" AS "project_id",
|
||||
"Tenancy"."branchId" AS "branch_id",
|
||||
"ProjectApiKey"."id" AS "id",
|
||||
"ProjectApiKey"."description" AS "description",
|
||||
"ProjectApiKey"."isPublic" AS "is_public",
|
||||
"ProjectApiKey"."expiresAt" AS "expires_at",
|
||||
"ProjectApiKey"."manuallyRevokedAt" AS "manually_revoked_at",
|
||||
"ProjectApiKey"."createdAt" AS "created_at",
|
||||
"ProjectApiKey"."teamId" AS "team_id",
|
||||
"ProjectApiKey"."projectUserId" AS "user_id",
|
||||
"ProjectApiKey"."sequenceId" AS "sync_sequence_id",
|
||||
"ProjectApiKey"."tenancyId" AS "tenancyId",
|
||||
false AS "sync_is_deleted"
|
||||
FROM "ProjectApiKey"
|
||||
JOIN "Tenancy" ON "Tenancy"."id" = "ProjectApiKey"."tenancyId"
|
||||
WHERE "ProjectApiKey"."tenancyId" = $1::uuid
|
||||
AND "ProjectApiKey"."sequenceId" IS NOT NULL
|
||||
AND "ProjectApiKey"."sequenceId" > $2::bigint
|
||||
ORDER BY "ProjectApiKey"."sequenceId" ASC
|
||||
LIMIT 1000
|
||||
`.trim(),
|
||||
},
|
||||
internalDbFetchQuery: `
|
||||
SELECT
|
||||
"ProjectApiKey"."id" AS "id",
|
||||
"ProjectApiKey"."description" AS "description",
|
||||
"ProjectApiKey"."isPublic" AS "is_public",
|
||||
"ProjectApiKey"."expiresAt" AS "expires_at",
|
||||
"ProjectApiKey"."manuallyRevokedAt" AS "manually_revoked_at",
|
||||
"ProjectApiKey"."createdAt" AS "created_at",
|
||||
"ProjectApiKey"."teamId" AS "team_id",
|
||||
"ProjectApiKey"."projectUserId" AS "user_id",
|
||||
"ProjectApiKey"."sequenceId" AS "sequence_id",
|
||||
"ProjectApiKey"."tenancyId",
|
||||
false AS "is_deleted"
|
||||
FROM "ProjectApiKey"
|
||||
WHERE "ProjectApiKey"."tenancyId" = $1::uuid
|
||||
AND "ProjectApiKey"."sequenceId" IS NOT NULL
|
||||
AND "ProjectApiKey"."sequenceId" > $2::bigint
|
||||
ORDER BY "ProjectApiKey"."sequenceId" ASC
|
||||
LIMIT 1000
|
||||
`.trim(),
|
||||
externalDbUpdateQueries: {
|
||||
postgres: `
|
||||
WITH params AS (
|
||||
SELECT
|
||||
$1::uuid AS "id",
|
||||
$2::text AS "description",
|
||||
$3::boolean AS "is_public",
|
||||
$4::timestamp without time zone AS "expires_at",
|
||||
$5::timestamp without time zone AS "manually_revoked_at",
|
||||
$6::timestamp without time zone AS "created_at",
|
||||
$7::uuid AS "team_id",
|
||||
$8::uuid AS "user_id",
|
||||
$9::bigint AS "sequence_id",
|
||||
$10::boolean AS "is_deleted",
|
||||
$11::text AS "mapping_name"
|
||||
),
|
||||
deleted AS (
|
||||
DELETE FROM "project_api_keys" pak
|
||||
USING params p
|
||||
WHERE p."is_deleted" = true AND pak."id" = p."id"
|
||||
RETURNING 1
|
||||
),
|
||||
upserted AS (
|
||||
INSERT INTO "project_api_keys" (
|
||||
"id",
|
||||
"description",
|
||||
"is_public",
|
||||
"expires_at",
|
||||
"manually_revoked_at",
|
||||
"created_at",
|
||||
"team_id",
|
||||
"user_id"
|
||||
)
|
||||
SELECT
|
||||
p."id",
|
||||
p."description",
|
||||
p."is_public",
|
||||
p."expires_at",
|
||||
p."manually_revoked_at",
|
||||
p."created_at",
|
||||
p."team_id",
|
||||
p."user_id"
|
||||
FROM params p
|
||||
WHERE p."is_deleted" = false
|
||||
ON CONFLICT ("id") DO UPDATE SET
|
||||
"description" = EXCLUDED."description",
|
||||
"is_public" = EXCLUDED."is_public",
|
||||
"expires_at" = EXCLUDED."expires_at",
|
||||
"manually_revoked_at" = EXCLUDED."manually_revoked_at",
|
||||
"created_at" = EXCLUDED."created_at",
|
||||
"team_id" = EXCLUDED."team_id",
|
||||
"user_id" = EXCLUDED."user_id"
|
||||
RETURNING 1
|
||||
)
|
||||
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
|
||||
SELECT p."mapping_name", p."sequence_id", now() FROM params p
|
||||
ON CONFLICT ("mapping_name") DO UPDATE SET
|
||||
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
|
||||
"updated_at" = now();
|
||||
`.trim(),
|
||||
},
|
||||
},
|
||||
} as const;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user