Merge branch 'dev' into promptless/document-developer-tools

This commit is contained in:
promptless[bot] 2026-04-18 21:18:46 +00:00
commit 9fc7631804
18 changed files with 2538 additions and 23 deletions

View File

@ -16,6 +16,11 @@ export default defineConfig({
tables: {
external: [
"public.BulldozerStorageEngine",
// PK on JSONB[] (tableStoragePath) — not expressible via Prisma's @id
// (list types are treated as non-required). Managed entirely by
// bulldozer code via raw SQL. See schema.prisma note next to the
// BulldozerTimeFoldMetadata model.
"public.BulldozerTimeFoldDownstreamCascade",
],
},
})

View File

@ -0,0 +1,341 @@
-- Propagate TimeFold queue-drained emissions through the registered
-- downstream trigger cascade (filter/map/LFold/etc.), mirroring what the
-- inline setRow path does via collectRowChangeTriggerStatements. Before
-- this migration, `bulldozer_timefold_process_queue` updated the
-- TimeFold's own state and materialized its output rows but never touched
-- any downstream materialized table, so any event scheduled for the
-- future (subscription-end on cancel-at-period-end, monthly
-- item-grant-repeat ticks) silently stopped after updating the TimeFold.
--
-- Strategy: per-timefold cascade templates are precomputed in TypeScript
-- (declareTimeFoldTable.init() \u2192 toCascadeSqlBlock) and persisted here as
-- data. The rewritten process_queue:
-- 1. Drains due queue rows per-timefold (existing per-row logic, moved
-- inside an outer per-timefold loop).
-- 2. After each queue row's reducer emits new rows, populates
-- __bulldozer_seq with them under the timefold's cascade input name.
-- 3. Once per timefold per tick, EXECUTEs the stored cascadeTemplate,
-- which reads from __bulldozer_seq and writes through filter/map/
-- LFold/etc. to their materialized outputs.
--
-- The shipped BulldozerTimeFoldQueue & BulldozerTimeFoldMetadata tables
-- from 20260323150000_add_bulldozer_timefold_queue are unchanged. pg_cron
-- keeps calling public.bulldozer_timefold_process_queue() as before.
-- CreateTable
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
);
-- SPLIT_STATEMENT_SENTINEL
-- SINGLE_STATEMENT_SENTINEL
CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
cutoff_timestamp timestamptz;
current_timefold_path jsonb[];
current_cascade_input_name text;
current_cascade_template text;
queued_row "BulldozerTimeFoldQueue"%ROWTYPE;
group_path jsonb[];
rows_path jsonb[];
states_path jsonb[];
state_row_path jsonb[];
existing_state jsonb;
old_emitted_rows jsonb;
newly_emitted_rows jsonb;
accumulated_emitted_rows jsonb;
current_state jsonb;
current_timestamp_value timestamptz;
next_state jsonb;
next_rows_data jsonb;
normalized_next_rows_data jsonb;
next_timestamp timestamptz;
previous_emitted_row_count int;
reducer_iterations int;
new_row_record record;
BEGIN
PERFORM pg_advisory_xact_lock(7857391);
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
ON CONFLICT ("key") DO NOTHING;
cutoff_timestamp := now();
UPDATE "BulldozerTimeFoldMetadata"
SET
"lastProcessedAt" = cutoff_timestamp,
"updatedAt" = CURRENT_TIMESTAMP
WHERE "key" = 'singleton';
-- The cascade templates generated by toExecutableSqlTransaction/
-- toCascadeSqlBlock read and write via this per-transaction temp table,
-- keyed by string output names. It's created idempotently here so the
-- cascade never fails on a cold queue-drain invocation.
CREATE TEMP TABLE IF NOT EXISTS "__bulldozer_seq" (
"__output_name" text NOT NULL,
"__output_row" jsonb NOT NULL
) ON COMMIT DROP;
-- Outer loop: each distinct timefold that has queue rows due at this tick.
FOR current_timefold_path IN
SELECT DISTINCT "tableStoragePath"
FROM "BulldozerTimeFoldQueue"
WHERE "scheduledAt" <= cutoff_timestamp
LOOP
-- Look up the cascade template for this timefold.
current_cascade_input_name := NULL;
current_cascade_template := NULL;
SELECT "cascadeInputName", "cascadeTemplate"
INTO current_cascade_input_name, current_cascade_template
FROM "BulldozerTimeFoldDownstreamCascade"
WHERE "tableStoragePath" = current_timefold_path;
-- If no registry row exists, this timefold hasn't been init()'d
-- yet — most commonly the deploy-window gap between this
-- migration applying (creating an empty registry table) and the
-- backend running declareTimeFoldTable.init() (which upserts one
-- row per timefold). Skipping just the cascade EXECUTE but still
-- draining the queue rows would let the timefold's own state
-- advance while the downstream filters/maps/LFolds silently miss
-- the update — a permanent desync with no retry path, since the
-- queue row is gone. Defer the entire timefold for this tick so
-- its queue rows stay queued; the next tick (after init finishes)
-- drains them with cascade intact.
--
-- See `timefold-queue-downstream.test.ts`'s "process_queue defers
-- a timefold whose cascade registry row is missing" for the
-- regression guard.
IF NOT FOUND THEN
CONTINUE;
END IF;
-- Defensive clean slate: clear ALL rows before populating this
-- timefold's cascade input. Cascade templates emit intermediate
-- rows (row_change_diag_*, mapped_changes_*, etc.) under unique
-- random names, so cross-timefold contamination isn't possible
-- today, but keeping __bulldozer_seq empty between iterations is
-- simpler to reason about ("between iterations, __bulldozer_seq is
-- empty") and prevents memory growth across many timefolds in one
-- tick. ON COMMIT DROP handles cleanup if the function throws.
DELETE FROM "__bulldozer_seq";
-- Inner loop: per-queue-row drain for this timefold, mirroring the
-- pre-fix function body (semantics preserved so the existing
-- process-queue.ts migration test keeps passing).
LOOP
SELECT *
INTO queued_row
FROM "BulldozerTimeFoldQueue"
WHERE "tableStoragePath" = current_timefold_path
AND "scheduledAt" <= cutoff_timestamp
ORDER BY "scheduledAt" ASC, "id" ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
EXIT WHEN NOT FOUND;
DELETE FROM "BulldozerTimeFoldQueue"
WHERE "id" = queued_row."id";
group_path := queued_row."tableStoragePath" || ARRAY[to_jsonb('groups'::text), queued_row."groupKey"]::jsonb[];
rows_path := group_path || ARRAY[to_jsonb('rows'::text)]::jsonb[];
states_path := group_path || ARRAY[to_jsonb('states'::text)]::jsonb[];
state_row_path := states_path || ARRAY[to_jsonb(queued_row."rowIdentifier")]::jsonb[];
SELECT "value"
INTO existing_state
FROM "BulldozerStorageEngine"
WHERE "keyPath" = state_row_path;
IF existing_state IS NULL THEN
CONTINUE;
END IF;
IF existing_state->'rowData' IS DISTINCT FROM queued_row."rowData" THEN
CONTINUE;
END IF;
old_emitted_rows := CASE
WHEN jsonb_typeof(existing_state->'emittedRowsData') = 'array' THEN existing_state->'emittedRowsData'
ELSE '[]'::jsonb
END;
newly_emitted_rows := '[]'::jsonb;
accumulated_emitted_rows := old_emitted_rows;
previous_emitted_row_count := jsonb_array_length(old_emitted_rows);
current_state := queued_row."stateAfter";
current_timestamp_value := queued_row."scheduledAt";
reducer_iterations := 0;
LOOP
reducer_iterations := reducer_iterations + 1;
IF reducer_iterations > 10000 THEN
RAISE EXCEPTION 'bulldozer timefold reducer exceeded 10k iterations for row %', queued_row."rowIdentifier";
END IF;
EXECUTE format(
$reducer$
SELECT
to_jsonb("reducerRows"."newState") AS "newState",
to_jsonb("reducerRows"."newRowsData") AS "newRowsData",
CASE
WHEN "reducerRows"."nextTimestamp" IS NULL THEN NULL::timestamptz
ELSE ("reducerRows"."nextTimestamp")::timestamptz
END AS "nextTimestamp"
FROM (
SELECT %s
FROM (
SELECT
$1::jsonb AS "oldState",
$2::jsonb AS "oldRowData",
$3::timestamptz AS "timestamp"
) AS "reducerInput"
) AS "reducerRows"
$reducer$,
queued_row."reducerSql"
)
INTO next_state, next_rows_data, next_timestamp
USING current_state, queued_row."rowData", current_timestamp_value;
normalized_next_rows_data := CASE
WHEN jsonb_typeof(next_rows_data) = 'array' THEN next_rows_data
ELSE '[]'::jsonb
END;
newly_emitted_rows := newly_emitted_rows || normalized_next_rows_data;
accumulated_emitted_rows := accumulated_emitted_rows || normalized_next_rows_data;
current_state := next_state;
EXIT WHEN next_timestamp IS NULL OR next_timestamp > cutoff_timestamp;
current_timestamp_value := next_timestamp;
END LOOP;
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES
(gen_random_uuid(), group_path, 'null'::jsonb),
(gen_random_uuid(), rows_path, 'null'::jsonb),
(gen_random_uuid(), states_path, 'null'::jsonb)
ON CONFLICT ("keyPath") DO NOTHING;
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
state_row_path,
jsonb_build_object(
'rowData', queued_row."rowData",
'stateAfter', current_state,
'emittedRowsData', accumulated_emitted_rows,
'nextTimestamp',
CASE
WHEN next_timestamp IS NULL THEN 'null'::jsonb
ELSE to_jsonb(next_timestamp)
END
)
)
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value";
FOR new_row_record IN
SELECT
"rows"."rowData" AS "rowData",
"rows"."rowIndex" AS "rowIndex"
FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "rows"("rowData", "rowIndex")
LOOP
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
rows_path || ARRAY[to_jsonb((queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + new_row_record."rowIndex")::text)::text)]::jsonb[],
jsonb_build_object('rowData', new_row_record."rowData")
)
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value";
END LOOP;
-- Accumulate the newly-emitted rows into __bulldozer_seq under this
-- timefold's cascade input name, shaped like timeFoldChangesTableName
-- (oldRowData/newRowData diff). Queue-drained emissions are always
-- new rows, so oldRowData/oldRowSortKey are null.
IF current_cascade_input_name IS NOT NULL THEN
INSERT INTO "__bulldozer_seq" ("__output_name", "__output_row")
SELECT
current_cascade_input_name,
jsonb_build_object(
'groupKey', queued_row."groupKey",
'rowIdentifier', queued_row."rowIdentifier" || ':' || (previous_emitted_row_count + "emitted"."rowIndex")::text,
'oldRowSortKey', 'null'::jsonb,
'newRowSortKey', 'null'::jsonb,
'oldRowData', 'null'::jsonb,
'newRowData', "emitted"."rowData"
)
FROM jsonb_array_elements(newly_emitted_rows) WITH ORDINALITY AS "emitted"("rowData", "rowIndex");
END IF;
IF next_timestamp IS NOT NULL AND next_timestamp > cutoff_timestamp THEN
INSERT INTO "BulldozerTimeFoldQueue" (
"id",
"tableStoragePath",
"groupKey",
"rowIdentifier",
"scheduledAt",
"stateAfter",
"rowData",
"reducerSql"
)
VALUES (
gen_random_uuid(),
queued_row."tableStoragePath",
queued_row."groupKey",
queued_row."rowIdentifier",
next_timestamp,
current_state,
queued_row."rowData",
queued_row."reducerSql"
)
ON CONFLICT ("tableStoragePath", "groupKey", "rowIdentifier") DO UPDATE
SET
"scheduledAt" = EXCLUDED."scheduledAt",
"stateAfter" = EXCLUDED."stateAfter",
"rowData" = EXCLUDED."rowData",
"reducerSql" = EXCLUDED."reducerSql",
"updatedAt" = CURRENT_TIMESTAMP;
END IF;
-- Note: the sibling migration 20260323150000 carried an "orphan
-- group cleanup" IF-NOT-EXISTS block here that deleted
-- rows_path / states_path / group_path when both had no children.
-- That block was unreachable in that function body and is still
-- unreachable here: every path that reaches this point has just
-- UPSERTed `state_row_path` (a child of `states_path`) a few
-- statements up, so the NOT EXISTS on states_path is always
-- false. Dropped here rather than inherited verbatim.
END LOOP;
-- After draining all due queue rows for this timefold, run the
-- downstream trigger cascade exactly once. The template reads its input
-- from __bulldozer_seq (populated above) and writes through every
-- registered downstream filter/map/LFold/etc. to their materialized
-- outputs. Skipped when no template is registered \u2014 matches the
-- inline path's no-op behavior for timefolds without downstream
-- triggers.
IF current_cascade_template IS NOT NULL THEN
EXECUTE current_cascade_template;
END IF;
-- Clean slate before the next timefold iteration. Clears the
-- cascade input rows AND every intermediate-stage row the cascade
-- template's EXECUTE just emitted into __bulldozer_seq. See the
-- identical DELETE at the top of this loop for the rationale.
DELETE FROM "__bulldozer_seq";
END LOOP;
END;
$function$;
-- SPLIT_STATEMENT_SENTINEL

