From 367a541c9c0c34ebdd5cd757d5b2f619eff4f43f Mon Sep 17 00:00:00 2001 From: nai-degen Date: Sat, 3 Jun 2023 22:00:44 -0500 Subject: [PATCH] downgrades zmq implementation for v5.x --- src/tokenization/claude-ipc.ts | 116 ++++++++++++++++----------- src/tokenization/claude-tokenizer.py | 4 +- tsconfig.json | 3 +- 3 files changed, 75 insertions(+), 48 deletions(-) diff --git a/src/tokenization/claude-ipc.ts b/src/tokenization/claude-ipc.ts index 0b21338..9601ffd 100644 --- a/src/tokenization/claude-ipc.ts +++ b/src/tokenization/claude-ipc.ts @@ -1,30 +1,42 @@ import { spawn, ChildProcess } from "child_process"; import { join } from "path"; -import { Dealer } from "zeromq"; import { logger } from "../logger"; +type Zmq = typeof import("zeromq"); const TOKENIZER_SOCKET = "tcp://localhost:5555"; const log = logger.child({ module: "claude-ipc" }); const pythonLog = logger.child({ module: "claude-python" }); let tokenizer: ChildProcess; -let socket: Dealer; +let socket: ReturnType; export async function init() { log.info("Initializing Claude tokenizer IPC"); try { - tokenizer = launchTokenizer(); - socket = new Dealer({ sendTimeout: 500 }); + const zmq = await import("zeromq"); + tokenizer = await launchTokenizer(); + socket = zmq.socket("dealer"); socket.connect(TOKENIZER_SOCKET); - await socket.send(["init"]); - const response = await socket.receive(); + socket.send(["init"]); + const response = await new Promise((resolve) => { + const timeout = setTimeout(() => resolve("timeout"), 1000); + socket.once("message", (msg) => { + clearTimeout(timeout); + resolve(msg); + }); + }); + if (response === "timeout") { + throw new Error("Timeout waiting for init response"); + } if (response.toString() !== "ok") { throw new Error("Unexpected init response"); } - // Start message pump - processMessages(); + socket.on("message", onMessage); + socket.on("error", (err) => { + log.error({ err }, "Claude tokenizer socket error"); + }); // Test tokenizer const result = await requestTokenCount({ @@ -35,8 +47,13 @@ export async function init() { log.error({ result }, "Unexpected test token count"); throw new Error("Unexpected test token count"); } - } catch (e) { - log.error({ e }, "Failed to initialize Claude tokenizer"); + } catch (err) { + log.error({ err: err.message }, "Failed to initialize Claude tokenizer"); + if (process.env.NODE_ENV !== "production") { + console.error( + `\nClaude tokenizer failed to initialize.\nIf you want to use the tokenizer, see the Optional Dependencies documentation.\n` + ); + } return false; } log.info("Claude tokenizer IPC ready"); @@ -60,7 +77,7 @@ export async function requestTokenCount({ } log.debug({ requestId, chars: prompt.length }, "Requesting token count"); - await socket.send(["tokenize", requestId, prompt]); + socket.send(["tokenize", requestId, prompt]); log.debug({ requestId }, "Waiting for socket response"); return new Promise(async (resolve, reject) => { @@ -79,44 +96,51 @@ export async function requestTokenCount({ log.warn({ requestId }, err); reject(new Error(err)); } - }, 250); // TODO: make this configurable, some really crappy VMs might need more time + }, 500); }); } -async function processMessages() { - if (!socket) { - throw new Error("Claude tokenizer is not initialized"); +function onMessage(requestId: Buffer, tokens: Buffer) { + const request = pendingRequests.get(requestId.toString()); + if (!request) { + log.error({ requestId }, "No pending request found for incoming message"); + return; } - log.debug("Starting message loop"); - for await (const [requestId, tokens] of socket) { - const request = pendingRequests.get(requestId.toString()); - if (!request) { - log.error({ requestId }, "No pending request found for incoming message"); - continue; + request.resolve(Number(tokens.toString())); +} + +async function launchTokenizer() { + return new Promise((resolve, reject) => { + let resolved = false; + const proc = spawn("python", [ + "-u", + join(__dirname, "tokenization", "claude-tokenizer.py"), + ]); + if (!proc) { + reject(new Error("Failed to spawn Claude tokenizer")); } - request.resolve(Number(tokens.toString())); - } -} - -function launchTokenizer() { - const proc = spawn("python", [ - "-u", - join(__dirname, "tokenization", "claude-tokenizer.py"), - ]); - if (!proc) { - throw new Error("Failed to start Claude tokenizer. Is python installed?"); - } - proc.stdout!.on("data", (data) => { - pythonLog.info(data.toString()); - }); - proc.stderr!.on("data", (data) => { - pythonLog.error(data.toString()); - }); - proc.on("close", (code) => { - pythonLog.info(`Claude tokenizer exited with code ${code}`); - socket.close(); - socket = undefined!; - tokenizer = undefined!; - }); - return proc; + proc.stdout!.on("data", (data) => { + pythonLog.info(data.toString()); + }); + proc.stderr!.on("data", (data) => { + pythonLog.error(data.toString()); + }); + proc.on("close", (code) => { + pythonLog.info(`Claude tokenizer exited with code ${code}`); + socket?.close(); + socket = undefined!; + tokenizer = undefined!; + if (code !== 0 && !resolved) { + resolved = true; + reject(new Error("Claude tokenizer exited immediately")); + } + }); + // Wait a moment to catch any immediate errors (missing imports, etc) + setTimeout(() => { + if (!resolved) { + resolved = true; + resolve(proc); + } + }, 100); + }); } diff --git a/src/tokenization/claude-tokenizer.py b/src/tokenization/claude-tokenizer.py index 7f96b7d..e09d472 100644 --- a/src/tokenization/claude-tokenizer.py +++ b/src/tokenization/claude-tokenizer.py @@ -19,12 +19,14 @@ def init(socket): while True: message = socket.recv_multipart() routing_id, command = message + print(f"claude-tokenizer.py: received message {message}") + print(f"claude-tokenizer.py: received command {command}") if command == b"init": print("claude-tokenizer.py: initialized") socket.send_multipart([routing_id, b"ok"]) break except Exception as e: - print("claude-tokenizer.py: failed to initialize") + print("claude-tokenizer.py: failed to initialize ({e})") return message_processor(socket) diff --git a/tsconfig.json b/tsconfig.json index 942475c..13a4926 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,7 +10,8 @@ "skipDefaultLibCheck": true, "outDir": "build", "sourceMap": true, - "resolveJsonModule": true + "resolveJsonModule": true, + "useUnknownInCatchVariables": false }, "include": ["src"], "exclude": ["node_modules"],