Repeated migrations

This commit is contained in:
Konstantin Wohlwend 2025-10-10 16:00:12 -07:00
parent c6b00e1a8b
commit 60a06069e4
3 changed files with 191 additions and 144 deletions

View File

@ -1,85 +1,85 @@
-- Migration to enable and pin apps based on usage conditions
-- This migration updates both EnvironmentConfigOverride and Project tables
-- Create temporary index to speed up the migration
CREATE INDEX IF NOT EXISTS "temp_eco_config_apps_idx" ON "EnvironmentConfigOverride" USING GIN ("config");
-- Update EnvironmentConfigOverride to enable apps
-- Authentication: Always enabled
UPDATE "EnvironmentConfigOverride"
SET "config" = jsonb_set(
COALESCE("config", '{}'::jsonb),
'{apps.installed.authentication.enabled}',
'true'::jsonb,
true
);
-- Emails: Always enabled
UPDATE "EnvironmentConfigOverride"
SET "config" = jsonb_set(
COALESCE("config", '{}'::jsonb),
'{apps.installed.emails.enabled}',
'true'::jsonb,
true
);
-- Teams: Always enabled
UPDATE "EnvironmentConfigOverride"
SET "config" = jsonb_set(
COALESCE("config", '{}'::jsonb),
'{apps.installed.teams.enabled}',
'true'::jsonb,
true
);
-- Webhooks: Always enabled
UPDATE "EnvironmentConfigOverride"
SET "config" = jsonb_set(
COALESCE("config", '{}'::jsonb),
'{apps.installed.webhooks.enabled}',
'true'::jsonb,
true
);
-- Launch Checklist: Always enabled
UPDATE "EnvironmentConfigOverride"
SET "config" = jsonb_set(
COALESCE("config", '{}'::jsonb),
'{apps.installed.launch-checklist.enabled}',
'true'::jsonb,
true
);
-- RBAC: Enable if at least one custom permission exists in the config
UPDATE "EnvironmentConfigOverride" eco
SET "config" = jsonb_set(
COALESCE(eco."config", '{}'::jsonb),
'{apps.installed.rbac.enabled}',
'true'::jsonb,
true
);
-- API Keys: Enable if at least one API key exists for the project
UPDATE "EnvironmentConfigOverride" eco
SET "config" = jsonb_set(
COALESCE(eco."config", '{}'::jsonb),
'{apps.installed.api-keys.enabled}',
'true'::jsonb,
true
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- CONDITIONALLY_REPEAT_MIGRATION_SENTINEL
WITH to_update AS (
SELECT "projectId", "branchId", "config"
FROM "EnvironmentConfigOverride"
WHERE NOT "config" ? 'apps.installed.authentication.enabled'
OR NOT "config" ? 'apps.installed.emails.enabled'
OR NOT "config" ? 'apps.installed.teams.enabled'
OR NOT "config" ? 'apps.installed.webhooks.enabled'
OR NOT "config" ? 'apps.installed.launch-checklist.enabled'
OR NOT "config" ? 'apps.installed.rbac.enabled'
OR NOT "config" ? 'apps.installed.api-keys.enabled'
OR NOT "config" ? 'apps.installed.payments.enabled'
LIMIT 10000
)
FROM "Tenancy" t
WHERE eco."projectId" = t."projectId"
AND eco."branchId" = t."branchId"
AND EXISTS (
SELECT 1 FROM "ProjectApiKey" pak
WHERE pak."tenancyId" = t."id"
);
-- Payments: Enable if Stripe account ID is available on the project
UPDATE "EnvironmentConfigOverride" eco
SET "config" = jsonb_set(
COALESCE(eco."config", '{}'::jsonb),
'{apps.installed.payments.enabled}',
'true'::jsonb,
true
)
FROM "Project" p
WHERE eco."projectId" = p."id"
AND p."stripeAccountId" IS NOT NULL;
SET "config" =
jsonb_set(
jsonb_set(
jsonb_set(
jsonb_set(
jsonb_set(
jsonb_set(
jsonb_set(
jsonb_set(
COALESCE(eco."config", '{}'::jsonb),
'{apps.installed.authentication.enabled}',
'true'::jsonb,
true
),
'{apps.installed.emails.enabled}',
'true'::jsonb,
true
),
'{apps.installed.teams.enabled}',
'true'::jsonb,
true
),
'{apps.installed.webhooks.enabled}',
'true'::jsonb,
true
),
'{apps.installed.launch-checklist.enabled}',
'true'::jsonb,
true
),
'{apps.installed.rbac.enabled}',
'true'::jsonb,
true
),
'{apps.installed.api-keys.enabled}',
CASE
WHEN EXISTS (
SELECT 1 FROM "Tenancy" t
JOIN "ProjectApiKey" pak ON pak."tenancyId" = t."id"
WHERE t."projectId" = eco."projectId"
AND t."branchId" = eco."branchId"
) THEN 'true'::jsonb
ELSE 'false'::jsonb
END,
true
),
'{apps.installed.payments.enabled}',
CASE
WHEN EXISTS (
SELECT 1 FROM "Project" p
WHERE p."id" = eco."projectId"
AND p."stripeAccountId" IS NOT NULL
) THEN 'true'::jsonb
ELSE 'false'::jsonb
END,
true
)
FROM to_update
WHERE eco."projectId" = to_update."projectId"
AND eco."branchId" = to_update."branchId"
RETURNING true AS should_repeat_migration;
-- SPLIT_STATEMENT_SENTINEL
-- Clean up temporary index
DROP INDEX IF EXISTS "temp_eco_config_apps_idx";

View File

@ -391,3 +391,36 @@ import.meta.vitest?.test("a migration that fails for whatever reasons rolls back
await expect(prismaClient.$queryRaw`SELECT * FROM should_exist_after_the_third_migration`).resolves.toBeDefined();
}));
import.meta.vitest?.test("repeats migrations when a REPEAT_MIGRATION error is thrown", runTest(async ({ expect, prismaClient, dbURL }) => {
const exampleMigration3 = {
migrationName: '003-repeat-ten-times',
// increment a value; if the value is < 10, raise a REPEAT_MIGRATION error
sql: `
CREATE TABLE IF NOT EXISTS repeat_counter (value INTEGER DEFAULT 0);
CREATE TABLE IF NOT EXISTS has_finished_counter (value INTEGER DEFAULT 0);
INSERT INTO repeat_counter (value)
SELECT 0
WHERE NOT EXISTS (SELECT 1 FROM repeat_counter);
INSERT INTO has_finished_counter (value)
SELECT 0
WHERE NOT EXISTS (SELECT 1 FROM has_finished_counter);
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
-- CONDITIONALLY_REPEAT_MIGRATION_SENTINEL
UPDATE repeat_counter SET value = value + 1 RETURNING
CASE WHEN value >= 10 THEN false ELSE true END AS should_repeat_migration;
-- SPLIT_STATEMENT_SENTINEL
UPDATE has_finished_counter SET value = value + 1;
`,
};
const result = await applyMigrations({ prismaClient, migrationFiles: [...exampleMigrationFiles1, exampleMigration3], schema: 'public', logging: true });
expect(result.newlyAppliedMigrationNames).toEqual(['001-create-table', '002-update-table', '003-repeat-ten-times']);
expect(await prismaClient.$queryRaw`SELECT value FROM repeat_counter`).toEqual([{ value: 10 }]);
expect(await prismaClient.$queryRaw`SELECT value FROM has_finished_counter`).toEqual([{ value: 1 }]);
}));