View File

@ -0,0 +1,263 @@
import type { Sql } from "postgres";
import { expect } from "vitest";
/**
* Migration-level test for `20260417000000_bulldozer_timefold_downstream_cascade`.
*
* Exercises the shape the migration is responsible for:
* - `BulldozerTimeFoldDownstreamCascade` exists with the right columns,
* - `public.bulldozer_timefold_process_queue()` consults that registry,
* - when a timefold has a registered `cascadeTemplate`, process_queue
* populates `__bulldozer_seq` with newly-emitted rows and EXECUTEs the
* template (i.e. the downstream cascade actually fires on the
* queue-drain path the regression this migration fixes),
* - re-draining with nothing due is a no-op (idempotency).
*
* The cascade template here is constructed by hand (not via
* `toCascadeSqlBlock` in TypeScript) so the test stays purely at the
* migration-SQL layer, matching the other migration tests under
* `apps/backend/prisma/migrations/.../tests/`.
*/
export const postMigration = async (sql: Sql) => {
// 1) Migration shape: the registry table exists with the expected columns.
const registryColumnRows = await sql<Array<{ column_name: string }>>`
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'BulldozerTimeFoldDownstreamCascade'
ORDER BY ordinal_position
`;
expect(registryColumnRows.map((r) => r.column_name)).toEqual([
"tableStoragePath",
"cascadeInputName",
"cascadeTemplate",
"createdAt",
"updatedAt",
]);
// 2) Set up a minimal timefold-shaped storage hierarchy. Each
// BulldozerStorageEngine insert must have its parent keyPath already
// present (FK: keyPathParent → keyPath).
const tablePathSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-migration-test'::text)]::jsonb[]`;
const storagePathSql = `${tablePathSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`;
const groupsPathSql = `${storagePathSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`;
const groupPathSql = `${groupsPathSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`;
const rowsPathSql = `${groupPathSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`;
const statesPathSql = `${groupPathSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`;
const stateRowPathSql = `${statesPathSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`;
// Parallel downstream tree the cascade template writes into. Its root
// (`ARRAY['cascade-out']`) is a direct child of the already-seeded
// `ARRAY[]` root, and only the root is needed as a parent for the
// cascade's row inserts below (rows hang directly off of it).
const cascadeOutRootSql = `ARRAY[to_jsonb('cascade-out'::text)]::jsonb[]`;
await sql.unsafe(`
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES
(gen_random_uuid(), ${tablePathSql}, 'null'::jsonb),
(gen_random_uuid(), ${storagePathSql}, 'null'::jsonb),
(gen_random_uuid(), ${groupsPathSql}, 'null'::jsonb),
(gen_random_uuid(), ${groupPathSql}, 'null'::jsonb),
(gen_random_uuid(), ${rowsPathSql}, 'null'::jsonb),
(gen_random_uuid(), ${statesPathSql}, 'null'::jsonb),
(gen_random_uuid(), ${cascadeOutRootSql}, 'null'::jsonb)
ON CONFLICT ("keyPath") DO NOTHING
`);
await sql.unsafe(`
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
${stateRowPathSql},
jsonb_build_object(
'rowData', '{"value": 2}'::jsonb,
'stateAfter', '{"counter": 1}'::jsonb,
'emittedRowsData', '[]'::jsonb,
'nextTimestamp', 'null'::jsonb
)
)
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value"
`);
// 3) Register this timefold's cascade. The template reads the rows
// that process_queue pushes into `__bulldozer_seq` under the input
// name and writes them under `ARRAY['cascade-out', <rowIdentifier>]`.
// EXECUTEing this DO block is exactly what process_queue does with
// the stored cascadeTemplate once per timefold per tick.
const cascadeInputName = "migration_test_cascade_input";
const cascadeTemplateSql = `
DO $tf_cascade$
BEGIN
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
SELECT
gen_random_uuid(),
ARRAY[to_jsonb('cascade-out'::text), "__bulldozer_seq"."__output_row"->'rowIdentifier']::jsonb[],
"__bulldozer_seq"."__output_row"->'newRowData'
FROM "__bulldozer_seq"
WHERE "__output_name" = '${cascadeInputName}'
ON CONFLICT ("keyPath") DO UPDATE
SET "value" = EXCLUDED."value";
END;
$tf_cascade$ LANGUAGE plpgsql;
`;
await sql.unsafe(`
INSERT INTO "BulldozerTimeFoldDownstreamCascade"
("tableStoragePath", "cascadeInputName", "cascadeTemplate")
VALUES (
${storagePathSql},
'${cascadeInputName}',
$cascade_template$${cascadeTemplateSql}$cascade_template$
)
`);
// 4) Queue a reducer row that emits one output row (value=100). Reducer
// SQL is the same shape the real timefold table emits:
// newState / newRowsData / nextTimestamp.
await sql.unsafe(`
INSERT INTO "BulldozerTimeFoldQueue" (
"id",
"tableStoragePath",
"groupKey",
"rowIdentifier",
"scheduledAt",
"stateAfter",
"rowData",
"reducerSql"
)
VALUES (
gen_random_uuid(),
${storagePathSql},
to_jsonb('alpha'::text),
'u1',
now() - interval '1 minute',
'{"counter": 1}'::jsonb,
'{"value": 2}'::jsonb,
'jsonb_build_object(''counter'', COALESCE(("oldState"->>''counter'')::int, 0) + 1) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"'
)
`);
// 5) Drain the queue. This is the real prod entry point — pg_cron calls
// exactly this function.
await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`);
// The queue row must be consumed. Scope by tableStoragePath — the
// sibling `20260323150000_add_bulldozer_timefold_queue/tests/process-queue.ts`
// ran on this same shared DB and left a future-dated queue row behind
// under its own tableStoragePath, which we must not match here.
const remainingQueueRows = await sql.unsafe(`
SELECT 1 FROM "BulldozerTimeFoldQueue"
WHERE "tableStoragePath" = ${storagePathSql}
AND "rowIdentifier" = 'u1'
`);
expect(remainingQueueRows).toHaveLength(0);
// The timefold's own state row must be updated (baseline the prior
// migration already covered).
const stateRows = await sql.unsafe(`
SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${stateRowPathSql}
`);
expect(stateRows).toHaveLength(1);
expect(stateRows[0].value).toMatchObject({
rowData: { value: 2 },
stateAfter: { counter: 2 },
});
// The regression guard: the cascade template must have run. Without
// the migration's rewrite, `__bulldozer_seq` is never populated and
// the template is never EXECUTEd, so `cascade-out/u1:1` would not
// exist.
const cascadeOutRows = await sql.unsafe(`
SELECT "keyPath", "value"
FROM "BulldozerStorageEngine"
WHERE "keyPathParent" = ${cascadeOutRootSql}
`);
expect(cascadeOutRows).toHaveLength(1);
expect(cascadeOutRows[0].value).toEqual({ value: 100 });
// 6) Idempotency: re-draining with nothing new in the queue must not
// re-run the cascade (no duplicate rows, no FK errors).
await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`);
const cascadeOutAfterRedrain = await sql.unsafe(`
SELECT 1 FROM "BulldozerStorageEngine" WHERE "keyPathParent" = ${cascadeOutRootSql}
`);
expect(cascadeOutAfterRedrain).toHaveLength(1);
// 7) No-template path: a timefold with cascadeTemplate = NULL must
// drain queued rows without error. Use a second, independent
// tableStoragePath so the FK'd storage engine rows for this
// branch don't collide with the one above.
const nullTableSql = `ARRAY[to_jsonb('table'::text), to_jsonb('external:cascade-null-template'::text)]::jsonb[]`;
const nullStorageSql = `${nullTableSql} || ARRAY[to_jsonb('storage'::text)]::jsonb[]`;
const nullGroupsSql = `${nullStorageSql} || ARRAY[to_jsonb('groups'::text)]::jsonb[]`;
const nullGroupSql = `${nullGroupsSql} || ARRAY[to_jsonb('alpha'::text)]::jsonb[]`;
const nullRowsSql = `${nullGroupSql} || ARRAY[to_jsonb('rows'::text)]::jsonb[]`;
const nullStatesSql = `${nullGroupSql} || ARRAY[to_jsonb('states'::text)]::jsonb[]`;
const nullStateRowSql = `${nullStatesSql} || ARRAY[to_jsonb('u1'::text)]::jsonb[]`;
await sql.unsafe(`
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES
(gen_random_uuid(), ${nullTableSql}, 'null'::jsonb),
(gen_random_uuid(), ${nullStorageSql}, 'null'::jsonb),
(gen_random_uuid(), ${nullGroupsSql}, 'null'::jsonb),
(gen_random_uuid(), ${nullGroupSql}, 'null'::jsonb),
(gen_random_uuid(), ${nullRowsSql}, 'null'::jsonb),
(gen_random_uuid(), ${nullStatesSql}, 'null'::jsonb)
ON CONFLICT ("keyPath") DO NOTHING
`);
await sql.unsafe(`
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
VALUES (
gen_random_uuid(),
${nullStateRowSql},
jsonb_build_object(
'rowData', '{"value": 2}'::jsonb,
'stateAfter', '{"counter": 1}'::jsonb,
'emittedRowsData', '[]'::jsonb,
'nextTimestamp', 'null'::jsonb
)
)
`);
await sql.unsafe(`
INSERT INTO "BulldozerTimeFoldDownstreamCascade"
("tableStoragePath", "cascadeInputName", "cascadeTemplate")
VALUES (
${nullStorageSql},
'null_template_input',
NULL
)
`);
await sql.unsafe(`
INSERT INTO "BulldozerTimeFoldQueue" (
"id", "tableStoragePath", "groupKey", "rowIdentifier",
"scheduledAt", "stateAfter", "rowData", "reducerSql"
)
VALUES (
gen_random_uuid(),
${nullStorageSql},
to_jsonb('alpha'::text),
'u1',
now() - interval '1 minute',
'{"counter": 1}'::jsonb,
'{"value": 2}'::jsonb,
'jsonb_build_object(''counter'', 2) AS "newState", jsonb_build_array(jsonb_build_object(''value'', 100)) AS "newRowsData", NULL::timestamptz AS "nextTimestamp"'
)
`);
await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`);
const nullRemainingQueue = await sql.unsafe(`
SELECT 1 FROM "BulldozerTimeFoldQueue"
WHERE "tableStoragePath" = ${nullStorageSql}
`);
expect(nullRemainingQueue).toHaveLength(0);
const nullStateAfterDrain = await sql.unsafe(`
SELECT "value" FROM "BulldozerStorageEngine" WHERE "keyPath" = ${nullStateRowSql}
`);
expect(nullStateAfterDrain).toHaveLength(1);
expect(nullStateAfterDrain[0].value).toMatchObject({ stateAfter: { counter: 2 } });
};

View File

@ -1356,6 +1356,14 @@ model BulldozerTimeFoldMetadata {
lastProcessedAt DateTime @db.Timestamptz
}
// BulldozerTimeFoldDownstreamCascade is managed externally (see
// prisma.config.ts `tables.external`). Same reason as BulldozerStorageEngine
// above: its primary key is on a JSONB[] column, which Prisma's @id
// attribute rejects ("fields that are marked as id must be required"; list
// types are treated as non-required). It's written only by bulldozer
// internals (declareTimeFoldTable.init()/.delete()) and read by
// public.bulldozer_timefold_process_queue() — never via the Prisma client.
model DeletedRow {
id String @id @default(uuid()) @db.Uuid
tenancyId String @db.Uuid

View File

@ -714,6 +714,17 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => {
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`;
await sql`
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
)
`;
});
afterEach(async () => {

View File

@ -354,6 +354,17 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`;
await sql`
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
)
`;
});
afterEach(async () => {

View File

@ -140,6 +140,7 @@ describe.sequential("declareStoredTable (real postgres)", () => {
beforeEach(async () => {
await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`;
await sql`DROP TABLE IF EXISTS "BulldozerMapTriggerAudit"`;
@ -233,6 +234,16 @@ describe.sequential("declareStoredTable (real postgres)", () => {
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
`;
await sql`
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
)
`;
});
// any is used here because the verifier works with heterogeneous table types

View File

@ -1,3 +1,5 @@
import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto";
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { deindent } from "@stackframe/stack-shared/dist/utils/strings";
import { BULLDOZER_SORT_HELPERS_SQL } from "./bulldozer-sort-helpers-sql";
@ -66,10 +68,23 @@ export function toQueryableSqlQuery(query: SqlQuery): string {
return query.sql;
}
export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string {
const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_"));
const seqOutputs = new Map<string, string>();
const executableStatementsInDoBlock = statements.map((statement) => {
/**
* Core body-builder shared by `toExecutableSqlTransaction` and
* `toCascadeSqlBlock`. Serializes a list of `SqlStatement`s into a single
* string suitable to drop into a plpgsql DO block, rewriting references to
* named outputs into `__bulldozer_seq` subqueries.
*
* `seededSeqOutputs` lets callers pretend a given `__output_name` has
* already been produced by an upstream statement. Used by the timefold
* queue-drain cascade, which pre-populates `__bulldozer_seq` in plpgsql
* BEFORE executing the stored cascade template.
*/
function buildExecutableStatementsBlock(
statements: SqlStatement[],
seededSeqOutputs: Map<string, string>,
): string {
const seqOutputs = new Map<string, string>(seededSeqOutputs);
return statements.map((statement) => {
let sql = statement.sql;
for (const [outputName, outputColumns] of seqOutputs) {
const quotedOutputName = `"${outputName}"`;
@ -112,12 +127,22 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options:
`;
})();
// Keep the outer DO block delimiter stable even when statements define $$ functions.
const normalizedSql = executableSql.replaceAll("$$", "$__bulldozer_do_inline$").trimEnd();
const normalizedSql = executableSql.trimEnd();
return normalizedSql.endsWith(";")
? normalizedSql
: `${normalizedSql};`;
}).join("\n\n");
}
export function toExecutableSqlTransaction(statements: SqlStatement[], options: { statementTimeout?: string } = {}): string {
const requiresSortHelpers = statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_"));
const executableStatementsInDoBlock = buildExecutableStatementsBlock(statements, new Map());
// Randomize the outer DO-block delimiter so nested `$$` that individual
// statements legitimately use (e.g. `CREATE FUNCTION ... AS $$ ... $$`
// in `reduce-table.ts`) don't collide with it, and so user-provided SQL
// containing a literal `'$$'` string doesn't need to be rewritten. Same
// approach as `toCascadeSqlBlock` below.
const outerTag = chooseSafeDollarQuoteTag(executableStatementsInDoBlock, "bulldozer_tx");
return deindent`
BEGIN;
@ -131,12 +156,84 @@ export function toExecutableSqlTransaction(statements: SqlStatement[], options:
${BULLDOZER_SEQ_TABLE_SQL}
DO $$
DO $${outerTag}$
BEGIN
${executableStatementsInDoBlock}
END;
$$ LANGUAGE plpgsql;
$${outerTag}$ LANGUAGE plpgsql;
COMMIT;
`;
}
/**
* Picks a plpgsql dollar-quote tag that is guaranteed not to appear
* verbatim inside `bodyContents`.
*
* We need this for any `DO $tag$ ... $tag$` block whose body is
* concatenated from caller-supplied SQL: a naive fixed `$tag$` would
* close the outer block prematurely if any embedded statement happened
* to include the same literal `$tag$` (e.g. a user filter predicate
* that references the string, or a comment, or a CASE branch).
*
* A cryptographically-random 224-bit suffix makes accidental collision
* astronomically unlikely. We still assert (rather than silently
* retrying) because a collision here would almost certainly mean the
* caller is constructing the body adversarially, and we'd rather fail
* loud.
*/
function chooseSafeDollarQuoteTag(bodyContents: string, tagPrefix: string): string {
const tag = `${tagPrefix}_${generateSecureRandomString()}`;
if (bodyContents.includes(`$${tag}$`)) {
throw new StackAssertionError(
"Randomly generated dollar-quote tag collided with body contents; this is astronomically unlikely with a 224-bit suffix and almost certainly indicates adversarial input",
{ tag, tagPrefix },
);
}
return tag;
}
/**
* Compiles a downstream-trigger cascade into a plpgsql `DO` block body that
* can be stored in `BulldozerTimeFoldDownstreamCascade.cascadeTemplate` and
* EXECUTEd by `bulldozer_timefold_process_queue()` at runtime.
*
* The generated body:
* - Assumes the caller has already populated `__bulldozer_seq` under
* `cascadeInputName` with rows matching `cascadeInputColumns`.
* - Does NOT acquire the advisory lock or SET LOCAL settings that
* responsibility belongs to the function wrapping the cascade.
* - Wraps the statement sequence in a `DO $<random-tag>$ ... $<random-tag>$`
* block so `EXECUTE` in plpgsql can run it as a single dispatch. The
* tag is randomized per-call to ensure user-supplied SQL can never
* contain a literal copy of it and close the outer DO block early
* (see `chooseSafeDollarQuoteTag`).
*
* If the downstream trigger graph is empty (no filters/maps/etc. registered),
* returns `null`. Callers should skip the EXECUTE in that case.
*/
export function toCascadeSqlBlock(options: {
cascadeInputName: string,
cascadeInputColumns: string,
statements: SqlStatement[],
}): string | null {
if (options.statements.length === 0) return null;
const seeded = new Map<string, string>([[options.cascadeInputName, options.cascadeInputColumns]]);
const body = buildExecutableStatementsBlock(options.statements, seeded);
const requiresSortHelpers = options.statements.some((statement) => statement.sql.includes("pg_temp.bulldozer_sort_"));
// Sort helpers use their own $$ dollar quoting inside `CREATE OR REPLACE
// FUNCTION` bodies. They live inside the outer DO so they share the
// enclosing transaction's pg_temp scope with the cascade statements. The
// outer tag is randomized below so nested $$ (or any other fixed tag in
// user SQL) can't close it.
const prelude = requiresSortHelpers ? BULLDOZER_SORT_HELPERS_SQL : "";
const outerTag = chooseSafeDollarQuoteTag(`${prelude}\n${body}`, "tf_cascade");
return deindent`
DO $${outerTag}$
BEGIN
${prelude}
${body}
END;
$${outerTag}$ LANGUAGE plpgsql;
`;
}

