partial googleai fixes; adds jsonl file backend for promptlogger stolen from fiz
This commit is contained in:
+14
-1
@@ -149,7 +149,9 @@ type Config = {
|
||||
/** Whether prompts and responses should be logged to persistent storage. */
|
||||
promptLogging?: boolean;
|
||||
/** Which prompt logging backend to use. */
|
||||
promptLoggingBackend?: "google_sheets";
|
||||
promptLoggingBackend?: "google_sheets" | "file";
|
||||
/** Prefix for prompt logging files when using the file backend. */
|
||||
promptLoggingFilePrefix?: string;
|
||||
/** Base64-encoded Google Sheets API key. */
|
||||
googleSheetsKey?: string;
|
||||
/** Google Sheets spreadsheet ID. */
|
||||
@@ -329,6 +331,10 @@ export const config: Config = {
|
||||
allowAwsLogging: getEnvWithDefault("ALLOW_AWS_LOGGING", false),
|
||||
promptLogging: getEnvWithDefault("PROMPT_LOGGING", false),
|
||||
promptLoggingBackend: getEnvWithDefault("PROMPT_LOGGING_BACKEND", undefined),
|
||||
promptLoggingFilePrefix: getEnvWithDefault(
|
||||
"PROMPT_LOGGING_FILE_PREFIX",
|
||||
"prompt-logs"
|
||||
),
|
||||
googleSheetsKey: getEnvWithDefault("GOOGLE_SHEETS_KEY", undefined),
|
||||
googleSheetsSpreadsheetId: getEnvWithDefault(
|
||||
"GOOGLE_SHEETS_SPREADSHEET_ID",
|
||||
@@ -387,6 +393,12 @@ export async function assertConfigIsValid() {
|
||||
);
|
||||
}
|
||||
|
||||
if (config.promptLogging && !config.promptLoggingBackend) {
|
||||
throw new Error(
|
||||
"Prompt logging is enabled but no backend is configured. Set PROMPT_LOGGING_BACKEND to 'google_sheets' or 'file'."
|
||||
);
|
||||
}
|
||||
|
||||
if (!["none", "proxy_key", "user_token"].includes(config.gatekeeper)) {
|
||||
throw new Error(
|
||||
`Invalid gatekeeper mode: ${config.gatekeeper}. Must be one of: none, proxy_key, user_token.`
|
||||
@@ -463,6 +475,7 @@ export const OMITTED_KEYS = [
|
||||
"rejectPhrases",
|
||||
"rejectMessage",
|
||||
"showTokenCosts",
|
||||
"promptLoggingFilePrefix",
|
||||
"googleSheetsKey",
|
||||
"firebaseKey",
|
||||
"firebaseRtdbUrl",
|
||||
|
||||
@@ -11,7 +11,7 @@ import { ProxyResHandlerWithBody } from ".";
|
||||
import { assertNever } from "../../../shared/utils";
|
||||
import {
|
||||
AnthropicChatMessage,
|
||||
flattenAnthropicMessages,
|
||||
flattenAnthropicMessages, GoogleAIChatMessage,
|
||||
MistralAIChatMessage,
|
||||
OpenAIChatMessage,
|
||||
} from "../../../shared/api-schemas";
|
||||
@@ -62,6 +62,7 @@ const getPromptForRequest = (
|
||||
):
|
||||
| string
|
||||
| OpenAIChatMessage[]
|
||||
| { contents: GoogleAIChatMessage[] }
|
||||
| { system: string; messages: AnthropicChatMessage[] }
|
||||
| MistralAIChatMessage[]
|
||||
| OaiImageResult => {
|
||||
@@ -87,7 +88,7 @@ const getPromptForRequest = (
|
||||
case "anthropic-text":
|
||||
return req.body.prompt;
|
||||
case "google-ai":
|
||||
return req.body.prompt.text;
|
||||
return { contents: req.body.contents };
|
||||
default:
|
||||
assertNever(req.outboundApi);
|
||||
}
|
||||
@@ -98,6 +99,7 @@ const flattenMessages = (
|
||||
| string
|
||||
| OaiImageResult
|
||||
| OpenAIChatMessage[]
|
||||
| { contents: GoogleAIChatMessage[] }
|
||||
| { system: string; messages: AnthropicChatMessage[] }
|
||||
| MistralAIChatMessage[]
|
||||
): string => {
|
||||
@@ -108,6 +110,16 @@ const flattenMessages = (
|
||||
const { system, messages } = val;
|
||||
return `System: ${system}\n\n${flattenAnthropicMessages(messages)}`;
|
||||
}
|
||||
if (isGoogleAIChatPrompt(val)) {
|
||||
return val.contents
|
||||
.map(({ parts, role }) => {
|
||||
const text = parts
|
||||
.map((p) => p.text)
|
||||
.join("\n");
|
||||
return `${role}: ${text}`;
|
||||
})
|
||||
.join("\n");
|
||||
}
|
||||
if (Array.isArray(val)) {
|
||||
return val
|
||||
.map(({ content, role }) => {
|
||||
@@ -128,6 +140,16 @@ const flattenMessages = (
|
||||
return val.prompt.trim();
|
||||
};
|
||||
|
||||
function isGoogleAIChatPrompt(
|
||||
val: unknown
|
||||
): val is { contents: GoogleAIChatMessage[] } {
|
||||
return (
|
||||
typeof val === "object" &&
|
||||
val !== null &&
|
||||
"contents" in val
|
||||
);
|
||||
}
|
||||
|
||||
function isAnthropicChatPrompt(
|
||||
val: unknown
|
||||
): val is { system: string; messages: AnthropicChatMessage[] } {
|
||||
|
||||
@@ -116,7 +116,7 @@ export class SSEStreamAdapter extends Transform {
|
||||
try {
|
||||
const hasParts = candidates[0].content?.parts?.length > 0;
|
||||
if (hasParts) {
|
||||
return `data: ${JSON.stringify(data)}`;
|
||||
return `data: ${JSON.stringify(data.value ?? data)}\n`;
|
||||
} else {
|
||||
this.log.error({ event: data }, "Received bad Google AI event");
|
||||
return `data: ${buildSpoofedSSE({
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
// stolen from https://gitgud.io/fiz1/oai-reverse-proxy
|
||||
|
||||
import { promises as fs } from "fs";
|
||||
import * as path from "path";
|
||||
import { USER_ASSETS_DIR, config } from "../../../config";
|
||||
import { logger } from "../../../logger";
|
||||
import { LogBackend, PromptLogEntry } from "../index";
|
||||
import { glob } from "glob";
|
||||
|
||||
const MAX_FILE_SIZE = 25 * 1024 * 1024;
|
||||
|
||||
let currentFileNumber = 0;
|
||||
let currentFilePath = "";
|
||||
let currentFileSize = 0;
|
||||
|
||||
export { currentFileNumber };
|
||||
|
||||
export const fileBackend: LogBackend = {
|
||||
init: async (_onStop: () => void) => {
|
||||
try {
|
||||
await createNewLogFile();
|
||||
} catch (error) {
|
||||
logger.error("Error initializing file backend", error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
const files = glob.sync(
|
||||
path.join(USER_ASSETS_DIR, `${config.promptLoggingFilePrefix}*.jsonl`),
|
||||
{ windowsPathsNoEscape: true }
|
||||
);
|
||||
const sorted = files.sort((a, b) => {
|
||||
const aNum = parseInt(path.basename(a).replace(/[^0-9]/g, ""), 10);
|
||||
const bNum = parseInt(path.basename(b).replace(/[^0-9]/g, ""), 10);
|
||||
return aNum - bNum;
|
||||
});
|
||||
|
||||
if (sorted.length > 0) {
|
||||
const latestFile = sorted[sorted.length - 1];
|
||||
const stats = await fs.stat(latestFile);
|
||||
currentFileNumber = parseInt(
|
||||
path.basename(latestFile).replace(/[^0-9]/g, ""),
|
||||
10
|
||||
);
|
||||
currentFilePath = latestFile;
|
||||
currentFileSize = stats.size;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
{ currentFileNumber, currentFilePath, currentFileSize },
|
||||
"File backend initialized"
|
||||
);
|
||||
},
|
||||
appendBatch: async (batch: PromptLogEntry[]) => {
|
||||
try {
|
||||
if (currentFileSize > MAX_FILE_SIZE) {
|
||||
await createNewLogFile();
|
||||
}
|
||||
|
||||
const batchString =
|
||||
batch.map((entry) => JSON.stringify(entry)).join("\n") + "\n";
|
||||
const batchSizeBytes = Buffer.byteLength(batchString);
|
||||
const batchLines = batch.length;
|
||||
logger.debug(
|
||||
{ batchLines, batchSizeBytes, currentFileSize, file: currentFilePath },
|
||||
"Appending batch to file"
|
||||
);
|
||||
await fs.appendFile(currentFilePath, batchString);
|
||||
currentFileSize += Buffer.byteLength(batchString);
|
||||
} catch (error) {
|
||||
logger.error("Error appending batch to file", error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
async function createNewLogFile() {
|
||||
currentFileNumber++;
|
||||
currentFilePath = path.join(
|
||||
USER_ASSETS_DIR,
|
||||
`${config.promptLoggingFilePrefix}${currentFileNumber}.jsonl`
|
||||
);
|
||||
currentFileSize = 0;
|
||||
|
||||
await fs.writeFile(currentFilePath, "");
|
||||
logger.info(`Created new log file: ${currentFilePath}`);
|
||||
}
|
||||
@@ -1 +1,2 @@
|
||||
export * as sheets from "./sheets";
|
||||
export { fileBackend as file } from "./file";
|
||||
|
||||
@@ -17,4 +17,9 @@ export interface PromptLogEntry {
|
||||
// TODO: temperature, top_p, top_k, etc.
|
||||
}
|
||||
|
||||
export interface LogBackend {
|
||||
init: (onStop: () => void) => Promise<void>;
|
||||
appendBatch: (batch: PromptLogEntry[]) => Promise<void>;
|
||||
}
|
||||
|
||||
export * as logQueue from "./log-queue";
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
* logging backend. */
|
||||
|
||||
import { logger } from "../../logger";
|
||||
import { PromptLogEntry } from ".";
|
||||
import { sheets } from "./backends";
|
||||
import { LogBackend, PromptLogEntry } from ".";
|
||||
import { sheets, file } from "./backends";
|
||||
import { config } from "../../config";
|
||||
import { assertNever } from "../utils";
|
||||
|
||||
const FLUSH_INTERVAL = 1000 * 10;
|
||||
const MAX_BATCH_SIZE = 25;
|
||||
@@ -15,6 +17,7 @@ let started = false;
|
||||
let timeoutId: NodeJS.Timeout | null = null;
|
||||
let retrying = false;
|
||||
let consecutiveFailedBatches = 0;
|
||||
let backend: LogBackend;
|
||||
|
||||
export const enqueue = (payload: PromptLogEntry) => {
|
||||
if (!started) {
|
||||
@@ -34,7 +37,7 @@ export const flush = async () => {
|
||||
const nextBatch = queue.splice(0, batchSize);
|
||||
log.info({ size: nextBatch.length }, "Submitting new batch.");
|
||||
try {
|
||||
await sheets.appendBatch(nextBatch);
|
||||
await backend.appendBatch(nextBatch);
|
||||
retrying = false;
|
||||
consecutiveFailedBatches = 0;
|
||||
} catch (e: any) {
|
||||
@@ -64,8 +67,20 @@ export const flush = async () => {
|
||||
};
|
||||
|
||||
export const start = async () => {
|
||||
const type = config.promptLoggingBackend!;
|
||||
try {
|
||||
await sheets.init(() => stop());
|
||||
switch (type) {
|
||||
case "google_sheets":
|
||||
backend = sheets;
|
||||
await sheets.init(() => stop());
|
||||
break;
|
||||
case "file":
|
||||
backend = file;
|
||||
await file.init(() => stop());
|
||||
break;
|
||||
default:
|
||||
assertNever(type)
|
||||
}
|
||||
log.info("Logging backend initialized.");
|
||||
started = true;
|
||||
} catch (e) {
|
||||
|
||||
Reference in New Issue
Block a user