mirror of
https://github.com/stack-auth/stack.git
synced 2026-06-13 21:01:21 +08:00
update reviewer authentication and API calls
This commit is contained in:
parent
1389d373a0
commit
185d245c67
@ -1,4 +1,4 @@
|
||||
import { callReducerStrict } from "@/lib/ai/spacetimedb-client";
|
||||
import { callReducerStrict, opt } from "@/lib/ai/spacetimedb-client";
|
||||
import { assertIsAiChatReviewer } from "@/lib/ai/qa/reviewer-auth";
|
||||
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
|
||||
import { adaptSchema, yupBoolean, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
|
||||
@ -16,6 +16,7 @@ export const POST = createSmartRouteHandler({
|
||||
question: yupString().defined(),
|
||||
answer: yupString().defined(),
|
||||
publish: yupBoolean().defined(),
|
||||
requestId: yupString(),
|
||||
}).defined(),
|
||||
method: yupString().oneOf(["POST"]).defined(),
|
||||
}),
|
||||
@ -27,8 +28,8 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth);
|
||||
const user = auth.user;
|
||||
assertIsAiChatReviewer(user);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
await callReducerStrict("add_manual_qa", [
|
||||
@ -37,6 +38,7 @@ export const POST = createSmartRouteHandler({
|
||||
body.answer,
|
||||
body.publish,
|
||||
user.display_name ?? user.primary_email ?? user.id,
|
||||
body.requestId
|
||||
]);
|
||||
|
||||
return {
|
||||
|
||||
@ -25,7 +25,7 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth.user);
|
||||
assertIsAiChatReviewer(auth);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
await callReducerStrict("delete_qa_entry", [
|
||||
|
||||
@ -25,8 +25,8 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth);
|
||||
const user = auth.user;
|
||||
assertIsAiChatReviewer(user);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
await callReducerStrict("mark_human_reviewed", [
|
||||
|
||||
@ -25,7 +25,7 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth.user);
|
||||
assertIsAiChatReviewer(auth);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
await callReducerStrict("unmark_human_reviewed", [
|
||||
|
||||
@ -28,8 +28,8 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth);
|
||||
const user = auth.user;
|
||||
assertIsAiChatReviewer(user);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
const reviewer = user.display_name ?? user.primary_email ?? user.id;
|
||||
|
||||
@ -28,8 +28,8 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth);
|
||||
const user = auth.user;
|
||||
assertIsAiChatReviewer(user);
|
||||
|
||||
const token = getEnvVariable("STACK_MCP_LOG_TOKEN");
|
||||
const editor = user.display_name ?? user.primary_email ?? user.id;
|
||||
|
||||
@ -26,8 +26,8 @@ export const POST = createSmartRouteHandler({
|
||||
}).defined(),
|
||||
}),
|
||||
handler: async ({ auth, body }) => {
|
||||
assertIsAiChatReviewer(auth);
|
||||
const user = auth.user;
|
||||
assertIsAiChatReviewer(user);
|
||||
if (!/^[0-9a-fA-F]{64}$/.test(body.identity)) {
|
||||
throw new StatusError(StatusError.BadRequest, "Invalid identity.");
|
||||
}
|
||||
|
||||
@ -1,6 +1,19 @@
|
||||
import { KnownErrors } from "@stackframe/stack-shared";
|
||||
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
|
||||
|
||||
export function assertIsAiChatReviewer(user: { client_read_only_metadata?: unknown }): void {
|
||||
export function assertIsAiChatReviewer(auth: {
|
||||
project: { id: string },
|
||||
user?: { client_read_only_metadata?: unknown } | null,
|
||||
}): void {
|
||||
if (auth.project.id !== "internal") {
|
||||
throw new KnownErrors.ExpectedInternalProject();
|
||||
}
|
||||
|
||||
const user = auth.user;
|
||||
if (!user) {
|
||||
throw new StatusError(StatusError.Unauthorized, "You must be signed in to perform MCP review operations.");
|
||||
}
|
||||
|
||||
const metadata = user.client_read_only_metadata;
|
||||
if (!(metadata && typeof metadata === "object" && "isAiChatReviewer" in metadata && metadata.isAiChatReviewer === true)) {
|
||||
throw new StatusError(StatusError.Forbidden, "You are not approved to perform MCP review operations.");
|
||||
|
||||
@ -67,27 +67,34 @@ function spacetimeDbError(label: string, status: number, preview: string): Error
|
||||
return new StackAssertionError(detail);
|
||||
}
|
||||
|
||||
export async function callReducer(reducer: string, args: unknown[]): Promise<void> {
|
||||
async function callWithEnrollmentRetry(reducer: string, args: unknown[]): Promise<boolean> {
|
||||
const token = await getServiceToken();
|
||||
if (!token) return;
|
||||
await rawCallReducer(token, reducer, args);
|
||||
if (!token) return false;
|
||||
try {
|
||||
await rawCallReducer(token, reducer, args);
|
||||
return true;
|
||||
} catch (err) {
|
||||
if (!(err instanceof StatusError) || err.statusCode !== 401) throw err;
|
||||
enrollmentPromise = null;
|
||||
const fresh = await getServiceToken();
|
||||
if (!fresh) throw err;
|
||||
await rawCallReducer(fresh, reducer, args);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
export async function callReducer(reducer: string, args: unknown[]): Promise<void> {
|
||||
await callWithEnrollmentRetry(reducer, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link callReducer} but throws when SpacetimeDB isn't configured, rather
|
||||
* than no-opping. Use for endpoints where the client treats a 200 as proof the
|
||||
* mutation actually ran (reviewer enrollment, human QA edits, deletions).
|
||||
* Fire-and-forget logging paths should keep using the best-effort variant.
|
||||
*/
|
||||
export async function callReducerStrict(reducer: string, args: unknown[]): Promise<void> {
|
||||
const token = await getServiceToken();
|
||||
if (!token) {
|
||||
const ran = await callWithEnrollmentRetry(reducer, args);
|
||||
if (!ran) {
|
||||
throw new StackAssertionError(
|
||||
`SpacetimeDB is not configured. Reducer ${reducer} cannot run. ` +
|
||||
`Check STACK_SPACETIMEDB_URL and STACK_SPACETIMEDB_SERVICE_TOKEN.`
|
||||
);
|
||||
}
|
||||
await rawCallReducer(token, reducer, args);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -130,4 +137,3 @@ export async function callSql<T = Record<string, unknown>>(sql: string): Promise
|
||||
return obj as T;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -158,6 +158,27 @@ function coerceBigInt(raw: unknown): bigint | undefined {
|
||||
if (typeof raw === "bigint") return raw;
|
||||
return undefined;
|
||||
}
|
||||
async function collectStackUserIdsForIdentities(
|
||||
callerToken: string,
|
||||
identities: ReadonlySet<string>,
|
||||
): Promise<Set<string>> {
|
||||
const out = new Set<string>();
|
||||
if (identities.size === 0) return out;
|
||||
const { rows } = await sqlQuery(callerToken, "SELECT * FROM operators");
|
||||
for (const row of rows) {
|
||||
const stackUserIdRaw = row.stack_user_id ?? row.stackUserId;
|
||||
if (typeof stackUserIdRaw !== "string") continue;
|
||||
if (stackUserIdRaw === "__service__") continue;
|
||||
const rowJson = JSON.stringify(row).toLowerCase();
|
||||
for (const identity of identities) {
|
||||
if (rowJson.includes(identity.toLowerCase())) {
|
||||
out.add(stackUserIdRaw);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-test collector for anything these tests drop into SpacetimeDB so
|
||||
@ -198,11 +219,12 @@ export function createCleanupScope(): CleanupScope {
|
||||
const caller = await mintIdentity().catch(() => null);
|
||||
if (caller == null) return;
|
||||
|
||||
const callerStackUserId = `cleanup-${caller.identity}`;
|
||||
try {
|
||||
await callReducer(caller.token, "add_operator", [
|
||||
logToken,
|
||||
[`0x${caller.identity}`],
|
||||
`__cleanup__-${caller.identity}`,
|
||||
callerStackUserId,
|
||||
"Cleanup Scope",
|
||||
]).catch(() => undefined);
|
||||
|
||||
@ -225,11 +247,15 @@ export function createCleanupScope(): CleanupScope {
|
||||
await callReducer(caller.token, "delete_ai_query_log", [logToken, correlationId]).catch(() => undefined);
|
||||
}
|
||||
|
||||
for (const identity of identities) {
|
||||
await callReducer(caller.token, "remove_operator", [logToken, [`0x${identity}`]]).catch(() => undefined);
|
||||
if (identities.size > 0) {
|
||||
const stackUserIdsToRemove = await collectStackUserIdsForIdentities(caller.token, identities)
|
||||
.catch(() => new Set<string>());
|
||||
for (const stackUserId of stackUserIdsToRemove) {
|
||||
await callReducer(caller.token, "remove_operators_for_user", [logToken, stackUserId]).catch(() => undefined);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await callReducer(caller.token, "remove_operator", [logToken, [`0x${caller.identity}`]]).catch(() => undefined);
|
||||
await callReducer(caller.token, "remove_operators_for_user", [logToken, callerStackUserId]).catch(() => undefined);
|
||||
identities.clear();
|
||||
questions.clear();
|
||||
aiQueryCorrelationIds.clear();
|
||||
|
||||
@ -62,10 +62,7 @@ describe.skipIf(!canRun)("operators table RLS", () => {
|
||||
expect(rows.length).toBe(0);
|
||||
});
|
||||
|
||||
it("enrolling a second identity as the same reviewer sweeps the first", async ({ expect }) => {
|
||||
// The add_operator reducer's sweep logic deletes prior rows with the same
|
||||
// stackUserId before inserting a new identity — a reviewer switching browsers
|
||||
// should not accumulate stale operator rows.
|
||||
it("enrolling a second identity as the same reviewer keeps both active", async ({ expect }) => {
|
||||
const x = await mintIdentity();
|
||||
scope.trackIdentity(x.identity);
|
||||
await AiChatReviewer.createReviewer();
|
||||
@ -76,7 +73,6 @@ describe.skipIf(!canRun)("operators table RLS", () => {
|
||||
});
|
||||
expect(enrollX.status).toBe(200);
|
||||
|
||||
// Same reviewer (backendContext.userAuth unchanged) enrolls a second identity.
|
||||
const y = await mintIdentity();
|
||||
scope.trackIdentity(y.identity);
|
||||
const enrollY = await niceBackendFetch("/api/latest/internal/spacetimedb-enroll-reviewer", {
|
||||
@ -85,43 +81,64 @@ describe.skipIf(!canRun)("operators table RLS", () => {
|
||||
body: { identity: y.identity },
|
||||
});
|
||||
expect(enrollY.status).toBe(200);
|
||||
|
||||
// X should no longer be in operators — sweep removed its row.
|
||||
const asX = await sqlQuery(x.token, "SELECT * FROM operators");
|
||||
expect(asX.rows.length).toBe(0);
|
||||
// Y should still be the active operator.
|
||||
expect(asX.rows.length).toBe(1);
|
||||
const asY = await sqlQuery(y.token, "SELECT * FROM operators");
|
||||
expect(asY.rows.length).toBe(1);
|
||||
});
|
||||
|
||||
it.skipIf(!logToken)(
|
||||
"remove_operator reducer revokes an operator's view access",
|
||||
"remove_operators_for_user revokes every device a user has enrolled",
|
||||
async ({ expect }) => {
|
||||
const target = await mintIdentity();
|
||||
scope.trackIdentity(target.identity);
|
||||
const targetA = await mintIdentity();
|
||||
scope.trackIdentity(targetA.identity);
|
||||
await AiChatReviewer.createReviewer();
|
||||
const enroll = await niceBackendFetch("/api/latest/internal/spacetimedb-enroll-reviewer", {
|
||||
const enrollA = await niceBackendFetch("/api/latest/internal/spacetimedb-enroll-reviewer", {
|
||||
method: "POST",
|
||||
accessType: "client",
|
||||
body: { identity: target.identity },
|
||||
body: { identity: targetA.identity },
|
||||
});
|
||||
expect(enroll.status).toBe(200);
|
||||
expect(enrollA.status).toBe(200);
|
||||
|
||||
// Confirm enrolled.
|
||||
const before = await sqlQuery(target.token, "SELECT * FROM operators");
|
||||
expect(before.rows.length).toBe(1);
|
||||
const targetB = await mintIdentity();
|
||||
scope.trackIdentity(targetB.identity);
|
||||
const enrollB = await niceBackendFetch("/api/latest/internal/spacetimedb-enroll-reviewer", {
|
||||
method: "POST",
|
||||
accessType: "client",
|
||||
body: { identity: targetB.identity },
|
||||
});
|
||||
expect(enrollB.status).toBe(200);
|
||||
|
||||
// Directly call remove_operator with the log token.
|
||||
const caller = await mintIdentity();
|
||||
const removed = await callReducer(caller.token, "remove_operator", [
|
||||
expect((await sqlQuery(targetA.token, "SELECT * FROM operators")).rows.length).toBe(1);
|
||||
expect((await sqlQuery(targetB.token, "SELECT * FROM operators")).rows.length).toBe(1);
|
||||
|
||||
const adminEnroll = await mintIdentity();
|
||||
await callReducer(adminEnroll.token, "add_operator", [
|
||||
logToken!,
|
||||
[`0x${target.identity}`],
|
||||
[`0x${adminEnroll.identity}`],
|
||||
`e2e-admin-${adminEnroll.identity}`,
|
||||
"E2E Admin",
|
||||
]);
|
||||
scope.trackIdentity(adminEnroll.identity);
|
||||
const allOps = await sqlQuery(adminEnroll.token, "SELECT * FROM operators");
|
||||
const targetRowJson = JSON.stringify(allOps.rows.find(r =>
|
||||
JSON.stringify(r).toLowerCase().includes(targetA.identity.toLowerCase())
|
||||
) ?? {});
|
||||
expect(targetRowJson).toContain("0x" + targetA.identity.toLowerCase().slice(0, 8));
|
||||
const stackUserId = (allOps.rows.find(r =>
|
||||
JSON.stringify(r).toLowerCase().includes(targetA.identity.toLowerCase())
|
||||
) as { stack_user_id?: string, stackUserId?: string } | undefined);
|
||||
const sUid = stackUserId?.stack_user_id ?? stackUserId?.stackUserId;
|
||||
expect(typeof sUid).toBe("string");
|
||||
|
||||
const removed = await callReducer(adminEnroll.token, "remove_operators_for_user", [
|
||||
logToken!,
|
||||
sUid!,
|
||||
]);
|
||||
expect(removed.ok).toBe(true);
|
||||
|
||||
// Target is no longer an operator.
|
||||
const after = await sqlQuery(target.token, "SELECT * FROM operators");
|
||||
expect(after.rows.length).toBe(0);
|
||||
expect((await sqlQuery(targetA.token, "SELECT * FROM operators")).rows.length).toBe(0);
|
||||
expect((await sqlQuery(targetB.token, "SELECT * FROM operators")).rows.length).toBe(0);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@ -57,16 +57,17 @@ describe.skipIf(!canRun)("SpacetimeDB reducer auth", () => {
|
||||
|
||||
const cases = [
|
||||
{ name: "add_operator", args: [wrong, [hexId], "some-user", "Some Name"] },
|
||||
{ name: "remove_operator", args: [wrong, [hexId]] },
|
||||
{ name: "remove_operators_for_user", args: [wrong, "some-user"] },
|
||||
{ name: "enroll_service", args: [wrong, "Some Service"] },
|
||||
{ name: "mark_human_reviewed", args: [wrong, "corr", "reviewer"] },
|
||||
{ name: "unmark_human_reviewed", args: [wrong, "corr"] },
|
||||
{
|
||||
name: "update_human_correction",
|
||||
name: "upsert_qa_from_call",
|
||||
args: [wrong, "corr", "q", "a", false, "reviewer"],
|
||||
},
|
||||
{ name: "add_manual_qa", args: [wrong, "q", "a", false, "reviewer"] },
|
||||
{ name: "delete_qa_entry", args: [wrong, "corr"] },
|
||||
{ name: "add_manual_qa", args: [wrong, "q", "a", false, "reviewer", opt(null)] },
|
||||
{ name: "delete_qa_entry", args: [wrong, 0n] },
|
||||
{ name: "update_qa_entry_with_publish", args: [wrong, 0n, "q", "a", false, "reviewer"] },
|
||||
{
|
||||
name: "log_mcp_call",
|
||||
args: [wrong, "corr", opt(null), "tool", "reason", "prompt", "q", "r", 0, "[]", 0n, "model", opt(null)],
|
||||
|
||||
@ -2,7 +2,6 @@
|
||||
NEXT_PUBLIC_STACK_API_URL=REPLACE_ME
|
||||
NEXT_PUBLIC_STACK_PROJECT_ID=REPLACE_ME
|
||||
NEXT_PUBLIC_STACK_PUBLISHABLE_CLIENT_KEY=REPLACE_ME
|
||||
STACK_SECRET_SERVER_KEY=REPLACE_ME
|
||||
NEXT_PUBLIC_STACK_DASHBOARD_URL=REPLACE_ME
|
||||
# SpacetimeDB
|
||||
NEXT_PUBLIC_SPACETIMEDB_HOST=REPLACE_ME
|
||||
|
||||
@ -29,24 +29,6 @@ if (publish.status !== 0) {
|
||||
}
|
||||
|
||||
await provisionServiceToken();
|
||||
await runQaEntriesBackfill();
|
||||
|
||||
async function runQaEntriesBackfill() {
|
||||
const dbName = process.env.STACK_SPACETIMEDB_DB_NAME ?? "stack-auth-llm";
|
||||
const logToken = process.env.STACK_MCP_LOG_TOKEN ?? "";
|
||||
if (!logToken) {
|
||||
console.warn("[internal-tool] STACK_MCP_LOG_TOKEN not set; skipping qa_entries backfill.");
|
||||
return;
|
||||
}
|
||||
const result = spawnSync(
|
||||
"spacetime",
|
||||
["call", dbName, "backfill_qa_entries", JSON.stringify(logToken)],
|
||||
{ stdio: "inherit" },
|
||||
);
|
||||
if (result.status !== 0) {
|
||||
console.warn(`[internal-tool] backfill_qa_entries returned ${result.status}; ignoring (may already be migrated).`);
|
||||
}
|
||||
}
|
||||
|
||||
async function provisionServiceToken() {
|
||||
const portPrefix = process.env.NEXT_PUBLIC_STACK_PORT_PREFIX ?? "81";
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
import { schema, t, table, SenderError } from 'spacetimedb/server';
|
||||
import type { Timestamp } from 'spacetimedb';
|
||||
import { Timestamp } from 'spacetimedb';
|
||||
|
||||
const OPERATOR_TTL_MILLIS = 60n * 60n * 1000n;
|
||||
const OPERATOR_TTL_MICROS = OPERATOR_TTL_MILLIS * 1000n;
|
||||
|
||||
// Injected at publish time by the spacetime:inject-token pnpm script from STACK_MCP_LOG_TOKEN env var.
|
||||
// Must match STACK_MCP_LOG_TOKEN in the backend .env.
|
||||
@ -82,6 +85,7 @@ const operators = table(
|
||||
addedAt: t.timestamp(),
|
||||
stackUserId: t.string(),
|
||||
displayName: t.string(),
|
||||
expiresAt: t.timestamp().optional(),
|
||||
}
|
||||
);
|
||||
|
||||
@ -91,6 +95,7 @@ const qaEntries = table(
|
||||
id: t.u64().primaryKey().autoInc(),
|
||||
shard: t.u8().index('btree'),
|
||||
sourceMcpCorrelationId: t.string().optional(),
|
||||
requestId: t.string().optional(),
|
||||
question: t.string(),
|
||||
answer: t.string(),
|
||||
createdBy: t.string(),
|
||||
@ -181,6 +186,18 @@ export const add_operator = spacetimedb.reducer(
|
||||
if (/^__.*__$/.test(args.stackUserId)) {
|
||||
throw new SenderError('stackUserId pattern __*__ is reserved');
|
||||
}
|
||||
const nowMicros = ctx.timestamp.microsSinceUnixEpoch;
|
||||
const expired = [];
|
||||
for (const row of ctx.db.operators.iter()) {
|
||||
if (row.expiresAt != null && row.expiresAt.microsSinceUnixEpoch <= nowMicros) {
|
||||
expired.push(row);
|
||||
}
|
||||
}
|
||||
for (const row of expired) {
|
||||
ctx.db.operators.identity.delete(row.identity);
|
||||
}
|
||||
|
||||
const expiresAt = new Timestamp(nowMicros + OPERATOR_TTL_MICROS);
|
||||
const existing = ctx.db.operators.identity.find(args.identity);
|
||||
if (existing != null) {
|
||||
if (existing.stackUserId !== args.stackUserId) {
|
||||
@ -191,37 +208,39 @@ export const add_operator = spacetimedb.reducer(
|
||||
addedAt: existing.addedAt,
|
||||
stackUserId: args.stackUserId,
|
||||
displayName: args.displayName,
|
||||
expiresAt,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const stale = [];
|
||||
for (const row of ctx.db.operators.iter()) {
|
||||
if (row.stackUserId === args.stackUserId) {
|
||||
stale.push(row);
|
||||
}
|
||||
}
|
||||
for (const row of stale) {
|
||||
ctx.db.operators.identity.delete(row.identity);
|
||||
}
|
||||
ctx.db.operators.insert({
|
||||
identity: args.identity,
|
||||
addedAt: ctx.timestamp,
|
||||
stackUserId: args.stackUserId,
|
||||
displayName: args.displayName,
|
||||
expiresAt,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
export const remove_operator = spacetimedb.reducer(
|
||||
export const remove_operators_for_user = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
identity: t.identity(),
|
||||
stackUserId: t.string(),
|
||||
},
|
||||
(ctx, args) => {
|
||||
if (args.token !== EXPECTED_LOG_TOKEN) {
|
||||
throw new SenderError('Invalid log token');
|
||||
}
|
||||
ctx.db.operators.identity.delete(args.identity);
|
||||
if (/^__.*__$/.test(args.stackUserId)) {
|
||||
throw new SenderError('Refusing to remove reserved __*__ identities');
|
||||
}
|
||||
const matches = [];
|
||||
for (const row of ctx.db.operators.iter()) {
|
||||
if (row.stackUserId === args.stackUserId) matches.push(row);
|
||||
}
|
||||
for (const row of matches) {
|
||||
ctx.db.operators.identity.delete(row.identity);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
@ -241,6 +260,7 @@ export const enroll_service = spacetimedb.reducer(
|
||||
addedAt: ctx.timestamp,
|
||||
stackUserId: '__service__',
|
||||
displayName: args.displayName,
|
||||
expiresAt: undefined,
|
||||
});
|
||||
}
|
||||
);
|
||||
@ -407,6 +427,7 @@ export const upsert_qa_from_call = spacetimedb.reducer(
|
||||
id: 0n,
|
||||
shard: 0,
|
||||
sourceMcpCorrelationId: args.correlationId,
|
||||
requestId: undefined,
|
||||
question: args.question,
|
||||
answer: args.answer,
|
||||
createdBy: args.editedBy,
|
||||
@ -427,15 +448,22 @@ export const add_manual_qa = spacetimedb.reducer(
|
||||
answer: t.string(),
|
||||
publish: t.bool(),
|
||||
createdBy: t.string(),
|
||||
requestId: t.string().optional(),
|
||||
},
|
||||
(ctx, args) => {
|
||||
if (args.token !== EXPECTED_LOG_TOKEN) {
|
||||
throw new SenderError('Invalid log token');
|
||||
}
|
||||
if (args.requestId != null && args.requestId !== '') {
|
||||
for (const existing of ctx.db.qaEntries.iter()) {
|
||||
if (existing.requestId === args.requestId) return;
|
||||
}
|
||||
}
|
||||
ctx.db.qaEntries.insert({
|
||||
id: 0n,
|
||||
shard: 0,
|
||||
sourceMcpCorrelationId: undefined,
|
||||
requestId: args.requestId,
|
||||
question: args.question,
|
||||
answer: args.answer,
|
||||
createdBy: args.createdBy,
|
||||
@ -466,31 +494,6 @@ export const delete_qa_entry = spacetimedb.reducer(
|
||||
}
|
||||
);
|
||||
|
||||
// Pure publish state transition. Same firstPublishedAt / lastPublishedAt
|
||||
// semantics as upsert_qa_from_call.
|
||||
export const set_qa_published = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
qaId: t.u64(),
|
||||
publish: t.bool(),
|
||||
},
|
||||
(ctx, args) => {
|
||||
if (args.token !== EXPECTED_LOG_TOKEN) {
|
||||
throw new SenderError('Invalid log token');
|
||||
}
|
||||
const row = ctx.db.qaEntries.id.find(args.qaId);
|
||||
if (row == null) {
|
||||
throw new SenderError('QA entry not found for qaId: ' + args.qaId.toString());
|
||||
}
|
||||
ctx.db.qaEntries.id.update({
|
||||
...row,
|
||||
published: args.publish,
|
||||
firstPublishedAt: args.publish ? (row.firstPublishedAt ?? ctx.timestamp) : row.firstPublishedAt,
|
||||
lastPublishedAt: args.publish ? ctx.timestamp : row.lastPublishedAt,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
export const update_qa_entry_with_publish = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
@ -521,77 +524,6 @@ export const update_qa_entry_with_publish = spacetimedb.reducer(
|
||||
}
|
||||
);
|
||||
|
||||
// Text-only edit. Doesn't touch publish state.
|
||||
export const update_qa_entry = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
qaId: t.u64(),
|
||||
question: t.string(),
|
||||
answer: t.string(),
|
||||
editedBy: t.string(),
|
||||
},
|
||||
(ctx, args) => {
|
||||
if (args.token !== EXPECTED_LOG_TOKEN) {
|
||||
throw new SenderError('Invalid log token');
|
||||
}
|
||||
const row = ctx.db.qaEntries.id.find(args.qaId);
|
||||
if (row == null) {
|
||||
throw new SenderError('QA entry not found for qaId: ' + args.qaId.toString());
|
||||
}
|
||||
ctx.db.qaEntries.id.update({
|
||||
...row,
|
||||
question: args.question,
|
||||
answer: args.answer,
|
||||
lastEditedBy: args.editedBy,
|
||||
lastEditedAt: ctx.timestamp,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
export const backfill_qa_entries = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
},
|
||||
(ctx, args) => {
|
||||
if (args.token !== EXPECTED_LOG_TOKEN) {
|
||||
throw new SenderError('Invalid log token');
|
||||
}
|
||||
for (const call of ctx.db.mcpCallLog.shard.filter(0)) {
|
||||
const hasEditorialContent =
|
||||
call.humanCorrectedQuestion != null ||
|
||||
call.humanCorrectedAnswer != null ||
|
||||
call.publishedToQa === true ||
|
||||
call.toolName === 'manual';
|
||||
if (!hasEditorialContent) continue;
|
||||
let alreadyMigrated = false;
|
||||
for (const existing of ctx.db.qaEntries.shard.filter(0)) {
|
||||
if (existing.sourceMcpCorrelationId === call.correlationId) {
|
||||
alreadyMigrated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (alreadyMigrated) continue;
|
||||
const reviewerId = call.humanReviewedBy ?? '__backfill__';
|
||||
const reviewedAt = call.humanReviewedAt ?? call.createdAt;
|
||||
ctx.db.qaEntries.insert({
|
||||
id: 0n,
|
||||
shard: 0,
|
||||
sourceMcpCorrelationId: call.toolName === 'manual' ? undefined : call.correlationId,
|
||||
question: call.humanCorrectedQuestion ?? call.question,
|
||||
answer: call.humanCorrectedAnswer ?? call.response,
|
||||
createdBy: reviewerId,
|
||||
createdAt: reviewedAt,
|
||||
lastEditedBy: reviewerId,
|
||||
lastEditedAt: reviewedAt,
|
||||
published: call.publishedToQa,
|
||||
firstPublishedAt: call.publishedToQa ? (call.publishedAt ?? reviewedAt) : undefined,
|
||||
lastPublishedAt: call.publishedToQa ? (call.publishedAt ?? reviewedAt) : undefined,
|
||||
} as Parameters<typeof ctx.db.qaEntries.insert>[0]);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
export const log_ai_query = spacetimedb.reducer(
|
||||
{
|
||||
token: t.string(),
|
||||
|
||||
@ -39,12 +39,16 @@ export default function App() {
|
||||
if (typeof window === "undefined") return;
|
||||
window.sessionStorage.setItem(TAB_STORAGE_KEY, tab);
|
||||
}, [tab]);
|
||||
const enrolledRef = useRef<Map<string, Promise<void>>>(new Map());
|
||||
const ENROLL_REFRESH_MS = 15 * 60 * 1000;
|
||||
const enrolledRef = useRef<Map<string, { promise: Promise<void>, enrolledAt: number }>>(new Map());
|
||||
const ensureEnrolled = useCallback(async (identity: Identity) => {
|
||||
if (!user) throw new Error("Not authenticated");
|
||||
const key = identity.toHexString();
|
||||
const existing = enrolledRef.current.get(key);
|
||||
if (existing) return await existing;
|
||||
if (existing && Date.now() - existing.enrolledAt < ENROLL_REFRESH_MS) {
|
||||
return await existing.promise;
|
||||
}
|
||||
const startedAt = Date.now();
|
||||
const promise = (async () => {
|
||||
const { accessToken, refreshToken } = await user.getAuthJson();
|
||||
const authHeaders: Record<string, string> = {};
|
||||
@ -53,13 +57,16 @@ export default function App() {
|
||||
try {
|
||||
await enrollSpacetimeReviewer({ identity: key }, authHeaders);
|
||||
} catch (err) {
|
||||
enrolledRef.current.delete(key);
|
||||
const cached = enrolledRef.current.get(key);
|
||||
if (cached && cached.enrolledAt === startedAt) {
|
||||
enrolledRef.current.delete(key);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
})();
|
||||
enrolledRef.current.set(key, promise);
|
||||
enrolledRef.current.set(key, { promise, enrolledAt: startedAt });
|
||||
return await promise;
|
||||
}, [user]);
|
||||
}, [ENROLL_REFRESH_MS, user]);
|
||||
const isAiChatReviewer = Boolean(
|
||||
(user?.clientReadOnlyMetadata as Record<string, unknown> | null)?.isAiChatReviewer,
|
||||
);
|
||||
@ -185,9 +192,9 @@ export default function App() {
|
||||
{showAddQa && (
|
||||
<AddManualQa
|
||||
onClose={() => setShowAddQa(false)}
|
||||
onSave={async (question, answer, publish) => {
|
||||
onSave={async (question, answer, publish, requestId) => {
|
||||
const api = await getApi();
|
||||
await api.addManual({ question, answer, publish });
|
||||
await api.addManual({ question, answer, publish, requestId });
|
||||
}}
|
||||
/>
|
||||
)}
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import { useState } from "react";
|
||||
import { useRef, useState } from "react";
|
||||
import { clsx } from "clsx";
|
||||
|
||||
export function AddManualQa({ onClose, onSave }: {
|
||||
onClose: () => void;
|
||||
onSave: (question: string, answer: string, publish: boolean) => Promise<void>;
|
||||
onSave: (question: string, answer: string, publish: boolean, requestId: string) => Promise<void>;
|
||||
}) {
|
||||
const [question, setQuestion] = useState("");
|
||||
const [answer, setAnswer] = useState("");
|
||||
@ -11,14 +11,21 @@ export function AddManualQa({ onClose, onSave }: {
|
||||
const [isSaving, setIsSaving] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
|
||||
const pendingRequestIdRef = useRef<string | null>(null);
|
||||
|
||||
const canSave = question.trim().length > 0 && answer.trim().length > 0 && !isSaving;
|
||||
|
||||
const handleSave = async (publish: boolean) => {
|
||||
if (!canSave) return;
|
||||
setIsSaving(true);
|
||||
setError(null);
|
||||
if (pendingRequestIdRef.current == null) {
|
||||
pendingRequestIdRef.current = crypto.randomUUID();
|
||||
}
|
||||
const requestId = pendingRequestIdRef.current;
|
||||
try {
|
||||
await onSave(question.trim(), answer.trim(), publish);
|
||||
await onSave(question.trim(), answer.trim(), publish, requestId);
|
||||
pendingRequestIdRef.current = null;
|
||||
setSaved(true);
|
||||
setTimeout(() => {
|
||||
setSaved(false);
|
||||
|
||||
@ -21,6 +21,7 @@ function getConfig() {
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const RETRY_DELAY_MS = 2000;
|
||||
const ENROLL_REFRESH_INTERVAL_MS = 15 * 60 * 1000;
|
||||
|
||||
type ConnectionState = "connecting" | "connected" | "error";
|
||||
|
||||
@ -48,6 +49,8 @@ function useTableSubscription<Row extends { id: bigint }>(
|
||||
let cancelled = false;
|
||||
let retryCount = 0;
|
||||
let retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let refreshTimer: ReturnType<typeof setInterval> | null = null;
|
||||
let currentIdentity: Identity | null = null;
|
||||
const query = `SELECT * FROM ${binding.tableName}`;
|
||||
|
||||
function retry() {
|
||||
@ -101,6 +104,7 @@ function useTableSubscription<Row extends { id: bigint }>(
|
||||
|
||||
const enrollFn = ensureEnrolledRef.current;
|
||||
if (enrollFn) {
|
||||
currentIdentity = identity;
|
||||
enrollFn(identity).then(
|
||||
() => startSubscription(),
|
||||
(err) => {
|
||||
@ -108,6 +112,17 @@ function useTableSubscription<Row extends { id: bigint }>(
|
||||
setConnectionState("error");
|
||||
},
|
||||
);
|
||||
if (refreshTimer == null) {
|
||||
refreshTimer = setInterval(() => {
|
||||
if (cancelled) return;
|
||||
const fn = ensureEnrolledRef.current;
|
||||
const id = currentIdentity;
|
||||
if (!fn || !id) return;
|
||||
fn(id).catch((err) => {
|
||||
captureError("spacetimedb-enroll-refresh", err);
|
||||
});
|
||||
}, ENROLL_REFRESH_INTERVAL_MS);
|
||||
}
|
||||
} else {
|
||||
startSubscription();
|
||||
}
|
||||
@ -163,6 +178,11 @@ function useTableSubscription<Row extends { id: bigint }>(
|
||||
clearTimeout(retryTimer);
|
||||
retryTimer = null;
|
||||
}
|
||||
if (refreshTimer !== null) {
|
||||
clearInterval(refreshTimer);
|
||||
refreshTimer = null;
|
||||
}
|
||||
currentIdentity = null;
|
||||
if (connRef.current) {
|
||||
connRef.current.disconnect();
|
||||
connRef.current = null;
|
||||
|
||||
@ -46,6 +46,7 @@ export function makeMcpReviewApi(authHeaders: Record<string, string>) {
|
||||
question: string;
|
||||
answer: string;
|
||||
publish: boolean;
|
||||
requestId: string;
|
||||
}) => post("add-manual", body, authHeaders),
|
||||
|
||||
updateQaEntry: (body: {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user