Merge remote-tracking branch 'origin/clickhouse-sync-team-member-and-invites' into clickhouse-sync-email-outbox-table

# Conflicts:
#	apps/backend/scripts/clickhouse-migrations.ts
#	apps/backend/src/app/api/latest/internal/external-db-sync/sequencer/route.ts
#	apps/backend/src/lib/external-db-sync.ts
#	apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-basics.test.ts
#	apps/e2e/tests/backend/endpoints/api/v1/external-db-sync-utils.ts
This commit is contained in:
Bilal Godil 2026-03-17 13:51:27 -07:00
commit a1417f64c6
14 changed files with 1309 additions and 43 deletions

View File

@ -0,0 +1,22 @@
-- AlterTable
ALTER TABLE "TeamMemberDirectPermission" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "TeamMemberDirectPermission_sequenceId_key" ON "TeamMemberDirectPermission"("sequenceId");
-- CreateIndex
CREATE INDEX "TeamMemberDirectPermission_shouldUpdateSequenceId_idx" ON "TeamMemberDirectPermission"("shouldUpdateSequenceId", "tenancyId");
-- CreateIndex
CREATE INDEX "TeamMemberDirectPermission_tenancyId_sequenceId_idx" ON "TeamMemberDirectPermission"("tenancyId", "sequenceId");
-- AlterTable
ALTER TABLE "VerificationCode" ADD COLUMN "sequenceId" BIGINT,
ADD COLUMN "shouldUpdateSequenceId" BOOLEAN NOT NULL DEFAULT true;
-- CreateIndex
CREATE UNIQUE INDEX "VerificationCode_sequenceId_key" ON "VerificationCode"("sequenceId");
-- CreateIndex
CREATE INDEX "VerificationCode_shouldUpdateSequenceId_type_idx" ON "VerificationCode"("shouldUpdateSequenceId", "type");

View File

