From e9add70f8a22ddbf9fc5ec55b96cdde44519c695 Mon Sep 17 00:00:00 2001 From: Enrico Ros Date: Thu, 17 Oct 2024 22:09:12 -0700 Subject: [PATCH] Storage: new delayed serialization backend. --- src/common/stores/chat/store-chats.ts | 6 +- src/common/util/idbUtils.ts | 379 ++++++++++++++++---------- 2 files changed, 240 insertions(+), 145 deletions(-) diff --git a/src/common/stores/chat/store-chats.ts b/src/common/stores/chat/store-chats.ts index b49c070e8..de2afa266 100644 --- a/src/common/stores/chat/store-chats.ts +++ b/src/common/stores/chat/store-chats.ts @@ -1,5 +1,5 @@ import { create } from 'zustand'; -import { createJSONStorage, persist } from 'zustand/middleware'; +import { persist } from 'zustand/middleware'; import { useShallow } from 'zustand/react/shallow'; import type { SystemPurposeId } from '../../../data'; @@ -8,7 +8,7 @@ import type { DLLMId } from '~/common/stores/llms/llms.types'; import { findLLMOrThrow, getChatLLMId } from '~/common/stores/llms/store-llms'; import { agiUuid } from '~/common/util/idUtils'; -import { backupIdbV3, idbStateStorage } from '~/common/util/idbUtils'; +import { backupIdbV3, createIDBPersistStorage } from '~/common/util/idbUtils'; import { workspaceActions } from '~/common/stores/workspace/store-client-workspace'; import { workspaceForConversationIdentity } from '~/common/stores/workspace/workspace.types'; @@ -400,7 +400,7 @@ export const useChatStore = create()(/*devtools(*/ * - 4: [2024-05-14] Convert messages to multi-part, removed the IDB migration */ version: 4, - storage: createJSONStorage(() => idbStateStorage), + storage: createIDBPersistStorage(), // Migrations migrate: async (state: any, fromVersion: number) => { diff --git a/src/common/util/idbUtils.ts b/src/common/util/idbUtils.ts index 756f3e02d..9f2825e8e 100644 --- a/src/common/util/idbUtils.ts +++ b/src/common/util/idbUtils.ts @@ -1,158 +1,253 @@ -import type { StateStorage } from 'zustand/middleware'; -import { del as idbDel, get as idbGet, set as idbSet } from 'idb-keyval'; +/** + * Copyright (c) 2024 Enrico Ros + * + * Faster Zustand storage backend serializing objects just before write. + * Moreover uses a deadline-based scheduler to batch writes, with an aggregation window. + */ + +import type { PersistStorage, StorageValue } from 'zustand/middleware'; +import { get as idbGet, set as idbSet } from 'idb-keyval'; -// set to true to enable debugging +// [DEV] configuration const DEBUG_SCHEDULER = false; -const USER_LOG_ISSUES = true; +const IDB_MERGE_WINDOW = 321; // not a magic number, just a random value +const IDB_DEADLINE = 1234; // breaks the pace -interface PendingWrite { - timeoutId: ReturnType | null; - firstAttemptTime: number; - pendingValue: string | null; -} +type SetKey = string; -/** - * The write scheduler is a simple utility that batches write operations to IndexedDB. - * Should be reasonably efficient and won't block the main thread. Schreuled writes shall happen within - * the deadline, which will help the UI survive intense load. - */ -class WriteScheduler { - private writeOperations: Record = {}; - - constructor(readonly maxDeadline: number = 600, readonly minInterval: number = 250) { - } - - scheduleWrite(key: string, value: string): void { - const now = Date.now(); - const operation = this.writeOperations[key] || { timeoutId: null, firstAttemptTime: now, pendingValue: null }; - - if (operation.timeoutId !== null) { - // if (DEBUG_SCHEDULER) - // console.warn(' - idb_WS: clr_write', key); - clearTimeout(operation.timeoutId); - operation.timeoutId = null; - } - - if (!operation.firstAttemptTime) - operation.firstAttemptTime = now; - operation.pendingValue = value; - this.writeOperations[key] = operation; - - const timeSinceFirstAttempt = now - operation.firstAttemptTime; - let writeDelay = this.minInterval; - - if (timeSinceFirstAttempt + this.minInterval > this.maxDeadline) - writeDelay = this.maxDeadline - timeSinceFirstAttempt; - - if (writeDelay < 10) { - if (DEBUG_SCHEDULER) - console.warn(' - idb_WS: deadline write', key, '(delay:', writeDelay, ')'); - this.performWrite(key).catch(error => { - if (USER_LOG_ISSUES) - console.warn('idbUtils: E1: writing', key, error); - }); - } else { - if (DEBUG_SCHEDULER) - console.warn(' - idb_WS: schedule', key, 'at', writeDelay, 'ms'); - operation.timeoutId = setTimeout(() => { - this.performWrite(key).catch(error => { - if (USER_LOG_ISSUES) - console.warn('idbUtils: E2: writing', key, error); - }); - }, writeDelay); - } - } - - async performWrite(key: string): Promise { - const operation = this.writeOperations[key]; - if (!operation) { - if (USER_LOG_ISSUES) - console.warn('idbUtils: write operation not found for', key); - return; - } - const walueToWrite = operation.pendingValue; - operation.timeoutId = null; - operation.firstAttemptTime = 0; - operation.pendingValue = null; - if (walueToWrite === null) { - if (USER_LOG_ISSUES) - console.warn('idbUtils: write operation has no pending value for', key); - } else { - const start = Date.now(); - if (DEBUG_SCHEDULER) - console.log(' - idb: [SET]', key); - await idbSet(key, walueToWrite); - if (DEBUG_SCHEDULER) - console.warn(' (write time:', Date.now() - start, 'ms, bytes:', walueToWrite.length.toLocaleString(), ')'); - } - } - - async retrievePendingWrite(key: string): Promise { - // If there's a pending value, return it immediately - const operation = this.writeOperations[key]; - if (operation && operation.pendingValue !== null) { - if (DEBUG_SCHEDULER) - console.warn(' - idb_WS: read_pending', key, 'deadline:', operation.firstAttemptTime + this.maxDeadline - Date.now(), 'ms'); - return operation.pendingValue; - } - - // If there's no operation or pending value, return null indicating no data is available - return null; - } -} - -const writeScheduler = new WriteScheduler(1200, 400); - - -/** - * A Zustand state storage implementation that uses IndexedDB as a simple key-value store - */ -export const idbStateStorage: StateStorage = { - getItem: async (name: string): Promise => { - // if (DEBUG_SCHEDULER) - // console.warn(' - idb: get', name); - - // If there's a pending value, return it directly - const pendingValue = await writeScheduler.retrievePendingWrite(name); - if (pendingValue !== null) - return pendingValue; - - // If there's no pending value, proceed to fetch from IndexedDB - if (DEBUG_SCHEDULER) - console.warn(' - idb: [GET]', name); - const value: string | undefined = await idbGet(name); - if (DEBUG_SCHEDULER) - console.warn(' (read bytes:', value?.length?.toLocaleString(), ')'); - - return value || null; - }, - setItem: (name: string, value: string): void => { - // if (DEBUG_SCHEDULER) - // console.warn(' - idb: set', name); - - writeScheduler.scheduleWrite(name, value); - }, - removeItem: async (name: string): Promise => { - if (DEBUG_SCHEDULER) - console.warn(' - idb: del', name); - await idbDel(name); - }, +type SetOperation = { + queueDeadline: number | null; + scheduledTimerId: ReturnType | null; + lastState: null | StorageValue; + needsWrite: boolean; + isWriting: boolean; // [r: all, w: performWrite] }; -export async function backupIdbV3(keyFrom: string, keyTo: string): Promise { - const existingItem = await idbStateStorage.getItem(keyFrom); - if (existingItem === null) { - console.warn('idbUtils: E3: backupIdbV3: item not found:', keyFrom); - return false; +const _warn = (...args: any[]) => console.warn('IndexedDB:', ...args); +const _devWarn = (...args: any[]) => console.warn('[DEV] IndexedDB:', ...args); + + +class IndexedDBWriteScheduler { + + private writeOperations: Record = {}; + + constructor(readonly mergeWindow: number, readonly deadline: number) { } - await idbStateStorage.setItem(keyTo, existingItem); - return true; + + + async getItem(key: SetKey): Promise | null> { + // in-mem recycle: unexpected, but implemented + const operation = this.writeOperations[key]; + if (operation && operation.lastState !== null) { + _devWarn(`unexpected in-mem recycle of '${key}'`); + return operation.lastState; + } + + // fetch from IDB + const jsonState = await this.#idbReadString(key); + if (jsonState === null) return null; // first time is null (not found in storage) + + // deserialize + try { + return JSON.parse(jsonState) as StorageValue; + } catch (error: any) { + _warn(`GET: reading error for '${key}':`, error); + return null; + } + } + + setItem(key: SetKey, newValue: StorageValue): void { + + // do not serialize now, just store the object in the work order + const operation = this.writeOperations[key]; + if (!operation) { + if (DEBUG_SCHEDULER) _devWarn(`SET.${key} new operation`); + this.writeOperations[key] = { + queueDeadline: Date.now() + this.deadline, + scheduledTimerId: null, + lastState: newValue, + needsWrite: true, + isWriting: false, + }; + } else { + if (DEBUG_SCHEDULER) _devWarn(`SET.${key} updating operation`); + if (!operation.queueDeadline) + operation.queueDeadline = Date.now() + this.deadline; + operation.lastState = newValue; + operation.needsWrite = true; + } + + // schedule the write + this.#scheduleWrite(key); + } + + async setItemDirect(key: SetKey, newValue: StorageValue): Promise { + return this.#idbWriteString(key, JSON.stringify(newValue)); + } + + + // scheduling + + #scheduleWrite(key: SetKey): void { + const operation = this.writeOperations[key]; + if (!operation) return; + + if (!operation.needsWrite) return; + if (operation.isWriting) return; + + const now = Date.now(); + + const timeUntilMerge = this.mergeWindow; + const timeUntilDeadline = operation.queueDeadline ? operation.queueDeadline - now : 0; // 0 should not be an option, as state is set correctly + const delay = Math.max(Math.min(timeUntilMerge, timeUntilDeadline), 0); + + if (delay > 0) { + + // schedule/reshedule the write + + if (DEBUG_SCHEDULER) _devWarn(` - schedule ${key}: ${operation.scheduledTimerId ? 'reschedule' : 'schedule'} in ${delay} ms`); + if (operation.scheduledTimerId) + clearTimeout(operation.scheduledTimerId); + operation.scheduledTimerId = setTimeout(() => { + void this.#performWrite(key); + }, delay); + + } else { + + // we are past the deadline + + if (DEBUG_SCHEDULER) _devWarn(` - schedule ${key}: ${operation.scheduledTimerId ? 'just wait (already pending)' : 'schedule 0-delay'}`); + if (operation.scheduledTimerId) { + // there's already a timer scheduled, so we don't need to do anything + return; + } + + operation.scheduledTimerId = setTimeout(() => { + void this.#performWrite(key); + }, 0); + } + } + + async #performWrite(key: SetKey): Promise { + const operation = this.writeOperations[key]; + if (!operation) return; + + // set the state for the scheduling operations to come + const state = operation.lastState; + operation.queueDeadline = null; + if (operation.scheduledTimerId) { + clearTimeout(operation.scheduledTimerId); + operation.scheduledTimerId = null; + } + operation.lastState = null; + operation.needsWrite = false; + operation.isWriting = true; + + try { + + // serialize + const dateStart = Date.now(); + const serialized = JSON.stringify(state); + if (DEBUG_SCHEDULER) _devWarn(`SET '${key}': serialized ${serialized?.length?.toLocaleString()} bytes in ${Date.now() - dateStart} ms`); + + // Optimization - ? unsure, needs testing + // (globalThis as any)?.scheduler?.yield?.(); + + // write to IDB + await this.#idbWriteString(key, serialized); + + } catch (error: any) { + _warn(`SET '${key}': serialization error:`, error); + } + + // done + operation.isWriting = false; + + // schedule the next write + this.#scheduleWrite(key); + + } + + + // with strings + + async #idbReadString(key: SetKey): Promise { + const now = Date.now(); + const counter = ++this.#readOpCounter; + try { + if (DEBUG_SCHEDULER) _devWarn(`GET ${key}(${counter})`); + const jsonState = await idbGet(key) ?? null; + if (DEBUG_SCHEDULER) _devWarn(jsonState === null ? `GET ${key}(${counter}) -> missing` : `GET ${key}(${counter}) -> read ${jsonState?.length?.toLocaleString()} bytes in ${Date.now() - now} ms`); + return jsonState; + } catch (error: any) { + _warn(`GET '${key}(${counter})': read error:`, error); + return null; + } + } + + async #idbWriteString(key: SetKey, jsonState: string): Promise { + const now = Date.now(); + const counter = ++this.#writeOpCounter; + try { + if (DEBUG_SCHEDULER) _devWarn(`SET ${key}(${counter})`); + await idbSet(key, jsonState); + if (DEBUG_SCHEDULER) _devWarn(`SET ${key}(${counter}) -> wrote ${jsonState?.length?.toLocaleString()} bytes in ${Date.now() - now} ms`); + } catch (error: any) { + _warn(`SET '${key}(${counter})': write error:`, error); + } + } + + + // private fields + #readOpCounter = 0; + #writeOpCounter = 0; + } +const _idbScheduler = new IndexedDBWriteScheduler(IDB_MERGE_WINDOW, IDB_DEADLINE); + +/** + * Thin adapter to use the new scheduler with Zustand. + */ +export function createIDBPersistStorage(): PersistStorage | undefined { + + // server-side or no IDB support + if (typeof window === 'undefined') + return undefined; + if (!('indexedDB' in window)) { + _warn('[FATAL] IndexedDB is not supported in this browser.'); + return undefined; + } + + return { + getItem: async (name: string): Promise | null> => _idbScheduler.getItem(name), + setItem: (name: string, newValue: StorageValue): void => _idbScheduler.setItem(name, newValue), + removeItem: async (_name: string): Promise => { + // We do NOT remove! We don't intend to implement this, on purpose + }, + }; +} + + +export async function backupIdbV3(keyFrom: string, keyTo: string): Promise { + try { + const existingItem = await _idbScheduler.getItem(keyFrom); + if (existingItem === null) { + _warn(`idbUtils: backupIdbV3: item not found: '${keyFrom}'`); + return false; + } + await _idbScheduler.setItemDirect(keyTo, existingItem); + return true; + } catch (error) { + _warn(`idbUtils: backupIdbV3: Error backing up from '${keyFrom}' to '${keyTo}':`, error); + return false; + } +} + /// Maintenance /* Sets a single key-value in a given IndexedDB key-value store.