View File

@ -4,7 +4,15 @@ import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings";
import type { SqlExpression, SqlStatement } from "./utilities";
import { quoteSqlIdentifier, quoteSqlStringLiteral, sqlQuery } from "./utilities";
const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb';
/**
* Column shape of every row-change changes-table flowing between tables
* in the bulldozer graph. One canonical source of truth for both the
* inline trigger dispatch (here), the queue-drain cascade (whose input
* is seeded to this same shape in `declareTimeFoldTable.init()`), and
* any downstream consumer that needs to describe a changes-table's
* columns for `jsonb_to_record(...)` etc.
*/
export const CHANGE_OUTPUT_COLUMNS = '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb';
const ROW_CHANGE_DIAGNOSTIC_COLUMN_NAME = "__row_change_table_id";
export type ChangesTableExpression = SqlExpression<{ __brand: "$SQL_Table" }>;
export type RowChangeTriggerDiagnostics = {

View File

@ -1,7 +1,12 @@
import { generateSecureRandomString } from "@stackframe/stack-shared/dist/utils/crypto";
import type { Table } from "..";
import type { RegisteredRowChangeTrigger } from "../row-change-trigger-dispatch";
import { attachRowChangeTriggerMetadata, normalizeRowChangeTrigger } from "../row-change-trigger-dispatch";
import { toCascadeSqlBlock, type Table } from "..";
import {
attachRowChangeTriggerMetadata,
CHANGE_OUTPUT_COLUMNS,
collectRowChangeTriggerStatements,
normalizeRowChangeTrigger,
type RegisteredRowChangeTrigger,
} from "../row-change-trigger-dispatch";
import type { Json, RowData, RowIdentifier, SqlExpression, SqlMapper, TableId, Timestamp } from "../utilities";
import {
getStorageEnginePath,
@ -410,7 +415,7 @@ export function declareTimeFoldTable<
ON "oldRows"."groupKey" IS NOT DISTINCT FROM "newRows"."groupKey"
AND "oldRows"."rowIdentifier" = "newRows"."rowIdentifier"
WHERE "oldRows"."rowData" IS DISTINCT FROM "newRows"."rowData"
`.toStatement(timeFoldChangesTableName, '"groupKey" jsonb, "rowIdentifier" text, "oldRowSortKey" jsonb, "newRowSortKey" jsonb, "oldRowData" jsonb, "newRowData" jsonb'),
`.toStatement(timeFoldChangesTableName, CHANGE_OUTPUT_COLUMNS),
];
};
const createFromTableTriggerStatements = (fromChangesTable: SqlExpression<{ __brand: "$SQL_Table" }>) => {
@ -467,6 +472,30 @@ export function declareTimeFoldTable<
const fromGroupsTableName = `from_groups_${generateSecureRandomString()}`;
const fromRowsTableName = `from_rows_${generateSecureRandomString()}`;
const initChangesTableName = `init_changes_${generateSecureRandomString()}`;
// Compile the downstream trigger cascade into a plpgsql DO block and
// persist it in BulldozerTimeFoldDownstreamCascade, keyed by this
// timefold's tableStoragePath. bulldozer_timefold_process_queue()
// reads and EXECUTEs it after each batch of queue-drained emissions.
//
// This mirrors, on the queue-drain path, what collectRowChangeTriggerStatements
// does on the inline setRow path (see row-change-trigger-dispatch.ts's
// use by the outer runStatements pipeline). Without this, pg_cron-
// drained emissions update the timefold's own rows but never propagate
// to filters/maps/LFolds above — see apps/backend/src/lib/bulldozer/db/
// timefold-queue-downstream.test.ts.
const cascadeInputName = `tf_cascade_input_${generateSecureRandomString()}`;
const cascadeCollected = collectRowChangeTriggerStatements({
sourceTableId: tableIdToDebugString(options.tableId),
sourceChangesTable: quoteSqlIdentifier(cascadeInputName),
sourceTableTriggers: triggers,
});
const cascadeTemplate = toCascadeSqlBlock({
cascadeInputName,
cascadeInputColumns: CHANGE_OUTPUT_COLUMNS,
statements: cascadeCollected.statements,
});
return [
sqlStatement`
INSERT INTO "BulldozerStorageEngine" ("id", "keyPath", "value")
@ -477,6 +506,24 @@ export function declareTimeFoldTable<
(gen_random_uuid(), ${getStorageEnginePath(options.tableId, ["metadata"])}, '{ "version": 1 }'::jsonb)
ON CONFLICT ("keyPath") DO NOTHING
`,
// Upsert the cascade registry row. A null template means "no
// downstream triggers registered" — process_queue will skip the
// EXECUTE in that case, matching the no-op semantics of the inline
// path when no triggers are attached.
sqlStatement`
INSERT INTO "BulldozerTimeFoldDownstreamCascade"
("tableStoragePath", "cascadeInputName", "cascadeTemplate")
VALUES (
${tableStoragePath}::jsonb[],
${quoteSqlStringLiteral(cascadeInputName)},
${cascadeTemplate == null ? sqlExpression`NULL::text` : quoteSqlStringLiteral(cascadeTemplate)}
)
ON CONFLICT ("tableStoragePath") DO UPDATE
SET
"cascadeInputName" = EXCLUDED."cascadeInputName",
"cascadeTemplate" = EXCLUDED."cascadeTemplate",
"updatedAt" = now()
`,
options.fromTable.listGroups({
start: "start",
end: "end",
@ -518,6 +565,10 @@ export function declareTimeFoldTable<
DELETE FROM "BulldozerTimeFoldQueue"
WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[]
`,
sqlStatement`
DELETE FROM "BulldozerTimeFoldDownstreamCascade"
WHERE "tableStoragePath" = ${tableStoragePath}::jsonb[]
`,
sqlStatement`
WITH RECURSIVE "pathsToDelete" AS (
SELECT ${getTablePath(options.tableId)}::jsonb[] AS "path"

View File

@ -0,0 +1,51 @@
/**
* Test-only SQL loaders for bulldozer migration artifacts.
*
* Lives next to the bulldozer db code because several test files (both in
* `apps/backend/src/lib/bulldozer` and in `apps/backend/src/lib/payments`)
* need to install the `public.bulldozer_timefold_process_queue()` function
* body from its migration file to exercise the queue-drain path. Keeping
* one canonical loader here avoids drift between copies when the
* migration's comment sentinels or function name change.
*
* Not intended for production code paths only imported from `*.test.ts`
* files and the payments test-helpers (which is itself only used from
* tests).
*/
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
const HERE = dirname(fileURLToPath(import.meta.url));
// apps/backend/src/lib/bulldozer/db/ → apps/backend/prisma/migrations
const MIGRATIONS_DIR = join(HERE, "..", "..", "..", "..", "prisma", "migrations");
const DOWNSTREAM_CASCADE_MIGRATION = "20260417000000_bulldozer_timefold_downstream_cascade";
/**
* Extracts the `CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue`
* block from the downstream-cascade migration file. The function body is
* what `pg_cron` invokes in prod; installing it into a test database lets
* tests exercise the real drain behaviour end-to-end.
*
* The migration file is split into statements via the
* `-- SPLIT_STATEMENT_SENTINEL` markers already used by the bulldozer
* migration tooling; this loader just picks the block that starts with
* the function definition.
*/
export function loadProcessQueueFunctionSql(): string {
const migrationPath = join(MIGRATIONS_DIR, DOWNSTREAM_CASCADE_MIGRATION, "migration.sql");
const raw = readFileSync(migrationPath, "utf8");
const block = raw
.split("-- SPLIT_STATEMENT_SENTINEL")
.map((chunk) => chunk.replaceAll("-- SINGLE_STATEMENT_SENTINEL", "").trim())
.find((chunk) => chunk.startsWith("CREATE OR REPLACE FUNCTION public.bulldozer_timefold_process_queue"));
if (block == null) {
throw new Error(
`Could not locate bulldozer_timefold_process_queue function body in ${DOWNSTREAM_CASCADE_MIGRATION}/migration.sql`,
);
}
return block.replace(/;$/, "");
}

View File

@ -0,0 +1,784 @@
/**
* `bulldozer_timefold_process_queue()` must propagate emitted rows through
* the downstream trigger cascade (filter/map/LFold/...) the same way the
* inline `setRow` path does via `collectRowChangeTriggerStatements`.
*
* Each timefold's cascade template is precomputed at `init()` time and
* stored in BulldozerTimeFoldDownstreamCascade; the rewritten process_queue
* populates `__bulldozer_seq` with newly-emitted rows under the timefold's
* input name and EXECUTEs the template.
*/
import postgres from "postgres";
import { afterAll, beforeAll, beforeEach, describe, expect, test } from "vitest";
import {
declareFilterTable,
declareGroupByTable,
declareMapTable,
declareStoredTable,
declareTimeFoldTable,
toExecutableSqlTransaction,
toQueryableSqlQuery,
} from "./index";
import { loadProcessQueueFunctionSql } from "./test-sql-loaders";
type SqlExpression<T> = { type: "expression", sql: string };
type SqlStatement = { type: "statement", sql: string, outputName?: string };
type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement };
type SqlMapper = { type: "mapper", sql: string };
type SqlPredicate = { type: "predicate", sql: string };
function expr<T>(sql: string): SqlExpression<T> {
return { type: "expression", sql };
}
function mapper(sql: string): SqlMapper {
return { type: "mapper", sql };
}
function predicate(sql: string): SqlPredicate {
return { type: "predicate", sql };
}
const TEST_DB_PREFIX = "stack_bulldozer_queue_downstream_test";
function getTestDbUrls() {
const env = Reflect.get(import.meta, "env");
const connectionString = Reflect.get(env, "STACK_DATABASE_CONNECTION_STRING");
if (typeof connectionString !== "string" || connectionString.length === 0) {
throw new Error("Missing STACK_DATABASE_CONNECTION_STRING");
}
const base = connectionString.replace(/\/[^/]*(\?.*)?$/, "");
const query = connectionString.split("?")[1] ?? "";
const dbName = `${TEST_DB_PREFIX}_${Math.random().toString(16).slice(2, 12)}`;
return {
full: query.length === 0 ? `${base}/${dbName}` : `${base}/${dbName}?${query}`,
base,
};
}
const PROCESS_QUEUE_FN_SQL = loadProcessQueueFunctionSql();
describe.sequential("timefold queue downstream cascade (real postgres)", () => {
const dbUrls = getTestDbUrls();
const dbName = dbUrls.full.replace(/^.*\//, "").replace(/\?.*$/, "");
const adminSql = postgres(dbUrls.base, { onnotice: () => undefined });
const sql = postgres(dbUrls.full, { onnotice: () => undefined, max: 1 });
async function runStatements(statements: SqlStatement[]) {
await sql.unsafe(toExecutableSqlTransaction(statements));
}
async function readRows(query: SqlQuery) {
return await sql.unsafe(toQueryableSqlQuery(query));
}
async function setLastProcessedAt(isoOrExpression: string) {
await sql.unsafe(`
UPDATE "BulldozerTimeFoldMetadata"
SET "lastProcessedAt" = (${isoOrExpression})::timestamptz,
"updatedAt" = now()
WHERE "key" = 'singleton'
`);
}
async function processQueue() {
await sql.unsafe(`SELECT public.bulldozer_timefold_process_queue()`);
}
async function countQueueRows() {
const rows = await sql<Array<{ count: number }>>`
SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue"
`;
return rows[0].count;
}
beforeAll(async () => {
await adminSql.unsafe(`CREATE DATABASE ${dbName}`);
}, 60_000);
// beforeEach does a lot (drop/recreate all bulldozer tables + install the
// ~250-line process_queue plpgsql function); the default 10s vitest hook
// timeout can be tight especially under parallel test files.
const HOOK_TIMEOUT_MS = 60_000;
beforeEach(async () => {
await sql`CREATE EXTENSION IF NOT EXISTS pgcrypto`;
await sql`DROP FUNCTION IF EXISTS public.bulldozer_timefold_process_queue()`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldDownstreamCascade"`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldQueue"`;
await sql`DROP TABLE IF EXISTS "BulldozerTimeFoldMetadata"`;
await sql`DROP TABLE IF EXISTS "BulldozerStorageEngine"`;
await sql`
CREATE TABLE "BulldozerStorageEngine" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"keyPath" JSONB[] NOT NULL,
"keyPathParent" JSONB[] GENERATED ALWAYS AS (
CASE
WHEN cardinality("keyPath") = 0 THEN NULL
ELSE "keyPath"[1:cardinality("keyPath") - 1]
END
) STORED,
"value" JSONB NOT NULL,
CONSTRAINT "BulldozerStorageEngine_pkey" PRIMARY KEY ("id"),
CONSTRAINT "BulldozerStorageEngine_keyPath_key" UNIQUE ("keyPath"),
CONSTRAINT "BulldozerStorageEngine_keyPathParent_fkey"
FOREIGN KEY ("keyPathParent")
REFERENCES "BulldozerStorageEngine"("keyPath")
ON DELETE CASCADE
)
`;
await sql`CREATE INDEX "BulldozerStorageEngine_keyPathParent_idx" ON "BulldozerStorageEngine"("keyPathParent")`;
await sql`
INSERT INTO "BulldozerStorageEngine" ("keyPath", "value")
VALUES
(ARRAY[]::jsonb[], 'null'::jsonb),
(ARRAY[to_jsonb('table'::text)]::jsonb[], 'null'::jsonb)
`;
await sql`
CREATE TABLE "BulldozerTimeFoldQueue" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"tableStoragePath" JSONB[] NOT NULL,
"groupKey" JSONB NOT NULL,
"rowIdentifier" TEXT NOT NULL,
"scheduledAt" TIMESTAMPTZ NOT NULL,
"stateAfter" JSONB NOT NULL,
"rowData" JSONB NOT NULL,
"reducerSql" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldQueue_pkey" PRIMARY KEY ("id"),
CONSTRAINT "BulldozerTimeFoldQueue_table_group_row_key" UNIQUE ("tableStoragePath", "groupKey", "rowIdentifier")
)
`;
await sql`
CREATE INDEX "BulldozerTimeFoldQueue_scheduledAt_idx"
ON "BulldozerTimeFoldQueue"("scheduledAt")
`;
await sql`
CREATE TABLE "BulldozerTimeFoldMetadata" (
"key" TEXT PRIMARY KEY,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastProcessedAt" TIMESTAMPTZ NOT NULL
)
`;
await sql`
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', now())
`;
await sql`
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
)
`;
// Install the rewritten process_queue function body from the cascade
// migration.
await sql.unsafe(PROCESS_QUEUE_FN_SQL);
}, HOOK_TIMEOUT_MS);
afterAll(async () => {
await sql.end();
await adminSql.unsafe(`
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '${dbName}'
AND pid <> pg_backend_pid()
`);
await adminSql.unsafe(`DROP DATABASE IF EXISTS ${dbName}`);
await adminSql.end();
});
// Reducer that emits {phase:'initial'} inline and schedules a past-due tick
// that emits {phase:'scheduled'}. The inline recursion stops because
// nextTimestamp > lastProcessedAt (we back lastProcessedAt off below).
// After process_queue runs, the scheduled-tick emission must propagate.
const splitPhaseReducerSql = `
CASE WHEN "timestamp" IS NULL THEN 1 ELSE 2 END AS "newState",
jsonb_build_array(
jsonb_build_object(
'phase', CASE WHEN "timestamp" IS NULL THEN 'initial' ELSE 'scheduled' END,
'team', "oldRowData"->'team',
'value', (("oldRowData"->>'value')::int)
)
) AS "newRowsData",
CASE
WHEN "timestamp" IS NULL THEN (now() - interval '1 second')
ELSE NULL::timestamptz
END AS "nextTimestamp"
`;
/**
* Reads the `phase` string out of a row's `rowdata` with runtime type
* checks. Used by the delete-before-drain and dollar-quote tests to
* assert which phases made it through the pipeline. Fails loud rather
* than silently returning `undefined` if the row shape is unexpected.
*
* Takes `unknown` (rather than a narrower row type) because the
* `postgres.js` driver's `Row` type doesn't statically guarantee a
* `rowdata` column.
*/
function rowPhase(row: unknown): string {
if (row == null || typeof row !== "object") {
throw new Error(`Expected row object, got ${typeof row}`);
}
const rowData = Reflect.get(row, "rowdata");
if (rowData == null || typeof rowData !== "object") {
throw new Error(`Expected object rowdata, got ${typeof rowData}`);
}
const phase = Reflect.get(rowData, "phase");
if (typeof phase !== "string") {
throw new Error(`Expected string 'phase' field in rowdata, got ${typeof phase}: ${JSON.stringify(rowData)}`);
}
return phase;
}
// ────────────────────────────────────────────────────────────────────
// Test 1: single filter downstream
// ────────────────────────────────────────────────────────────────────
test("process_queue propagates emissions to a single downstream filter", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-filter-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-filter-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-filter-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-filter-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
// Back the clock up before any setRow so the inline recursion bails.
// Use pre-epoch time so the reducer's scheduled tick (now()-1s) stays
// ahead of lastProcessedAt regardless of wall-clock drift.
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`)));
expect(await countQueueRows()).toBe(1);
const filteredBeforeDrain = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(filteredBeforeDrain).toEqual([]);
await setLastProcessedAt(`now()`);
await processQueue();
const filteredAfterDrain = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
// Filter wraps rows through an internal flat-map that appends a
// flatIndex to the rowIdentifier (":1" for the single-element array).
// That's an implementation detail, not something to assert on.
expect(filteredAfterDrain.map((row) => row.rowdata)).toEqual([
{ phase: "scheduled", team: "alpha", value: 7 },
]);
expect(await countQueueRows()).toBe(0);
});
// ────────────────────────────────────────────────────────────────────
// Test 2: multi-stage cascade (timefold → filter → map → map)
// Exercises transitive propagation across three downstream stages.
// ────────────────────────────────────────────────────────────────────
test("process_queue propagates emissions through filter → map → map", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-multistage-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-multistage-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-multistage-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-multistage-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
const mappedTable = declareMapTable({
tableId: "queue-cascade-multistage-u-mapped",
fromTable: filteredTable,
mapper: mapper(`
("rowData"->'team') AS "team",
(("rowData"->>'value')::int * 10) AS "valueTimesTen"
`),
});
const reMappedTable = declareMapTable({
tableId: "queue-cascade-multistage-u-remapped",
fromTable: mappedTable,
mapper: mapper(`
("rowData"->'team') AS "team",
(("rowData"->>'valueTimesTen')::int + 1) AS "valueTimesTenPlusOne"
`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(mappedTable.init());
await runStatements(reMappedTable.init());
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":3}'::jsonb`)));
await runStatements(fromTable.setRow("u2", expr(`'{"team":"alpha","value":5}'::jsonb`)));
expect(await countQueueRows()).toBe(2);
for (const table of [filteredTable, mappedTable, reMappedTable]) {
const rows = await readRows(table.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(rows).toEqual([]);
}
await setLastProcessedAt(`now()`);
await processQueue();
const filtered = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(filtered.map((r) => r.rowdata).sort((a, b) =>
Number(Reflect.get(a as object, "value")) - Number(Reflect.get(b as object, "value"))
)).toEqual([
{ phase: "scheduled", team: "alpha", value: 3 },
{ phase: "scheduled", team: "alpha", value: 5 },
]);
const mapped = await readRows(mappedTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(mapped.map((r) => r.rowdata).sort((a, b) =>
Number(Reflect.get(a as object, "valueTimesTen")) - Number(Reflect.get(b as object, "valueTimesTen"))
)).toEqual([
{ team: "alpha", valueTimesTen: 30 },
{ team: "alpha", valueTimesTen: 50 },
]);
const reMapped = await readRows(reMappedTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(reMapped.map((r) => r.rowdata).sort((a, b) =>
Number(Reflect.get(a as object, "valueTimesTenPlusOne")) - Number(Reflect.get(b as object, "valueTimesTenPlusOne"))
)).toEqual([
{ team: "alpha", valueTimesTenPlusOne: 31 },
{ team: "alpha", valueTimesTenPlusOne: 51 },
]);
});
// ────────────────────────────────────────────────────────────────────
// Test 3: inline path and queue path produce identical downstream state
// ────────────────────────────────────────────────────────────────────
test("inline-drain and queue-drain produce identical downstream state", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-parity-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-parity-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-parity-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-parity-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
// Customer "inline": lastProcessedAt way ahead → tick fires inline.
await setLastProcessedAt(`'2099-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(fromTable.setRow("inline-u1", expr(`'{"team":"inline","value":9}'::jsonb`)));
expect(await countQueueRows()).toBe(0);
// Customer "queue": lastProcessedAt behind → tick queued.
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(fromTable.setRow("queue-u1", expr(`'{"team":"queue","value":9}'::jsonb`)));
expect(await countQueueRows()).toBe(1);
await setLastProcessedAt(`now()`);
await processQueue();
expect(await countQueueRows()).toBe(0);
const inlineRows = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('inline'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
const queueRows = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('queue'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
const normalize = (rows: ReadonlyArray<Record<string, unknown>>) =>
rows.map((row) => {
const rowIdentifier = row.rowidentifier;
if (typeof rowIdentifier !== "string") throw new Error("expected string rowidentifier");
const rowData = row.rowdata;
if (rowData == null || typeof rowData !== "object") throw new Error("expected object rowdata");
return {
rowIdentifierSuffix: rowIdentifier.replace(/^(inline|queue)-u1:/, "u1:"),
rowData: {
...(rowData as Record<string, unknown>),
team: "<normalized>",
},
};
});
expect(normalize([...queueRows])).toEqual(normalize([...inlineRows]));
expect([...inlineRows]).not.toEqual([]);
});
// ────────────────────────────────────────────────────────────────────
// Test 4: idempotency — redraining with no new rows is a no-op
// ────────────────────────────────────────────────────────────────────
test("process_queue is idempotent when there is nothing new to drain", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-idempotency-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-idempotency-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-idempotency-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-idempotency-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":4}'::jsonb`)));
await setLastProcessedAt(`now()`);
await processQueue();
const afterFirstDrain = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(afterFirstDrain).toHaveLength(1);
// Second drain: no due queue rows. No-op at every layer.
await processQueue();
const afterSecondDrain = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(afterSecondDrain).toEqual(afterFirstDrain);
expect(await countQueueRows()).toBe(0);
});
// ────────────────────────────────────────────────────────────────────
// Test 5: deleting a downstream table must NOT wedge process_queue.
//
// The cascade template is compiled at upstream-init() time and
// references the downstream's storage paths. If the downstream is
// .delete()d while the upstream still has queue rows pending, the
// drain must still succeed without a FK violation. The safety comes
// from every trigger's first statement carrying a
// `WHERE isInitializedExpression` clause that short-circuits the rest
// of the pipeline when the downstream's metadata row is absent. The
// queue-drain cascade inherits the same statements verbatim, so it
// inherits the same safety — this test pins that invariant down.
// ────────────────────────────────────────────────────────────────────
test("process_queue does not wedge when a downstream table is deleted", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-deleted-downstream-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-deleted-downstream-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-deleted-downstream-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-deleted-downstream-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`)));
expect(await countQueueRows()).toBe(1);
// At this point the inline setRow path has already propagated the
// {phase:initial} row all the way through to the filter's storage.
// Blow the filter's storage away while the {phase:scheduled} tick
// is still queued — the upstream's cascade template was compiled
// referencing the filter's paths and is NOT updated by .delete(),
// so this is the delete-before-drain scenario.
await runStatements(filteredTable.delete());
await setLastProcessedAt(`now()`);
// If the cascade template's WHERE-gated statements didn't no-op on
// missing downstream storage, this would throw an FK violation and
// the queue would stay wedged forever.
await processQueue();
expect(await countQueueRows()).toBe(0);
// The timefold's own state must reflect both emissions (the inline
// {initial} from before the delete and the queue-drained {scheduled}
// from after). A wedged drain would roll everything back and leave
// the timefold with only the initial row.
const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]);
});
// ────────────────────────────────────────────────────────────────────
// Test 6: statements whose SQL contains `$tf_cascade$` as a literal
// substring must not prematurely close the outer cascade DO-block.
//
// The cascade template is wrapped in `DO $tf_cascade$ ... $tf_cascade$`.
// If any embedded statement happens to include that literal — e.g. in
// a SQL comment or a user-provided expression — the outer dollar quote
// closes mid-body and EXECUTE fails at parse time.
// ────────────────────────────────────────────────────────────────────
test("process_queue tolerates downstream SQL containing the cascade dollar-quote delimiter", async () => {
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-dollar-collision-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-dollar-collision-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-dollar-collision-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
// The filter predicate embeds the literal `$tf_cascade$` in a way
// that always evaluates true. This text flows through
// `collectRowChangeTriggerStatements` into the stored cascade
// template verbatim, so if the outer dollar-quoting is not robust,
// parse time EXECUTE inside process_queue will fail.
const filteredTable = declareFilterTable({
tableId: "queue-cascade-dollar-collision-u-all",
fromTable: timeFoldTable,
filter: predicate(`('$tf_cascade$' IS NOT NULL) OR ("rowData"->>'phase' = 'scheduled')`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":11}'::jsonb`)));
expect(await countQueueRows()).toBe(1);
await setLastProcessedAt(`now()`);
// If the outer dollar-quote delimiter collided with the embedded
// `$tf_cascade$` string, EXECUTE raises `syntax error at or near ...`
// and the entire drain rolls back (queue stays at 1). The fix
// (`chooseSafeDollarQuoteTag`) randomizes the outer tag per call, so
// the embedded literal is just another string in the body.
await processQueue();
expect(await countQueueRows()).toBe(0);
// Predicate evaluates to always-true thanks to the first disjunct,
// so both the inline-emitted {initial} row and the queue-drained
// {scheduled} row make it through. The point of the assertion is
// that the cascade ran and wrote the scheduled row at all — if the
// delimiter collision had broken the DO block, we'd see only
// {initial} (written synchronously at setRow time, before the queue
// drain).
const filteredRows = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(filteredRows.map(rowPhase).sort()).toEqual(["initial", "scheduled"]);
});
// ────────────────────────────────────────────────────────────────────
// when process_queue() can't find a cascade
// registry row for a timefold, it must DEFER that timefold's queue
// rows (leave them queued) instead of draining them and silently
// skipping the downstream cascade.
//
// Why this matters — a concrete example of the failure mode:
//
// 1. This migration (20260417000000) creates the registry table
// BulldozerTimeFoldDownstreamCascade. It's empty at first.
//
// 2. The backend starts up. `declareTimeFoldTable.init()` runs for
// every timefold and upserts one row per timefold into the
// registry, storing the pre-compiled cascade SQL template.
//
// 3. There is a short gap between (1) and (2). During that gap,
// pg_cron keeps calling process_queue() every second.
//
// 4. If a due queue row exists for a timefold whose registry row
// hasn't been upserted yet, and process_queue() drains it
// anyway without a cascade to run, then:
//
// - the queue row is gone,
// - the timefold's own state is updated,
// - but NONE of the downstream filters/maps/LFolds hear
// about it, and there's no queue row left for a future
// tick to retry.
//
// → downstream tables are permanently desynchronized from
// the timefold.
//
// The right behavior: no registry row → do nothing for this
// timefold this tick. The queue row stays queued. Once init()
// finishes and the registry row appears, the next pg_cron tick
// drains with cascade intact.
//
// This test simulates the deploy-window gap by deleting the
// registry row after init() has run (so we know the rest of the
// pipeline is set up), then calling process_queue() and asserting
// that nothing silently advanced.
// ────────────────────────────────────────────────────────────────────
test("process_queue defers a timefold whose cascade registry row is missing", async () => {
// ---- Setup: build a small timefold pipeline ----
//
// The chain is: source data → group by team → timefold (recurses
// through time) → filter (keeps only phase=scheduled rows).
//
// Calling each table's init() wires up its storage. The
// timeFoldTable's init() is the one that inserts the registry
// row we care about — it stores the cascade template that
// process_queue() looks up at drain time.
const fromTable = declareStoredTable<{ value: number, team: string }>({ tableId: "queue-cascade-missing-registry-u" });
const groupedTable = declareGroupByTable({
tableId: "queue-cascade-missing-registry-u-by-team",
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const timeFoldTable = declareTimeFoldTable({
tableId: "queue-cascade-missing-registry-u-folded",
fromTable: groupedTable,
initialState: expr(`'0'::jsonb`),
reducer: mapper(splitPhaseReducerSql),
});
const filteredTable = declareFilterTable({
tableId: "queue-cascade-missing-registry-u-scheduled-only",
fromTable: timeFoldTable,
filter: predicate(`"rowData"->>'phase' = 'scheduled'`),
});
await runStatements(fromTable.init());
await runStatements(groupedTable.init());
// Backdate the "last processed" clock so any future tick the
// reducer schedules looks like it's still in the future when
// setRow fires. That way the {scheduled} emission gets QUEUED
// instead of running inline.
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(timeFoldTable.init());
await runStatements(filteredTable.init());
// ---- Generate one inline emission + one queued emission ----
//
// splitPhaseReducerSql is written so:
// - First call (timestamp=NULL) emits {phase:'initial'} and
// schedules a future tick.
// - The future tick (when drained) emits {phase:'scheduled'}.
//
// setRow fires the inline path: {initial} flows through the
// whole chain right now. The filter predicate
// `"rowData"->>'phase' = 'scheduled'` rejects {initial}, so the
// filter stays empty. Meanwhile the scheduled tick lands in
// BulldozerTimeFoldQueue for later.
await runStatements(fromTable.setRow("u1", expr(`'{"team":"alpha","value":7}'::jsonb`)));
expect(await countQueueRows()).toBe(1);
// ---- Simulate the deploy-window gap ----
//
// We just saw init() upsert a registry row above. To mimic the
// case where init() hasn't run yet (migration applied but
// backend hasn't reached init() for this timefold), delete
// every row in the registry table by hand. Now it looks
// identical to the fresh-after-migration state.
await sql.unsafe(`DELETE FROM "BulldozerTimeFoldDownstreamCascade"`);
// ---- Run process_queue() as pg_cron would ----
//
// Advance the clock past the queued tick's scheduledAt so it's
// due, then drain.
await setLastProcessedAt(`now()`);
await processQueue();
// ---- Assert we deferred instead of silently losing state ----
//
// (1) The queue row is still there. process_queue() saw that
// the registry had no row for this timefold and said "I
// don't know which cascade to run, so I'll leave this for
// the next tick." Nothing was drained, nothing was
// skipped.
expect(await countQueueRows()).toBe(1);
// (2) The timefold's own state wasn't advanced by the drain.
// Only the inline-emitted {initial} row is visible; the
// {scheduled} row is still sitting in the queue waiting
// for a tick with a registered cascade to process it.
//
// Contrast with the buggy (pre-fix) behavior: the timefold
// would have had BOTH "initial" and "scheduled" here, with
// the filter permanently missing "scheduled".
const timefoldRows = await readRows(timeFoldTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(timefoldRows.map(rowPhase).sort()).toEqual(["initial"]);
// (3) The filter is empty, which is the correct steady state:
// the inline setRow's {initial} was filtered out, and the
// {scheduled} row hasn't been drained yet. No partial
// writes, no orphan rows. Once init() runs and the
// registry is populated, the next pg_cron tick will drain
// the queue row and propagate {scheduled} into this
// filter.
const filterRows = await readRows(filteredTable.listRowsInGroup({
groupKey: expr(`to_jsonb('alpha'::text)`),
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
expect(filterRows).toEqual([]);
});
});

View File

@ -0,0 +1,292 @@
/**
* Queue-drained variant of the phase 13 integration tests: subscription
* lifecycle events scheduled in the future (sub-end on
* `cancelAtPeriodEnd`, monthly item-grant-repeat ticks) defer to the
* BulldozerTimeFoldQueue and must propagate through the downstream cascade
* events transactions itemQuantities / ownedProducts when drained
* by `public.bulldozer_timefold_process_queue()` (the pg_cron path).
*
* The sibling `integration-1-3.test.ts` seeds lastProcessedAt = 2099 so
* every scheduled tick fires inline at setRow time and the queue is never
* exercised. These tests instead keep lastProcessedAt at the present, let
* future ticks stay queued, then advance the clock and invoke the drain
* function mirroring real pg_cron behaviour.
*/
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { createPaymentsSchema } from "../index";
import { createTestDb, jsonbExpr } from "./test-helpers";
const DAY_MS = 86400000;
const MONTH_MS = 2592000000;
describe.sequential("payments schema integration phase 1→3, queue-drained path (real postgres)", () => {
// Clock starts at now() so that future scheduledAt values stay queued
// (vs the default `createTestDb` behaviour of lastProcessedAt = 2099,
// which would fire every tick inline). `installProcessQueueFn` installs
// the rewritten process_queue function body from the cascade migration
// so `processQueue()` exercises the real prod function.
const db = createTestDb({
lastProcessedAt: "now()",
installProcessQueueFn: true,
});
const { runStatements, readRows, setLastProcessedAt, processQueue, countQueueRows } = db;
const schema = createPaymentsSchema();
const getRowDatas = async (table: { listRowsInGroup: (opts: any) => any }) => {
const rows = await readRows(table.listRowsInGroup({
start: "start", end: "end", startInclusive: true, endInclusive: true,
}));
return rows.map((r: any) => r.rowdata);
};
beforeAll(async () => {
await db.setup();
for (const table of schema._allTables) {
await runStatements(table.init());
}
}, 120_000);
afterAll(async () => {
await db.teardown();
});
// ============================================================
// Test 6: mid-period upgrade with pg_cron-drained end event
//
// Free sub ending mid-period + team sub starting the next day.
// With lastProcessedAt in the recent past, the free sub's end event
// gets QUEUED (not fired inline). Once drained, downstream ledgers
// must reflect: only team's 500 emails, not 100 + 500 = 600.
// ============================================================
describe("mid-period upgrade with queue-drained end event", () => {
it("queues the subscription-end event instead of firing inline", async () => {
// subscription-timefold-algo derives nextTimestamp from millis fields
// like endedAtMillis / repeat intervals. These are raw epoch millis,
// so they map to ~1970. To make the inline recursion's
// `nextTimestamp > lastProcessedAt` check hold (= "defer to queue"),
// we set lastProcessedAt to pre-epoch.
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(schema.subscriptions.setRow("sub-q-free", jsonbExpr({
id: "sub-q-free",
tenancyId: "t1",
customerId: "u-q-upgrade",
customerType: "user",
productId: "prod-q-free",
priceId: "p-free",
product: {
displayName: "Free (queued)",
customerType: "user",
productLineId: "line-q-upgrade",
prices: { "p-free": { USD: "0" } },
includedItems: {
emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 0,
currentPeriodEndMillis: MONTH_MS,
cancelAtPeriodEnd: false,
canceledAtMillis: 10 * DAY_MS,
endedAtMillis: 10 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
// The subscription-end tick is scheduled at `endedAtMillis` which is
// > lastProcessedAt (10 days from epoch > "one hour ago"). It must
// therefore be queued, not emitted inline.
const queued = await countQueueRows();
expect(queued).toBeGreaterThan(0);
const endEventsBeforeDrain = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId === "sub-q-free");
expect(endEventsBeforeDrain).toEqual([]);
});
it("after drain, end event fires AND downstream cascade runs; upgrade does not stack", async () => {
// Now bring the team sub online.
await runStatements(schema.subscriptions.setRow("sub-q-team", jsonbExpr({
id: "sub-q-team",
tenancyId: "t1",
customerId: "u-q-upgrade",
customerType: "user",
productId: "prod-q-team",
priceId: "p-team",
product: {
displayName: "Team (queued)",
customerType: "user",
productLineId: "line-q-upgrade",
prices: { "p-team": { USD: "30" } },
includedItems: {
emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 11 * DAY_MS,
currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS,
cancelAtPeriodEnd: true,
canceledAtMillis: 20 * DAY_MS,
endedAtMillis: 20 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 11 * DAY_MS,
})));
// Bump clock far enough forward to make every queued tick due, then drain.
await setLastProcessedAt(`'2099-01-01T00:00:00Z'`);
await processQueue();
expect(await countQueueRows()).toBe(0);
const endEvents = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId === "sub-q-free");
expect(endEvents).toHaveLength(1);
const transactions = (await getRowDatas(schema.transactions))
.filter((t: any) => t.customerId === "u-q-upgrade");
const endTxns = transactions.filter((t: any) => t.txnId === "sub-end:sub-q-free");
expect(endTxns).toHaveLength(1);
const itemQuantityRows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-q-upgrade")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
const atTeamStart = itemQuantityRows.find((r: any) => r.txnId === "sub-start:sub-q-team");
expect(atTeamStart).toBeDefined();
// Stacking regression: without downstream-cascade propagation on the
// queue path, sub-free's end event never fires the cascade, so
// atTeamStart.emails accumulates free.100 + team.500 = 600 instead of
// the team-only 500.
expect(atTeamStart.itemQuantities.emails).toBe(500);
});
});
// ============================================================
// Test 7: monthly repeat reset via pg_cron
//
// Sub with a monthly-repeating quota item. Between sub-start and the
// first repeat, balance must reflect the initial grant. After pg_cron
// drains the repeat tick, balance must reflect the REFRESHED grant
// (not doubled, not zero).
// ============================================================
describe("monthly repeat reset via queue drain", () => {
it("repeat tick is queued until the clock is advanced past it", async () => {
await setLastProcessedAt(`'1969-01-01T00:00:00Z'`);
await runStatements(schema.subscriptions.setRow("sub-q-repeat", jsonbExpr({
id: "sub-q-repeat",
tenancyId: "t1",
customerId: "u-q-repeat",
customerType: "user",
productId: "prod-q-repeat",
priceId: "p1",
product: {
displayName: "Repeat (queued)",
customerType: "user",
productLineId: "line-q-repeat",
prices: { p1: { USD: "10" } },
includedItems: {
quota: { quantity: 200, repeat: [1, "month"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "active",
currentPeriodStartMillis: 0,
currentPeriodEndMillis: MONTH_MS,
cancelAtPeriodEnd: false,
canceledAtMillis: null,
endedAtMillis: 45 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
expect(await countQueueRows()).toBeGreaterThan(0);
const initialRows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-q-repeat")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
const atStart = initialRows.find((r: any) => r.txnId === "sub-start:sub-q-repeat");
expect(atStart).toBeDefined();
expect(atStart.itemQuantities.quota).toBe(200);
});
it("after drain, quota is refreshed (not doubled, not stuck)", async () => {
// Advance clock past all repeats (and past endedAt).
await setLastProcessedAt(`'2099-01-01T00:00:00Z'`);
await processQueue();
expect(await countQueueRows()).toBe(0);
const rows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-q-repeat")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
// After every repeat and the final subscription-end, quota must drop
// to 0 — both repeat grants and the final one expire. Without the
// downstream cascade on the queue path, queued events never reach
// itemQuantities and this stays at the initial 200 (or whatever the
// inline path wrote at setRow time).
const latest = rows[rows.length - 1];
expect(latest.itemQuantities.quota).toBe(0);
// Between repeats, there should be exactly one active repeat grant at
// a time — never doubled. The queue drain is expected to emit at
// least one `igr:sub-q-repeat:*` row into itemQuantities (the
// 30-day repeat fires once between sub-start at t=0 and sub-end at
// t=45d). Asserting its presence unconditionally is the whole point
// of this test — the original regression was that the cascade
// dropped these emissions entirely, so a `find() == null` must fail
// loud, not silently skip.
const midPeriodRow = rows.find((r: any) =>
r.txnId?.startsWith("igr:sub-q-repeat:") && r.itemQuantities?.quota != null
);
expect(
midPeriodRow,
"expected at least one igr:sub-q-repeat:* row in itemQuantities; if this is null the queue-drain cascade dropped the repeat emission entirely",
).toBeDefined();
expect(midPeriodRow.itemQuantities.quota).toBe(200);
});
});
// ============================================================
// Test 8: re-drain idempotency at the payments layer
//
// Draining twice with nothing new in between must not duplicate
// subscription-end events, transactions, or item-quantity changes.
// ============================================================
describe("re-drain idempotency at the payments layer", () => {
it("second process_queue call with no new queue rows is a no-op", async () => {
// Snapshot counts after prior tests' drains.
const endEventsBefore = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId?.startsWith("sub-q-"));
const endTxnsBefore = (await getRowDatas(schema.transactions))
.filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-"));
const itemQuantitiesBefore = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId?.startsWith("u-q-"));
expect(await countQueueRows()).toBe(0);
await processQueue();
const endEventsAfter = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId?.startsWith("sub-q-"));
const endTxnsAfter = (await getRowDatas(schema.transactions))
.filter((t: any) => t.txnId?.startsWith("sub-end:sub-q-"));
const itemQuantitiesAfter = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId?.startsWith("u-q-"));
expect(endEventsAfter).toHaveLength(endEventsBefore.length);
expect(endTxnsAfter).toHaveLength(endTxnsBefore.length);
expect(itemQuantitiesAfter).toHaveLength(itemQuantitiesBefore.length);
});
});
});

View File

@ -893,6 +893,371 @@ describe.sequential("payments schema integration phase 1→3 (real postgres)", (
});
// ============================================================
// Full when-repeated lifecycle: sub-start → item-grant-repeat → sub-end.
// item-quantity-expire entries in the sub-end transaction reference the
// preceding item-grant-repeat by txn id. Both the id text and the
// reference text must match byte-for-byte or the expire silently fails
// to resolve the grant and the `when-repeated` balance stays at the
// last-granted quantity instead of dropping to 0.
// ============================================================
describe("item-quantity-expire resolves across item-grant-repeat → sub-end", () => {
const DAY_MS = 86400000;
beforeAll(async () => {
await runStatements(schema.subscriptions.setRow("sub-repeat-to-end", jsonbExpr({
id: "sub-repeat-to-end",
tenancyId: "t1",
customerId: "u-repeat-to-end",
customerType: "user",
productId: "prod-repeat-to-end",
priceId: "p1",
product: {
displayName: "Repeat Then End Plan",
customerType: "user",
productLineId: "line-repeat-to-end",
prices: { p1: { USD: "10" } },
includedItems: {
// 7-day repeat interval is an exact whole-second epoch offset,
// which is where subtle NUMERIC-vs-bigint mismatches around
// `->>effectiveAtMillis` tend to surface.
quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 0,
currentPeriodEndMillis: MONTH_MS,
cancelAtPeriodEnd: true,
// Ends at 14d: fires one repeat at 7d, then sub-end at 14d.
canceledAtMillis: 14 * DAY_MS,
endedAtMillis: 14 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
});
it("item-grant-repeat transaction id has no trailing decimals", async () => {
const txns = (await getRowDatas(schema.transactions))
.filter((t: any) => t.customerId === "u-repeat-to-end");
const igr = txns.find((t: any) =>
typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:")
);
expect(igr).toBeDefined();
// transactions.ts derives this id from the event's effectiveAtMillis
// via `->>`. If that value was stored in JSONB as a NUMERIC with
// fractional scale (e.g. "604800000.000000") the id text picks up
// the trailing zeros and no longer matches references built via
// `::bigint::text` elsewhere in the reducer.
expect(igr.txnId).toMatch(/^igr:sub-repeat-to-end:\d+$/);
expect(igr.txnId).not.toContain(".");
});
it("sub-end's item-quantity-expire adjustedTransactionId matches the igr txn id", async () => {
const txns = (await getRowDatas(schema.transactions))
.filter((t: any) => t.customerId === "u-repeat-to-end");
const igr = txns.find((t: any) =>
typeof t.txnId === "string" && t.txnId.startsWith("igr:sub-repeat-to-end:")
);
const subEnd = txns.find((t: any) => t.txnId === "sub-end:sub-repeat-to-end");
expect(igr).toBeDefined();
expect(subEnd).toBeDefined();
const expireEntry = (subEnd.entries as any[]).find((e: any) =>
e.type === "item-quantity-expire" && e.itemId === "quota"
);
expect(expireEntry).toBeDefined();
// The two texts must be byte-identical for the expire to resolve
// the grant. Same value in different representations (e.g.
// "604800000" vs "604800000.000000") is the failure mode this
// guards against.
expect(expireEntry.adjustedTransactionId).toBe(igr.txnId);
});
it("quota balance drops to 0 after sub-end resolves the igr's grant", async () => {
const rows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-repeat-to-end")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
expect(rows.length).toBeGreaterThan(0);
const latest = rows[rows.length - 1];
// If the expire ref mismatches the igr txn id, the expire silently
// becomes a no-op and quota stays at the last igr-granted quantity
// (100). When the ids match, sub-end's expire resolves and the
// ledger drops to 0.
expect(latest.itemQuantities.quota).toBe(0);
});
});
// ============================================================
// Same bigint-vs-NUMERIC txn-id fingerprint as the subscription test
// above, but on the one-time-purchase algo (no sub-end analog). A
// one-time purchase with a repeating `when-repeated` item relies on
// each IGR expiring the previous IGR's grant by referencing its
// `txnId` inside `item-quantity-expire.adjustedTransactionId`. If the
// reducer-internal reference (decimal-free via `::text`) doesn't
// match the downstream-materialized txn id (built from
// `->>effectiveAtMillis` stored in JSONB), consecutive IGRs stack
// instead of refreshing and `quota` climbs without bound.
// ============================================================
describe("item-quantity-expire resolves across consecutive one-time-purchase item-grant-repeats", () => {
const DAY_MS = 86400000;
beforeAll(async () => {
// OTP has no terminating event (unlike subscriptions with endedAt),
// so the inline reducer would otherwise try to fire ~6700 repeats
// between createdAtMillis=0 and the test-helpers default
// lastProcessedAt=2099 and time out the hook. Cap the clock to 16d
// post-epoch for this describe so exactly 2 IGRs fire inline (at
// +7d and +14d) — the minimum needed to exercise the IGR-N expiring
// IGR-(N-1)'s grant path. Restored to the default in afterAll so
// later tests in this file still run under inline recursion.
await db.setLastProcessedAt(`'1970-01-17T00:00:00Z'`);
await runStatements(schema.oneTimePurchases.setRow("otp-repeat-bigint", jsonbExpr({
id: "otp-repeat-bigint",
tenancyId: "t1",
customerId: "u-otp-repeat-bigint",
customerType: "user",
productId: "prod-otp-repeat-bigint",
priceId: "p1",
product: {
displayName: "Repeating OTP (bigint regression)",
customerType: "user",
productLineId: "line-otp-repeat-bigint",
prices: { p1: { USD: "5" } },
includedItems: {
// 7-day repeat: same whole-second epoch offset the subscription
// test uses to surface NUMERIC-vs-bigint mismatches. Two IGRs
// fire inline under the clock-cap above — enough to exercise
// the second IGR's `previousGrantsToExpire` referencing the
// first's outstandingGrants[].txnId.
quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" },
},
},
quantity: 1,
stripePaymentIntentId: null,
revokedAtMillis: null,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
});
afterAll(async () => {
// Restore the default so subsequent describes in this file still
// fire their reducer recursion inline (the payments suite relies
// on this).
await db.setLastProcessedAt(`'2099-01-01T00:00:00Z'`);
});
it("every item-grant-repeat txn id is decimal-free", async () => {
const txns = (await getRowDatas(schema.transactions))
.filter((t: any) => t.customerId === "u-otp-repeat-bigint");
const igrs = txns.filter((t: any) =>
typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:")
);
expect(igrs.length).toBeGreaterThan(0);
for (const igr of igrs) {
expect(igr.txnId).toMatch(/^igr:otp-repeat-bigint:\d+$/);
expect(igr.txnId).not.toContain(".");
}
});
it("every IGR after the first carries an item-quantity-expire whose adjustedTransactionId matches the prior IGR's txnId", async () => {
const txns = (await getRowDatas(schema.transactions))
.filter((t: any) => t.customerId === "u-otp-repeat-bigint")
.sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis);
const igrs = txns.filter((t: any) =>
typeof t.txnId === "string" && t.txnId.startsWith("igr:otp-repeat-bigint:")
);
// Need ≥ 2 IGRs for the expire reference path to fire. Any fewer
// means the test setup isn't exercising the bug's surface area —
// fail loud rather than silently passing.
expect(igrs.length).toBeGreaterThanOrEqual(2);
for (let i = 1; i < igrs.length; i++) {
const priorIgr = igrs[i - 1];
const currentIgr = igrs[i];
const expireEntry = (currentIgr.entries as any[]).find((e: any) =>
e.type === "item-quantity-expire" && e.itemId === "quota"
);
expect(expireEntry, `IGR ${currentIgr.txnId} missing item-quantity-expire for quota`).toBeDefined();
// Byte-identical match is the invariant: if the prior IGR's
// downstream txnId picked up a `.000000` tail and this expire
// built its adjustedTransactionId via `::text` on a bigint, the
// two strings diverge and the expire silently no-ops.
expect(expireEntry.adjustedTransactionId).toBe(priorIgr.txnId);
}
});
it("quota balance stays at exactly the per-repeat grant, never stacks", async () => {
const rows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-otp-repeat-bigint")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
expect(rows.length).toBeGreaterThan(0);
// Without the fix, each IGR adds 100 without expiring the previous
// grant, so quota climbs monotonically (100, 200, 300, …). With
// the fix, each IGR expires the prior grant before adding its own,
// and quota stays at exactly 100.
const latest = rows[rows.length - 1];
expect(latest.itemQuantities.quota).toBe(100);
});
});
// ============================================================
// when-repeated grants must expire at subscription-end
// (regression: they were previously left stacked in the ledger)
// ============================================================
describe("when-repeated grants expire at subscription-end", () => {
const DAY_MS = 86400000;
beforeAll(async () => {
await runStatements(schema.subscriptions.setRow("sub-repeat-end", jsonbExpr({
id: "sub-repeat-end",
tenancyId: "t1",
customerId: "u-repeat-end",
customerType: "user",
productId: "prod-repeat-end",
priceId: "p1",
product: {
displayName: "Repeat End Plan",
customerType: "user",
productLineId: "line-repeat-end",
prices: { p1: { USD: "10" } },
includedItems: {
quota: { quantity: 100, repeat: [7, "day"], expires: "when-repeated" },
permanent: { quantity: 25, expires: "never" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 0,
currentPeriodEndMillis: MONTH_MS,
cancelAtPeriodEnd: true,
canceledAtMillis: 2 * DAY_MS,
endedAtMillis: 5 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
});
it("should drop when-repeated item balance to 0 after subscription-end", async () => {
const rows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-repeat-end")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
const latest = rows[rows.length - 1];
expect(latest.itemQuantities.quota).toBe(0);
// Permanent grants must not be touched.
expect(latest.itemQuantities.permanent).toBe(25);
});
it("should revoke owned product at subscription-end", async () => {
const rows = (await getRowDatas(schema.ownedProducts))
.filter((r: any) => r.customerId === "u-repeat-end")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
const afterEnd = rows.find((r: any) => r.txnId === "sub-end:sub-repeat-end");
expect(afterEnd).toBeDefined();
expect(afterEnd.ownedProducts["prod-repeat-end"].quantity).toBe(0);
});
});
// ============================================================
// Upgrade stacking regression: free → team mid-period must not
// leave the outgoing sub's monthly allowance stacked on top of
// the incoming sub's allowance.
// ============================================================
describe("mid-period upgrade does not stack when-repeated balances", () => {
const DAY_MS = 86400000;
beforeAll(async () => {
await runStatements(schema.subscriptions.setRow("sub-upgrade-free", jsonbExpr({
id: "sub-upgrade-free",
tenancyId: "t1",
customerId: "u-upgrade",
customerType: "user",
productId: "prod-upgrade-free",
priceId: "p-free",
product: {
displayName: "Free",
customerType: "user",
productLineId: "line-upgrade",
prices: { "p-free": { USD: "0" } },
includedItems: {
emails: { quantity: 100, repeat: [1, "month"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 0,
currentPeriodEndMillis: MONTH_MS,
cancelAtPeriodEnd: false,
canceledAtMillis: 10 * DAY_MS,
endedAtMillis: 10 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 0,
})));
await runStatements(schema.subscriptions.setRow("sub-upgrade-team", jsonbExpr({
id: "sub-upgrade-team",
tenancyId: "t1",
customerId: "u-upgrade",
customerType: "user",
productId: "prod-upgrade-team",
priceId: "p-team",
product: {
displayName: "Team",
customerType: "user",
productLineId: "line-upgrade",
prices: { "p-team": { USD: "30" } },
includedItems: {
emails: { quantity: 500, repeat: [1, "month"], expires: "when-repeated" },
},
},
quantity: 1,
stripeSubscriptionId: null,
status: "canceled",
currentPeriodStartMillis: 11 * DAY_MS,
currentPeriodEndMillis: 11 * DAY_MS + MONTH_MS,
cancelAtPeriodEnd: true,
canceledAtMillis: 20 * DAY_MS,
endedAtMillis: 20 * DAY_MS,
refundedAtMillis: null,
creationSource: "TEST_MODE",
createdAtMillis: 11 * DAY_MS,
})));
});
it("should show only the incoming sub's allowance right after the upgrade", async () => {
const rows = (await getRowDatas(schema.itemQuantities))
.filter((r: any) => r.customerId === "u-upgrade")
.sort((a: any, b: any) => a.txnEffectiveAtMillis - b.txnEffectiveAtMillis);
const atUpgrade = rows.find((r: any) => r.txnId === "sub-start:sub-upgrade-team");
expect(atUpgrade).toBeDefined();
// Before the fix this was 100 (free) + 500 (team) = 600 because the
// free sub's when-repeated grant was not expired at subscription-end.
expect(atUpgrade.itemQuantities.emails).toBe(500);
});
});
// ============================================================
// Subscription map LFold
// ============================================================

View File

@ -345,6 +345,107 @@ describe.sequential("payments schema phase 1 (real postgres)", () => {
.filter((e: any) => e.subscriptionId === "sub-tf-start");
expect(endEvents).toHaveLength(0);
});
it("should expire when-repeated grants alongside when-purchase-expires when ending before any repeat", async () => {
await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-pre", jsonbExpr(makeSubscription("sub-tf-end-mix-pre", {
product: {
displayName: "Mix Pre-Repeat Plan",
customerType: "user",
productLineId: "line-tf-end-mix-pre",
prices: { p1: { USD: "10" } },
includedItems: {
storage: { quantity: 100, expires: "when-purchase-expires" },
quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" },
},
},
endedAtMillis: 3 * DAY_MS,
createdAtMillis: 0,
}))));
const endEvents = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId === "sub-tf-end-mix-pre");
expect(endEvents).toHaveLength(1);
const expiredByItem = new Map<string, any>();
for (const expiry of endEvents[0].itemQuantityChangesToExpire) {
expiredByItem.set(expiry.itemId, expiry);
}
expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]);
expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-pre");
expect(expiredByItem.get("quota").transactionId).toBe("sub-start:sub-tf-end-mix-pre");
expect(expiredByItem.get("quota").quantity).toBe(500);
const repeats = (await getRowDatas(schema.itemGrantRepeatEvents))
.filter((e: any) => e.sourceId === "sub-tf-end-mix-pre");
expect(repeats).toHaveLength(0);
});
it("should reference the latest igr txnId for when-repeated grants when ending after repeats", async () => {
await runStatements(schema.subscriptions.setRow("sub-tf-end-mix-post", jsonbExpr(makeSubscription("sub-tf-end-mix-post", {
product: {
displayName: "Mix Post-Repeat Plan",
customerType: "user",
productLineId: "line-tf-end-mix-post",
prices: { p1: { USD: "10" } },
includedItems: {
storage: { quantity: 100, expires: "when-purchase-expires" },
quota: { quantity: 500, repeat: [7, "day"], expires: "when-repeated" },
},
},
endedAtMillis: 17 * DAY_MS,
createdAtMillis: 0,
}))));
const endEvents = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId === "sub-tf-end-mix-post");
expect(endEvents).toHaveLength(1);
const expiredByItem = new Map<string, any>();
for (const expiry of endEvents[0].itemQuantityChangesToExpire) {
expiredByItem.set(expiry.itemId, expiry);
}
expect([...expiredByItem.keys()].sort()).toEqual(["quota", "storage"]);
// storage never repeated; still points at sub-start
expect(expiredByItem.get("storage").transactionId).toBe("sub-start:sub-tf-end-mix-post");
// quota last repeated at 14d; should point at that igr txn
const latestRepeatMillis = 14 * DAY_MS;
expect(expiredByItem.get("quota").transactionId).toBe(`igr:sub-tf-end-mix-post:${latestRepeatMillis}`);
expect(expiredByItem.get("quota").quantity).toBe(500);
const repeats = (await getRowDatas(schema.itemGrantRepeatEvents))
.filter((e: any) => e.sourceId === "sub-tf-end-mix-post")
.sort((a: any, b: any) => a.effectiveAtMillis - b.effectiveAtMillis);
expect(repeats.map((r: any) => r.effectiveAtMillis)).toEqual([7 * DAY_MS, 14 * DAY_MS]);
});
it("should NOT expire permanent grants (expires=never, absent, or invalid)", async () => {
await runStatements(schema.subscriptions.setRow("sub-tf-end-permanent", jsonbExpr(makeSubscription("sub-tf-end-permanent", {
product: {
displayName: "Permanent Grants Plan",
customerType: "user",
productLineId: "line-tf-end-permanent",
prices: { p1: { USD: "10" } },
includedItems: {
expiring: { quantity: 50, expires: "when-purchase-expires" },
repeating: { quantity: 10, repeat: [1, "day"], expires: "when-repeated" },
permanent_never: { quantity: 20, expires: "never" },
permanent_absent: { quantity: 30 },
permanent_invalid: { quantity: 40, expires: "not-a-real-value" },
},
},
endedAtMillis: 2 * MONTH_MS,
createdAtMillis: 0,
}))));
const endEvents = (await getRowDatas(schema.subscriptionEndEvents))
.filter((e: any) => e.subscriptionId === "sub-tf-end-permanent");
expect(endEvents).toHaveLength(1);
const expiredItemIds = endEvents[0].itemQuantityChangesToExpire.map((e: any) => e.itemId).sort();
expect(expiredItemIds).toEqual(["expiring", "repeating"]);
});
});

View File

@ -8,6 +8,7 @@
import postgres from "postgres";
import { toExecutableSqlTransaction, toQueryableSqlQuery } from "@/lib/bulldozer/db/index";
import { loadProcessQueueFunctionSql } from "@/lib/bulldozer/db/test-sql-loaders";
type SqlStatement = { type: "statement", sql: string, outputName?: string };
type SqlQuery = { type: "query", sql: string, toStatement(outputName?: string): SqlStatement };
@ -21,13 +22,36 @@ function getConnectionString(): string {
return connectionString;
}
export type CreateTestDbOptions = {
/**
* SQL expression used to seed `BulldozerTimeFoldMetadata.lastProcessedAt`
* at setup time. The default (`'2099-01-01T00:00:00Z'::timestamptz`) puts
* the metadata clock far in the future so every timefold tick fires
* inline at `setRow` time this is what most payments tests rely on.
*
* Set this to `now()` (or a pre-epoch timestamp) to exercise the queued
* path where future ticks defer to `bulldozer_timefold_process_queue()`.
*/
lastProcessedAt?: string,
/**
* When true, also installs `public.bulldozer_timefold_process_queue()`
* from the cascade migration so callers can invoke `processQueue()`
* against a real prod-shape function body. Default: false (inline-path
* tests don't need it).
*/
installProcessQueueFn?: boolean,
};
/**
* Creates an isolated test database. Call `setup()` in beforeAll and
* `teardown()` in afterAll. Access `runStatements` / `readRows` after setup.
*
* Follows the same pattern as apps/backend/src/lib/bulldozer/db/index.test.ts.
*/
export function createTestDb() {
export function createTestDb(options: CreateTestDbOptions = {}) {
const lastProcessedAtExpression = options.lastProcessedAt ?? `'2099-01-01T00:00:00Z'::timestamptz`;
const installProcessQueueFn = options.installProcessQueueFn ?? false;
const connectionString = getConnectionString();
const base = connectionString.replace(/\/[^/]*(\?.*)?$/, "");
const queryString = connectionString.split("?")[1] ?? "";
@ -53,6 +77,38 @@ export function createTestDb() {
return await getSql().unsafe(toQueryableSqlQuery(query));
},
/**
* Overwrites `BulldozerTimeFoldMetadata.lastProcessedAt` to the given
* SQL expression. Useful for tests that need to bump the clock forward
* (to make queued ticks due) or backward (to force future ticks into
* the queue).
*/
setLastProcessedAt: async (isoOrExpression: string) => {
await getSql().unsafe(`
UPDATE "BulldozerTimeFoldMetadata"
SET "lastProcessedAt" = (${isoOrExpression})::timestamptz,
"updatedAt" = now()
WHERE "key" = 'singleton'
`);
},
/** Invokes the real pg_cron drain entry point. Requires `installProcessQueueFn`. */
processQueue: async () => {
if (!installProcessQueueFn) {
throw new Error(
"processQueue() requires createTestDb({ installProcessQueueFn: true })",
);
}
await getSql().unsafe(`SELECT public.bulldozer_timefold_process_queue()`);
},
countQueueRows: async (): Promise<number> => {
const rows = await getSql()<Array<{ count: number }>>`
SELECT COUNT(*)::int AS "count" FROM "BulldozerTimeFoldQueue"
`;
return rows[0].count;
},
setup: async () => {
await adminSql.unsafe(`CREATE DATABASE ${dbName}`);
_sql = postgres(dbUrl, { onnotice: () => undefined, max: 1 });
@ -115,8 +171,25 @@ export function createTestDb() {
`);
await _sql.unsafe(`
INSERT INTO "BulldozerTimeFoldMetadata" ("key", "lastProcessedAt")
VALUES ('singleton', '2099-01-01T00:00:00Z'::timestamptz)
VALUES ('singleton', ${lastProcessedAtExpression})
`);
// declareTimeFoldTable.init() upserts a cascade template here (see
// 20260417000000_bulldozer_timefold_downstream_cascade). The
// table must exist even when tests don't exercise the queue path,
// because init() runs the upsert unconditionally.
await _sql.unsafe(`
CREATE TABLE "BulldozerTimeFoldDownstreamCascade" (
"tableStoragePath" JSONB[] NOT NULL,
"cascadeInputName" TEXT NOT NULL,
"cascadeTemplate" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "BulldozerTimeFoldDownstreamCascade_pkey" PRIMARY KEY ("tableStoragePath")
)
`);
if (installProcessQueueFn) {
await _sql.unsafe(loadProcessQueueFunctionSql());
}
},
teardown: async () => {

View File

@ -143,7 +143,14 @@ export function getOtpTimeFoldReducerSql(): string {
)`;
// ── item-grant-repeat event (same logic as subscription but with sourceType=one_time_purchase) ──
const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`;
// Keep currentMillis as bigint at the root (not NUMERIC) for the same
// reason as `subscription-timefold-algo.ts` — see the comment there for
// the full failure mode. Explicit ROUND before the cast is defensive:
// NUMERIC::bigint already rounds on PG 12+, but explicit rounding makes
// intent clear at the callsite and stays stable if `EXTRACT(EPOCH ...)`
// ever comes back typed as DOUBLE PRECISION (older PG, or a future
// regression) where implicit casts use half-to-even.
const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`;
const dueItems = `(
SELECT jsonb_agg(jsonb_build_object('itemId', "sched"."key", 'schedule', "sched"."value"))
@ -153,7 +160,8 @@ export function getOtpTimeFoldReducerSql(): string {
AND ("sched"."value"->>'nextRepeatMillis')::numeric <= ${currentMillis}
)`;
const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::bigint::text)`;
// currentMillis is already ::bigint (see above) so plain ::text is enough.
const igrTxnId = `('igr:' || (${S}->>'purchaseId') || ':' || ${currentMillis}::text)`;
const previousGrantsToExpire = `(
SELECT COALESCE(jsonb_agg(

View File

@ -261,8 +261,26 @@ export function getSubscriptionTimeFoldReducerSql(): string {
)`;
// ── item-grant-repeat event ──
// Emitted when timestamp matches an item's nextRepeatMillis
const currentMillis = `(EXTRACT(EPOCH FROM ${T}) * 1000)::numeric`;
// Emitted when timestamp matches an item's nextRepeatMillis.
//
// PG 12+ returns EXTRACT(EPOCH ...) as NUMERIC with scale 6 (microsecond
// precision), so if we left currentMillis as NUMERIC it would serialize
// into JSONB with trailing ".000000". That round-trips fine for our own
// comparisons but leaks into txn IDs built by downstream tables via
// `->>effectiveAtMillis`, producing e.g. `igr:<sub>:2592000000.000000`,
// while references built inline in this algo via `::text` would produce
// the decimal-free `igr:<sub>:2592000000`. The two wouldn't match →
// `item-quantity-expire` entries would fail to resolve the grant they're
// meant to expire, leaving `when-repeated` balances stuck after a
// subscription-end that follows an item-grant-repeat.
//
// Explicit ROUND before the bigint cast: NUMERIC::bigint rounds
// half-away-from-zero on PG 12+, which happens to match what we want,
// but if `T` ever comes from a path that returns DOUBLE PRECISION (older
// PG, or a future regression) the implicit cast rounds half-to-even and
// could disagree on midpoint values. Being explicit about the rounding
// intent is both self-documenting and stable across numeric types.
const currentMillis = `(ROUND(EXTRACT(EPOCH FROM ${T}) * 1000)::bigint)`;
// Items due at current timestamp
const dueItems = `(
@ -279,8 +297,10 @@ export function getSubscriptionTimeFoldReducerSql(): string {
AND (${S}->>'endedAtMillis')::numeric <= ${currentMillis}
)`;
// item-grant-repeat: txnId uses sourceId + effectiveAtMillis
const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::bigint::text)`;
// item-grant-repeat: txnId uses sourceId + effectiveAtMillis. currentMillis
// is already ::bigint (see above) so plain ::text is enough — no decimal
// tail, no redundant double-cast.
const igrTxnId = `('igr:' || (${S}->>'subscriptionId') || ':' || ${currentMillis}::text)`;
const repeatCount = `(${S}->>'repeatCount')::int`;
// Build previousGrantsToExpire: outstanding grants with expiresWhen="when-repeated" that match due items
@ -391,7 +411,22 @@ export function getSubscriptionTimeFoldReducerSql(): string {
)`;
// ── subscription-end event ──
// Expire all outstanding grants with expiresWhen="when-purchase-expires"
// Expire all outstanding grants that are tied to the subscription's
// lifetime — both 'when-purchase-expires' and 'when-repeated'. The latter
// must be expired here too: otherwise the last-granted monthly quota
// (emails_per_month, analytics_events, …) persists in the item-quantity
// ledger after the subscription is gone and stacks on top of any
// replacement subscription for the remainder of the period.
//
// Permanent grants (item has no `expires` configured, or an unrecognized
// value) were normalized to JSONB null at subscription-start time, so
// `"g"->>'expiresWhen'` returns SQL NULL for them and the IN predicate
// correctly excludes them.
//
// outstandingGrants always carries the *current* grant ref for each item:
// initially { txnId: 'sub-start:<sub>' }, and each item-grant-repeat tick
// replaces the matching when-repeated entries with fresh ones keyed by
// the igr txnId, so iterating here works identically pre- and post-repeat.
const endItemQuantityChangesToExpire = (stateSql: string) => `(
SELECT COALESCE(jsonb_agg(
jsonb_build_object(
@ -402,7 +437,7 @@ export function getSubscriptionTimeFoldReducerSql(): string {
)
), '[]'::jsonb)
FROM jsonb_array_elements(${stateSql}->'outstandingGrants') AS "g"
WHERE "g"->>'expiresWhen' = 'when-purchase-expires'
WHERE "g"->>'expiresWhen' IN ('when-purchase-expires', 'when-repeated')
)`;
const endEventRowFromState = (stateSql: string) => `jsonb_build_object(