View File

@ -1,5 +1,6 @@
import { sqlQuoteIdent } from '@/prisma-client';
import { Prisma, PrismaClient } from '@prisma/client';
import { StackAssertionError } from '@stackframe/stack-shared/dist/utils/errors';
import { MIGRATION_FILES } from './../generated/migration-files';
// The bigint key for the pg advisory lock
@ -90,74 +91,87 @@ export async function applyMigrations(options: {
const appliedMigrationNames = await getAppliedMigrations({ prismaClient: options.prismaClient, schema: options.schema });
const newMigrationFiles = migrationFiles.filter(x => !appliedMigrationNames.includes(x.migrationName));
const newlyAppliedMigrationNames = [];
const newlyAppliedMigrationNames: string[] = [];
for (const migration of newMigrationFiles) {
if (options.logging) {
console.log(`Applying migration ${migration.migrationName}`);
}
const transaction = [];
transaction.push(options.prismaClient.$executeRaw`
SELECT pg_advisory_xact_lock(${MIGRATION_LOCK_ID});
`);
transaction.push(options.prismaClient.$executeRaw(Prisma.sql`
SET search_path TO ${sqlQuoteIdent(options.schema)};
`));
transaction.push(options.prismaClient.$executeRaw`
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM "SchemaMigration"
WHERE "migrationName" = '${Prisma.raw(migration.migrationName)}'
) THEN
RAISE EXCEPTION 'MIGRATION_ALREADY_APPLIED';
END IF;
END
$$;
`);
for (const statement of migration.sql.split('SPLIT_STATEMENT_SENTINEL')) {
if (statement.includes('SINGLE_STATEMENT_SENTINEL')) {
transaction.push(options.prismaClient.$queryRaw`${Prisma.raw(statement)}`);
} else {
transaction.push(options.prismaClient.$executeRaw`
DO $$
BEGIN
${Prisma.raw(statement)}
END
$$;
`);
let shouldRepeat = true;
for (let repeat = 0; shouldRepeat; repeat++) {
if (options.logging) {
console.log(`Applying migration ${migration.migrationName}${repeat > 0 ? ` (repeat ${repeat})` : ''}`);
}
}
if (options.artificialDelayInSeconds) {
transaction.push(options.prismaClient.$executeRaw`
SELECT pg_sleep(${options.artificialDelayInSeconds});
`);
}
transaction.push(options.prismaClient.$executeRaw`
INSERT INTO "SchemaMigration" ("migrationName", "finishedAt")
VALUES (${migration.migrationName}, clock_timestamp())
`);
try {
// eslint-disable-next-line no-restricted-syntax
await options.prismaClient.$transaction(transaction);
} catch (e) {
const error = getMigrationError(e);
if (error === 'MIGRATION_ALREADY_APPLIED') {
if (options.logging) {
console.log(`Migration ${migration.migrationName} already applied, skipping`);
}
continue;
}
throw e;
}
await options.prismaClient.$transaction(async (tx) => {
await tx.$executeRaw`
SELECT pg_advisory_xact_lock(${MIGRATION_LOCK_ID});
`;
newlyAppliedMigrationNames.push(migration.migrationName);
await tx.$executeRaw(Prisma.sql`
SET search_path TO ${sqlQuoteIdent(options.schema)};
`);
const existingMigration = await tx.$queryRaw`
SELECT 1 FROM "SchemaMigration"
WHERE "migrationName" = ${migration.migrationName}
` as { "?column?": number }[];
if (existingMigration.length > 0) {
if (options.logging) {
console.log(`Migration ${migration.migrationName} already applied, skipping`);
}
shouldRepeat = false;
return;
}
for (const statement of migration.sql.split('SPLIT_STATEMENT_SENTINEL')) {
if (statement.includes('SINGLE_STATEMENT_SENTINEL')) {
const res = await tx.$queryRaw`${Prisma.raw(statement)}`;
if (statement.includes('CONDITIONALLY_REPEAT_MIGRATION_SENTINEL')) {
if (!Array.isArray(res)) {
throw new StackAssertionError("Expected an array as a return value of repeat condition", { res });
}
if (res.length > 0) {
if (!("should_repeat_migration" in res[0])) {
throw new StackAssertionError("Expected should_repeat_migration column in return value of repeat condition", { res });
}
if (typeof res[0].should_repeat_migration !== 'boolean') {
throw new StackAssertionError("Expected should_repeat_migration column in return value of repeat condition to be a boolean (found: " + typeof res[0].should_repeat_migration + ")", { res });
}
if (res[0].should_repeat_migration) {
if (options.logging) {
console.log(`Migration ${migration.migrationName} should be repeated`);
}
// Commit the transaction and continue re-running the migration
return;
}
}
}
} else {
await tx.$executeRaw`
DO $$
BEGIN
${Prisma.raw(statement)}
END
$$;
`;
}
}
if (options.artificialDelayInSeconds) {
await tx.$executeRaw`
SELECT pg_sleep(${options.artificialDelayInSeconds});
`;
}
await tx.$executeRaw`
INSERT INTO "SchemaMigration" ("migrationName", "finishedAt")
VALUES (${migration.migrationName}, clock_timestamp())
`;
newlyAppliedMigrationNames.push(migration.migrationName);
shouldRepeat = false;
}, {
timeout: 30_000,
});
}
}
return { newlyAppliedMigrationNames };