Vector Clocks implementation

This commit is contained in:
Enrico Ros
2024-12-02 17:40:26 -08:00
parent 0a27544db3
commit b8f4ad674b
3 changed files with 216 additions and 38 deletions
+30 -38
View File
@@ -4,50 +4,30 @@ import { persist } from 'zustand/middleware';
import { agiId } from '~/common/util/idUtils';
import { isBrowser } from '~/common/util/pwaUtils';
import type { VectorClockNodeId } from './vectorclock.types';
type VectorClientDeviceId = string;
interface VectorClient {
interface SyncStore {
// The critical ID used in vector clocks
vectorId: VectorClientDeviceId;
vectorClockNode: null | {
// Basic device fingerprint stored once at creation
createdAt: number;
userAgent: string;
// unique id for this device, used to track changes, only statistically unique within the user space
nodeId: VectorClockNodeId;
// basic device fingerprint stored once at creation
createdAt: number;
userAgent: string;
};
}
interface StoreSync {
client: VectorClient | null;
getVectorDeviceId: () => string;
}
const useSyncStore = create<StoreSync>()(persist(
const useSyncStore = create<SyncStore>()(persist(
(_set, _get) => ({
client: null,
getVectorDeviceId: () => {
const exClient = _get().client;
if (exClient) return exClient.vectorId;
// this will be created once per browser
const client = {
vectorId: agiId('vector-device-id10'),
createdAt: Date.now(),
userAgent: isBrowser ? window.navigator?.userAgent || '' : '',
};
_set({ client });
return client.vectorId;
},
// initial state
vectorClockNode: null,
}),
{
@@ -56,7 +36,19 @@ const useSyncStore = create<StoreSync>()(persist(
},
));
// Quick access for vector clocks
export const getVectorDeviceId = () => {
return useSyncStore.getState().getVectorDeviceId();
};
export function getVectorClockNodeId() {
const exClient = useSyncStore.getState().vectorClockNode;
if (exClient) return exClient.nodeId;
// this will be created once per browser
const vectorClockNode = {
nodeId: agiId('vector-device-id10'),
createdAt: Date.now(),
userAgent: isBrowser ? window.navigator?.userAgent || '' : '',
};
useSyncStore.setState({ vectorClockNode });
return vectorClockNode.nodeId;
}
+148
View File
@@ -0,0 +1,148 @@
import type { VectorClock, VectorClockMergeResult, VectorClockNodeId, VectorClockState } from './vectorclock.types';
export const VectorClockOrder = {
BEFORE: -1,
CONCURRENT: 0,
AFTER: 1,
} as const;
export type VectorClockOrderType = typeof VectorClockOrder[keyof typeof VectorClockOrder];
/**
* Creates a new vector clock for a node
*/
export function vectorClockCreate(nodeId: VectorClockNodeId, state: VectorClockState = {}): VectorClock {
return {
nodeId,
state: { ...state, [nodeId]: state[nodeId] ?? 0 },
};
}
/**
* Creates a deep copy of a vector clock
*/
export function vectorClockClone(clock: VectorClock): VectorClock {
return {
nodeId: clock.nodeId,
state: { ...clock.state },
};
}
/**
* Increments the vector clock for its node
*/
export function vectorClockIncrementInPlace(clock: VectorClock): void {
clock.state[clock.nodeId] = (clock.state[clock.nodeId] ?? 0) + 1;
}
/**
* Merges source clock into target clock
*/
export function vectorClockMergeInPlace(target: VectorClock, source: VectorClock): void {
const allNodes = new Set([...Object.keys(target.state), ...Object.keys(source.state)]);
for (const nodeId of allNodes)
target.state[nodeId] = Math.max(target.state[nodeId] ?? 0, source.state[nodeId] ?? 0);
}
/**
* Compares two vector clocks
* @returns -1 if a < b, 0 if concurrent, 1 if a > b
*/
export function vectorClockCompare(a: VectorClock, b: VectorClock): VectorClockOrderType {
let isGreater = false;
let isLess = false;
const allNodes = new Set([...Object.keys(a.state), ...Object.keys(b.state)]);
for (const nodeId of allNodes) {
const aTime = a.state[nodeId] ?? 0;
const bTime = b.state[nodeId] ?? 0;
if (aTime > bTime) isGreater = true;
if (aTime < bTime) isLess = true;
}
if (isGreater && !isLess) return VectorClockOrder.AFTER;
if (isLess && !isGreater) return VectorClockOrder.BEFORE;
return VectorClockOrder.CONCURRENT;
}
/**
* Validates if an object is a valid vector clock
*/
export function vectorClockIsValid(obj: unknown): obj is VectorClock {
if (!obj || typeof obj !== 'object') return false;
const clock = obj as VectorClock;
// noinspection SuspiciousTypeOfGuard
return (
typeof clock.nodeId === 'string' && !!clock.nodeId &&
clock.state !== null && typeof clock.state === 'object' &&
Object.values(clock.state).every(v => typeof v === 'number')
);
}
// /**
// * Gets timestamp for a specific node
// */
// export function vectorClockGetTime(clock: VectorClock, nodeId: VectorClockNodeId): VectorClockTimestamp {
// return clock.state[nodeId] ?? 0;
// }
// /**
// * Creates an empty vector clock state
// */
// export function vectorClockCreateEmptyState(): VectorClockState {
// return {};
// }
// /**
// * Detects if two clocks are concurrent (potential conflict)
// */
// export function vectorClockHasConcurrentUpdates(a: VectorClock, b: VectorClock): boolean {
// return vectorClockCompare(a, b) === VectorClockOrder.CONCURRENT;
// }
/**
* Checks if one clock dominates another (happens-before relationship)
*/
export function vectorClockIsDominating(dominant: VectorClock, other: VectorClock): boolean {
return vectorClockCompare(dominant, other) === VectorClockOrder.AFTER;
}
/**
* Attempts to merge data with vector clocks, detecting conflicts
*/
export function vectorClockAttemptMerge<T>(local: { data: T; clock: VectorClock }, remote: { data: T; clock: VectorClock }): VectorClockMergeResult<T> {
const comparison = vectorClockCompare(local.clock, remote.clock);
switch (comparison) {
case VectorClockOrder.CONCURRENT:
return {
success: false,
conflicts: [{
local: local.data,
remote: remote.data,
localClock: local.clock,
remoteClock: remote.clock,
}],
};
case VectorClockOrder.BEFORE:
return {
success: true,
result: remote.data,
};
default:
return {
success: true,
result: local.data,
};
}
}
/**
* Creates a merged clock after conflict resolution
*/
export function vectorClockCreateMerged(localClock: VectorClock, remoteClock: VectorClock): VectorClock {
const merged = vectorClockClone(localClock);
vectorClockMergeInPlace(merged, remoteClock);
vectorClockIncrementInPlace(merged);
return merged;
}
+38
View File
@@ -0,0 +1,38 @@
/**
* Vector Clock implementation for distributed systems
* Supports async workflows and conflict resolution
*/
export type VectorClockNodeId = string;
type VectorClockTimestamp = number;
/**
* What's in the database
*/
export type VectorClockState = {
[nodeId: string]: VectorClockTimestamp;
};
/**
* What's in memory in this node (device)
*/
export interface VectorClock {
nodeId: VectorClockNodeId;
state: VectorClockState;
}
// Auxiliary types for comparisons, merges
export interface VectorClockConflict<T> {
local: T;
remote: T;
localClock: VectorClock;
remoteClock: VectorClock;
}
export type VectorClockMergeResult<T> = {
success: boolean;
conflicts?: VectorClockConflict<T>[];
result?: T;
};