diff --git a/apps/backend/src/app/api/latest/ai/query/[mode]/route.ts b/apps/backend/src/app/api/latest/ai/query/[mode]/route.ts index af7a45a94..03ddee09e 100644 --- a/apps/backend/src/app/api/latest/ai/query/[mode]/route.ts +++ b/apps/backend/src/app/api/latest/ai/query/[mode]/route.ts @@ -1,47 +1,20 @@ -import { logAiQuery } from "@/lib/ai/ai-query-logger"; -import { logMcpCall } from "@/lib/ai/mcp-logger"; +import { + assertProjectAccess, + handleGenerateMode, + handleStreamMode, + type CommonLogFields, + type ModeContext, +} from "@/lib/ai/ai-query-handlers"; import { selectModel } from "@/lib/ai/models"; import { getFullSystemPrompt } from "@/lib/ai/prompts"; -import { reviewMcpCall } from "@/lib/ai/qa-reviewer"; import { requestBodySchema } from "@/lib/ai/schema"; import { getTools, validateToolNames } from "@/lib/ai/tools"; import { getVerifiedQaContext } from "@/lib/ai/verified-qa"; -import { listManagedProjectIds } from "@/lib/projects"; import { SmartResponse } from "@/route-handlers/smart-response"; import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler"; -import { runAsynchronouslyAndWaitUntil } from "@/utils/background-tasks"; import { yupMixed, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields"; -import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; -import { Json } from "@stackframe/stack-shared/dist/utils/json"; -import type { OpenRouterUsageAccounting } from "@openrouter/ai-sdk-provider"; -import { generateText, ModelMessage, stepCountIs, streamText, type StepResult, type ToolSet } from "ai"; - -type ProviderMetadata = { openrouter?: { usage?: OpenRouterUsageAccounting } }; - -function extractOpenRouterCost(meta: unknown): number | undefined { - return (meta as ProviderMetadata | null | undefined)?.openrouter?.usage?.cost; -} - -function extractCachedTokens(meta: unknown): number | undefined { - return (meta as ProviderMetadata | null | undefined)?.openrouter?.usage?.promptTokensDetails?.cachedTokens; -} - -function buildStepsJson(steps: ReadonlyArray>): string { - return JSON.stringify(steps.map((step, i) => ({ - step: i, - text: step.text || undefined, - toolCalls: step.toolCalls.map(tc => ({ - toolName: tc.toolName, - toolCallId: tc.toolCallId, - args: tc.input, - })), - toolResults: step.toolResults.map(tr => ({ - toolName: tr.toolName, - toolCallId: tr.toolCallId, - result: tr.output, - })), - }))); -} +import { StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { ModelMessage } from "ai"; export const POST = createSmartRouteHandler({ metadata: { @@ -65,17 +38,7 @@ export const POST = createSmartRouteHandler({ const { quality, speed, systemPrompt: systemPromptId, tools: toolNames, messages, projectId } = body; if (projectId != null) { - if (fullReq.auth?.project.id !== "internal") { - throw new StatusError(StatusError.Forbidden, "You do not have access to this project"); - } - const user = fullReq.auth.user; - if (user == null) { - throw new StatusError(StatusError.Forbidden, "You do not have access to this project"); - } - const managedProjectIds = await listManagedProjectIds(user); - if (!managedProjectIds.includes(projectId)) { - throw new StatusError(StatusError.Forbidden, "You do not have access to this project"); - } + await assertProjectAccess(projectId, fullReq.auth); } const model = selectModel(quality, speed, isAuthenticated); @@ -92,7 +55,7 @@ export const POST = createSmartRouteHandler({ const conversationIdForLog = body.mcpCallMetadata ? body.mcpCallMetadata.conversationId ?? crypto.randomUUID() : undefined; - const commonLogFields = { + const common: CommonLogFields = { correlationId, mode, systemPromptId, @@ -107,27 +70,8 @@ export const POST = createSmartRouteHandler({ mcpCorrelationId: body.mcpCallMetadata ? correlationId : undefined, conversationId: conversationIdForLog, }; - const startedAt = Date.now(); - const USER_FACING_ERROR_MESSAGE = "The AI service is temporarily unavailable. Please try again later."; - - function logError(err: unknown) { - captureError("ai-query-upstream", err); - runAsynchronouslyAndWaitUntil(logAiQuery({ - ...commonLogFields, - stepsJson: "[]", - finalText: "", - inputTokens: undefined, - outputTokens: undefined, - cachedInputTokens: undefined, - costUsd: undefined, - stepCount: 0, - durationMs: BigInt(Date.now() - startedAt), - errorMessage: err instanceof Error ? err.message : String(err), - })); - } - const isAnthropic = model.modelId.startsWith("anthropic/"); const systemMessage: ModelMessage = { role: "system", @@ -137,160 +81,18 @@ export const POST = createSmartRouteHandler({ }), }; const cachedMessages: ModelMessage[] = [systemMessage, ...(messages as ModelMessage[])]; - const openrouterProviderOptions = { - usage: { include: true }, - extraBody: { - stream_options: { include_usage: true }, - }, - } as const; + + const ctx: ModeContext = { model, cachedMessages, toolsArg, stepLimit, common, startedAt }; if (mode === "stream") { - const result = streamText({ - model, - messages: cachedMessages, - tools: toolsArg, - stopWhen: stepCountIs(stepLimit), - providerOptions: { - openrouter: openrouterProviderOptions, - }, - onFinish: ({ text, steps, usage, providerMetadata }) => { - runAsynchronouslyAndWaitUntil(logAiQuery({ - ...commonLogFields, - stepsJson: buildStepsJson(steps), - finalText: text, - inputTokens: usage.inputTokens ?? undefined, - outputTokens: usage.outputTokens ?? undefined, - cachedInputTokens: extractCachedTokens(providerMetadata), - costUsd: extractOpenRouterCost(providerMetadata), - stepCount: steps.length, - durationMs: BigInt(Date.now() - startedAt), - errorMessage: undefined, - })); - }, - onError: ({ error }) => logError(error), - }); - return { - statusCode: 200, - bodyType: "response" as const, - body: result.toUIMessageStreamResponse({ - onError: () => USER_FACING_ERROR_MESSAGE, - }), - }; - } else { - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 120_000); - let result: Awaited>; - try { - result = await generateText({ - model, - messages: cachedMessages, - tools: toolsArg, - abortSignal: controller.signal, - stopWhen: stepCountIs(stepLimit), - providerOptions: { - openrouter: openrouterProviderOptions, - }, - }).finally(() => clearTimeout(timeoutId)); - } catch (err) { - logError(err); - throw new StatusError(StatusError.BadGateway, USER_FACING_ERROR_MESSAGE); - } - - const contentBlocks: Array< - | { type: "text", text: string } - | { - type: "tool-call", - toolName: string, - toolCallId: string, - args: Json, - argsText: string, - result: Json, - } - > = []; - - result.steps.forEach((step) => { - if (step.text) { - contentBlocks.push({ - type: "text", - text: step.text, - }); - } - - const toolResultsByCallId = new Map( - step.toolResults.map((r) => [r.toolCallId, r]) - ); - - step.toolCalls.forEach((toolCall) => { - const toolResult = toolResultsByCallId.get(toolCall.toolCallId); - contentBlocks.push({ - type: "tool-call", - toolName: toolCall.toolName, - toolCallId: toolCall.toolCallId, - args: toolCall.input, - argsText: JSON.stringify(toolCall.input), - result: (toolResult?.output ?? null) as Json, - }); - }); - }); - - runAsynchronouslyAndWaitUntil(logAiQuery({ - ...commonLogFields, - stepsJson: buildStepsJson(result.steps), - finalText: result.text, - inputTokens: result.usage.inputTokens ?? undefined, - outputTokens: result.usage.outputTokens ?? undefined, - cachedInputTokens: extractCachedTokens(result.providerMetadata), - costUsd: extractOpenRouterCost(result.providerMetadata), - stepCount: result.steps.length, - durationMs: BigInt(Date.now() - startedAt), - errorMessage: undefined, - })); - - let responseConversationId: string | undefined; - if (body.mcpCallMetadata != null && conversationIdForLog != null) { - const conversationId = conversationIdForLog; - responseConversationId = conversationId; - const firstUserMessage = messages.find(m => m.role === "user"); - const question = typeof firstUserMessage?.content === "string" - ? firstUserMessage.content - : JSON.stringify(firstUserMessage?.content ?? ""); - - const innerToolCallsJson = JSON.stringify(contentBlocks.filter(b => b.type === "tool-call")); - - const logPromise = logMcpCall({ - correlationId, - toolName: body.mcpCallMetadata.toolName, - reason: body.mcpCallMetadata.reason, - userPrompt: body.mcpCallMetadata.userPrompt, - conversationId, - question, - response: result.text, - stepCount: result.steps.length, - innerToolCallsJson, - durationMs: BigInt(Date.now() - startedAt), - modelId: String(model.modelId), - errorMessage: undefined, - }); - runAsynchronouslyAndWaitUntil(logPromise); - - runAsynchronouslyAndWaitUntil(reviewMcpCall({ - logPromise, - correlationId, - question, - reason: body.mcpCallMetadata.reason, - response: result.text, - })); - } - - return { - statusCode: 200, - bodyType: "json" as const, - body: { - content: contentBlocks, - finalText: result.text, - conversationId: responseConversationId ?? null, - }, - }; + return handleStreamMode(ctx); } + return await handleGenerateMode({ + ...ctx, + messages, + mcpCallMetadata: body.mcpCallMetadata ?? undefined, + correlationId, + conversationIdForLog, + }); }, }); diff --git a/apps/backend/src/app/api/latest/integrations/ai-proxy/[[...path]]/route.ts b/apps/backend/src/app/api/latest/integrations/ai-proxy/[[...path]]/route.ts index 9a8205dd9..14884a083 100644 --- a/apps/backend/src/app/api/latest/integrations/ai-proxy/[[...path]]/route.ts +++ b/apps/backend/src/app/api/latest/integrations/ai-proxy/[[...path]]/route.ts @@ -1,37 +1,11 @@ -import { ALLOWED_MODEL_IDS } from "@/lib/ai/models"; +import { observeAndLog, sanitizeBody } from "@/lib/ai/ai-proxy-handlers"; import { handleApiRequest } from "@/route-handlers/smart-route-handler"; import { getEnvVariable } from "@stackframe/stack-shared/dist/utils/env"; -import { StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { captureError } from "@stackframe/stack-shared/dist/utils/errors"; import { NextRequest } from "next/server"; const OPENROUTER_BASE_URL = "https://openrouter.ai/api"; const PRODUCTION_PROXY_BASE_URL = "https://api.stack-auth.com/api/latest/integrations/ai-proxy"; -const OPENROUTER_DEFAULT_MODEL = "anthropic/claude-sonnet-4.6"; - -function sanitizeBody(raw: ArrayBuffer): Uint8Array { - const text = new TextDecoder().decode(raw); - let parsed; - try { - parsed = JSON.parse(text); - } catch { - throw new StatusError(400, "Request body must be valid JSON"); - } - - if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) { - throw new StatusError(400, "Request body must be a JSON object"); - } - - if (!parsed.model || !ALLOWED_MODEL_IDS.has(parsed.model)) { - parsed.model = OPENROUTER_DEFAULT_MODEL; - } - - // OpenRouter limits metadata.user_id to 128 characters - if (parsed.metadata?.user_id && parsed.metadata.user_id.length > 128) { - parsed.metadata.user_id = parsed.metadata.user_id.slice(0, 128); - } - - return new TextEncoder().encode(JSON.stringify(parsed)); -} async function proxyToOpenRouter(req: NextRequest, options: { params: Promise<{ path?: string[] }> }) { const apiKey = getEnvVariable("STACK_OPENROUTER_API_KEY"); @@ -39,54 +13,49 @@ async function proxyToOpenRouter(req: NextRequest, options: { params: Promise<{ const subpath = params.path?.join("/") ?? ""; const contentType = req.headers.get("Content-Type"); - const body = req.method !== "GET" && req.method !== "HEAD" - ? Buffer.from(sanitizeBody(await req.arrayBuffer())) + const sanitized = req.method !== "GET" && req.method !== "HEAD" + ? sanitizeBody(await req.arrayBuffer()) : undefined; + const body = sanitized ? Buffer.from(sanitized.bytes) : undefined; + const callerApiKey = req.headers.get("x-api-key"); + const shouldLog = sanitized != null && callerApiKey != null && callerApiKey.startsWith("stack-auth-"); + const correlationId = crypto.randomUUID(); + const startedAt = Date.now(); - if (apiKey === "FORWARD_TO_PRODUCTION") { - const targetUrl = `${PRODUCTION_PROXY_BASE_URL}/${subpath}${req.nextUrl.search}`; - const headers: Record = {}; - if (contentType) { - headers["Content-Type"] = contentType; - } + const targetUrl = apiKey === "FORWARD_TO_PRODUCTION" + ? `${PRODUCTION_PROXY_BASE_URL}/${subpath}${req.nextUrl.search}` + : `${OPENROUTER_BASE_URL}/${subpath}${req.nextUrl.search}`; + const forwardHeaders: Record = apiKey === "FORWARD_TO_PRODUCTION" + ? {} + : { + "Authorization": `Bearer ${apiKey}`, + "anthropic-version": "2023-06-01", + }; + if (contentType) forwardHeaders["Content-Type"] = contentType; - const response = await fetch(targetUrl, { - method: req.method, - headers, - body, - }); + const response = await fetch(targetUrl, { method: req.method, headers: forwardHeaders, body }); - return new Response(response.body, { - status: response.status, - headers: { - "Content-Type": response.headers.get("Content-Type") ?? "application/json", - "Cache-Control": "no-cache", - }, - }); - } - - const targetUrl = `${OPENROUTER_BASE_URL}/${subpath}${req.nextUrl.search}`; - const headers: Record = { - "Authorization": `Bearer ${apiKey}`, - "anthropic-version": "2023-06-01", + const responseHeaders = { + "Content-Type": response.headers.get("Content-Type") ?? "application/json", + "Cache-Control": "no-cache", }; - if (contentType) { - headers["Content-Type"] = contentType; + + const passthrough = () => new Response(response.body, { status: response.status, headers: responseHeaders }); + + if (!shouldLog) return passthrough(); + try { + return await observeAndLog({ + response, + sanitizedBody: sanitized!, + callerApiKey, + correlationId, + startedAt, + responseHeaders, + }); + } catch (e) { + captureError("ai-proxy-log-pipeline", e); + return passthrough(); } - - const response = await fetch(targetUrl, { - method: req.method, - headers, - body, - }); - - return new Response(response.body, { - status: response.status, - headers: { - "Content-Type": response.headers.get("Content-Type") ?? "application/json", - "Cache-Control": "no-cache", - }, - }); } export const GET = handleApiRequest(proxyToOpenRouter); diff --git a/apps/backend/src/lib/ai/ai-proxy-handlers.ts b/apps/backend/src/lib/ai/ai-proxy-handlers.ts new file mode 100644 index 000000000..22a658bc5 --- /dev/null +++ b/apps/backend/src/lib/ai/ai-proxy-handlers.ts @@ -0,0 +1,140 @@ +import { logAiQuery } from "@/lib/ai/ai-query-logger"; +import { ALLOWED_MODEL_IDS } from "@/lib/ai/models"; +import { extractOpenRouterUsage, scanSseForUsage, type UsageFields } from "@/lib/ai/openrouter-usage"; +import { runAsynchronouslyAndWaitUntil } from "@/utils/background-tasks"; +import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; + +export const OPENROUTER_DEFAULT_MODEL = "anthropic/claude-sonnet-4.6"; + +export type SanitizedBody = { + parsed: Record, + bytes: Uint8Array, +}; + +export function sanitizeBody(raw: ArrayBuffer): SanitizedBody { + const text = new TextDecoder().decode(raw); + let parsed; + try { + parsed = JSON.parse(text); + } catch { + throw new StatusError(400, "Request body must be valid JSON"); + } + + if (typeof parsed !== "object" || parsed === null || Array.isArray(parsed)) { + throw new StatusError(400, "Request body must be a JSON object"); + } + + if (!parsed.model || !ALLOWED_MODEL_IDS.has(parsed.model)) { + parsed.model = OPENROUTER_DEFAULT_MODEL; + } + + if (parsed.metadata?.user_id && parsed.metadata.user_id.length > 128) { + parsed.metadata.user_id = parsed.metadata.user_id.slice(0, 128); + } + + return { parsed: parsed as Record, bytes: new TextEncoder().encode(JSON.stringify(parsed)) }; +} + +function buildMessagesWithSystem(parsed: Record): unknown[] { + const messages = Array.isArray(parsed.messages) ? parsed.messages : []; + const system = parsed.system; + if (typeof system === "string" && system.length > 0) { + return [{ role: "system", content: system }, ...messages]; + } + return messages; +} + +type ProxyLogFields = { + correlationId: string, + parsed: Record, + apiKey: string, + durationMs: bigint, + responseStatus: number, + usage?: UsageFields, +}; + +function buildProxyLogRow(fields: ProxyLogFields) { + const { parsed, apiKey, durationMs, responseStatus, usage, correlationId } = fields; + const tools = Array.isArray(parsed.tools) ? parsed.tools : []; + const toolNames = tools + .map(t => (t && typeof t === "object" && "name" in t) ? (t as { name: unknown }).name : null) + .filter((n): n is string => typeof n === "string"); + return { + correlationId, + mode: parsed.stream === true ? "stream" : "generate", + systemPromptId: apiKey === "stack-auth-proxy" ? "stack-cli" : apiKey, + quality: "unknown", + speed: "unknown", + modelId: String(parsed.model ?? OPENROUTER_DEFAULT_MODEL), + isAuthenticated: false, + projectId: undefined, + userId: undefined, + requestedToolsJson: JSON.stringify(toolNames), + messagesJson: JSON.stringify(buildMessagesWithSystem(parsed)), + stepsJson: "[]", + finalText: "", + inputTokens: usage?.inputTokens, + outputTokens: usage?.outputTokens, + cachedInputTokens: usage?.cachedInputTokens, + costUsd: usage?.costUsd, + stepCount: 0, + durationMs, + errorMessage: responseStatus >= 400 ? `upstream ${responseStatus}` : undefined, + mcpCorrelationId: undefined, + conversationId: undefined, + }; +} + +function scheduleLog(row: ReturnType) { + try { + const safe = logAiQuery(row).catch(e => captureError("ai-proxy-log-async", e)); + runAsynchronouslyAndWaitUntil(safe); + } catch (e) { + captureError("ai-proxy-log-sync", e); + } +} + +export async function observeAndLog(args: { + response: Response, + sanitizedBody: SanitizedBody, + callerApiKey: string, + correlationId: string, + startedAt: number, + responseHeaders: Record, +}): Promise { + const { response, sanitizedBody, callerApiKey, correlationId, startedAt, responseHeaders } = args; + const isStreaming = sanitizedBody.parsed.stream === true; + + if (isStreaming && response.body) { + const [clientStream, observerStream] = response.body.tee(); + runAsynchronouslyAndWaitUntil((async () => { + const usage = await scanSseForUsage(observerStream).catch(() => undefined); + scheduleLog(buildProxyLogRow({ + correlationId, + parsed: sanitizedBody.parsed, + apiKey: callerApiKey, + durationMs: BigInt(Date.now() - startedAt), + responseStatus: response.status, + usage, + })); + })()); + return new Response(clientStream, { status: response.status, headers: responseHeaders }); + } + + const bodyBytes = await response.arrayBuffer(); + let parsedBody: unknown; + try { + parsedBody = JSON.parse(new TextDecoder().decode(bodyBytes)); + } catch { + parsedBody = undefined; + } + scheduleLog(buildProxyLogRow({ + correlationId, + parsed: sanitizedBody.parsed, + apiKey: callerApiKey, + durationMs: BigInt(Date.now() - startedAt), + responseStatus: response.status, + usage: extractOpenRouterUsage(parsedBody), + })); + return new Response(bodyBytes, { status: response.status, headers: responseHeaders }); +} diff --git a/apps/backend/src/lib/ai/ai-query-handlers.ts b/apps/backend/src/lib/ai/ai-query-handlers.ts new file mode 100644 index 000000000..b88e7ffef --- /dev/null +++ b/apps/backend/src/lib/ai/ai-query-handlers.ts @@ -0,0 +1,282 @@ +import { logAiQuery } from "@/lib/ai/ai-query-logger"; +import { logMcpCall } from "@/lib/ai/mcp-logger"; +import { selectModel } from "@/lib/ai/models"; +import { extractCachedTokens, extractOpenRouterCost } from "@/lib/ai/openrouter-usage"; +import { reviewMcpCall } from "@/lib/ai/qa-reviewer"; +import { listManagedProjectIds } from "@/lib/projects"; +import type { SmartRequestAuth } from "@/route-handlers/smart-request"; +import { runAsynchronouslyAndWaitUntil } from "@/utils/background-tasks"; +import { captureError, StatusError } from "@stackframe/stack-shared/dist/utils/errors"; +import { Json } from "@stackframe/stack-shared/dist/utils/json"; +import { generateText, ModelMessage, stepCountIs, streamText, type LanguageModelUsage, type StepResult, type ToolSet } from "ai"; + +export const USER_FACING_ERROR_MESSAGE = "The AI service is temporarily unavailable. Please try again later."; + +export const OPENROUTER_PROVIDER_OPTIONS = { + usage: { include: true }, + extraBody: { stream_options: { include_usage: true } }, +} as const; + +export type ContentBlock = + | { type: "text", text: string } + | { + type: "tool-call", + toolName: string, + toolCallId: string, + args: Json, + argsText: string, + result: Json, + }; + +export type McpCallMetadata = { + toolName: string, + reason: string, + userPrompt: string, + conversationId?: string | null, +}; + +type MessageLike = { role: string, content: unknown }; + +export type CommonLogFields = { + correlationId: string, + mode: "stream" | "generate", + systemPromptId: string, + quality: string, + speed: string, + modelId: string, + isAuthenticated: boolean, + projectId: string | undefined, + userId: string | undefined, + requestedToolsJson: string, + messagesJson: string, + mcpCorrelationId: string | undefined, + conversationId: string | undefined, +}; + +export type ModeContext = { + model: ReturnType, + cachedMessages: ModelMessage[], + toolsArg: ToolSet | undefined, + stepLimit: number, + common: CommonLogFields, + startedAt: number, +}; + +function buildStepsJson(steps: ReadonlyArray>): string { + return JSON.stringify(steps.map((step, i) => ({ + step: i, + text: step.text || undefined, + toolCalls: step.toolCalls.map(tc => ({ + toolName: tc.toolName, + toolCallId: tc.toolCallId, + args: tc.input, + })), + toolResults: step.toolResults.map(tr => ({ + toolName: tr.toolName, + toolCallId: tr.toolCallId, + result: tr.output, + })), + }))); +} + +function buildContentBlocks(steps: ReadonlyArray>): ContentBlock[] { + const blocks: ContentBlock[] = []; + for (const step of steps) { + if (step.text) { + blocks.push({ type: "text", text: step.text }); + } + const resultsByCallId = new Map(step.toolResults.map(r => [r.toolCallId, r])); + for (const toolCall of step.toolCalls) { + const toolResult = resultsByCallId.get(toolCall.toolCallId); + blocks.push({ + type: "tool-call", + toolName: toolCall.toolName, + toolCallId: toolCall.toolCallId, + args: toolCall.input, + argsText: JSON.stringify(toolCall.input), + result: (toolResult?.output ?? null) as Json, + }); + } + } + return blocks; +} + +function logSuccess(args: { + common: CommonLogFields, + startedAt: number, + steps: ReadonlyArray>, + text: string, + usage: LanguageModelUsage, + providerMetadata: unknown, +}): void { + const { common, startedAt, steps, text, usage, providerMetadata } = args; + runAsynchronouslyAndWaitUntil(logAiQuery({ + ...common, + stepsJson: buildStepsJson(steps), + finalText: text, + inputTokens: usage.inputTokens ?? undefined, + outputTokens: usage.outputTokens ?? undefined, + cachedInputTokens: extractCachedTokens(providerMetadata), + costUsd: extractOpenRouterCost(providerMetadata), + stepCount: steps.length, + durationMs: BigInt(Date.now() - startedAt), + errorMessage: undefined, + })); +} + +function logFailure(args: { + common: CommonLogFields, + startedAt: number, + err: unknown, +}): void { + const { common, startedAt, err } = args; + captureError("ai-query-upstream", err); + runAsynchronouslyAndWaitUntil(logAiQuery({ + ...common, + stepsJson: "[]", + finalText: "", + inputTokens: undefined, + outputTokens: undefined, + cachedInputTokens: undefined, + costUsd: undefined, + stepCount: 0, + durationMs: BigInt(Date.now() - startedAt), + errorMessage: err instanceof Error ? err.message : String(err), + })); +} + +export async function assertProjectAccess(projectId: string, auth: SmartRequestAuth | null): Promise { + if (auth?.project.id !== "internal" || auth.user == null) { + throw new StatusError(StatusError.Forbidden, "You do not have access to this project"); + } + const managedProjectIds = await listManagedProjectIds(auth.user); + if (!managedProjectIds.includes(projectId)) { + throw new StatusError(StatusError.Forbidden, "You do not have access to this project"); + } +} + +function logMcpCallAndReview(args: { + mcpCallMetadata: McpCallMetadata, + conversationId: string, + correlationId: string, + messages: ReadonlyArray, + contentBlocks: ContentBlock[], + finalText: string, + stepCount: number, + startedAt: number, + modelId: string, +}): void { + const { mcpCallMetadata, conversationId, correlationId, messages, contentBlocks, finalText, stepCount, startedAt, modelId } = args; + const firstUserMessage = messages.find(m => m.role === "user"); + const question = typeof firstUserMessage?.content === "string" + ? firstUserMessage.content + : JSON.stringify(firstUserMessage?.content ?? ""); + const innerToolCallsJson = JSON.stringify(contentBlocks.filter(b => b.type === "tool-call")); + + const logPromise = logMcpCall({ + correlationId, + toolName: mcpCallMetadata.toolName, + reason: mcpCallMetadata.reason, + userPrompt: mcpCallMetadata.userPrompt, + conversationId, + question, + response: finalText, + stepCount, + innerToolCallsJson, + durationMs: BigInt(Date.now() - startedAt), + modelId, + errorMessage: undefined, + }); + runAsynchronouslyAndWaitUntil(logPromise); + + runAsynchronouslyAndWaitUntil(reviewMcpCall({ + logPromise, + correlationId, + question, + reason: mcpCallMetadata.reason, + response: finalText, + })); +} + +export function handleStreamMode(ctx: ModeContext) { + const { model, cachedMessages, toolsArg, stepLimit, common, startedAt } = ctx; + const result = streamText({ + model, + messages: cachedMessages, + tools: toolsArg, + stopWhen: stepCountIs(stepLimit), + providerOptions: { openrouter: OPENROUTER_PROVIDER_OPTIONS }, + onFinish: ({ text, steps, usage, providerMetadata }) => { + logSuccess({ common, startedAt, steps, text, usage, providerMetadata }); + }, + onError: ({ error }) => logFailure({ common, startedAt, err: error }), + }); + return { + statusCode: 200, + bodyType: "response" as const, + body: result.toUIMessageStreamResponse({ + onError: () => USER_FACING_ERROR_MESSAGE, + }), + }; +} + +export async function handleGenerateMode(ctx: ModeContext & { + messages: ReadonlyArray, + mcpCallMetadata: McpCallMetadata | undefined, + correlationId: string, + conversationIdForLog: string | undefined, +}) { + const { model, cachedMessages, toolsArg, stepLimit, common, startedAt, messages, mcpCallMetadata, correlationId, conversationIdForLog } = ctx; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 120_000); + let result: Awaited>; + try { + result = await generateText({ + model, + messages: cachedMessages, + tools: toolsArg, + abortSignal: controller.signal, + stopWhen: stepCountIs(stepLimit), + providerOptions: { openrouter: OPENROUTER_PROVIDER_OPTIONS }, + }).finally(() => clearTimeout(timeoutId)); + } catch (err) { + logFailure({ common, startedAt, err }); + throw new StatusError(StatusError.BadGateway, USER_FACING_ERROR_MESSAGE); + } + + const contentBlocks = buildContentBlocks(result.steps); + logSuccess({ + common, + startedAt, + steps: result.steps, + text: result.text, + usage: result.usage, + providerMetadata: result.providerMetadata, + }); + + let responseConversationId: string | undefined; + if (mcpCallMetadata != null && conversationIdForLog != null) { + responseConversationId = conversationIdForLog; + logMcpCallAndReview({ + mcpCallMetadata, + conversationId: conversationIdForLog, + correlationId, + messages, + contentBlocks, + finalText: result.text, + stepCount: result.steps.length, + startedAt, + modelId: String(model.modelId), + }); + } + + return { + statusCode: 200, + bodyType: "json" as const, + body: { + content: contentBlocks, + finalText: result.text, + conversationId: responseConversationId ?? null, + }, + }; +} diff --git a/apps/backend/src/lib/ai/mcp-logger.ts b/apps/backend/src/lib/ai/mcp-logger.ts index b6bcc9533..d79f4917e 100644 --- a/apps/backend/src/lib/ai/mcp-logger.ts +++ b/apps/backend/src/lib/ai/mcp-logger.ts @@ -23,7 +23,7 @@ export async function getConnection(): Promise { .onApplied(() => { resolve(connInstance); }) - .subscribe(["SELECT * FROM mcp_call_log", "SELECT * FROM ai_query_log"]); + .subscribe(["SELECT * FROM mcp_call_log"]); }) .onConnectError((_: unknown, err: Error) => { captureError("mcp-logger", err); diff --git a/apps/backend/src/lib/ai/openrouter-usage.ts b/apps/backend/src/lib/ai/openrouter-usage.ts new file mode 100644 index 000000000..1ca8fb150 --- /dev/null +++ b/apps/backend/src/lib/ai/openrouter-usage.ts @@ -0,0 +1,95 @@ +import type { OpenRouterUsageAccounting } from "@openrouter/ai-sdk-provider"; + +export type UsageFields = { + inputTokens?: number, + outputTokens?: number, + cachedInputTokens?: number, + costUsd?: number, +}; + +type ProviderMetadata = { openrouter?: { usage?: OpenRouterUsageAccounting } }; + +export function extractOpenRouterCost(meta: unknown): number | undefined { + return (meta as ProviderMetadata | null | undefined)?.openrouter?.usage?.cost; +} + +export function extractCachedTokens(meta: unknown): number | undefined { + return (meta as ProviderMetadata | null | undefined)?.openrouter?.usage?.promptTokensDetails?.cachedTokens; +} + +type RawUsage = { + input_tokens?: number, + output_tokens?: number, + cache_read_input_tokens?: number, + cache_creation_input_tokens?: number, + prompt_tokens?: number, + completion_tokens?: number, + prompt_tokens_details?: { cached_tokens?: number }, + cost?: number, +}; + +type SseEvent = { + usage?: RawUsage, + message?: { usage?: RawUsage }, + delta?: { usage?: RawUsage }, +}; + +const emptyUsage = (): UsageFields => ({}); +const isUsageEmpty = (u: UsageFields): boolean => + u.inputTokens == null && u.outputTokens == null && u.cachedInputTokens == null && u.costUsd == null; + +function readUsageBlock(usage: RawUsage, into: UsageFields): void { + // Anthropic splits prompt tokens across three buckets; sum for parity with OpenAI's `prompt_tokens`. + if (usage.input_tokens != null) { + into.inputTokens = usage.input_tokens + (usage.cache_read_input_tokens ?? 0) + (usage.cache_creation_input_tokens ?? 0); + } else { + into.inputTokens = usage.prompt_tokens ?? into.inputTokens; + } + into.outputTokens = usage.output_tokens ?? usage.completion_tokens ?? into.outputTokens; + into.cachedInputTokens = usage.cache_read_input_tokens ?? usage.prompt_tokens_details?.cached_tokens ?? into.cachedInputTokens; + into.costUsd = usage.cost ?? into.costUsd; +} + +function mergeUsageFromEvent(event: unknown, into: UsageFields): void { + if (event == null || typeof event !== "object") return; + const e = event as SseEvent; + if (e.usage) readUsageBlock(e.usage, into); + if (e.message?.usage) readUsageBlock(e.message.usage, into); + if (e.delta?.usage) readUsageBlock(e.delta.usage, into); +} + +export function extractOpenRouterUsage(obj: unknown): UsageFields | undefined { + const acc = emptyUsage(); + mergeUsageFromEvent(obj, acc); + return isUsageEmpty(acc) ? undefined : acc; +} + +export async function scanSseForUsage(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + const acc = emptyUsage(); + let buffer = ""; + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let boundary: number; + while ((boundary = buffer.indexOf("\n\n")) !== -1) { + const block = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + for (const line of block.split("\n")) { + if (!line.startsWith("data:")) continue; + const dataStr = line.slice(5).trim(); + if (!dataStr || dataStr === "[DONE]") continue; + try { + mergeUsageFromEvent(JSON.parse(dataStr), acc); + } catch { /* malformed event — skip */ } + } + } + } + } finally { + reader.releaseLock(); + } + return isUsageEmpty(acc) ? undefined : acc; +} diff --git a/apps/internal-tool/src/app/app-client.tsx b/apps/internal-tool/src/app/app-client.tsx index 1e841b509..26ec52c17 100644 --- a/apps/internal-tool/src/app/app-client.tsx +++ b/apps/internal-tool/src/app/app-client.tsx @@ -1,6 +1,6 @@ import { useUser } from "@stackframe/stack"; import { clsx } from "clsx"; -import { useState } from "react"; +import { useEffect, useState } from "react"; import { AddManualQa } from "../components/AddManualQa"; import { Analytics } from "../components/Analytics"; import { CallLogDetail } from "../components/CallLogDetail"; @@ -13,13 +13,31 @@ import { makeMcpReviewApi } from "../lib/mcp-review-api"; import type { AiQueryLogRow, McpCallLogRow } from "../types"; type Tab = "calls" | "knowledge" | "analytics" | "usage"; +const TAB_STORAGE_KEY = "internal-tool-active-tab"; +const VALID_TABS: readonly Tab[] = ["calls", "knowledge", "analytics", "usage"]; + +function readInitialTab(): Tab { + // sessionStorage is per-tab: reload preserves the active tab, but a brand-new + // browser tab gets the default ("calls"). + if (typeof window === "undefined") return "calls"; + const saved = window.sessionStorage.getItem(TAB_STORAGE_KEY); + if (saved != null && (VALID_TABS as readonly string[]).includes(saved)) { + return saved as Tab; + } + return "calls"; +} export default function App() { const user = useUser({ or: process.env.NODE_ENV === "development" ? "redirect" : "return-null" }); const [selectedRow, setSelectedRow] = useState(null); const [selectedUsageRow, setSelectedUsageRow] = useState(null); const [showAddQa, setShowAddQa] = useState(false); - const [tab, setTab] = useState("calls"); + const [tab, setTab] = useState(readInitialTab); + + useEffect(() => { + if (typeof window === "undefined") return; + window.sessionStorage.setItem(TAB_STORAGE_KEY, tab); + }, [tab]); const { rows, connectionState } = useMcpCallLogs(); const { rows: usageRows, connectionState: usageConnectionState } = useAiQueryLogs(); @@ -77,8 +95,8 @@ export default function App() { } return ( -
-
+
+

MCP Review Tool

{/* Tabs */} @@ -154,82 +172,90 @@ export default function App() { /> )} - {tab === "calls" && ( -
-
- -
- {currentSelectedRow && ( - - )} -
- )} - - {tab === "knowledge" && ( -
- { - getApi() - .then(api => api.updateCorrection({ correlationId, correctedQuestion: question, correctedAnswer: answer, publish })) - .catch(() => { /* errors are surfaced by UI state */ }); - }} - onDelete={(correlationId) => { - getApi() - .then(api => api.delete({ correlationId })) - .catch(() => { /* errors are surfaced by UI state */ }); - }} - /> -
- )} - - {tab === "analytics" && ( -
- -
- )} - - {tab === "usage" && ( -
-
- +
- {selectedUsageRow && ( - - )} -
- )} + )} + + {tab === "analytics" && ( +
+
+ +
+
+ )} + + {tab === "usage" && ( + <> +
+
+ +
+
+ {selectedUsageRow && ( + + )} + + )} +
); } diff --git a/apps/internal-tool/src/components/Usage.tsx b/apps/internal-tool/src/components/Usage.tsx index 88f452f26..315fdc7f2 100644 --- a/apps/internal-tool/src/components/Usage.tsx +++ b/apps/internal-tool/src/components/Usage.tsx @@ -676,9 +676,7 @@ function SortHeader({ function formatUsd(value: number): string { if (value === 0) return "$0"; - if (value < 0.01) return `$${value.toFixed(4)}`; - if (value < 1) return `$${value.toFixed(3)}`; - return `$${value.toFixed(2)}`; + return `$${value.toFixed(4)}`; } function MetricCard({ label, value, valueClass }: { label: string, value: string, valueClass?: string }) { diff --git a/apps/internal-tool/src/components/UsageDetail.tsx b/apps/internal-tool/src/components/UsageDetail.tsx index 83c03c8b2..2c4d82fe9 100644 --- a/apps/internal-tool/src/components/UsageDetail.tsx +++ b/apps/internal-tool/src/components/UsageDetail.tsx @@ -103,7 +103,7 @@ export function UsageDetail({ row, onClose }: { row: AiQueryLogRow, onClose: () <> (cached {row.cachedInputTokens.toLocaleString()}) )} {" · "}out {row.outputTokens?.toLocaleString() ?? "?"} tok - {row.costUsd != null && <>{" · "}{row.costUsd < 0.01 ? `$${row.costUsd.toFixed(4)}` : `$${row.costUsd.toFixed(3)}`}} + {row.costUsd != null && <>{" · "}${row.costUsd.toFixed(4)}}