Storage: new delayed serialization backend.

This commit is contained in:
Enrico Ros
2024-10-17 22:09:12 -07:00
parent f3bd5e4d58
commit e9add70f8a
2 changed files with 240 additions and 145 deletions
+3 -3
View File
@@ -1,5 +1,5 @@
import { create } from 'zustand';
import { createJSONStorage, persist } from 'zustand/middleware';
import { persist } from 'zustand/middleware';
import { useShallow } from 'zustand/react/shallow';
import type { SystemPurposeId } from '../../../data';
@@ -8,7 +8,7 @@ import type { DLLMId } from '~/common/stores/llms/llms.types';
import { findLLMOrThrow, getChatLLMId } from '~/common/stores/llms/store-llms';
import { agiUuid } from '~/common/util/idUtils';
import { backupIdbV3, idbStateStorage } from '~/common/util/idbUtils';
import { backupIdbV3, createIDBPersistStorage } from '~/common/util/idbUtils';
import { workspaceActions } from '~/common/stores/workspace/store-client-workspace';
import { workspaceForConversationIdentity } from '~/common/stores/workspace/workspace.types';
@@ -400,7 +400,7 @@ export const useChatStore = create<ConversationsStore>()(/*devtools(*/
* - 4: [2024-05-14] Convert messages to multi-part, removed the IDB migration
*/
version: 4,
storage: createJSONStorage(() => idbStateStorage),
storage: createIDBPersistStorage<ConversationsStore>(),
// Migrations
migrate: async (state: any, fromVersion: number) => {
+237 -142
View File
@@ -1,158 +1,253 @@
import type { StateStorage } from 'zustand/middleware';
import { del as idbDel, get as idbGet, set as idbSet } from 'idb-keyval';
/**
* Copyright (c) 2024 Enrico Ros
*
* Faster Zustand storage backend serializing objects just before write.
* Moreover uses a deadline-based scheduler to batch writes, with an aggregation window.
*/
import type { PersistStorage, StorageValue } from 'zustand/middleware';
import { get as idbGet, set as idbSet } from 'idb-keyval';
// set to true to enable debugging
// [DEV] configuration
const DEBUG_SCHEDULER = false;
const USER_LOG_ISSUES = true;
const IDB_MERGE_WINDOW = 321; // not a magic number, just a random value
const IDB_DEADLINE = 1234; // breaks the pace
interface PendingWrite {
timeoutId: ReturnType<typeof setTimeout> | null;
firstAttemptTime: number;
pendingValue: string | null;
}
type SetKey = string;
/**
* The write scheduler is a simple utility that batches write operations to IndexedDB.
* Should be reasonably efficient and won't block the main thread. Schreuled writes shall happen within
* the deadline, which will help the UI survive intense load.
*/
class WriteScheduler {
private writeOperations: Record<string, PendingWrite> = {};
constructor(readonly maxDeadline: number = 600, readonly minInterval: number = 250) {
}
scheduleWrite(key: string, value: string): void {
const now = Date.now();
const operation = this.writeOperations[key] || { timeoutId: null, firstAttemptTime: now, pendingValue: null };
if (operation.timeoutId !== null) {
// if (DEBUG_SCHEDULER)
// console.warn(' - idb_WS: clr_write', key);
clearTimeout(operation.timeoutId);
operation.timeoutId = null;
}
if (!operation.firstAttemptTime)
operation.firstAttemptTime = now;
operation.pendingValue = value;
this.writeOperations[key] = operation;
const timeSinceFirstAttempt = now - operation.firstAttemptTime;
let writeDelay = this.minInterval;
if (timeSinceFirstAttempt + this.minInterval > this.maxDeadline)
writeDelay = this.maxDeadline - timeSinceFirstAttempt;
if (writeDelay < 10) {
if (DEBUG_SCHEDULER)
console.warn(' - idb_WS: deadline write', key, '(delay:', writeDelay, ')');
this.performWrite(key).catch(error => {
if (USER_LOG_ISSUES)
console.warn('idbUtils: E1: writing', key, error);
});
} else {
if (DEBUG_SCHEDULER)
console.warn(' - idb_WS: schedule', key, 'at', writeDelay, 'ms');
operation.timeoutId = setTimeout(() => {
this.performWrite(key).catch(error => {
if (USER_LOG_ISSUES)
console.warn('idbUtils: E2: writing', key, error);
});
}, writeDelay);
}
}
async performWrite(key: string): Promise<void> {
const operation = this.writeOperations[key];
if (!operation) {
if (USER_LOG_ISSUES)
console.warn('idbUtils: write operation not found for', key);
return;
}
const walueToWrite = operation.pendingValue;
operation.timeoutId = null;
operation.firstAttemptTime = 0;
operation.pendingValue = null;
if (walueToWrite === null) {
if (USER_LOG_ISSUES)
console.warn('idbUtils: write operation has no pending value for', key);
} else {
const start = Date.now();
if (DEBUG_SCHEDULER)
console.log(' - idb: [SET]', key);
await idbSet(key, walueToWrite);
if (DEBUG_SCHEDULER)
console.warn(' (write time:', Date.now() - start, 'ms, bytes:', walueToWrite.length.toLocaleString(), ')');
}
}
async retrievePendingWrite(key: string): Promise<string | null> {
// If there's a pending value, return it immediately
const operation = this.writeOperations[key];
if (operation && operation.pendingValue !== null) {
if (DEBUG_SCHEDULER)
console.warn(' - idb_WS: read_pending', key, 'deadline:', operation.firstAttemptTime + this.maxDeadline - Date.now(), 'ms');
return operation.pendingValue;
}
// If there's no operation or pending value, return null indicating no data is available
return null;
}
}
const writeScheduler = new WriteScheduler(1200, 400);
/**
* A Zustand state storage implementation that uses IndexedDB as a simple key-value store
*/
export const idbStateStorage: StateStorage = {
getItem: async (name: string): Promise<string | null> => {
// if (DEBUG_SCHEDULER)
// console.warn(' - idb: get', name);
// If there's a pending value, return it directly
const pendingValue = await writeScheduler.retrievePendingWrite(name);
if (pendingValue !== null)
return pendingValue;
// If there's no pending value, proceed to fetch from IndexedDB
if (DEBUG_SCHEDULER)
console.warn(' - idb: [GET]', name);
const value: string | undefined = await idbGet(name);
if (DEBUG_SCHEDULER)
console.warn(' (read bytes:', value?.length?.toLocaleString(), ')');
return value || null;
},
setItem: (name: string, value: string): void => {
// if (DEBUG_SCHEDULER)
// console.warn(' - idb: set', name);
writeScheduler.scheduleWrite(name, value);
},
removeItem: async (name: string): Promise<void> => {
if (DEBUG_SCHEDULER)
console.warn(' - idb: del', name);
await idbDel(name);
},
type SetOperation = {
queueDeadline: number | null;
scheduledTimerId: ReturnType<typeof setTimeout> | null;
lastState: null | StorageValue<any>;
needsWrite: boolean;
isWriting: boolean; // [r: all, w: performWrite]
};
export async function backupIdbV3(keyFrom: string, keyTo: string): Promise<boolean> {
const existingItem = await idbStateStorage.getItem(keyFrom);
if (existingItem === null) {
console.warn('idbUtils: E3: backupIdbV3: item not found:', keyFrom);
return false;
const _warn = (...args: any[]) => console.warn('IndexedDB:', ...args);
const _devWarn = (...args: any[]) => console.warn('[DEV] IndexedDB:', ...args);
class IndexedDBWriteScheduler {
private writeOperations: Record<SetKey, SetOperation> = {};
constructor(readonly mergeWindow: number, readonly deadline: number) {
}
await idbStateStorage.setItem(keyTo, existingItem);
return true;
async getItem<S>(key: SetKey): Promise<StorageValue<S> | null> {
// in-mem recycle: unexpected, but implemented
const operation = this.writeOperations[key];
if (operation && operation.lastState !== null) {
_devWarn(`unexpected in-mem recycle of '${key}'`);
return operation.lastState;
}
// fetch from IDB
const jsonState = await this.#idbReadString(key);
if (jsonState === null) return null; // first time is null (not found in storage)
// deserialize
try {
return JSON.parse(jsonState) as StorageValue<S>;
} catch (error: any) {
_warn(`GET: reading error for '${key}':`, error);
return null;
}
}
setItem<S>(key: SetKey, newValue: StorageValue<S>): void {
// do not serialize now, just store the object in the work order
const operation = this.writeOperations[key];
if (!operation) {
if (DEBUG_SCHEDULER) _devWarn(`SET.${key} new operation`);
this.writeOperations[key] = {
queueDeadline: Date.now() + this.deadline,
scheduledTimerId: null,
lastState: newValue,
needsWrite: true,
isWriting: false,
};
} else {
if (DEBUG_SCHEDULER) _devWarn(`SET.${key} updating operation`);
if (!operation.queueDeadline)
operation.queueDeadline = Date.now() + this.deadline;
operation.lastState = newValue;
operation.needsWrite = true;
}
// schedule the write
this.#scheduleWrite(key);
}
async setItemDirect<S>(key: SetKey, newValue: StorageValue<S>): Promise<void> {
return this.#idbWriteString(key, JSON.stringify(newValue));
}
// scheduling
#scheduleWrite(key: SetKey): void {
const operation = this.writeOperations[key];
if (!operation) return;
if (!operation.needsWrite) return;
if (operation.isWriting) return;
const now = Date.now();
const timeUntilMerge = this.mergeWindow;
const timeUntilDeadline = operation.queueDeadline ? operation.queueDeadline - now : 0; // 0 should not be an option, as state is set correctly
const delay = Math.max(Math.min(timeUntilMerge, timeUntilDeadline), 0);
if (delay > 0) {
// schedule/reshedule the write
if (DEBUG_SCHEDULER) _devWarn(` - schedule ${key}: ${operation.scheduledTimerId ? 'reschedule' : 'schedule'} in ${delay} ms`);
if (operation.scheduledTimerId)
clearTimeout(operation.scheduledTimerId);
operation.scheduledTimerId = setTimeout(() => {
void this.#performWrite(key);
}, delay);
} else {
// we are past the deadline
if (DEBUG_SCHEDULER) _devWarn(` - schedule ${key}: ${operation.scheduledTimerId ? 'just wait (already pending)' : 'schedule 0-delay'}`);
if (operation.scheduledTimerId) {
// there's already a timer scheduled, so we don't need to do anything
return;
}
operation.scheduledTimerId = setTimeout(() => {
void this.#performWrite(key);
}, 0);
}
}
async #performWrite(key: SetKey): Promise<void> {
const operation = this.writeOperations[key];
if (!operation) return;
// set the state for the scheduling operations to come
const state = operation.lastState;
operation.queueDeadline = null;
if (operation.scheduledTimerId) {
clearTimeout(operation.scheduledTimerId);
operation.scheduledTimerId = null;
}
operation.lastState = null;
operation.needsWrite = false;
operation.isWriting = true;
try {
// serialize
const dateStart = Date.now();
const serialized = JSON.stringify(state);
if (DEBUG_SCHEDULER) _devWarn(`SET '${key}': serialized ${serialized?.length?.toLocaleString()} bytes in ${Date.now() - dateStart} ms`);
// Optimization - ? unsure, needs testing
// (globalThis as any)?.scheduler?.yield?.();
// write to IDB
await this.#idbWriteString(key, serialized);
} catch (error: any) {
_warn(`SET '${key}': serialization error:`, error);
}
// done
operation.isWriting = false;
// schedule the next write
this.#scheduleWrite(key);
}
// with strings
async #idbReadString(key: SetKey): Promise<string | null> {
const now = Date.now();
const counter = ++this.#readOpCounter;
try {
if (DEBUG_SCHEDULER) _devWarn(`GET ${key}(${counter})`);
const jsonState = await idbGet(key) ?? null;
if (DEBUG_SCHEDULER) _devWarn(jsonState === null ? `GET ${key}(${counter}) -> missing` : `GET ${key}(${counter}) -> read ${jsonState?.length?.toLocaleString()} bytes in ${Date.now() - now} ms`);
return jsonState;
} catch (error: any) {
_warn(`GET '${key}(${counter})': read error:`, error);
return null;
}
}
async #idbWriteString(key: SetKey, jsonState: string): Promise<void> {
const now = Date.now();
const counter = ++this.#writeOpCounter;
try {
if (DEBUG_SCHEDULER) _devWarn(`SET ${key}(${counter})`);
await idbSet(key, jsonState);
if (DEBUG_SCHEDULER) _devWarn(`SET ${key}(${counter}) -> wrote ${jsonState?.length?.toLocaleString()} bytes in ${Date.now() - now} ms`);
} catch (error: any) {
_warn(`SET '${key}(${counter})': write error:`, error);
}
}
// private fields
#readOpCounter = 0;
#writeOpCounter = 0;
}
const _idbScheduler = new IndexedDBWriteScheduler(IDB_MERGE_WINDOW, IDB_DEADLINE);
/**
* Thin adapter to use the new scheduler with Zustand.
*/
export function createIDBPersistStorage<S>(): PersistStorage<S> | undefined {
// server-side or no IDB support
if (typeof window === 'undefined')
return undefined;
if (!('indexedDB' in window)) {
_warn('[FATAL] IndexedDB is not supported in this browser.');
return undefined;
}
return {
getItem: async (name: string): Promise<StorageValue<S> | null> => _idbScheduler.getItem(name),
setItem: (name: string, newValue: StorageValue<S>): void => _idbScheduler.setItem(name, newValue),
removeItem: async (_name: string): Promise<void> => {
// We do NOT remove! We don't intend to implement this, on purpose
},
};
}
export async function backupIdbV3(keyFrom: string, keyTo: string): Promise<boolean> {
try {
const existingItem = await _idbScheduler.getItem(keyFrom);
if (existingItem === null) {
_warn(`idbUtils: backupIdbV3: item not found: '${keyFrom}'`);
return false;
}
await _idbScheduler.setItemDirect(keyTo, existingItem);
return true;
} catch (error) {
_warn(`idbUtils: backupIdbV3: Error backing up from '${keyFrom}' to '${keyTo}':`, error);
return false;
}
}
/// Maintenance
/* Sets a single key-value in a given IndexedDB key-value store.