Implement user persistence via Firebase (khanon/oai-reverse-proxy!8)
This commit is contained in:
@@ -1,13 +1,16 @@
|
||||
/**
|
||||
* Basic user management. Handles creation and tracking of proxy users, personal
|
||||
* access tokens, and quota management. No persistence is provided, users must
|
||||
* be re-created on each proxy start via the /admin API.
|
||||
* access tokens, and quota management. Supports in-memory and Firebase Realtime
|
||||
* Database persistence stores.
|
||||
*
|
||||
* Users are identified solely by their personal access token. The token is
|
||||
* used to authenticate the user for all proxied requests.
|
||||
*/
|
||||
|
||||
import admin from "firebase-admin";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { config, getFirebaseApp } from "../../config";
|
||||
import { logger } from "../../logger";
|
||||
|
||||
export interface User {
|
||||
/** The user's personal access token. */
|
||||
@@ -41,6 +44,15 @@ export type UserType = "normal" | "special";
|
||||
type UserUpdate = Partial<User> & Pick<User, "token">;
|
||||
|
||||
const users: Map<string, User> = new Map();
|
||||
const usersToFlush = new Set<string>();
|
||||
|
||||
export async function init() {
|
||||
logger.info({ store: config.gatekeeperStore }, "Initializing user store...");
|
||||
if (config.gatekeeperStore === "firebase_rtdb") {
|
||||
await initFirebase();
|
||||
}
|
||||
logger.info("User store initialized.");
|
||||
}
|
||||
|
||||
/** Creates a new user and returns their token. */
|
||||
export function createUser() {
|
||||
@@ -84,6 +96,13 @@ export function upsertUser(user: UserUpdate) {
|
||||
...existing,
|
||||
...user,
|
||||
});
|
||||
usersToFlush.add(user.token);
|
||||
|
||||
// Immediately schedule a flush to the database if we're using Firebase.
|
||||
if (config.gatekeeperStore === "firebase_rtdb") {
|
||||
setImmediate(flushUsers);
|
||||
}
|
||||
|
||||
return users.get(user.token);
|
||||
}
|
||||
|
||||
@@ -92,6 +111,7 @@ export function incrementPromptCount(token: string) {
|
||||
const user = users.get(token);
|
||||
if (!user) return;
|
||||
user.promptCount++;
|
||||
usersToFlush.add(token);
|
||||
}
|
||||
|
||||
/** Increments the token count for the given user by the given amount. */
|
||||
@@ -99,6 +119,7 @@ export function incrementTokenCount(token: string, amount = 1) {
|
||||
const user = users.get(token);
|
||||
if (!user) return;
|
||||
user.tokenCount += amount;
|
||||
usersToFlush.add(token);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -111,6 +132,7 @@ export function authenticate(token: string, ip: string) {
|
||||
if (!user || user.disabledAt) return;
|
||||
if (!user.ip.includes(ip)) user.ip.push(ip);
|
||||
user.lastUsedAt = Date.now();
|
||||
usersToFlush.add(token);
|
||||
return user;
|
||||
}
|
||||
|
||||
@@ -120,4 +142,58 @@ export function disableUser(token: string, reason?: string) {
|
||||
if (!user) return;
|
||||
user.disabledAt = Date.now();
|
||||
user.disabledReason = reason;
|
||||
usersToFlush.add(token);
|
||||
}
|
||||
|
||||
// TODO: Firebase persistence is pretend right now and just polls the in-memory
|
||||
// store to sync it with Firebase when it changes. Will refactor to abstract
|
||||
// persistence layer later so we can support multiple stores.
|
||||
let firebaseTimeout: NodeJS.Timeout | undefined;
|
||||
|
||||
async function initFirebase() {
|
||||
logger.info("Connecting to Firebase...");
|
||||
const app = getFirebaseApp();
|
||||
const db = admin.database(app);
|
||||
const usersRef = db.ref("users");
|
||||
const snapshot = await usersRef.once("value");
|
||||
const users: Record<string, User> | null = snapshot.val();
|
||||
firebaseTimeout = setInterval(flushUsers, 20 * 1000);
|
||||
if (!users) {
|
||||
logger.info("No users found in Firebase.");
|
||||
return;
|
||||
}
|
||||
for (const token in users) {
|
||||
upsertUser(users[token]);
|
||||
}
|
||||
usersToFlush.clear();
|
||||
const numUsers = Object.keys(users).length;
|
||||
logger.info({ users: numUsers }, "Loaded users from Firebase");
|
||||
}
|
||||
|
||||
async function flushUsers() {
|
||||
const app = getFirebaseApp();
|
||||
const db = admin.database(app);
|
||||
const usersRef = db.ref("users");
|
||||
const updates: Record<string, User> = {};
|
||||
|
||||
for (const token of usersToFlush) {
|
||||
const user = users.get(token);
|
||||
if (!user) {
|
||||
continue;
|
||||
}
|
||||
updates[token] = user;
|
||||
}
|
||||
|
||||
usersToFlush.clear();
|
||||
|
||||
const numUpdates = Object.keys(updates).length;
|
||||
if (numUpdates === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await usersRef.update(updates);
|
||||
logger.info(
|
||||
{ users: Object.keys(updates).length },
|
||||
"Flushed users to Firebase"
|
||||
);
|
||||
}
|
||||
|
||||
+1
-1
@@ -191,7 +191,7 @@ function cleanQueue() {
|
||||
(waitTime) => now - waitTime.end > 90 * 1000
|
||||
);
|
||||
const removed = waitTimes.splice(0, index + 1);
|
||||
log.info(
|
||||
log.debug(
|
||||
{ stalledRequests: oldRequests.length, prunedWaitTimes: removed.length },
|
||||
`Cleaning up request queue.`
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user