@ -251,7 +251,12 @@ model TeamMemberDirectPermission {
teamMember TeamMember @relation(fields: [tenancyId, projectUserId, teamId], references: [tenancyId, projectUserId, teamId], onDelete: Cascade)
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
@@unique([tenancyId, projectUserId, teamId, permissionId])
@@index([shouldUpdateSequenceId, tenancyId], name: "TeamMemberDirectPermission_shouldUpdateSequenceId_idx")
@@index([tenancyId, sequenceId], name: "TeamMemberDirectPermission_tenancyId_sequenceId_idx")
}
model ProjectUser {
@ -647,9 +652,13 @@ model VerificationCode {
data Json
attemptCount Int @default(0)
sequenceId BigInt? @unique
shouldUpdateSequenceId Boolean @default(true)
@@id([projectId, branchId, id])
@@unique([projectId, branchId, code])
@@index([data(ops: JsonbPathOps)], type: Gin)
@@index([shouldUpdateSequenceId, type], name: "VerificationCode_shouldUpdateSequenceId_type_idx")
}
enum VerificationCodeType {

View File

@ -20,8 +20,12 @@ export async function runClickhouseMigrations() {
await client.exec({ query: CONTACT_CHANNELS_VIEW_SQL });
await client.exec({ query: TEAMS_TABLE_BASE_SQL });
await client.exec({ query: TEAMS_VIEW_SQL });
await client.exec({ query: TEAM_MEMBERS_TABLE_BASE_SQL });
await client.exec({ query: TEAM_MEMBERS_VIEW_SQL });
await client.exec({ query: TEAM_MEMBER_PROFILES_TABLE_BASE_SQL });
await client.exec({ query: TEAM_MEMBER_PROFILES_VIEW_SQL });
await client.exec({ query: TEAM_PERMISSIONS_TABLE_BASE_SQL });
await client.exec({ query: TEAM_PERMISSIONS_VIEW_SQL });
await client.exec({ query: TEAM_INVITATIONS_TABLE_BASE_SQL });
await client.exec({ query: TEAM_INVITATIONS_VIEW_SQL });
await client.exec({ query: EMAIL_OUTBOXES_TABLE_BASE_SQL });
await client.exec({ query: EMAIL_OUTBOXES_VIEW_SQL });
await client.exec({ query: EVENTS_ADD_REPLAY_COLUMNS_SQL });
@ -37,7 +41,9 @@ export async function runClickhouseMigrations() {
"GRANT SELECT ON default.users TO limited_user;",
"GRANT SELECT ON default.contact_channels TO limited_user;",
"GRANT SELECT ON default.teams TO limited_user;",
"GRANT SELECT ON default.team_members TO limited_user;",
"GRANT SELECT ON default.team_member_profiles TO limited_user;",
"GRANT SELECT ON default.team_permissions TO limited_user;",
"GRANT SELECT ON default.team_invitations TO limited_user;",
"GRANT SELECT ON default.email_outboxes TO limited_user;",
];
await client.exec({
@ -53,7 +59,13 @@ export async function runClickhouseMigrations() {
query: "CREATE ROW POLICY IF NOT EXISTS teams_project_isolation ON default.teams FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS team_members_project_isolation ON default.team_members FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
query: "CREATE ROW POLICY IF NOT EXISTS team_member_profiles_project_isolation ON default.team_member_profiles FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS team_permissions_project_isolation ON default.team_permissions FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS team_invitations_project_isolation ON default.team_invitations FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
});
await client.exec({
query: "CREATE ROW POLICY IF NOT EXISTS email_outboxes_project_isolation ON default.email_outboxes FOR SELECT USING project_id = getSetting('SQL_project_id') AND branch_id = getSetting('SQL_branch_id') TO limited_user",
@ -301,14 +313,15 @@ FINAL
WHERE sync_is_deleted = 0;
`;
const TEAM_MEMBERS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.team_members (
const TEAM_MEMBER_PROFILES_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles (
project_id String,
branch_id String,
team_id UUID,
user_id UUID,
display_name Nullable(String),
profile_image_url Nullable(String),
user JSON,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
@ -319,8 +332,8 @@ PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, team_id, user_id);
`;
const TEAM_MEMBERS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.team_members
const TEAM_MEMBER_PROFILES_VIEW_SQL = `
CREATE OR REPLACE VIEW default.team_member_profiles
SQL SECURITY DEFINER
AS
SELECT
@ -330,8 +343,79 @@ SELECT
user_id,
display_name,
profile_image_url,
user,
created_at
FROM analytics_internal.team_members
FROM analytics_internal.team_member_profiles
FINAL
WHERE sync_is_deleted = 0;
`;
const TEAM_PERMISSIONS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.team_permissions (
project_id String,
branch_id String,
team_id UUID,
user_id UUID,
permission_id String,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, team_id, user_id, permission_id);
`;
const TEAM_PERMISSIONS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.team_permissions
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
team_id,
user_id,
permission_id,
created_at
FROM analytics_internal.team_permissions
FINAL
WHERE sync_is_deleted = 0;
`;
const TEAM_INVITATIONS_TABLE_BASE_SQL = `
CREATE TABLE IF NOT EXISTS analytics_internal.team_invitations (
project_id String,
branch_id String,
id UUID,
team_id UUID,
team_display_name String,
recipient_email String,
expires_at_millis Int64,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`;
const TEAM_INVITATIONS_VIEW_SQL = `
CREATE OR REPLACE VIEW default.team_invitations
SQL SECURITY DEFINER
AS
SELECT
project_id,
branch_id,
id,
team_id,
team_display_name,
recipient_email,
expires_at_millis,
created_at
FROM analytics_internal.team_invitations
FINAL
WHERE sync_is_deleted = 0;
`;

View File

@ -1,5 +1,6 @@
import { getExternalDbSyncFusebox } from "@/lib/external-db-sync-metadata";
import { enqueueExternalDbSyncBatch } from "@/lib/external-db-sync-queue";
import { Prisma } from "@/generated/prisma/client";
import { globalPrismaClient } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { traceSpan } from "@/utils/telemetry";
@ -78,6 +79,23 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
if (projectUserTenants.length > 0) {
await enqueueExternalDbSyncBatch(projectUserTenants.map(t => t.tenancyId));
didUpdate = true;
// Cascade: when a user changes, mark their TeamMember rows for re-sync
// so the embedded user JSON in team_member_profiles stays fresh
await globalPrismaClient.$executeRaw`
UPDATE "TeamMember"
SET "shouldUpdateSequenceId" = TRUE
FROM (
SELECT DISTINCT "tenancyId", "projectUserId"
FROM "ProjectUser"
WHERE "tenancyId" IN (${Prisma.join(projectUserTenants.map(t => t.tenancyId))})
AND "shouldUpdateSequenceId" = FALSE
AND "sequenceId" IS NOT NULL
) AS changed_users
WHERE "TeamMember"."tenancyId" = changed_users."tenancyId"
AND "TeamMember"."projectUserId" = changed_users."projectUserId"
AND "TeamMember"."shouldUpdateSequenceId" = FALSE
`;
}
const contactChannelTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
@ -135,6 +153,25 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
if (teamTenants.length > 0) {
await enqueueExternalDbSyncBatch(teamTenants.map(t => t.tenancyId));
didUpdate = true;
// Cascade: when a team changes, mark related TEAM_INVITATION verification codes for re-sync
// so the team_display_name in team_invitations stays fresh
await globalPrismaClient.$executeRaw`
UPDATE "VerificationCode"
SET "shouldUpdateSequenceId" = TRUE
FROM (
SELECT DISTINCT "Tenancy"."projectId", "Tenancy"."branchId"
FROM "Team"
JOIN "Tenancy" ON "Tenancy"."id" = "Team"."tenancyId"
WHERE "Team"."tenancyId" IN (${Prisma.join(teamTenants.map(t => t.tenancyId))})
AND "Team"."shouldUpdateSequenceId" = FALSE
AND "Team"."sequenceId" IS NOT NULL
) AS changed_teams
WHERE "VerificationCode"."projectId" = changed_teams."projectId"
AND "VerificationCode"."branchId" = changed_teams."branchId"
AND "VerificationCode"."type" = 'TEAM_INVITATION'
AND "VerificationCode"."shouldUpdateSequenceId" = FALSE
`;
}
const teamMemberTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
@ -166,6 +203,66 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
didUpdate = true;
}
const teamPermissionTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "id"
FROM "TeamMemberDirectPermission"
WHERE "shouldUpdateSequenceId" = TRUE
ORDER BY "tenancyId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "TeamMemberDirectPermission" tp
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE tp."id" = r."id"
RETURNING tp."tenancyId"
)
SELECT DISTINCT "tenancyId" FROM updated_rows
`;
span.setAttribute("stack.external-db-sync.team-permission-tenants", teamPermissionTenants.length);
if (teamPermissionTenants.length > 0) {
await enqueueExternalDbSyncBatch(teamPermissionTenants.map(t => t.tenancyId));
didUpdate = true;
}
const teamInvitationTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "projectId", "branchId", "id"
FROM "VerificationCode"
WHERE "shouldUpdateSequenceId" = TRUE
AND "type" = 'TEAM_INVITATION'
ORDER BY "projectId", "branchId"
LIMIT ${batchSize}
FOR UPDATE SKIP LOCKED
),
updated_rows AS (
UPDATE "VerificationCode" vc
SET "sequenceId" = nextval('global_seq_id'),
"shouldUpdateSequenceId" = FALSE
FROM rows_to_update r
WHERE vc."projectId" = r."projectId"
AND vc."branchId" = r."branchId"
AND vc."id" = r."id"
RETURNING vc."projectId", vc."branchId"
)
SELECT DISTINCT "Tenancy"."id" AS "tenancyId"
FROM updated_rows
JOIN "Tenancy" ON "Tenancy"."projectId" = updated_rows."projectId"
AND "Tenancy"."branchId" = updated_rows."branchId"
`;
span.setAttribute("stack.external-db-sync.team-invitation-tenants", teamInvitationTenants.length);
if (teamInvitationTenants.length > 0) {
await enqueueExternalDbSyncBatch(teamInvitationTenants.map(t => t.tenancyId));
didUpdate = true;
}
const emailOutboxTenants = await globalPrismaClient.$queryRaw<{ tenancyId: string }[]>`
WITH rows_to_update AS (
SELECT "tenancyId", "id"
@ -223,7 +320,7 @@ async function backfillSequenceIds(batchSize: number): Promise<boolean> {
span.setAttribute("stack.external-db-sync.did-update", didUpdate);
if (didUpdate) {
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`);
console.log(`[Sequencer] Backfilled sequence IDs: USR=${projectUserTenants.length}, CC=${contactChannelTenants.length}, TM=${teamTenants.length}, TMB=${teamMemberTenants.length}, TP=${teamPermissionTenants.length}, TI=${teamInvitationTenants.length}, EO=${emailOutboxTenants.length}, DR=${deletedRowTenants.length}`);
}
return didUpdate;

View File

@ -1,4 +1,4 @@
import { recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { recordExternalDbSyncDeletion, recordExternalDbSyncTeamPermissionDeletionsForTeamMember, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { grantDefaultTeamPermissions } from "@/lib/permissions";
import { ensureTeamExists, ensureTeamMembershipDoesNotExist, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
@ -139,6 +139,12 @@ export const teamMembershipsCrudHandlers = createLazyProxy(() => createCrudHandl
userId: params.user_id,
});
await recordExternalDbSyncTeamPermissionDeletionsForTeamMember(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
teamId: params.team_id,
});
await recordExternalDbSyncDeletion(tx, {
tableName: "TeamMember",
tenancyId: auth.tenancy.id,

View File

@ -1,4 +1,4 @@
import { recordExternalDbSyncDeletion, recordExternalDbSyncTeamMemberDeletionsForTeam, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { recordExternalDbSyncDeletion, recordExternalDbSyncTeamInvitationDeletionsForTeam, recordExternalDbSyncTeamMemberDeletionsForTeam, recordExternalDbSyncTeamPermissionDeletionsForTeam, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { ensureTeamExists, ensureTeamMembershipExists, ensureUserExists, ensureUserTeamPermissionExists } from "@/lib/request-checks";
import { sendTeamCreatedWebhook, sendTeamDeletedWebhook, sendTeamUpdatedWebhook } from "@/lib/webhooks";
import { getPrismaClientForTenancy, retryTransaction } from "@/prisma-client";
@ -195,6 +195,16 @@ export const teamsCrudHandlers = createLazyProxy(() => createCrudHandlers(teamsC
}
await ensureTeamExists(tx, { tenancyId: auth.tenancy.id, teamId: params.team_id });
await recordExternalDbSyncTeamPermissionDeletionsForTeam(tx, {
tenancyId: auth.tenancy.id,
teamId: params.team_id,
});
await recordExternalDbSyncTeamInvitationDeletionsForTeam(tx, {
tenancyId: auth.tenancy.id,
teamId: params.team_id,
});
await recordExternalDbSyncTeamMemberDeletionsForTeam(tx, {
tenancyId: auth.tenancy.id,
teamId: params.team_id,

View File

@ -2,7 +2,7 @@ import { BooleanTrue, Prisma } from "@/generated/prisma/client";
import { getRenderedOrganizationConfigQuery, getRenderedProjectConfigQuery } from "@/lib/config";
import { demoteAllContactChannelsToNonPrimary, setContactChannelAsPrimaryByValue } from "@/lib/contact-channel";
import { normalizeEmail } from "@/lib/emails";
import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncTeamMemberDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { recordExternalDbSyncContactChannelDeletionsForUser, recordExternalDbSyncDeletion, recordExternalDbSyncTeamMemberDeletionsForUser, recordExternalDbSyncTeamPermissionDeletionsForUser, withExternalDbSyncUpdate } from "@/lib/external-db-sync";
import { grantDefaultProjectPermissions } from "@/lib/permissions";
import { ensureTeamMembershipExists, ensureUserExists } from "@/lib/request-checks";
import { Tenancy } from "@/lib/tenancies";
@ -1207,6 +1207,11 @@ export const usersCrudHandlers = createLazyProxy(() => createCrudHandlers(usersC
projectUserId: params.user_id,
});
await recordExternalDbSyncTeamPermissionDeletionsForUser(tx, {
tenancyId: auth.tenancy.id,
projectUserId: params.user_id,
});
await tx.projectUser.delete({
where: {
tenancyId_projectUserId: {

View File

@ -52,6 +52,18 @@ type ExternalDbSyncTarget =
tenancyId: string,
projectUserId: string,
teamId: string,
}
| {
tableName: "TeamMemberDirectPermission",
tenancyId: string,
permissionDbId: string,
}
| {
tableName: "VerificationCode_TEAM_INVITATION",
tenancyId: string,
verificationCodeProjectId: string,
verificationCodeBranchId: string,
verificationCodeId: string,
};
type ExternalDbType = NonNullable<NonNullable<CompleteConfig["dbSync"]["externalDatabases"][string]>["type"]>;
@ -203,8 +215,7 @@ export async function recordExternalDbSyncDeletion(
return;
}
{
const _teamMemberTarget: { tableName: "TeamMember" } = target;
if (target.tableName === "TeamMember") {
assertUuid(target.projectUserId, "projectUserId");
assertUuid(target.teamId, "teamId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
@ -239,6 +250,85 @@ export async function recordExternalDbSyncDeletion(
}
return;
}
if (target.tableName === "TeamMemberDirectPermission") {
assertUuid(target.permissionDbId, "permissionDbId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'TeamMemberDirectPermission',
jsonb_build_object(
'tenancyId', "tenancyId",
'projectUserId', "projectUserId",
'teamId', "teamId",
'permissionId', "permissionId"
),
to_jsonb("TeamMemberDirectPermission".*),
NOW(),
TRUE
FROM "TeamMemberDirectPermission"
WHERE "id" = ${target.permissionDbId}::uuid
FOR UPDATE
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for TeamMemberDirectPermission, got ${insertedCount}.`
);
}
return;
}
{
const _verificationCodeTarget: { tableName: "VerificationCode_TEAM_INVITATION" } = target;
assertNonEmptyString(target.verificationCodeProjectId, "verificationCodeProjectId");
assertNonEmptyString(target.verificationCodeBranchId, "verificationCodeBranchId");
assertUuid(target.verificationCodeId, "verificationCodeId");
const insertedCount = await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"Tenancy"."id",
'VerificationCode_TEAM_INVITATION',
jsonb_build_object('id', "VerificationCode"."id"),
to_jsonb("VerificationCode".*),
NOW(),
TRUE
FROM "VerificationCode"
JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId"
AND "Tenancy"."branchId" = "VerificationCode"."branchId"
WHERE "VerificationCode"."projectId" = ${target.verificationCodeProjectId}
AND "VerificationCode"."branchId" = ${target.verificationCodeBranchId}
AND "VerificationCode"."id" = ${target.verificationCodeId}::uuid
AND "VerificationCode"."type" = 'TEAM_INVITATION'
FOR UPDATE OF "VerificationCode"
`);
if (insertedCount !== 1) {
throw new StackAssertionError(
`Expected to insert 1 DeletedRow entry for VerificationCode_TEAM_INVITATION, got ${insertedCount}.`
);
}
return;
}
}
export async function recordExternalDbSyncContactChannelDeletionsForUser(
@ -318,6 +408,167 @@ export async function recordExternalDbSyncTeamMemberDeletionsForTeam(
`);
}
export async function recordExternalDbSyncTeamPermissionDeletionsForTeamMember(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
teamId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
assertUuid(options.teamId, "teamId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'TeamMemberDirectPermission',
jsonb_build_object(
'tenancyId', "tenancyId",
'projectUserId', "projectUserId",
'teamId', "teamId",
'permissionId', "permissionId"
),
to_jsonb("TeamMemberDirectPermission".*),
NOW(),
TRUE
FROM "TeamMemberDirectPermission"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "projectUserId" = ${options.projectUserId}::uuid
AND "teamId" = ${options.teamId}::uuid
FOR UPDATE
`);
}
export async function recordExternalDbSyncTeamPermissionDeletionsForTeam(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
teamId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.teamId, "teamId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'TeamMemberDirectPermission',
jsonb_build_object(
'tenancyId', "tenancyId",
'projectUserId', "projectUserId",
'teamId', "teamId",
'permissionId', "permissionId"
),
to_jsonb("TeamMemberDirectPermission".*),
NOW(),
TRUE
FROM "TeamMemberDirectPermission"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "teamId" = ${options.teamId}::uuid
FOR UPDATE
`);
}
export async function recordExternalDbSyncTeamPermissionDeletionsForUser(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
projectUserId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.projectUserId, "projectUserId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"tenancyId",
'TeamMemberDirectPermission',
jsonb_build_object(
'tenancyId', "tenancyId",
'projectUserId', "projectUserId",
'teamId', "teamId",
'permissionId', "permissionId"
),
to_jsonb("TeamMemberDirectPermission".*),
NOW(),
TRUE
FROM "TeamMemberDirectPermission"
WHERE "tenancyId" = ${options.tenancyId}::uuid
AND "projectUserId" = ${options.projectUserId}::uuid
FOR UPDATE
`);
}
export async function recordExternalDbSyncTeamInvitationDeletionsForTeam(
tx: ExternalDbSyncClient,
options: {
tenancyId: string,
teamId: string,
},
): Promise<void> {
assertUuid(options.tenancyId, "tenancyId");
assertUuid(options.teamId, "teamId");
await tx.$executeRaw(Prisma.sql`
INSERT INTO "DeletedRow" (
"id",
"tenancyId",
"tableName",
"primaryKey",
"data",
"deletedAt",
"shouldUpdateSequenceId"
)
SELECT
gen_random_uuid(),
"Tenancy"."id",
'VerificationCode_TEAM_INVITATION',
jsonb_build_object('id', "VerificationCode"."id"),
to_jsonb("VerificationCode".*),
NOW(),
TRUE
FROM "VerificationCode"
JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId"
AND "Tenancy"."branchId" = "VerificationCode"."branchId"
WHERE "Tenancy"."id" = ${options.tenancyId}::uuid
AND "VerificationCode"."type" = 'TEAM_INVITATION'
AND "VerificationCode"."data"->>'team_id' = ${options.teamId}
FOR UPDATE OF "VerificationCode"
`);
}
export async function recordExternalDbSyncTeamMemberDeletionsForUser(
tx: ExternalDbSyncClient,
options: {
@ -481,7 +732,7 @@ async function pushRowsToExternalDb(
}
}
function getInternalDbFetchQuery(mapping: DbSyncMapping, dbType: ExternalDbType) {
function getInternalDbFetchQuery(mapping: DbSyncMapping) {
return mapping.internalDbFetchQuery;
}
@ -510,7 +761,7 @@ function parseSequenceId(value: unknown, mappingId: string): number | null {
if (value == null) {
return null;
}
const seqNum = typeof value === "bigint" ? Number(value) : Number(value);
const seqNum = Number(value);
if (!Number.isFinite(seqNum)) {
throw new StackAssertionError(
`Invalid sequence_id for mapping ${mappingId}: ${JSON.stringify(value)}`
@ -536,8 +787,8 @@ async function ensureClickhouseSchema(
}
// Map of target table name -> column normalizers for ClickHouse
// 'json' columns get JSON.stringify, 'boolean' columns get normalizeClickhouseBoolean
const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boolean' | 'nullable_boolean'>> = {
// 'json' columns get JSON.stringify, 'boolean' columns get normalizeClickhouseBoolean, 'bigint' columns get Number()
const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boolean' | 'nullable_boolean' | 'bigint'>> = {
users: {
client_metadata: 'json',
client_read_only_metadata: 'json',
@ -559,7 +810,14 @@ const CLICKHOUSE_COLUMN_NORMALIZERS: Record<string, Record<string, 'json' | 'boo
server_metadata: 'json',
sync_is_deleted: 'boolean',
},
team_members: {
team_member_profiles: {
sync_is_deleted: 'boolean',
},
team_permissions: {
sync_is_deleted: 'boolean',
},
team_invitations: {
expires_at_millis: 'bigint',
sync_is_deleted: 'boolean',
},
email_outboxes: {
@ -643,6 +901,8 @@ async function pushRowsToClickhouse(
normalized[col] = JSON.stringify(normalized[col]);
} else if (type === 'nullable_boolean') {
normalized[col] = normalizeClickhouseNullableBoolean(normalized[col], col);
} else if (type === 'bigint') {
normalized[col] = Number(normalized[col]);
} else {
normalized[col] = normalizeClickhouseBoolean(normalized[col], col);
}
@ -735,7 +995,7 @@ async function syncPostgresMapping(
assertNonEmptyString(mappingId, "mappingId");
assertNonEmptyString(mapping.targetTable, "mapping.targetTable");
assertUuid(tenancyId, "tenancyId");
const fetchQuery = getInternalDbFetchQuery(mapping, "postgres");
const fetchQuery = getInternalDbFetchQuery(mapping);
const updateQuery = mapping.externalDbUpdateQueries.postgres;
const tableName = mapping.targetTable;
assertNonEmptyString(fetchQuery, "internalDbFetchQuery");

View File

@ -7,6 +7,7 @@ import { groupBy } from "@stackframe/stack-shared/dist/utils/arrays";
import { getOrUndefined, has, typedEntries, typedFromEntries } from "@stackframe/stack-shared/dist/utils/objects";
import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings";
import { overrideEnvironmentConfigOverride } from "./config";
import { recordExternalDbSyncDeletion, withExternalDbSyncUpdate } from "./external-db-sync";
import { Tenancy } from "./tenancies";
import { PrismaTransaction } from "./types";
@ -122,13 +123,13 @@ export async function grantTeamPermission(
permissionId: options.permissionId,
},
},
create: {
create: withExternalDbSyncUpdate({
tenancyId: options.tenancy.id,
projectUserId: options.userId,
teamId: options.teamId,
permissionId: options.permissionId,
},
update: {},
}),
update: withExternalDbSyncUpdate({}),
});
return {
@ -147,6 +148,24 @@ export async function revokeTeamPermission(
permissionId: string,
}
) {
const permissionRecord = await tx.teamMemberDirectPermission.findUniqueOrThrow({
where: {
tenancyId_projectUserId_teamId_permissionId: {
tenancyId: options.tenancy.id,
projectUserId: options.userId,
teamId: options.teamId,
permissionId: options.permissionId,
},
},
select: { id: true },
});
await recordExternalDbSyncDeletion(tx, {
tableName: "TeamMemberDirectPermission",
tenancyId: options.tenancy.id,
permissionDbId: permissionRecord.id,
});
await tx.teamMemberDirectPermission.delete({
where: {
tenancyId_projectUserId_teamId_permissionId: {

View File

@ -1,3 +1,4 @@
import { recordExternalDbSyncDeletion } from "@/lib/external-db-sync";
import { validateRedirectUrl } from "@/lib/redirect-urls";
import { getSoleTenancyFromProjectBranch, getTenancy, Tenancy } from "@/lib/tenancies";
import { globalPrismaClient } from "@/prisma-client";
@ -272,16 +273,27 @@ export function createVerificationCodeHandler<
return codes.map(code => createCodeObjectFromPrismaCode(code));
},
async revokeCode(options) {
const { project, branchId } = parseProjectBranchCombo(options);
async revokeCode(revokeOptions) {
const { project, branchId } = parseProjectBranchCombo(revokeOptions);
const tenancy = await getSoleTenancyFromProjectBranch(project.id, branchId);
// Record deletion for external DB sync if this is a TEAM_INVITATION code
if (options.type === 'TEAM_INVITATION') {
await recordExternalDbSyncDeletion(globalPrismaClient, {
tableName: "VerificationCode_TEAM_INVITATION",
tenancyId: tenancy.id,
verificationCodeProjectId: project.id,
verificationCodeBranchId: branchId,
verificationCodeId: revokeOptions.id,
});
}
await globalPrismaClient.verificationCode.delete({
where: {
projectId_branchId_id: {
projectId: project.id,
branchId,
id: options.id,
id: revokeOptions.id,
},
},
});

View File

@ -522,7 +522,12 @@ it("has limited grants", async ({ expect }) => {
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON SQLite FROM limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "REVOKE TABLE ENGINE ON URL FROM limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW DATABASES ON default.* TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.contact_channels TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.events TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_invitations TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_member_profiles TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.team_permissions TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.teams TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SHOW TABLES, SHOW COLUMNS, SELECT ON default.users TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SELECT ON system.aggregate_function_combinators TO limited_user" },
{ "GRANTS WITH IMPLICIT FINAL FORMAT JSONEachRow": "GRANT SELECT ON system.collations TO limited_user" },
@ -561,10 +566,30 @@ it("can see only some tables", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{
"database": "default",
"name": "contact_channels",
},
{
"database": "default",
"name": "events",
},
{
"database": "default",
"name": "team_invitations",
},
{
"database": "default",
"name": "team_member_profiles",
},
{
"database": "default",
"name": "team_permissions",
},
{
"database": "default",
"name": "teams",
},
{
"database": "default",
"name": "users",
@ -586,7 +611,12 @@ it("SHOW TABLES should have the correct tables", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{ "name": "contact_channels" },
{ "name": "events" },
{ "name": "team_invitations" },
{ "name": "team_member_profiles" },
{ "name": "team_permissions" },
{ "name": "teams" },
{ "name": "users" },
],
},
@ -1068,7 +1098,12 @@ it("shows grants", async ({ expect }) => {
"status": 200,
"body": {
"result": [
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.contact_channels TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.events TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_invitations TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_member_profiles TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.team_permissions TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.teams TO limited_user" },
{ "GRANTS FORMAT JSONEachRow": "GRANT SELECT ON default.users TO limited_user" },
],
},

View File

@ -18,8 +18,12 @@ import {
waitForSyncedEmailOutboxByStatus,
waitForSyncedTeam,
waitForSyncedTeamDeletion,
waitForSyncedTeamInvitation,
waitForSyncedTeamInvitationDeletion,
waitForSyncedTeamMember,
waitForSyncedTeamMemberDeletion,
waitForSyncedTeamPermission,
waitForSyncedTeamPermissionDeletion,
waitForTable
} from './external-db-sync-utils';
@ -714,7 +718,7 @@ describe.sequential('External DB Sync - Basic Tests', () => {
await waitForSyncedTeamMember(client, teamId, user.userId);
const res1 = await client.query(`SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, user.userId]);
const res1 = await client.query(`SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, user.userId]);
expect(res1.rows.length).toBe(1);
// Remove member
@ -840,6 +844,259 @@ describe.sequential('External DB Sync - Basic Tests', () => {
await waitForSyncedTeamMemberDeletion(client, teamId, user.userId);
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a team, adds a member, grants a permission, verifies in external DB,
* revokes the permission, and verifies removal.
*/
test('TeamPermission CRUD sync (Postgres)', async () => {
const dbName = 'team_permission_crud_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ primary_email: 'tp-crud@example.com' });
const createTeamResponse = await niceBackendFetch('/api/v1/teams', {
accessType: 'admin',
method: 'POST',
body: { display_name: 'TP CRUD Team' },
});
expect(createTeamResponse.status).toBe(201);
const teamId = createTeamResponse.body.id;
// Add user as team member
const addMemberResponse = await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, {
accessType: 'admin',
method: 'POST',
body: {},
});
expect(addMemberResponse.status).toBe(201);
// Grant a permission
const grantResponse = await niceBackendFetch(`/api/v1/team-permissions/${teamId}/${user.userId}/$read_members`, {
accessType: 'admin',
method: 'POST',
body: {},
});
expect(grantResponse.status).toBe(201);
await waitForSyncedTeamPermission(client, teamId, user.userId, '$read_members');
const res1 = await client.query(`SELECT * FROM "team_permissions" WHERE "team_id" = $1 AND "user_id" = $2 AND "permission_id" = $3`, [teamId, user.userId, '$read_members']);
expect(res1.rows.length).toBe(1);
// Revoke the permission
await niceBackendFetch(`/api/v1/team-permissions/${teamId}/${user.userId}/$read_members`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForSyncedTeamPermissionDeletion(client, teamId, user.userId, '$read_members');
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a team + member + permission, queries ClickHouse analytics API to verify.
*/
test('TeamPermission sync (ClickHouse)', async ({ expect }) => {
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const user = await User.create({ primary_email: 'tp-ch@example.com' });
const createTeamResponse = await niceBackendFetch('/api/v1/teams', {
accessType: 'admin',
method: 'POST',
body: { display_name: 'TP CH Team' },
});
expect(createTeamResponse.status).toBe(201);
const teamId = createTeamResponse.body.id;
await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, {
accessType: 'admin',
method: 'POST',
body: {},
});
await niceBackendFetch(`/api/v1/team-permissions/${teamId}/${user.userId}/$read_members`, {
accessType: 'admin',
method: 'POST',
body: {},
});
await InternalApiKey.createAndSetProjectKeys();
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT team_id, user_id, permission_id FROM team_permissions WHERE permission_id = {perm:String}",
params: { perm: '$read_members' },
});
expect(response.status).toBe(200);
if (response.body.result.length === 1) {
break;
}
await wait(intervalMs);
}
expect(response!.body.result.length).toBe(1);
expect(response!.body.result[0].permission_id).toBe('$read_members');
}, TEST_TIMEOUT);
/**
* What it does:
* - Sends a team invitation, verifies in external DB, revokes it, verifies removal.
*/
test('TeamInvitation sync (Postgres)', async () => {
const dbName = 'team_invitation_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
}, { display_name: 'Invitation Test Project' });
const client = dbManager.getClient(dbName);
const createTeamResponse = await niceBackendFetch('/api/v1/teams', {
accessType: 'admin',
method: 'POST',
body: { display_name: 'Invitation Team' },
});
expect(createTeamResponse.status).toBe(201);
const teamId = createTeamResponse.body.id;
// Send a team invitation
const inviteResponse = await niceBackendFetch('/api/v1/team-invitations/send-code', {
accessType: 'admin',
method: 'POST',
body: { team_id: teamId, email: 'invited@example.com', callback_url: 'http://localhost:12345/callback' },
});
expect(inviteResponse.status).toBe(200);
await waitForSyncedTeamInvitation(client, 'invited@example.com');
const res1 = await client.query(`SELECT * FROM "team_invitations" WHERE "recipient_email" = $1`, ['invited@example.com']);
expect(res1.rows.length).toBe(1);
expect(res1.rows[0].team_display_name).toBe('Invitation Team');
const invitationId = res1.rows[0].id;
// Revoke the invitation
await niceBackendFetch(`/api/v1/team-invitations/${invitationId}?team_id=${teamId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForSyncedTeamInvitationDeletion(client, invitationId);
}, TEST_TIMEOUT);
/**
* What it does:
* - Sends a team invitation, queries ClickHouse analytics API to verify.
*/
test('TeamInvitation sync (ClickHouse)', async ({ expect }) => {
await Project.createAndSwitch({ config: { magic_link_enabled: true } });
const createTeamResponse = await niceBackendFetch('/api/v1/teams', {
accessType: 'admin',
method: 'POST',
body: { display_name: 'CH Invitation Team' },
});
expect(createTeamResponse.status).toBe(201);
const teamId = createTeamResponse.body.id;
await niceBackendFetch('/api/v1/team-invitations/send-code', {
accessType: 'admin',
method: 'POST',
body: { team_id: teamId, email: 'ch-invited@example.com', callback_url: 'http://localhost:12345/callback' },
});
await InternalApiKey.createAndSetProjectKeys();
const timeoutMs = 180_000;
const intervalMs = 2_000;
const start = performance.now();
let response;
while (performance.now() - start < timeoutMs) {
response = await runQueryForCurrentProject({
query: "SELECT recipient_email, team_display_name FROM team_invitations WHERE recipient_email = {email:String}",
params: { email: 'ch-invited@example.com' },
});
expect(response.status).toBe(200);
if (response.body.result.length === 1) {
break;
}
await wait(intervalMs);
}
expect(response!.body.result.length).toBe(1);
expect(response!.body.result[0].recipient_email).toBe('ch-invited@example.com');
expect(response!.body.result[0].team_display_name).toBe('CH Invitation Team');
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a team with a member and permission, deletes the team,
* verifies team, member, and permissions are all gone.
*/
test('Cascade: Team delete removes permissions and invitations from external DB', async () => {
const dbName = 'cascade_team_perm_test';
const connectionString = await dbManager.createDatabase(dbName);
await createProjectWithExternalDb({
main: {
type: 'postgres',
connectionString,
}
});
const client = dbManager.getClient(dbName);
const user = await User.create({ primary_email: 'cascade-perm@example.com' });
const createTeamResponse = await niceBackendFetch('/api/v1/teams', {
accessType: 'admin',
method: 'POST',
body: { display_name: 'Cascade Perm Team' },
});
const teamId = createTeamResponse.body.id;
await niceBackendFetch(`/api/v1/team-memberships/${teamId}/${user.userId}`, {
accessType: 'admin',
method: 'POST',
body: {},
});
await niceBackendFetch(`/api/v1/team-permissions/${teamId}/${user.userId}/$read_members`, {
accessType: 'admin',
method: 'POST',
body: {},
});
await waitForSyncedTeamPermission(client, teamId, user.userId, '$read_members');
await waitForSyncedTeam(client, 'Cascade Perm Team');
// Delete the team — should cascade-delete permissions too
await niceBackendFetch(`/api/v1/teams/${teamId}`, {
accessType: 'admin',
method: 'DELETE',
});
await waitForSyncedTeamDeletion(client, teamId);
await waitForSyncedTeamPermissionDeletion(client, teamId, user.userId, '$read_members');
}, TEST_TIMEOUT);
/**
* What it does:
* - Creates a project with email config, sends an email, and verifies

View File

@ -287,14 +287,14 @@ export async function waitForSyncedTeamDeletion(client: Client, teamId: string)
}
export async function waitForSyncedTeamMember(client: Client, teamId: string, userId: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], {
await waitForExternalDbRow(client, `SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], {
shouldExist: true,
description: `team member (team=${teamId}, user=${userId}) to appear in external DB`,
});
}
export async function waitForSyncedTeamMemberDeletion(client: Client, teamId: string, userId: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_members" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], {
await waitForExternalDbRow(client, `SELECT * FROM "team_member_profiles" WHERE "team_id" = $1 AND "user_id" = $2`, [teamId, userId], {
shouldExist: false,
description: `team member (team=${teamId}, user=${userId}) to be removed from external DB`,
});
@ -314,6 +314,34 @@ export async function waitForSyncedContactChannelDeletion(client: Client, value:
});
}
export async function waitForSyncedTeamPermission(client: Client, teamId: string, userId: string, permissionId: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_permissions" WHERE "team_id" = $1 AND "user_id" = $2 AND "permission_id" = $3`, [teamId, userId, permissionId], {
shouldExist: true,
description: `team permission (team=${teamId}, user=${userId}, perm=${permissionId}) to appear in external DB`,
});
}
export async function waitForSyncedTeamPermissionDeletion(client: Client, teamId: string, userId: string, permissionId: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_permissions" WHERE "team_id" = $1 AND "user_id" = $2 AND "permission_id" = $3`, [teamId, userId, permissionId], {
shouldExist: false,
description: `team permission (team=${teamId}, user=${userId}, perm=${permissionId}) to be removed from external DB`,
});
}
export async function waitForSyncedTeamInvitation(client: Client, recipientEmail: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_invitations" WHERE "recipient_email" = $1`, [recipientEmail], {
shouldExist: true,
description: `team invitation for "${recipientEmail}" to appear in external DB`,
});
}
export async function waitForSyncedTeamInvitationDeletion(client: Client, invitationId: string) {
await waitForExternalDbRow(client, `SELECT * FROM "team_invitations" WHERE "id" = $1`, [invitationId], {
shouldExist: false,
description: `team invitation ${invitationId} to be removed from external DB`,
});
}
export async function waitForSyncedEmailOutbox(client: Client, emailId: string, expectedStatus?: string) {
await waitForExternalDbRow(
client,

View File

@ -663,21 +663,22 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
`.trim(),
},
},
"team_members": {
sourceTables: { "TeamMember": "TeamMember" },
targetTable: "team_members",
"team_member_profiles": {
sourceTables: { "TeamMember": "TeamMember", "ProjectUser": "ProjectUser" },
targetTable: "team_member_profiles",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "team_members" (
CREATE TABLE IF NOT EXISTS "team_member_profiles" (
"team_id" uuid NOT NULL,
"user_id" uuid NOT NULL,
"display_name" text,
"profile_image_url" text,
"user" jsonb NOT NULL DEFAULT '{}'::jsonb,
"created_at" timestamp without time zone NOT NULL,
PRIMARY KEY ("team_id", "user_id")
);
REVOKE ALL ON "team_members" FROM PUBLIC;
GRANT SELECT ON "team_members" TO PUBLIC;
REVOKE ALL ON "team_member_profiles" FROM PUBLIC;
GRANT SELECT ON "team_member_profiles" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
@ -686,13 +687,14 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.team_members (
CREATE TABLE IF NOT EXISTS analytics_internal.team_member_profiles (
project_id String,
branch_id String,
team_id UUID,
user_id UUID,
display_name Nullable(String),
profile_image_url Nullable(String),
user JSON,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
@ -714,12 +716,46 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
"TeamMember"."projectUserId" AS "user_id",
"TeamMember"."displayName" AS "display_name",
"TeamMember"."profileImageUrl" AS "profile_image_url",
jsonb_build_object(
'id', "ProjectUser"."projectUserId",
'display_name', "ProjectUser"."displayName",
'primary_email', (
SELECT "ContactChannel"."value"
FROM "ContactChannel"
WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId"
AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId"
AND "ContactChannel"."type" = 'EMAIL'
AND "ContactChannel"."isPrimary" = 'TRUE'
LIMIT 1
),
'primary_email_verified', COALESCE(
(
SELECT "ContactChannel"."isVerified"
FROM "ContactChannel"
WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId"
AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId"
AND "ContactChannel"."type" = 'EMAIL'
AND "ContactChannel"."isPrimary" = 'TRUE'
LIMIT 1
),
false
),
'profile_image_url', "ProjectUser"."profileImageUrl",
'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000,
'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb),
'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb),
'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb),
'is_anonymous', "ProjectUser"."isAnonymous",
'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."lastActiveAt") * 1000 ELSE NULL END
) AS "user",
"TeamMember"."createdAt" AS "created_at",
"TeamMember"."sequenceId" AS "sync_sequence_id",
"TeamMember"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "TeamMember"
JOIN "Tenancy" ON "Tenancy"."id" = "TeamMember"."tenancyId"
JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId"
AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId"
WHERE "TeamMember"."tenancyId" = $1::uuid
UNION ALL
@ -731,6 +767,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id",
NULL::text AS "display_name",
NULL::text AS "profile_image_url",
'{}'::jsonb AS "user",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sync_sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
@ -755,11 +792,45 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
"TeamMember"."projectUserId" AS "user_id",
"TeamMember"."displayName" AS "display_name",
"TeamMember"."profileImageUrl" AS "profile_image_url",
jsonb_build_object(
'id', "ProjectUser"."projectUserId",
'display_name', "ProjectUser"."displayName",
'primary_email', (
SELECT "ContactChannel"."value"
FROM "ContactChannel"
WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId"
AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId"
AND "ContactChannel"."type" = 'EMAIL'
AND "ContactChannel"."isPrimary" = 'TRUE'
LIMIT 1
),
'primary_email_verified', COALESCE(
(
SELECT "ContactChannel"."isVerified"
FROM "ContactChannel"
WHERE "ContactChannel"."projectUserId" = "ProjectUser"."projectUserId"
AND "ContactChannel"."tenancyId" = "ProjectUser"."tenancyId"
AND "ContactChannel"."type" = 'EMAIL'
AND "ContactChannel"."isPrimary" = 'TRUE'
LIMIT 1
),
false
),
'profile_image_url', "ProjectUser"."profileImageUrl",
'signed_up_at_millis', EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000,
'client_metadata', COALESCE("ProjectUser"."clientMetadata", '{}'::jsonb),
'client_read_only_metadata', COALESCE("ProjectUser"."clientReadOnlyMetadata", '{}'::jsonb),
'server_metadata', COALESCE("ProjectUser"."serverMetadata", '{}'::jsonb),
'is_anonymous', "ProjectUser"."isAnonymous",
'last_active_at_millis', CASE WHEN "ProjectUser"."lastActiveAt" IS NOT NULL THEN EXTRACT(EPOCH FROM "ProjectUser"."createdAt") * 1000 ELSE NULL END
) AS "user",
"TeamMember"."createdAt" AS "created_at",
"TeamMember"."sequenceId" AS "sequence_id",
"TeamMember"."tenancyId",
false AS "is_deleted"
FROM "TeamMember"
JOIN "ProjectUser" ON "ProjectUser"."projectUserId" = "TeamMember"."projectUserId"
AND "ProjectUser"."tenancyId" = "TeamMember"."tenancyId"
WHERE "TeamMember"."tenancyId" = $1::uuid
UNION ALL
@ -769,6 +840,7 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id",
NULL::text AS "display_name",
NULL::text AS "profile_image_url",
'{}'::jsonb AS "user",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sequence_id",
"DeletedRow"."tenancyId",
@ -791,23 +863,25 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
$2::uuid AS "user_id",
$3::text AS "display_name",
$4::text AS "profile_image_url",
$5::timestamp without time zone AS "created_at",
$6::bigint AS "sequence_id",
$7::boolean AS "is_deleted",
$8::text AS "mapping_name"
$5::jsonb AS "user",
$6::timestamp without time zone AS "created_at",
$7::bigint AS "sequence_id",
$8::boolean AS "is_deleted",
$9::text AS "mapping_name"
),
deleted AS (
DELETE FROM "team_members" tm
DELETE FROM "team_member_profiles" tm
USING params p
WHERE p."is_deleted" = true AND tm."team_id" = p."team_id" AND tm."user_id" = p."user_id"
RETURNING 1
),
upserted AS (
INSERT INTO "team_members" (
INSERT INTO "team_member_profiles" (
"team_id",
"user_id",
"display_name",
"profile_image_url",
"user",
"created_at"
)
SELECT
@ -815,12 +889,360 @@ export const DEFAULT_DB_SYNC_MAPPINGS = {
p."user_id",
p."display_name",
p."profile_image_url",
p."user",
p."created_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("team_id", "user_id") DO UPDATE SET
"display_name" = EXCLUDED."display_name",
"profile_image_url" = EXCLUDED."profile_image_url",
"user" = EXCLUDED."user",
"created_at" = EXCLUDED."created_at"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
"team_permissions": {
sourceTables: { "TeamMemberDirectPermission": "TeamMemberDirectPermission" },
targetTable: "team_permissions",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "team_permissions" (
"team_id" uuid NOT NULL,
"user_id" uuid NOT NULL,
"permission_id" text NOT NULL,
"created_at" timestamp without time zone NOT NULL,
PRIMARY KEY ("team_id", "user_id", "permission_id")
);
REVOKE ALL ON "team_permissions" FROM PUBLIC;
GRANT SELECT ON "team_permissions" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.team_permissions (
project_id String,
branch_id String,
team_id UUID,
user_id UUID,
permission_id String,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, team_id, user_id, permission_id);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT *
FROM (
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"TeamMemberDirectPermission"."teamId" AS "team_id",
"TeamMemberDirectPermission"."projectUserId" AS "user_id",
"TeamMemberDirectPermission"."permissionId" AS "permission_id",
"TeamMemberDirectPermission"."createdAt" AS "created_at",
"TeamMemberDirectPermission"."sequenceId" AS "sync_sequence_id",
"TeamMemberDirectPermission"."tenancyId" AS "tenancyId",
false AS "sync_is_deleted"
FROM "TeamMemberDirectPermission"
JOIN "Tenancy" ON "Tenancy"."id" = "TeamMemberDirectPermission"."tenancyId"
WHERE "TeamMemberDirectPermission"."tenancyId" = $1::uuid
UNION ALL
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id",
("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id",
"DeletedRow"."primaryKey"->>'permissionId' AS "permission_id",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sync_sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
true AS "sync_is_deleted"
FROM "DeletedRow"
JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'TeamMemberDirectPermission'
) AS "_src"
WHERE "sync_sequence_id" IS NOT NULL
AND "sync_sequence_id" > $2::bigint
ORDER BY "sync_sequence_id" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT *
FROM (
SELECT
"TeamMemberDirectPermission"."teamId" AS "team_id",
"TeamMemberDirectPermission"."projectUserId" AS "user_id",
"TeamMemberDirectPermission"."permissionId" AS "permission_id",
"TeamMemberDirectPermission"."createdAt" AS "created_at",
"TeamMemberDirectPermission"."sequenceId" AS "sequence_id",
"TeamMemberDirectPermission"."tenancyId",
false AS "is_deleted"
FROM "TeamMemberDirectPermission"
WHERE "TeamMemberDirectPermission"."tenancyId" = $1::uuid
UNION ALL
SELECT
("DeletedRow"."primaryKey"->>'teamId')::uuid AS "team_id",
("DeletedRow"."primaryKey"->>'projectUserId')::uuid AS "user_id",
"DeletedRow"."primaryKey"->>'permissionId' AS "permission_id",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sequence_id",
"DeletedRow"."tenancyId",
true AS "is_deleted"
FROM "DeletedRow"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'TeamMemberDirectPermission'
) AS "_src"
WHERE "sequence_id" IS NOT NULL
AND "sequence_id" > $2::bigint
ORDER BY "sequence_id" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "team_id",
$2::uuid AS "user_id",
$3::text AS "permission_id",
$4::timestamp without time zone AS "created_at",
$5::bigint AS "sequence_id",
$6::boolean AS "is_deleted",
$7::text AS "mapping_name"
),
deleted AS (
DELETE FROM "team_permissions" tp
USING params p
WHERE p."is_deleted" = true AND tp."team_id" = p."team_id" AND tp."user_id" = p."user_id" AND tp."permission_id" = p."permission_id"
RETURNING 1
),
upserted AS (
INSERT INTO "team_permissions" (
"team_id",
"user_id",
"permission_id",
"created_at"
)
SELECT
p."team_id",
p."user_id",
p."permission_id",
p."created_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("team_id", "user_id", "permission_id") DO UPDATE SET
"created_at" = EXCLUDED."created_at"
RETURNING 1
)
INSERT INTO "_stack_sync_metadata" ("mapping_name", "last_synced_sequence_id", "updated_at")
SELECT p."mapping_name", p."sequence_id", now() FROM params p
ON CONFLICT ("mapping_name") DO UPDATE SET
"last_synced_sequence_id" = GREATEST("_stack_sync_metadata"."last_synced_sequence_id", EXCLUDED."last_synced_sequence_id"),
"updated_at" = now();
`.trim(),
},
},
"team_invitations": {
sourceTables: { "VerificationCode": "VerificationCode" },
targetTable: "team_invitations",
targetTableSchemas: {
postgres: `
CREATE TABLE IF NOT EXISTS "team_invitations" (
"id" uuid PRIMARY KEY NOT NULL,
"team_id" uuid NOT NULL,
"team_display_name" text NOT NULL,
"recipient_email" text NOT NULL,
"expires_at_millis" bigint NOT NULL,
"created_at" timestamp without time zone NOT NULL
);
REVOKE ALL ON "team_invitations" FROM PUBLIC;
GRANT SELECT ON "team_invitations" TO PUBLIC;
CREATE TABLE IF NOT EXISTS "_stack_sync_metadata" (
"mapping_name" text PRIMARY KEY NOT NULL,
"last_synced_sequence_id" bigint NOT NULL DEFAULT -1,
"updated_at" timestamp without time zone NOT NULL DEFAULT now()
);
`.trim(),
clickhouse: `
CREATE TABLE IF NOT EXISTS analytics_internal.team_invitations (
project_id String,
branch_id String,
id UUID,
team_id UUID,
team_display_name String,
recipient_email String,
expires_at_millis Int64,
created_at DateTime64(3, 'UTC'),
sync_sequence_id Int64,
sync_is_deleted UInt8,
sync_created_at DateTime64(3, 'UTC') DEFAULT now64(3)
)
ENGINE ReplacingMergeTree(sync_sequence_id)
PARTITION BY toYYYYMM(created_at)
ORDER BY (project_id, branch_id, id);
`.trim(),
},
internalDbFetchQueries: {
clickhouse: `
SELECT *
FROM (
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
"VerificationCode"."id"::uuid AS "id",
("VerificationCode"."data"->>'team_id')::uuid AS "team_id",
"Team"."displayName" AS "team_display_name",
"VerificationCode"."method"->>'email' AS "recipient_email",
FLOOR(EXTRACT(EPOCH FROM "VerificationCode"."expiresAt") * 1000)::bigint AS "expires_at_millis",
"VerificationCode"."createdAt" AS "created_at",
"VerificationCode"."sequenceId" AS "sync_sequence_id",
"Tenancy"."id" AS "tenancyId",
false AS "sync_is_deleted"
FROM "VerificationCode"
JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId"
AND "Tenancy"."branchId" = "VerificationCode"."branchId"
LEFT JOIN "Team" ON "Team"."teamId" = ("VerificationCode"."data"->>'team_id')::uuid
AND "Team"."tenancyId" = "Tenancy"."id"
WHERE "Tenancy"."id" = $1::uuid
AND "VerificationCode"."type" = 'TEAM_INVITATION'
UNION ALL
SELECT
"Tenancy"."projectId" AS "project_id",
"Tenancy"."branchId" AS "branch_id",
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
'00000000-0000-0000-0000-000000000000'::uuid AS "team_id",
''::text AS "team_display_name",
''::text AS "recipient_email",
0::bigint AS "expires_at_millis",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sync_sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
true AS "sync_is_deleted"
FROM "DeletedRow"
JOIN "Tenancy" ON "Tenancy"."id" = "DeletedRow"."tenancyId"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'VerificationCode_TEAM_INVITATION'
) AS "_src"
WHERE "sync_sequence_id" IS NOT NULL
AND "sync_sequence_id" > $2::bigint
ORDER BY "sync_sequence_id" ASC
LIMIT 1000
`.trim(),
},
internalDbFetchQuery: `
SELECT *
FROM (
SELECT
"VerificationCode"."id"::uuid AS "id",
("VerificationCode"."data"->>'team_id')::uuid AS "team_id",
"Team"."displayName" AS "team_display_name",
"VerificationCode"."method"->>'email' AS "recipient_email",
FLOOR(EXTRACT(EPOCH FROM "VerificationCode"."expiresAt") * 1000)::bigint AS "expires_at_millis",
"VerificationCode"."createdAt" AS "created_at",
"VerificationCode"."sequenceId" AS "sequence_id",
"Tenancy"."id" AS "tenancyId",
false AS "is_deleted"
FROM "VerificationCode"
JOIN "Tenancy" ON "Tenancy"."projectId" = "VerificationCode"."projectId"
AND "Tenancy"."branchId" = "VerificationCode"."branchId"
LEFT JOIN "Team" ON "Team"."teamId" = ("VerificationCode"."data"->>'team_id')::uuid
AND "Team"."tenancyId" = "Tenancy"."id"
WHERE "Tenancy"."id" = $1::uuid
AND "VerificationCode"."type" = 'TEAM_INVITATION'
UNION ALL
SELECT
("DeletedRow"."primaryKey"->>'id')::uuid AS "id",
'00000000-0000-0000-0000-000000000000'::uuid AS "team_id",
''::text AS "team_display_name",
''::text AS "recipient_email",
0::bigint AS "expires_at_millis",
"DeletedRow"."deletedAt"::timestamp without time zone AS "created_at",
"DeletedRow"."sequenceId" AS "sequence_id",
"DeletedRow"."tenancyId" AS "tenancyId",
true AS "is_deleted"
FROM "DeletedRow"
WHERE
"DeletedRow"."tenancyId" = $1::uuid
AND "DeletedRow"."tableName" = 'VerificationCode_TEAM_INVITATION'
) AS "_src"
WHERE "sequence_id" IS NOT NULL
AND "sequence_id" > $2::bigint
ORDER BY "sequence_id" ASC
LIMIT 1000
`.trim(),
externalDbUpdateQueries: {
postgres: `
WITH params AS (
SELECT
$1::uuid AS "id",
$2::uuid AS "team_id",
$3::text AS "team_display_name",
$4::text AS "recipient_email",
$5::bigint AS "expires_at_millis",
$6::timestamp without time zone AS "created_at",
$7::bigint AS "sequence_id",
$8::boolean AS "is_deleted",
$9::text AS "mapping_name"
),
deleted AS (
DELETE FROM "team_invitations" ti
USING params p
WHERE p."is_deleted" = true AND ti."id" = p."id"
RETURNING 1
),
upserted AS (
INSERT INTO "team_invitations" (
"id",
"team_id",
"team_display_name",
"recipient_email",
"expires_at_millis",
"created_at"
)
SELECT
p."id",
p."team_id",
p."team_display_name",
p."recipient_email",
p."expires_at_millis",
p."created_at"
FROM params p
WHERE p."is_deleted" = false
ON CONFLICT ("id") DO UPDATE SET
"team_id" = EXCLUDED."team_id",
"team_display_name" = EXCLUDED."team_display_name",
"recipient_email" = EXCLUDED."recipient_email",
"expires_at_millis" = EXCLUDED."expires_at_millis",
"created_at" = EXCLUDED."created_at"
RETURNING 1
)