refactors log-queue to support multiple backends
This commit is contained in:
@@ -1 +1,2 @@
|
||||
export * as sheets from "./sheets";
|
||||
export { sheets } from "./sheets";
|
||||
export { airtable } from "./airtable";
|
||||
|
||||
@@ -10,7 +10,7 @@ import type { CredentialBody } from "google-auth-library";
|
||||
import type { GaxiosResponse } from "googleapis-common";
|
||||
import { config } from "../../config";
|
||||
import { logger } from "../../logger";
|
||||
import { PromptLogEntry } from "..";
|
||||
import { PromptLogBackend, PromptLogEntry } from "..";
|
||||
|
||||
// There is always a sheet called __index__ which contains a list of all the
|
||||
// other sheets. We use this rather than iterating over all the sheets in case
|
||||
@@ -240,7 +240,7 @@ const createLogSheet = async () => {
|
||||
activeLogSheet = { sheetName, rows: [] };
|
||||
};
|
||||
|
||||
export const appendBatch = async (batch: PromptLogEntry[]) => {
|
||||
const appendBatch = async (batch: PromptLogEntry[]) => {
|
||||
if (!activeLogSheet) {
|
||||
// Create a new log sheet if we don't have one yet.
|
||||
await createLogSheet();
|
||||
@@ -310,40 +310,7 @@ const finalizeBatch = async () => {
|
||||
log.info({ sheetName, rowCount }, "Batch finalized.");
|
||||
};
|
||||
|
||||
type LoadLogSheetArgs = {
|
||||
sheetName: string;
|
||||
/** The starting row to load. If omitted, loads all rows (expensive). */
|
||||
fromRow?: number;
|
||||
};
|
||||
|
||||
/** Not currently used. */
|
||||
export const loadLogSheet = async ({
|
||||
sheetName,
|
||||
fromRow = 2, // omit header row
|
||||
}: LoadLogSheetArgs) => {
|
||||
const client = sheetsClient!;
|
||||
const spreadsheetId = config.googleSheetsSpreadsheetId!;
|
||||
|
||||
const range = `${sheetName}!A${fromRow}:E`;
|
||||
const res = await client.spreadsheets.values.get({
|
||||
spreadsheetId: spreadsheetId,
|
||||
range,
|
||||
});
|
||||
const data = assertData(res);
|
||||
const values = data.values || [];
|
||||
const rows = values.slice(1).map((row) => {
|
||||
return {
|
||||
model: row[0],
|
||||
endpoint: row[1],
|
||||
promptRaw: row[2],
|
||||
promptFlattened: row[3],
|
||||
response: row[4],
|
||||
};
|
||||
});
|
||||
activeLogSheet = { sheetName, rows };
|
||||
};
|
||||
|
||||
export const init = async (onStop: () => void) => {
|
||||
const init = async (onStop: () => void) => {
|
||||
if (sheetsClient) {
|
||||
return;
|
||||
}
|
||||
@@ -420,3 +387,5 @@ function assertData<T = sheets_v4.Schema$ValueRange>(res: GaxiosResponse<T>) {
|
||||
}
|
||||
return res.data!;
|
||||
}
|
||||
|
||||
export const sheets: PromptLogBackend = { init, appendBatch };
|
||||
|
||||
@@ -17,4 +17,9 @@ export interface PromptLogEntry {
|
||||
// TODO: temperature, top_p, top_k, etc.
|
||||
}
|
||||
|
||||
export interface PromptLogBackend {
|
||||
init(onStop: () => void): Promise<void>;
|
||||
appendBatch(entries: PromptLogEntry[]): Promise<void>;
|
||||
}
|
||||
|
||||
export * as logQueue from "./log-queue";
|
||||
|
||||
@@ -1,21 +1,36 @@
|
||||
/* Queues incoming prompts/responses and periodically flushes them to configured
|
||||
* logging backend. */
|
||||
|
||||
import { config } from "../config";
|
||||
import { logger } from "../logger";
|
||||
import { PromptLogEntry } from ".";
|
||||
import { sheets } from "./backends";
|
||||
import { PromptLogBackend, PromptLogEntry } from ".";
|
||||
import { sheets, airtable } from "./backends";
|
||||
|
||||
const FLUSH_INTERVAL = 1000 * 10;
|
||||
const MAX_BATCH_SIZE = 25;
|
||||
const BACKENDS: Record<
|
||||
NonNullable<typeof config.promptLoggingBackend>,
|
||||
PromptLogBackend
|
||||
> = {
|
||||
google_sheets: sheets,
|
||||
airtable: airtable,
|
||||
};
|
||||
|
||||
const queue: PromptLogEntry[] = [];
|
||||
const log = logger.child({ module: "log-queue" });
|
||||
|
||||
let activeBackend: PromptLogBackend | null = null;
|
||||
let started = false;
|
||||
let timeoutId: NodeJS.Timeout | null = null;
|
||||
let retrying = false;
|
||||
let consecutiveFailedBatches = 0;
|
||||
|
||||
const getBackend = () => {
|
||||
if (!activeBackend) {
|
||||
throw new Error("Log queue not initialized.");
|
||||
}
|
||||
return activeBackend;
|
||||
};
|
||||
|
||||
export const enqueue = (payload: PromptLogEntry) => {
|
||||
if (!started) {
|
||||
log.warn("Log queue not started, discarding incoming log entry.");
|
||||
@@ -34,7 +49,7 @@ export const flush = async () => {
|
||||
const nextBatch = queue.splice(0, batchSize);
|
||||
log.info({ size: nextBatch.length }, "Submitting new batch.");
|
||||
try {
|
||||
await sheets.appendBatch(nextBatch);
|
||||
await getBackend().appendBatch(nextBatch);
|
||||
retrying = false;
|
||||
consecutiveFailedBatches = 0;
|
||||
} catch (e: any) {
|
||||
@@ -65,7 +80,13 @@ export const flush = async () => {
|
||||
|
||||
export const start = async () => {
|
||||
try {
|
||||
await sheets.init(() => stop());
|
||||
const selectedBackend = config.promptLoggingBackend;
|
||||
if (!selectedBackend) {
|
||||
throw new Error("No logging backend configured.");
|
||||
}
|
||||
|
||||
activeBackend = BACKENDS[selectedBackend];
|
||||
await getBackend().init(() => stop());
|
||||
log.info("Logging backend initialized.");
|
||||
started = true;
|
||||
} catch (e) {
|
||||
|
||||
Reference in New Issue
Block a user