AIX: Reattach: preserve the creation time and don't overwrite handle on reattach

This commit is contained in:
Enrico Ros
2026-04-22 13:15:07 -07:00
parent 013dab185c
commit 04916b700e
4 changed files with 21 additions and 5 deletions
+4 -2
View File
@@ -132,8 +132,10 @@ export type DMessageGenerator = ({
},
upstreamHandle?:
// unified `runId` across variants - vendor-specific id lives behind it; `uht` is consulted only for dispatch routing
| { uht: 'vnd.oai.responses', runId: string /* OpenAI `response.id` */, expiresAt: number | null /* null = never expires */ }
| { uht: 'vnd.gem.interactions', runId: string /* Gemini `interaction.id` */, expiresAt: number | null },
// createdAt/expiresAt: server-clock (ms) at the FIRST observation for this runId - preserved across reattaches
// (reassembler ignores re-emissions for the same runId so retention is measured from creation, not last reattach)
| { uht: 'vnd.oai.responses', runId: string /* OpenAI `response.id` */, createdAt: number | null, expiresAt: number | null /* null = never expires */ }
| { uht: 'vnd.gem.interactions', runId: string /* Gemini `interaction.id` */, createdAt: number | null, expiresAt: number | null },
tokenStopReason?:
| 'client-abort' // if the generator stopped due to a client abort signal
| 'filter' // (inline filter message injected) if the generator stopped due to a filter
+8 -1
View File
@@ -1052,10 +1052,17 @@ export class ContentReassembler {
private onResponseHandle({ handle }: Extract<AixWire_Particles.ChatGenerateOp, { cg: 'set-upstream-handle' }>): void {
// validate the handle
const knownUht = handle?.uht === 'vnd.oai.responses' || handle?.uht === 'vnd.gem.interactions';
if (!knownUht || !handle?.runId || handle.expiresAt === undefined) {
if (!knownUht || !handle?.runId || handle.createdAt === undefined || handle.expiresAt === undefined) {
this._appendReassemblyDevError(`Invalid response handle received: ${JSON.stringify(handle)}`);
return;
}
// Preserve earliest-observed timestamps for a given runId: on reattach the server emits fresh server-clock
// values but the original createdAt is the truth we want to keep (so retention is measured from creation, not reattach).
const existing = this.S.generator.upstreamHandle;
if (existing && existing.runId === handle.runId && existing.createdAt !== null && handle.createdAt !== null && existing.createdAt <= handle.createdAt)
return; // no-op: existing handle already carries the earliest createdAt for this runId
// type check point for AixWire_Particles.ChatControlOp('set-upstream-handle') -> DUpstreamResponseHandle
this.S.generator = { ...this.S.generator, upstreamHandle: handle };
}
+4 -1
View File
@@ -540,6 +540,9 @@ export namespace AixWire_API {
* - vnd.oai.responses: OpenAI Responses API - GET /v1/responses/{id}
* - vnd.gem.interactions: Gemini Interactions API for background agents - GET-poll /v1beta/interactions/{id}
*/
// Wire input for reattach: server only consumes `runId` (+ `startingAfter` for OpenAI). Timestamps live
// on the persisted `DMessageGenerator.upstreamHandle` (client concerns) and on the `set-upstream-handle`
// particle (server-to-client transport), but don't need to travel back in the reattach request.
export const ResumeHandle_schema = z.discriminatedUnion('uht', [
z.object({
uht: z.literal('vnd.oai.responses'),
@@ -683,7 +686,7 @@ export namespace AixWire_Particles {
| { cg: 'set-metrics', metrics: CGSelectMetrics }
| { cg: 'set-model', name: string }
| { cg: 'set-provider-infra', label: string }
| { cg: 'set-upstream-handle', handle: { uht: 'vnd.oai.responses' | 'vnd.gem.interactions', runId: string, expiresAt: number | null } }
| { cg: 'set-upstream-handle', handle: { uht: 'vnd.oai.responses' | 'vnd.gem.interactions', runId: string, createdAt: number | null, expiresAt: number | null } }
| { cg: '_debugDispatchRequest', security: 'dev-env', dispatchRequest: { url: string, headers: string, body: string, bodySize: number } } // may generalize this in the future
| { cg: '_debugProfiler', measurements: Record<string, number | string>[] };
@@ -522,6 +522,9 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
if (SERVER_DEBUG_WIRE)
console.log('|response-handle|', type, handle);
// NOTE: if needed, we could store the handle locally for server-side resumability, but we just implement client-side (correction, manual) for now
// createdAt/expiresAt are server-clock at emit time; on reattach the server has no knowledge of the original create,
// so it emits fresh values. The client reassembler preserves the earliest values for a given runId.
const now = Date.now();
const expireDays = type === 'vnd.gem.interactions'
? 1 // Gemini Interactions: 1d free / 55d paid - use the conservative lower bound
: 30; // OpenAI Responses: default 30 days
@@ -530,7 +533,8 @@ export class ChatGenerateTransmitter implements IParticleTransmitter {
handle: {
uht: type,
runId: handle,
expiresAt: Date.now() + expireDays * 24 * 3600 * 1000,
createdAt: now,
expiresAt: now + expireDays * 24 * 3600 * 1000,
},
});
// send it right away, in case the connection closes soon