diff --git a/app/api/llms/stream/route.ts b/app/api/llms/stream/route.ts deleted file mode 100644 index fed3b5d47..000000000 --- a/app/api/llms/stream/route.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const runtime = 'edge'; -export { llmStreamingRelayHandler as POST } from '~/modules/llms/server/llm.server.streaming'; \ No newline at end of file diff --git a/src/apps/chat/AppChat.tsx b/src/apps/chat/AppChat.tsx index f2b17363b..edb39e88d 100644 --- a/src/apps/chat/AppChat.tsx +++ b/src/apps/chat/AppChat.tsx @@ -258,7 +258,7 @@ export function AppChat() { }, [paneUniqueConversationIds, handleExecuteAndOutcome, willMulticast]); const handleConversationExecuteHistory = React.useCallback(async (conversationId: DConversationId) => { - await handleExecuteAndOutcome('generate-content', conversationId, 'chat-execute-history'); // replace with 'history', then 'generate-text' + await handleExecuteAndOutcome('generate-content', conversationId, 'chat-execute-history'); // replace with 'history', then 'generate-content' }, [handleExecuteAndOutcome]); const handleMessageRegenerateLastInFocusedPane = React.useCallback(async () => { diff --git a/src/apps/chat/components/composer/Composer.tsx b/src/apps/chat/components/composer/Composer.tsx index 37947f160..94d88d3c2 100644 --- a/src/apps/chat/components/composer/Composer.tsx +++ b/src/apps/chat/components/composer/Composer.tsx @@ -142,7 +142,7 @@ export function Composer(props: { : null; // composer-overlay: for the in-reference-to state, comes from the conversation overlay - const allowInReferenceTo = chatExecuteMode === 'generate-content' || chatExecuteMode === 'generate-text-v1'; + const allowInReferenceTo = chatExecuteMode === 'generate-content'; const inReferenceTo = useChatComposerOverlayStore(conversationOverlayStore, store => allowInReferenceTo ? store.inReferenceTo : null); // don't load URLs if the user is typing a command or there's no capability @@ -518,7 +518,7 @@ export function Composer(props: { // ... - const isText = chatExecuteMode === 'generate-content' || chatExecuteMode === 'generate-text-v1'; + const isText = chatExecuteMode === 'generate-content'; const isTextBeam = chatExecuteMode === 'beam-content'; const isAppend = chatExecuteMode === 'append-user'; const isReAct = chatExecuteMode === 'react-content'; diff --git a/src/apps/chat/components/message/fragments-content/TextFragmentEditor.tsx b/src/apps/chat/components/message/fragments-content/TextFragmentEditor.tsx index 92922418d..9330ccb91 100644 --- a/src/apps/chat/components/message/fragments-content/TextFragmentEditor.tsx +++ b/src/apps/chat/components/message/fragments-content/TextFragmentEditor.tsx @@ -81,7 +81,7 @@ export function TextFragmentEditor(props: { // shortcuts const isEdited = props.editedText !== undefined; - useGlobalShortcuts('ContentPartTextEditor', React.useMemo(() => !isFocused ? [] : [ + useGlobalShortcuts('TextFragmentEditor', React.useMemo(() => !isFocused ? [] : [ { key: ShortcutKey.Enter, shift: true, description: 'Save', disabled: !isEdited && props.enableRestart !== true, level: 1, action: () => onSubmit(false) }, ...props.enableRestart ? [{ key: ShortcutKey.Enter, ctrl: true, shift: true, description: 'Save & Retry', disabled: !isEdited, level: 1, action: () => onSubmit(true) }] : [], { key: ShortcutKey.Esc, description: 'Cancel', level: 1, action: onEscapePressed }, diff --git a/src/apps/chat/editors/_handleExecute.ts b/src/apps/chat/editors/_handleExecute.ts index 48fb04519..7c609f482 100644 --- a/src/apps/chat/editors/_handleExecute.ts +++ b/src/apps/chat/editors/_handleExecute.ts @@ -6,14 +6,11 @@ import { ConversationHandler } from '~/common/chat-overlay/ConversationHandler'; import { ConversationsManager } from '~/common/chat-overlay/ConversationsManager'; import { createTextContentFragment, isContentFragment, isTextPart } from '~/common/stores/chat/chat.fragments'; import { getConversationSystemPurposeId } from '~/common/stores/chat/store-chats'; -import { getUXLabsHighPerformance } from '~/common/state/store-ux-labs'; import type { ChatExecuteMode } from '../execute-mode/execute-mode.types'; -import { getInstantAppChatPanesCount } from '../components/panes/usePanesManager'; import { textToDrawCommand } from '../commands/CommandsDraw'; import { _handleExecuteCommand, RET_NO_CMD } from './_handleExecuteCommand'; -import { runAssistantUpdatingStateV1 } from './chat-stream-v1'; import { runImageGenerationUpdatingState } from './image-generate'; import { runPersonaOnConversationHead } from './chat-persona'; import { runReActUpdatingState } from './react-tangent'; @@ -74,9 +71,6 @@ export async function _handleExecute(chatExecuteMode: ChatExecuteMode, conversat case 'generate-content': return await runPersonaOnConversationHead(chatLLMId, conversationId); - case 'generate-text-v1': - return await runAssistantUpdatingStateV1(conversationId, cHandler.historyViewHead('generate-text-v1'), chatLLMId, getUXLabsHighPerformance() ? 0 : getInstantAppChatPanesCount()); - case 'beam-content': cHandler.beamInvoke(cHandler.historyViewHead('beam-content'), [], null); return true; diff --git a/src/apps/chat/editors/chat-persona.ts b/src/apps/chat/editors/chat-persona.ts index 3c20a3e2d..9421dab8b 100644 --- a/src/apps/chat/editors/chat-persona.ts +++ b/src/apps/chat/editors/chat-persona.ts @@ -88,5 +88,6 @@ export async function runPersonaOnConversationHead( if (!hasBeenAborted && (autoSuggestDiagrams || autoSuggestHTMLUI || autoSuggestQuestions)) autoSuggestions(null, conversationId, assistantMessageId, autoSuggestDiagrams, autoSuggestHTMLUI, autoSuggestQuestions); + // return true if this succeeded return messageStatus.outcome === 'success'; } diff --git a/src/apps/chat/editors/chat-stream-v1.ts b/src/apps/chat/editors/chat-stream-v1.ts deleted file mode 100644 index c78e8d55b..000000000 --- a/src/apps/chat/editors/chat-stream-v1.ts +++ /dev/null @@ -1,167 +0,0 @@ -import type { StreamingClientUpdate } from '~/modules/llms/vendors/unifiedStreamingClient'; -import { autoSuggestions } from '~/modules/aifn/autosuggestions/autoSuggestions'; -import { autoConversationTitle } from '~/modules/aifn/autotitle/autoTitle'; -import { llmStreamingChatGenerate, VChatContextRef, VChatMessageIn, VChatStreamContextName } from '~/modules/llms/llm.client'; -import { speakText } from '~/modules/elevenlabs/elevenlabs.client'; - -import type { DLLMId } from '~/common/stores/llms/llms.types'; -import { ConversationsManager } from '~/common/chat-overlay/ConversationsManager'; -import { DMessage, messageFragmentsReduceText, messageFragmentsReplaceLastContentText, messageSingleTextOrThrow } from '~/common/stores/chat/chat.message'; - -import { ChatAutoSpeakType, getChatAutoAI } from '../store-app-chat'; - - -/** - * The main "chat" function. TODO: this is here so we can soon move it to the data model. - */ -export async function runAssistantUpdatingStateV1( - conversationId: string, - history: Readonly, - assistantLlmId: DLLMId, - parallelViewCount: number, -) { - const cHandler = ConversationsManager.getHandler(conversationId); - - // ai follow-up operations (fire/forget) - const { autoSpeak, autoSuggestDiagrams, autoSuggestHTMLUI, autoSuggestQuestions, autoTitleChat } = getChatAutoAI(); - - // assistant placeholder - const { assistantMessageId } = cHandler.messageAppendAssistantPlaceholder( - '...', - { generator: { mgt: 'named', name: assistantLlmId }, purposeId: history[0].purposeId }, - ); - - // when an abort controller is set, the UI switches to the "stop" mode - const abortController = new AbortController(); - cHandler.setAbortController(abortController); - - // stream the assistant's messages directly to the state store - const overwriteMessageParts = (incrementalMessage: Partial, messageComplete: boolean) => { - cHandler.messageEdit(assistantMessageId, incrementalMessage, messageComplete, false); - }; - let instructions: VChatMessageIn[]; - try { - instructions = history.map((m): VChatMessageIn => ({ role: m.role, content: messageSingleTextOrThrow(m) /* BIG FIXME */ })); - } catch (error) { - console.error('runAssistantUpdatingState: error:', error, history); - throw error; - } - const messageStatus = await streamAssistantMessageV1( - assistantLlmId, - instructions, - 'conversation', - conversationId, - parallelViewCount, - autoSpeak, - overwriteMessageParts, - abortController.signal, - ); - - // clear to send, again - // FIXME: race condition? (for sure!) - cHandler.setAbortController(null); - - if (autoTitleChat) { - // fire/forget, this will only set the title if it's not already set - void autoConversationTitle(conversationId, false); - } - - if (autoSuggestDiagrams || autoSuggestHTMLUI || autoSuggestQuestions) - autoSuggestions(null, conversationId, assistantMessageId, autoSuggestDiagrams, autoSuggestHTMLUI, autoSuggestQuestions); - - return messageStatus.outcome === 'success'; -} - - -type StreamMessageOutcome = 'success' | 'aborted' | 'errored'; -type StreamMessageStatus = { outcome: StreamMessageOutcome, errorMessage?: string }; -type StreamMessageUpdate = Pick; - -export async function streamAssistantMessageV1( - llmId: DLLMId, - messagesHistory: VChatMessageIn[], - contextName: VChatStreamContextName, - contextRef: VChatContextRef, - throttleUnits: number, // 0: disable, 1: default throttle (12Hz), 2+ reduce the message frequency with the square root - autoSpeak: ChatAutoSpeakType, - onMessageUpdated: (incrementalMessage: Partial, messageComplete: boolean) => void, - abortSignal: AbortSignal, -): Promise { - - const returnStatus: StreamMessageStatus = { - outcome: 'success', - errorMessage: undefined, - }; - - // speak once - let spokenLine = false; - - // Throttling setup - let lastCallTime = 0; - let throttleDelay = 1000 / 12; // 12 messages per second works well for 60Hz displays (single chat, and 24 in 4 chats, see the square root below) - if (throttleUnits > 1) - throttleDelay = Math.round(throttleDelay * Math.sqrt(throttleUnits)); - - function throttledEditMessage(updatedMessage: Partial) { - const now = Date.now(); - if (throttleUnits === 0 || now - lastCallTime >= throttleDelay) { - onMessageUpdated(updatedMessage, false); - lastCallTime = now; - } - } - - // TODO: should clean this up once we have multi-fragment streaming/recombination - const incrementalAnswer: StreamMessageUpdate = { - fragments: [], - }; - - try { - await llmStreamingChatGenerate(llmId, messagesHistory, contextName, contextRef, null, null, abortSignal, (update: StreamingClientUpdate) => { - const textSoFar = update.textSoFar; - - // grow the incremental message - if (textSoFar) incrementalAnswer.fragments = messageFragmentsReplaceLastContentText(incrementalAnswer.fragments, textSoFar); - if (update.originLLM) incrementalAnswer.generator = { mgt: 'named', name: update.originLLM }; - if (update.typing !== undefined) - incrementalAnswer.pendingIncomplete = update.typing ? true : undefined; - - // Update the data store, with optional max-frequency throttling (e.g. OpenAI is downsamped 50 -> 12Hz) - // This can be toggled from the settings - throttledEditMessage(incrementalAnswer); - - // 📢 TTS: first-line - if (textSoFar && autoSpeak === 'firstLine' && !spokenLine) { - let cutPoint = textSoFar.lastIndexOf('\n'); - if (cutPoint < 0) - cutPoint = textSoFar.lastIndexOf('. '); - if (cutPoint > 100 && cutPoint < 400) { - spokenLine = true; - const firstParagraph = textSoFar.substring(0, cutPoint); - // fire/forget: we don't want to stall this loop - void speakText(firstParagraph); - } - } - }); - } catch (error: any) { - if (error?.name !== 'AbortError') { - console.error('Fetch request error:', error); - const errorText = ` [Issue: ${error.message || (typeof error === 'string' ? error : 'Chat stopped.')}]`; - incrementalAnswer.fragments = messageFragmentsReplaceLastContentText(incrementalAnswer.fragments, errorText, true); - returnStatus.outcome = 'errored'; - returnStatus.errorMessage = error.message; - } else - returnStatus.outcome = 'aborted'; - } - - // Ensure the last content is flushed out, and mark as complete - onMessageUpdated({ ...incrementalAnswer, pendingIncomplete: undefined }, true); - - // 📢 TTS: all - if ((autoSpeak === 'all' || autoSpeak === 'firstLine') && !spokenLine && !abortSignal.aborted) { - const incrementalText = messageFragmentsReduceText(incrementalAnswer.fragments); - if (incrementalText.length > 0) - void speakText(incrementalText); - } - - return returnStatus; -} \ No newline at end of file diff --git a/src/apps/chat/execute-mode/execute-mode.items.ts b/src/apps/chat/execute-mode/execute-mode.items.ts index a776e2c8f..2f8f993ce 100644 --- a/src/apps/chat/execute-mode/execute-mode.items.ts +++ b/src/apps/chat/execute-mode/execute-mode.items.ts @@ -28,13 +28,6 @@ export const ExecuteModeItems: { [key in ChatExecuteMode]: ModeDescription } = { sendColor: 'primary', sendText: 'Chat · DEV', }, - 'generate-text-v1': { - label: 'Chat (Stable)', - description: 'Model replies (stable)', - canAttach: true, - sendColor: 'primary', - sendText: 'Chat · Stable', - }, 'beam-content': { label: 'Beam', // Best of, Auto-Prime, Top Pick, Select Best description: 'Combine multiple models', // Smarter: combine... diff --git a/src/apps/chat/execute-mode/execute-mode.types.ts b/src/apps/chat/execute-mode/execute-mode.types.ts index 13c25e67c..f7db62660 100644 --- a/src/apps/chat/execute-mode/execute-mode.types.ts +++ b/src/apps/chat/execute-mode/execute-mode.types.ts @@ -7,6 +7,5 @@ export type ChatExecuteMode = | 'beam-content' | 'generate-content' | 'generate-image' - | 'generate-text-v1' | 'react-content' ; diff --git a/src/modules/aifn/agiattachmentprompts/agiAttachmentPrompts.ts b/src/modules/aifn/agiattachmentprompts/agiAttachmentPrompts.ts index b104696ac..7861f2a96 100644 --- a/src/modules/aifn/agiattachmentprompts/agiAttachmentPrompts.ts +++ b/src/modules/aifn/agiattachmentprompts/agiAttachmentPrompts.ts @@ -5,7 +5,7 @@ import { getChatLLMId } from '~/common/stores/llms/store-llms'; import type { AixAPIChatGenerate_Request } from '~/modules/aix/server/api/aix.wiretypes'; import { aixChatGenerateRequestFromDMessages } from '~/modules/aix/client/aix.client.fromDMessages.api'; import { aixFunctionCallTool } from '~/modules/aix/client/aix.client.fromSimpleFunction'; -import { aixCreateContext, aixLLMChatGenerateContent } from '~/modules/aix/client/aix.client'; +import { aixCreateChatGenerateStreamContext, aixLLMChatGenerateContent } from '~/modules/aix/client/aix.client'; import { createTextContentFragment, DMessageAttachmentFragment, DMessageToolInvocationPart, isContentFragment } from '~/common/stores/chat/chat.fragments'; @@ -71,7 +71,7 @@ Analyze the provided content to determine its nature, identify any relationships toolsPolicy: { type: 'any' }, } as const; - const { fragments } = await aixLLMChatGenerateContent(llmId, aixChatGenerate, aixCreateContext('DEV', 'DEV'), false, abortSignal, undefined); + const { fragments } = await aixLLMChatGenerateContent(llmId, aixChatGenerate, aixCreateChatGenerateStreamContext('DEV', 'DEV'), false, abortSignal, undefined); // validate if (!Array.isArray(fragments) || fragments.length !== 1) diff --git a/src/modules/aifn/useLLMChain.ts b/src/modules/aifn/useLLMChain.ts index 11f84deed..8c678777e 100644 --- a/src/modules/aifn/useLLMChain.ts +++ b/src/modules/aifn/useLLMChain.ts @@ -1,6 +1,6 @@ import * as React from 'react'; -import { llmStreamingChatGenerate, VChatContextRef, VChatMessageIn, VChatStreamContextName } from '~/modules/llms/llm.client'; +import { llmStreamingChatGenerate, VChatMessageIn } from '~/modules/llms/llm.client'; import type { DLLMId } from '~/common/stores/llms/llms.types'; import { findLLMOrThrow } from '~/common/stores/llms/store-llms'; @@ -22,7 +22,7 @@ export interface LLMChainStep { /** * React hook to manage a chain of LLM transformations. */ -export function useLLMChain(steps: LLMChainStep[], llmId: DLLMId | undefined, chainInput: string | undefined, onSuccess: (output: string, input: string) => void, contextName: VChatStreamContextName, contextRef: VChatContextRef) { +export function useLLMChain(steps: LLMChainStep[], llmId: DLLMId | undefined, chainInput: string | undefined, onSuccess: (output: string, input: string) => void, contextName: string, contextRef: string) { // state const [chain, setChain] = React.useState(null); diff --git a/src/modules/aifn/useStreamChatText.ts b/src/modules/aifn/useStreamChatText.ts index fec2fcc1c..174b05802 100644 --- a/src/modules/aifn/useStreamChatText.ts +++ b/src/modules/aifn/useStreamChatText.ts @@ -1,6 +1,6 @@ import * as React from 'react'; -import { llmStreamingChatGenerate, VChatContextRef, VChatMessageIn, VChatStreamContextName } from '~/modules/llms/llm.client'; +import { llmStreamingChatGenerate, VChatMessageIn } from '~/modules/llms/llm.client'; import type { DLLMId } from '~/common/stores/llms/llms.types'; @@ -14,7 +14,7 @@ export function useStreamChatText() { const abortControllerRef = React.useRef(null); - const startStreaming = React.useCallback(async (llmId: DLLMId, prompt: VChatMessageIn[], contextName: VChatStreamContextName, contextRef: VChatContextRef) => { + const startStreaming = React.useCallback(async (llmId: DLLMId, prompt: VChatMessageIn[], contextName: string, contextRef: string) => { setStreamError(null); setPartialText(null); setText(null); diff --git a/src/modules/aix/client/aix.client.ts b/src/modules/aix/client/aix.client.ts index 76b2e119e..d83865d00 100644 --- a/src/modules/aix/client/aix.client.ts +++ b/src/modules/aix/client/aix.client.ts @@ -1,4 +1,3 @@ -import type { ChatStreamingInputSchema } from '~/modules/llms/server/llm.server.streaming'; import { findServiceAccessOrThrow } from '~/modules/llms/vendors/vendor.helpers'; import type { DLLMId } from '~/common/stores/llms/llms.types'; @@ -12,7 +11,7 @@ import { metricsStoreAddChatGenerate } from '~/common/stores/metrics/store-metri import { presentErrorToHumans } from '~/common/util/errorUtils'; // NOTE: pay particular attention to the "import type", as this is importing from the server-side Zod definitions -import type { AixAPI_Access, AixAPI_ContextChatStream, AixAPI_Model, AixAPIChatGenerate_Request } from '../server/api/aix.wiretypes'; +import type { AixAPI_Access, AixAPI_Context, AixAPI_Context_ChatGenerateNS, AixAPI_Context_ChatGenerateStream, AixAPI_Model, AixAPIChatGenerate_Request } from '../server/api/aix.wiretypes'; import { ContentReassembler } from './ContentReassembler'; import { ThrottleFunctionCall } from './ThrottleFunctionCall'; @@ -23,7 +22,11 @@ import { aixChatGenerateRequestFromDMessages } from './aix.client.fromDMessages. export const DEBUG_PARTICLES = false; -export function aixCreateContext(name: AixAPI_ContextChatStream['name'], ref: AixAPI_ContextChatStream['ref']): AixAPI_ContextChatStream { +export function aixCreateChatGenerateNSContext(name: AixAPI_Context_ChatGenerateNS['name'], ref: string): AixAPI_Context_ChatGenerateNS { + return { method: 'chat-generate', name, ref }; +} + +export function aixCreateChatGenerateStreamContext(name: AixAPI_Context_ChatGenerateStream['name'], ref: string): AixAPI_Context_ChatGenerateStream { return { method: 'chat-stream', name, ref }; } @@ -59,8 +62,8 @@ export async function aixChatGenerateContentStreaming( llmId: DLLMId, chatHistory: Readonly, // aix inputs - aixContextName: AixAPI_ContextChatStream['name'], - aixContextRef: AixAPI_ContextChatStream['ref'], + aixContextName: AixAPI_Context_ChatGenerateStream['name'], + aixContextRef: AixAPI_Context['ref'], // others throttleParallelThreads: number, // 0: disable, 1: default throttle (12Hz), 2+ reduce frequency with the square root abortSignal: AbortSignal, @@ -81,7 +84,7 @@ export async function aixChatGenerateContentStreaming( try { - await aixLLMChatGenerateContent(llmId, aixChatContentGenerateRequest, aixCreateContext(aixContextName, aixContextRef), true, abortSignal, + await aixLLMChatGenerateContent(llmId, aixChatContentGenerateRequest, aixCreateChatGenerateStreamContext(aixContextName, aixContextRef), true, abortSignal, (update: AixLLMGenerateContentAccumulator, isDone: boolean) => { // typesafe overwrite on all fields (Object.assign, but typesafe) @@ -110,6 +113,9 @@ export async function aixChatGenerateContentStreaming( chatDMessageUpdate.pendingIncomplete = false; throttler.finalize(() => onStreamingUpdate(chatDMessageUpdate, true)); + // TODO: check something beyond this return status (as exceptions almost never happen here) + // - e.g. the generator.aix may have error/token stop codes + return returnStatus; } @@ -129,12 +135,12 @@ export interface AixLLMGenerateContentAccumulator extends Pick( +export async function aixLLMChatGenerateContent( // llm Id input -> access & model llmId: DLLMId, // aix inputs aixChatGenerate: AixAPIChatGenerate_Request, - aixContext: AixAPI_ContextChatStream, + aixContext: AixAPI_Context, aixStreaming: boolean, // others abortSignal: AbortSignal, @@ -152,12 +158,6 @@ export async function aixLLMChatGenerateContent { @@ -233,7 +243,7 @@ async function _aix_LL_ChatGenerateContent( aixAccess: AixAPI_Access, aixModel: AixAPI_Model, aixChatGenerate: AixAPIChatGenerate_Request, - aixContext: AixAPI_ContextChatStream, + aixContext: AixAPI_Context, aixStreaming: boolean, // others abortSignal: AbortSignal, @@ -334,4 +344,45 @@ async function _aix_LL_ChatGenerateContent( // variant: 'gemini_auto_inline', // }, // ]; -// } \ No newline at end of file +// } + +/** + * OpenAI-specific moderation check. This is a separate function, as it's not part of the + * streaming chat generation, but it's a pre-check before we even start the streaming. + * + * @returns null if the message is safe, or a string with the user message if it's not safe + */ +/* NOTE: NOT PORTED TO AIX YET, this was the former "LLMS" implementation +async function _openAIModerationCheck(access: OpenAIAccessSchema, lastMessage: VChatMessageIn | null): Promise { + if (!lastMessage || lastMessage.role !== 'user') + return null; + + try { + const moderationResult = await apiAsync.llmOpenAI.moderation.mutate({ + access, text: lastMessage.content, + }); + const issues = moderationResult.results.reduce((acc, result) => { + if (result.flagged) { + Object + .entries(result.categories) + .filter(([_, value]) => value) + .forEach(([key, _]) => acc.add(key)); + } + return acc; + }, new Set()); + + // if there's any perceived violation, we stop here + if (issues.size) { + const categoriesText = [...issues].map(c => `\`${c}\``).join(', '); + // do not proceed with the streaming request + return `[Moderation] I an unable to provide a response to your query as it violated the following categories of the OpenAI usage policies: ${categoriesText}.\nFor further explanation please visit https://platform.openai.com/docs/guides/moderation/moderation`; + } + } catch (error: any) { + // as the moderation check was requested, we cannot proceed in case of error + return '[Issue] There was an error while checking for harmful content. ' + error?.toString(); + } + + // moderation check was successful + return null; +} +*/ \ No newline at end of file diff --git a/src/modules/aix/server/api/aix.wiretypes.ts b/src/modules/aix/server/api/aix.wiretypes.ts index 65aa437fc..7eaa241c5 100644 --- a/src/modules/aix/server/api/aix.wiretypes.ts +++ b/src/modules/aix/server/api/aix.wiretypes.ts @@ -32,7 +32,9 @@ export type AixTools_FunctionCallDefinition = Extract; export type AixAPI_Access = z.infer; -export type AixAPI_ContextChatStream = z.infer; +export type AixAPI_Context = z.infer; +export type AixAPI_Context_ChatGenerateNS = z.infer; +export type AixAPI_Context_ChatGenerateStream = z.infer; export type AixAPI_Model = z.infer; export type AixAPIChatGenerate_Request = z.infer; @@ -374,14 +376,21 @@ export namespace AixWire_API { /// Context - export const ContextChatStream_schema = z.object({ + export const ContextChatGenerateNS_schema = z.object({ + method: z.literal('chat-generate'), + name: z.enum(['chat-ai-title', 'chat-ai-summarize', 'chat-followup-diagram', 'chat-followup-htmlui', 'chat-react-turn', 'draw-expand-prompt']), + ref: z.string(), + }); + + export const ContextChatGenerateStream_schema = z.object({ method: z.literal('chat-stream'), name: z.enum(['DEV', 'conversation', 'ai-diagram', 'ai-flattener', 'call', 'beam-scatter', 'beam-gather', 'persona-extract']), ref: z.string(), }); export const Context_schema = z.discriminatedUnion('method', [ - ContextChatStream_schema, + ContextChatGenerateNS_schema, + ContextChatGenerateStream_schema, ]); /// Connection options diff --git a/src/modules/llms/llm.client.ts b/src/modules/llms/llm.client.ts index 0bda7eccf..99dd07c5a 100644 --- a/src/modules/llms/llm.client.ts +++ b/src/modules/llms/llm.client.ts @@ -5,13 +5,11 @@ import { hasGoogleAnalytics } from '~/common/components/GoogleAnalytics'; import type { OpenAIWire_Tools } from '~/modules/aix/server/dispatch/wiretypes/openai.wiretypes'; import type { DModelsService, DModelsServiceId } from '~/common/stores/llms/modelsservice.types'; -import { DLLM, DLLMId, LLM_IF_OAI_Chat, LLM_IF_OAI_Fn } from '~/common/stores/llms/llms.types'; -import { findLLMOrThrow, llmsStoreActions } from '~/common/stores/llms/store-llms'; +import { DLLM, DLLMId, LLM_IF_OAI_Chat } from '~/common/stores/llms/llms.types'; +import { llmsStoreActions } from '~/common/stores/llms/store-llms'; import { isModelPriceFree } from '~/common/stores/llms/llms.pricing'; -import type { ChatStreamingInputSchema } from './server/llm.server.streaming'; -import type { GenerateContextNameSchema, ModelDescriptionSchema, StreamingContextNameSchema } from './server/llm.server.types'; -import type { StreamingClientUpdate } from './vendors/unifiedStreamingClient'; +import type { ModelDescriptionSchema } from './server/llm.server.types'; import { DOpenAILLMOptions, FALLBACK_LLM_TEMPERATURE } from './vendors/openai/openai.vendor'; import { findServiceAccessOrThrow } from './vendors/vendor.helpers'; @@ -27,10 +25,6 @@ export interface VChatMessageIn { export type VChatFunctionIn = OpenAIWire_Tools.FunctionDefinition; -export type VChatStreamContextName = StreamingContextNameSchema; -export type VChatGenerateContextName = GenerateContextNameSchema; -export type VChatContextRef = string; - export interface VChatMessageOut { role: 'assistant' | 'system' | 'user'; content: string; @@ -134,58 +128,28 @@ function _createDLLMFromModelDescription(d: ModelDescriptionSchema, service: DMo export async function llmChatGenerateOrThrow( llmId: DLLMId, messages: VChatMessageIn[], - contextName: VChatGenerateContextName, - contextRef: VChatContextRef | null, + contextName: string, + contextRef: string | null, functions: VChatFunctionIn[] | null, forceFunctionName: string | null, maxTokens?: number, ): Promise { - - // id to DLLM and vendor - const llm = findLLMOrThrow(llmId); - const llmOptions = llm.options; - - // if the model does not support function calling and we're trying to force a function, throw - if (forceFunctionName && !llm.interfaces.includes(LLM_IF_OAI_Fn)) - throw new Error(`Model ${llmId} does not support function calling`); - - // get the access - const { serviceSettings, transportAccess, vendor } = findServiceAccessOrThrow(llm.sId); - - // apply any vendor-specific rate limit - await vendor.rateLimitChatGenerate?.(llm, serviceSettings); - - // execute via the vendor - return await vendor.rpcChatGenerateOrThrow(transportAccess, llmOptions, messages, contextName, contextRef, functions, forceFunctionName, maxTokens); + throw new Error('llmStreamingChatGenerate: Unsupported - migrated to AIX'); } - export async function llmStreamingChatGenerate< TServiceSettings extends object = {}, - TAccess extends ChatStreamingInputSchema['access'] = ChatStreamingInputSchema['access'], + TAccess = undefined, TLLMOptions = unknown >( llmId: DLLMId, messages: VChatMessageIn[], - contextName: VChatStreamContextName, - contextRef: VChatContextRef, + contextName: string, + contextRef: string | null, functions: VChatFunctionIn[] | null, forceFunctionName: string | null, abortSignal: AbortSignal, - onUpdate: (update: StreamingClientUpdate, done: boolean) => void, + onUpdate: (update: any, done: boolean) => void, ): Promise { - - // id to DLLM and vendor - const llm = findLLMOrThrow(llmId); - const llmOptions = llm.options; - - // get the access - const { serviceSettings, transportAccess, vendor } = findServiceAccessOrThrow(llm.sId); - - // apply any vendor-specific rate limit - await vendor.rateLimitChatGenerate?.(llm, serviceSettings); - - // execute via the vendor - // return await unifiedStreamingClient(access, llmId, llmOptions, messages, contextName, contextRef, functions, forceFunctionName, abortSignal, onUpdate); - return await vendor.streamingChatGenerateOrThrow(transportAccess, llmId, llmOptions, messages, contextName, contextRef, functions, forceFunctionName, abortSignal, onUpdate); + throw new Error('llmStreamingChatGenerate: Unsupported - migrated to AIX'); } diff --git a/src/modules/llms/server/anthropic/anthropic.router.ts b/src/modules/llms/server/anthropic/anthropic.router.ts index f4f0b17a0..48d9e9a8c 100644 --- a/src/modules/llms/server/anthropic/anthropic.router.ts +++ b/src/modules/llms/server/anthropic/anthropic.router.ts @@ -1,5 +1,4 @@ import { z } from 'zod'; -import { TRPCError } from '@trpc/server'; import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server'; import { env } from '~/server/env.mjs'; @@ -7,10 +6,7 @@ import { fetchJsonOrTRPCThrow } from '~/server/api/trpc.router.fetchers'; import { fixupHost } from '~/common/util/urlUtils'; -import { AnthropicWire_API_Message_Create } from '~/modules/aix/server/dispatch/wiretypes/anthropic.wiretypes'; - -import { ListModelsResponse_schema, llmsChatGenerateOutputSchema, llmsGenerateContextSchema } from '../llm.server.types'; -import { OpenAIHistorySchema, openAIHistorySchema, OpenAIModelSchema, openAIModelSchema } from '../openai/openai.router'; +import { ListModelsResponse_schema } from '../llm.server.types'; import { hardcodedAnthropicModels } from './anthropic.models'; @@ -71,86 +67,6 @@ export function anthropicAccess(access: AnthropicAccessSchema, apiPath: string): }; } -export function anthropicMessagesPayloadOrThrow(model: OpenAIModelSchema, history: OpenAIHistorySchema, stream: boolean): AnthropicWire_API_Message_Create.Request { - - // Take the System prompt, if it's the first message - // But if it's the only message, treat it as a user message - history = [...history]; - let systemPrompt: string | undefined = undefined; - if (history[0]?.role === 'system' && history.length > 1) - systemPrompt = history.shift()?.content; - - // Transform the OpenAIHistorySchema into the target messages format, ensuring that roles alternate between 'user' and 'assistant' - const messages = history.reduce( - (acc, historyItem, index) => { - - // skip empty messages - if (!historyItem.content.trim()) return acc; - - const lastMessage: AnthropicWire_API_Message_Create.Request['messages'][number] | undefined = acc[acc.length - 1]; - const anthropicRole = historyItem.role === 'assistant' ? 'assistant' : 'user'; - - if (index === 0 || anthropicRole !== lastMessage?.role) { - - // Hack/Hotfix: if the first role is 'assistant', then prepend a user message otherwise the API call will break; - // but what should we really do here? - if (index === 0 && anthropicRole === 'assistant') { - if (systemPrompt) { - // This stinks, as it will duplicate the system prompt; it's the best we can do for now for a better UX - acc.push({ role: 'user', content: [{ type: 'text', text: systemPrompt }] }); - } else - throw new Error('The first message in the chat history must be a user message and not an assistant message.'); - } - - // Add a new message object if the role is different from the previous message - acc.push({ - role: anthropicRole, - content: [ - { type: 'text', text: historyItem.content }, - ], - }); - } else { - // Merge consecutive messages with the same role - (lastMessage.content as AnthropicWire_API_Message_Create.Request['messages'][number]['content']).push( - { type: 'text', text: historyItem.content }, - ); - } - return acc; - }, - [] as AnthropicWire_API_Message_Create.Request['messages'], - ); - - // NOTE: if the last message is 'assistant', then the API will perform a continuation - shall we add a user message? TBD - - // NOTE: the following code has been disabled because Anthropic will reject empty text blocks - // If the messages array is empty, add a default user message - // if (messages.length === 0) - // messages.push({ role: 'user', content: [{ type: 'text', text: '' }] }); - - // Construct the request payload - const payload: AnthropicWire_API_Message_Create.Request = { - model: model.id, - ...(systemPrompt !== undefined && { system: [{ type: 'text', text: systemPrompt }] }), - messages: messages, - max_tokens: model.maxTokens || DEFAULT_MAX_TOKENS, - stream: stream, - ...(model.temperature !== undefined && { temperature: model.temperature }), - // ...(tools && { tools: tools }), - // ...(forceToolChoice && { tool_choice: forceToolChoice }), - // metadata: not useful to us - // stop_sequences: not useful to us - // top_p: not useful to us - // top_k: not useful to us - }; - - // Validate the payload against the schema to ensure correctness - const validated = AnthropicWire_API_Message_Create.Request_schema.safeParse(payload); - if (!validated.success) - throw new Error(`Invalid message sequence for Anthropic models: ${validated.error.errors?.[0]?.message || validated.error}`); - - return validated.data; -} - // Input Schemas @@ -166,14 +82,6 @@ const listModelsInputSchema = z.object({ access: anthropicAccessSchema, }); -const chatGenerateInputSchema = z.object({ - access: anthropicAccessSchema, - model: openAIModelSchema, - history: openAIHistorySchema, - // tools: llmsToolsSchema.optional(), - context: llmsGenerateContextSchema.optional(), -}); - // Router @@ -185,31 +93,4 @@ export const llmAnthropicRouter = createTRPCRouter({ .output(ListModelsResponse_schema) .query(() => ({ models: hardcodedAnthropicModels })), - /* [Anthropic] Message generation (non-streaming) */ - chatGenerateMessage: publicProcedure - .input(chatGenerateInputSchema) - .output(llmsChatGenerateOutputSchema) - .mutation(async ({ input: { access, model, history } }) => { - - // NOTES: doesn't support functions yet, supports multi-modal inputs (but they're not in our history, yet) - - // throw if the message sequence is not okay - const payload = anthropicMessagesPayloadOrThrow(model, history, false); - const response = await anthropicPOST(access, payload, '/v1/messages'); - const completion = AnthropicWire_API_Message_Create.Response_schema.parse(response); - - // validate output - if (!completion || completion.type !== 'message' || completion.role !== 'assistant' || completion.stop_reason === undefined) - throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: `[Anthropic Issue] Invalid Message` }); - if (completion.content.length !== 1 || completion.content[0].type !== 'text') - throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: `[Anthropic Issue] No Single Text Message (${completion.content.length})` }); - - // got the completion (non-streaming) - return { - role: completion.role, - content: completion.content[0].text, - finish_reason: completion.stop_reason === 'max_tokens' ? 'length' : 'stop', - }; - }), - }); diff --git a/src/modules/llms/server/gemini/gemini.router.ts b/src/modules/llms/server/gemini/gemini.router.ts index 01d76ba75..95137d074 100644 --- a/src/modules/llms/server/gemini/gemini.router.ts +++ b/src/modules/llms/server/gemini/gemini.router.ts @@ -1,5 +1,4 @@ import { z } from 'zod'; -import { TRPCError } from '@trpc/server'; import { env } from '~/server/env.mjs'; import packageJson from '../../../../../package.json'; @@ -7,12 +6,11 @@ import packageJson from '../../../../../package.json'; import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server'; import { fetchJsonOrTRPCThrow } from '~/server/api/trpc.router.fetchers'; -import { GeminiWire_API_Generate_Content, GeminiWire_API_Models_List, GeminiWire_Safety } from '~/modules/aix/server/dispatch/wiretypes/gemini.wiretypes'; +import { GeminiWire_API_Models_List, GeminiWire_Safety } from '~/modules/aix/server/dispatch/wiretypes/gemini.wiretypes'; import { fixupHost } from '~/common/util/urlUtils'; -import { ListModelsResponse_schema, llmsChatGenerateOutputSchema, llmsGenerateContextSchema } from '../llm.server.types'; -import { OpenAIHistorySchema, openAIHistorySchema, OpenAIModelSchema, openAIModelSchema } from '../openai/openai.router'; +import { ListModelsResponse_schema } from '../llm.server.types'; import { geminiFilterModels, geminiModelToModelDescription, geminiSortModels } from './gemini.models'; @@ -44,57 +42,6 @@ export function geminiAccess(access: GeminiAccessSchema, modelRefId: string | nu }; } -type TRequest = GeminiWire_API_Generate_Content.Request; - -/** - * We specially encode the history to match the Gemini API requirements. - * Gemini does not want 2 consecutive messages from the same role, so we alternate. - * - System messages = [User, Model'Ok'] - * - User and Assistant messages are coalesced into a single message (e.g. [User, User, Assistant, Assistant, User] -> [User[2], Assistant[2], User[1]]) - */ -export const geminiGenerateContentTextPayload = (model: OpenAIModelSchema, history: OpenAIHistorySchema, safety: GeminiWire_Safety.HarmBlockThreshold, n: number): TRequest => { - - // convert the history to a Gemini format - const contents: TRequest['contents'] = []; - for (const _historyElement of history) { - - const { role: msgRole, content: msgContent } = _historyElement; - - // System message - we treat it as per the example in https://ai.google.dev/tutorials/ai-studio_quickstart#chat_example - // TODO: sypport the system instruction - if (msgRole === 'system') { - contents.push({ role: 'user', parts: [{ text: msgContent }] }); - contents.push({ role: 'model', parts: [{ text: 'Ok' }] }); - continue; - } - - // User or Assistant message - const nextRole: TRequest['contents'][number]['role'] = msgRole === 'assistant' ? 'model' : 'user'; - if (contents.length && contents[contents.length - 1].role === nextRole) { - // coalesce with the previous message - contents[contents.length - 1].parts.push({ text: msgContent }); - } else { - // create a new message - contents.push({ role: nextRole, parts: [{ text: msgContent }] }); - } - } - - return { - contents, - generationConfig: { - ...(n >= 2 && { candidateCount: n }), - ...(model.maxTokens && { maxOutputTokens: model.maxTokens }), - temperature: model.temperature, - }, - safetySettings: safety !== 'HARM_BLOCK_THRESHOLD_UNSPECIFIED' ? [ - { category: 'HARM_CATEGORY_SEXUALLY_EXPLICIT', threshold: safety }, - { category: 'HARM_CATEGORY_HATE_SPEECH', threshold: safety }, - { category: 'HARM_CATEGORY_HARASSMENT', threshold: safety }, - { category: 'HARM_CATEGORY_DANGEROUS_CONTENT', threshold: safety }, - ] : undefined, - }; -}; - async function geminiGET(access: GeminiAccessSchema, modelRefId: string | null, apiPath: string /*, signal?: AbortSignal*/): Promise { const { headers, url } = geminiAccess(access, modelRefId, apiPath); @@ -121,15 +68,6 @@ const accessOnlySchema = z.object({ access: geminiAccessSchema, }); -const chatGenerateInputSchema = z.object({ - access: geminiAccessSchema, - model: openAIModelSchema, - history: openAIHistorySchema, - // functions: openAIFunctionsSchema.optional(), - // forceFunctionName: z.string().optional(), - context: llmsGenerateContextSchema.optional(), -}); - /** * See https://github.com/google/generative-ai-js/tree/main/packages/main/src for @@ -161,36 +99,4 @@ export const llmGeminiRouter = createTRPCRouter({ }; }), - - /* [Gemini] models.generateContent = /v1/{model=models/*}:generateContent */ - chatGenerate: publicProcedure - .input(chatGenerateInputSchema) - .output(llmsChatGenerateOutputSchema) - .mutation(async ({ input: { access, history, model } }) => { - - // generate the content - const wireGeneration = await geminiPOST(access, model.id, geminiGenerateContentTextPayload(model, history, access.minSafetyLevel, 1), GeminiWire_API_Generate_Content.postPath); - const generation = GeminiWire_API_Generate_Content.Response_schema.parse(wireGeneration); - 0; - // only use the first result (and there should be only one) - const singleCandidate = generation.candidates?.[0] ?? null; - if (!singleCandidate || !singleCandidate.content?.parts.length) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Gemini chat-generation API issue: ${JSON.stringify(wireGeneration)}`, - }); - - if (!('text' in singleCandidate.content.parts[0])) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Gemini non-text chat-generation API issue: ${JSON.stringify(wireGeneration)}`, - }); - - return { - role: 'assistant', - content: singleCandidate.content.parts[0].text || '', - finish_reason: singleCandidate.finishReason === 'STOP' ? 'stop' : null, - }; - }), - }); diff --git a/src/modules/llms/server/llm.server.streaming.ts b/src/modules/llms/server/llm.server.streaming.ts deleted file mode 100644 index 780ab8ace..000000000 --- a/src/modules/llms/server/llm.server.streaming.ts +++ /dev/null @@ -1,546 +0,0 @@ -import { z } from 'zod'; -import { NextRequest, NextResponse } from 'next/server'; -import { createParser as createEventsourceParser, EventSourceParseCallback, EventSourceParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser'; - -import { createEmptyReadableStream, debugGenerateCurlCommand, nonTrpcServerFetchOrThrow, safeErrorString, SERVER_DEBUG_WIRE, serverCapitalizeFirstLetter, ServerFetchError } from '~/server/wire'; - - -// Anthropic server imports -import { AnthropicWire_API_Message_Create } from '~/modules/aix/server/dispatch/wiretypes/anthropic.wiretypes'; -import { anthropicAccess, anthropicAccessSchema, anthropicMessagesPayloadOrThrow } from './anthropic/anthropic.router'; - -// Gemini server imports -import { GeminiWire_API_Generate_Content } from '~/modules/aix/server/dispatch/wiretypes/gemini.wiretypes'; -import { geminiAccess, geminiAccessSchema, geminiGenerateContentTextPayload } from './gemini/gemini.router'; - -// Ollama server imports -import { OLLAMA_PATH_CHAT, ollamaAccess, ollamaAccessSchema, ollamaChatCompletionPayload } from './ollama/ollama.router'; -import { wireOllamaChunkedOutputSchema } from '~/modules/llms/server/ollama/ollama.wiretypes'; - -// OpenAI server imports -import { OpenAIWire_API_Chat_Completions } from '~/modules/aix/server/dispatch/wiretypes/openai.wiretypes'; -import { openAIAccess, openAIAccessSchema, openAIChatCompletionPayload, openAIHistorySchema, openAIModelSchema } from './openai/openai.router'; - - -import { llmsStreamingContextSchema } from './llm.server.types'; - - -// configuration -const USER_SYMBOL_MAX_TOKENS = '🧱'; -const USER_SYMBOL_PROMPT_BLOCKED = '🚫'; -// const USER_SYMBOL_NO_DATA_RECEIVED_BROKEN = '🔌'; - - -/** - * Event stream formats - * - 'sse' is the default format, and is used by all vendors except Ollama - * - 'json-nl' is used by Ollama - */ -type MuxingFormat = 'sse' | 'json-nl'; - - -/** - * Vendor stream parsers - * - The vendor can decide to terminate the connection (close: true), transmitting anything in 'text' before doing so - * - The vendor can also throw from this function, which will error and terminate the connection - * - * The peculiarity of our parser is the injection of a JSON structure at the beginning of the stream, to - * communicate parameters before the text starts flowing to the client. - */ -type AIStreamParser = (data: string, eventType?: string) => { text: string, close: boolean }; - - -const chatStreamingInputSchema = z.object({ - access: z.discriminatedUnion('dialect', [anthropicAccessSchema, geminiAccessSchema, ollamaAccessSchema, openAIAccessSchema]), - model: openAIModelSchema, - history: openAIHistorySchema, - // NOTE: made it optional for now as we have some old requests without it - // 2024-07-07: remove .optional() - context: llmsStreamingContextSchema.optional(), -}); -export type ChatStreamingInputSchema = z.infer; - -// the purpose is to send something out even before the upstream stream starts, so that we keep the connection up -const chatStreamingStartOutputPacketSchema = z.object({ - type: z.enum(['start']), -}); -export type ChatStreamingPreambleStartSchema = z.infer; - -// the purpose is to have a first packet that contains the model name, so that the client can display it -// this is a hack until we have a better streaming format -const chatStreamingFirstOutputPacketSchema = z.object({ - model: z.string(), -}); -export type ChatStreamingPreambleModelSchema = z.infer; - - -export async function llmStreamingRelayHandler(req: NextRequest): Promise { - - // Parse the request - const body = await req.json(); - const _chatStreamingInput: ChatStreamingInputSchema = chatStreamingInputSchema.parse(body); - const { dialect: accessDialect } = _chatStreamingInput.access; - const prettyDialect = serverCapitalizeFirstLetter(accessDialect); - - - // Prepare the upstream API request and demuxer/parser - let requestData: ReturnType; - try { - requestData = _prepareRequestData(_chatStreamingInput); - } catch (error: any) { - console.error(`[POST] /api/llms/stream: ${prettyDialect}: prepareRequestData issue:`, safeErrorString(error)); - return new NextResponse(`**[Service Issue] ${prettyDialect}**: ${safeErrorString(error) || 'Unknown streaming error'}`, { - status: 422, - }); - } - - // Connect to the upstream (blocking) - let upstreamResponse: Response; - try { - - if (SERVER_DEBUG_WIRE) - console.log('-> streaming:', debugGenerateCurlCommand('POST', requestData.url, requestData.headers, requestData.body)); - - // POST to our API route - // [MAY TIMEOUT] on Vercel Edge calls; this times out on long requests to Anthropic, on 2024-04-23. - // The solution would be to return a new response with a 200 status code, and then stream the data - // in a new request, but we'll lose back-pressure and complicates logic. - upstreamResponse = await nonTrpcServerFetchOrThrow(requestData.url, 'POST', requestData.headers, requestData.body); - - } catch (error: any) { - - // server-side admins message - const capDialect = serverCapitalizeFirstLetter(accessDialect); - const fetchOrVendorError = safeErrorString(error) + (error?.cause ? ' · ' + JSON.stringify(error.cause) : ''); - console.error(`[POST] /api/llms/stream: ${capDialect}: fetch issue:`, fetchOrVendorError, requestData?.url); - - // client-side users visible message - const statusCode = ((error instanceof ServerFetchError) && (error.statusCode >= 400)) ? error.statusCode : 422; - const devMessage = process.env.NODE_ENV === 'development' ? ` [DEV_URL: ${requestData?.url}]` : ''; - return new NextResponse(`**[Service Issue] ${capDialect}**: ${fetchOrVendorError}${devMessage}`, { - status: statusCode, - }); - } - - /* The following code is heavily inspired by the Vercel AI SDK, but simplified to our needs and in full control. - * This replaces the former (custom) implementation that used to return a ReadableStream directly, and upon start, - * it was blindly fetching the upstream response and piping it to the client. - * - * We now use backpressure, as explained on: https://sdk.vercel.ai/docs/concepts/backpressure-and-cancellation - * - * NOTE: we have not benchmarked to see if there is performance impact by using this approach - we do want to have - * a 'healthy' level of inventory (i.e., pre-buffering) on the pipe to the client. - */ - const transformUpstreamToBigAgiClient = createUpstreamTransformer( - requestData.vendorMuxingFormat, requestData.vendorStreamParser, accessDialect, - ); - - const chatResponseStream = - (upstreamResponse.body || createEmptyReadableStream()) - .pipeThrough(transformUpstreamToBigAgiClient); - - return new NextResponse(chatResponseStream, { - status: 200, - headers: { - 'Content-Type': 'text/event-stream; charset=utf-8', - }, - }); -} - - -// Event Stream Transformers - -/** - * The default demuxer for EventSource upstreams. - */ -const _createDemuxerEventSource: (onParse: EventSourceParseCallback) => EventSourceParser = createEventsourceParser; - -/** - * Creates a parser for a 'JSON\n' non-event stream, to be swapped with an EventSource parser. - * Ollama is the only vendor that uses this format. - */ -function _createDemuxerJsonNewline(onParse: EventSourceParseCallback): EventSourceParser { - let accumulator: string = ''; - return { - // feeds a new chunk to the parser - we accumulate in case of partial data, and only execute on full lines - feed: (chunk: string): void => { - accumulator += chunk; - if (accumulator.endsWith('\n')) { - for (const jsonString of accumulator.split('\n').filter(line => !!line)) { - const mimicEvent: ParsedEvent = { - type: 'event', - id: undefined, - event: undefined, - data: jsonString, - }; - onParse(mimicEvent); - } - accumulator = ''; - } - }, - - // resets the parser state - not useful with our driving of the parser - reset: (): void => { - console.error('createDemuxerJsonNewline.reset() not implemented'); - }, - }; -} - -/** - * Creates a TransformStream that parses events from an EventSource stream using a custom parser. - * @returns {TransformStream} TransformStream parsing events. - */ -function createUpstreamTransformer(muxingFormat: MuxingFormat, vendorTextParser: AIStreamParser, dialectLabel: string): TransformStream { - const textDecoder = new TextDecoder(); - const textEncoder = new TextEncoder(); - let eventSourceParser: EventSourceParser; - let hasReceivedData = false; - - return new TransformStream({ - start: async (controller): Promise => { - - // Send initial packet indicating the start of the stream - const startPacket: ChatStreamingPreambleStartSchema = { type: 'start' }; - controller.enqueue(textEncoder.encode(JSON.stringify(startPacket))); - - // only used for debugging - let debugLastMs: number | null = null; - - const onNewEvent = (event: ParsedEvent | ReconnectInterval) => { - if (SERVER_DEBUG_WIRE) { - const nowMs = Date.now(); - const elapsedMs = debugLastMs ? nowMs - debugLastMs : 0; - debugLastMs = nowMs; - console.log(`<- SSE (${elapsedMs} ms):`, event); - } - - // ignore 'reconnect-interval' and events with no data - if (event.type !== 'event' || !('data' in event)) - return; - - // event stream termination, close our transformed stream - if (event.data === '[DONE]') { - controller.terminate(); - return; - } - - try { - const { text, close } = vendorTextParser(event.data, event.event); - if (text) - controller.enqueue(textEncoder.encode(text)); - if (close) - controller.terminate(); - } catch (error: any) { - if (SERVER_DEBUG_WIRE) - console.log(' - E: parse issue:', event.data, error?.message || error); - controller.enqueue(textEncoder.encode(` **[Stream Issue] ${serverCapitalizeFirstLetter(dialectLabel)}**: ${safeErrorString(error) || 'Unknown stream parsing error'}`)); - controller.terminate(); - } - }; - - if (muxingFormat === 'sse') - eventSourceParser = _createDemuxerEventSource(onNewEvent); - else if (muxingFormat === 'json-nl') - eventSourceParser = _createDemuxerJsonNewline(onNewEvent); - }, - - // stream=true is set because the data is not guaranteed to be final and un-chunked - transform: (chunk: Uint8Array) => { - hasReceivedData = true; - eventSourceParser.feed(textDecoder.decode(chunk, { stream: true })); - }, - - flush: (controller): void => { - // if we get a flush() without having received any data, we should terminate the stream - // NOTE: happens with Gemini on 2024-03-14 - if (!hasReceivedData) { - controller.enqueue(textEncoder.encode(` **[Service Issue] ${serverCapitalizeFirstLetter(dialectLabel)}**: No data was sent by the server.`)); - controller.terminate(); - } - }, - }); -} - - -/// Stream Parsers - -function createStreamParserAnthropicMessages(): AIStreamParser { - let responseMessage: AnthropicWire_API_Message_Create.Response | null = null; - let hasErrored = false; - - // Note: at this stage, the parser only returns the text content as text, which is streamed as text - // to the client. It is however building in parallel the responseMessage object, which is not - // yet used, but contains token counts, for instance. - return (data: string, eventName?: string) => { - let text = ''; - - // if we've errored, we should not be receiving more data - if (hasErrored) - console.log('Anthropic stream has errored already, but received more data:', data); - - switch (eventName) { - // Ignore pings - case 'ping': - break; - - // Initialize the message content for a new message - case 'message_start': - const firstMessage = !responseMessage; - const { message } = JSON.parse(data); - responseMessage = AnthropicWire_API_Message_Create.Response_schema.parse(message); - // hack: prepend the model name to the first packet - if (firstMessage) { - const firstPacket: ChatStreamingPreambleModelSchema = { model: responseMessage.model }; - text = JSON.stringify(firstPacket); - } - break; - - // Initialize content block if needed - case 'content_block_start': - if (responseMessage) { - const { index, content_block } = JSON.parse(data); - if (responseMessage.content[index] === undefined) - responseMessage.content[index] = content_block; - text = (responseMessage.content[index] as any).text; - } else - throw new Error('Unexpected content block start'); - break; - - // Append delta text to the current message content - case 'content_block_delta': - if (responseMessage) { - const { index, delta } = JSON.parse(data); - if (delta.type !== 'text_delta') - throw new Error(`Unexpected content block non-text delta (${delta.type})`); - if (responseMessage.content[index] === undefined) - throw new Error(`Unexpected content block delta location (${index})`); - (responseMessage.content[index] as any).text += delta.text; - text = delta.text; - } else - throw new Error('Unexpected content block delta'); - break; - - // Finalize content block if needed. - case 'content_block_stop': - if (responseMessage) { - const { index } = JSON.parse(data); - if (responseMessage.content[index] === undefined) - throw new Error(`Unexpected content block end location (${index})`); - } else - throw new Error('Unexpected content block stop'); - break; - - // Optionally handle top-level message changes. Example: updating stop_reason - case 'message_delta': - if (responseMessage) { - const { delta } = JSON.parse(data); - Object.assign(responseMessage, delta); - } else - throw new Error('Unexpected message delta'); - break; - - // We can now close the message - case 'message_stop': - return { text: '', close: true }; - - // Occasionaly, the server will send errors, such as {"type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"}} - case 'error': - hasErrored = true; - const { error } = JSON.parse(data); - const errorText = (error.type && error.message) ? `${error.type}: ${error.message}` : safeErrorString(error); - return { text: `[Anthropic Server Error] ${errorText}`, close: true }; - - default: - throw new Error(`Unexpected event name: ${eventName}`); - } - - return { text, close: false }; - }; -} - -function createStreamParserGemini(modelName: string): AIStreamParser { - let hasBegun = false; - - // this can throw, it's catched upstream - return (data: string) => { - - // parse the JSON chunk - const wireGenerationChunk = JSON.parse(data); - let generationChunk: GeminiWire_API_Generate_Content.Response; - try { - generationChunk = GeminiWire_API_Generate_Content.Response_schema.parse(wireGenerationChunk); - } catch (error: any) { - // log the malformed data to the console, and rethrow to transmit as 'error' - console.log(`/api/llms/stream: Gemini parsing issue: ${error?.message || error}`, wireGenerationChunk); - throw error; - } - - // Prompt Safety Errors: pass through errors from Gemini - if (generationChunk.promptFeedback?.blockReason) { - const { blockReason, safetyRatings } = generationChunk.promptFeedback; - return { text: `${USER_SYMBOL_PROMPT_BLOCKED} [Gemini Prompt Blocked] ${blockReason}: ${JSON.stringify(safetyRatings || 'Unknown Safety Ratings', null, 2)}`, close: true }; - } - - // expect a single completion - const singleCandidate = generationChunk.candidates?.[0] ?? null; - if (!singleCandidate) - throw new Error(`expected 1 completion, got ${generationChunk.candidates?.length}`); - - // no contents: could be an expected or unexpected condition - if (!singleCandidate.content) { - if (singleCandidate.finishReason === 'MAX_TOKENS') - return { text: ` ${USER_SYMBOL_MAX_TOKENS}`, close: true }; - if (singleCandidate.finishReason === 'RECITATION') - throw new Error('generation stopped due to RECITATION'); - throw new Error(`server response missing content (finishReason: ${singleCandidate?.finishReason})`); - } - - // expect a single part - if (singleCandidate.content.parts?.length !== 1 || !('text' in singleCandidate.content.parts[0])) - throw new Error(`expected 1 text part, got ${singleCandidate.content.parts?.length}`); - - // expect a single text in the part - let text = singleCandidate.content.parts[0].text || ''; - - // hack: prepend the model name to the first packet - if (!hasBegun) { - hasBegun = true; - const firstPacket: ChatStreamingPreambleModelSchema = { model: modelName }; - text = JSON.stringify(firstPacket) + text; - } - - return { text, close: false }; - }; -} - -function createStreamParserOllama(): AIStreamParser { - let hasBegun = false; - - return (data: string) => { - - // parse the JSON chunk - let wireJsonChunk: any; - try { - wireJsonChunk = JSON.parse(data); - } catch (error: any) { - // log the malformed data to the console, and rethrow to transmit as 'error' - console.log(`/api/llms/stream: Ollama parsing issue: ${error?.message || error}`, data); - throw error; - } - - // validate chunk - const chunk = wireOllamaChunkedOutputSchema.parse(wireJsonChunk); - - // pass through errors from Ollama - if ('error' in chunk) - throw new Error(chunk.error); - - // process output - let text = chunk.message?.content || /*chunk.response ||*/ ''; - - // hack: prepend the model name to the first packet - if (!hasBegun && chunk.model) { - hasBegun = true; - const firstPacket: ChatStreamingPreambleModelSchema = { model: chunk.model }; - text = JSON.stringify(firstPacket) + text; - } - - return { text, close: chunk.done }; - }; -} - -function createStreamParserOpenAI(): AIStreamParser { - let hasBegun = false; - let hasWarned = false; - - return (data: string) => { - - const json = OpenAIWire_API_Chat_Completions.ChunkResponse_schema.parse(JSON.parse(data)); - - // [OpenAI] an upstream error will be handled gracefully and transmitted as text (throw to transmit as 'error') - if (json.error) - return { text: `[OpenAI Issue] ${safeErrorString(json.error)}`, close: true }; - - // [OpenAI] if there's a warning, log it once - if (json.warning && !hasWarned) { - hasWarned = true; - console.log('/api/llms/stream: OpenAI upstream warning:', json.warning); - } - - if (json.choices.length !== 1) { - // [Azure] we seem to 'prompt_annotations' or 'prompt_filter_results' objects - which we will ignore to suppress the error - if (json.id === '' && json.object === '' && json.model === '') - return { text: '', close: false }; - throw new Error(`Expected 1 completion, got ${json.choices.length}`); - } - - const index = json.choices[0].index; - if (index !== 0 && index !== undefined /* LocalAI hack/workaround until https://github.com/go-skynet/LocalAI/issues/788 */) - throw new Error(`Expected completion index 0, got ${index}`); - let text = json.choices[0].delta?.content /*|| json.choices[0]?.text*/ || ''; - - // hack: prepend the model name to the first packet - if (!hasBegun) { - hasBegun = true; - const firstPacket: ChatStreamingPreambleModelSchema = { model: json.model }; - text = JSON.stringify(firstPacket) + text; - } - - // [LocalAI] workaround: LocalAI doesn't send the [DONE] event, but similarly to OpenAI, it sends a "finish_reason" delta update - const close = !!json.choices[0].finish_reason; - return { text, close }; - }; -} - - -function _prepareRequestData({ access, model, history, context: _context }: ChatStreamingInputSchema): { - headers: HeadersInit; - url: string; - body: object; - vendorMuxingFormat: MuxingFormat; - vendorStreamParser: AIStreamParser; -} { - switch (access.dialect) { - case 'anthropic': - return { - ...anthropicAccess(access, '/v1/messages'), - body: anthropicMessagesPayloadOrThrow(model, history, true), - vendorMuxingFormat: 'sse', - vendorStreamParser: createStreamParserAnthropicMessages(), - }; - - case 'gemini': - return { - ...geminiAccess(access, model.id, GeminiWire_API_Generate_Content.streamingPostPath), - body: geminiGenerateContentTextPayload(model, history, access.minSafetyLevel, 1), - vendorMuxingFormat: 'sse', - vendorStreamParser: createStreamParserGemini(model.id.replace('models/', '')), - }; - - case 'ollama': - return { - ...ollamaAccess(access, OLLAMA_PATH_CHAT), - body: ollamaChatCompletionPayload(model, history, access.ollamaJson, true), - vendorMuxingFormat: 'json-nl', - vendorStreamParser: createStreamParserOllama(), - }; - - case 'azure': - case 'deepseek': - case 'groq': - case 'lmstudio': - case 'localai': - case 'mistral': - case 'openai': - case 'openpipe': - case 'openrouter': - case 'perplexity': - case 'togetherai': - return { - ...openAIAccess(access, model.id, '/v1/chat/completions'), - body: openAIChatCompletionPayload(access.dialect, model, history, null, null, 1, true), - vendorMuxingFormat: 'sse', - vendorStreamParser: createStreamParserOpenAI(), - }; - } -} \ No newline at end of file diff --git a/src/modules/llms/server/llm.server.types.ts b/src/modules/llms/server/llm.server.types.ts index 20007e0d2..4e5c7fced 100644 --- a/src/modules/llms/server/llm.server.types.ts +++ b/src/modules/llms/server/llm.server.types.ts @@ -88,39 +88,3 @@ export const ModelDescription_schema = z.object({ export const ListModelsResponse_schema = z.object({ models: z.array(ModelDescription_schema), }); - -// } - - -// chat context - -export const llmsGenerateContextSchema = z.object({ - method: z.literal('chat-generate'), - name: z.enum(['chat-ai-title', 'chat-ai-summarize', 'chat-followup-diagram', 'chat-followup-htmlui', 'chat-react-turn', 'draw-expand-prompt']), - ref: z.string(), -}); -export type GenerateContextNameSchema = z.infer['name']; - -export const llmsStreamingContextSchema = z.object({ - method: z.literal('chat-stream'), - name: z.enum(['conversation', 'ai-diagram', 'ai-flattener', 'call', 'beam-scatter', 'beam-gather', 'persona-extract']), - ref: z.string(), -}); -export type StreamingContextNameSchema = z.infer['name']; - - -// (non-streaming) Chat Generation Output - -export const llmsChatGenerateOutputSchema = z.object({ - role: z.enum(['assistant', 'system', 'user']), - content: z.string().nullable(), - finish_reason: z.enum(['stop', 'length']).nullable(), -}); - -export const llmsChatGenerateWithFunctionsOutputSchema = z.union([ - llmsChatGenerateOutputSchema, - z.object({ - function_name: z.string(), - function_arguments: z.record(z.any()), - }), -]); \ No newline at end of file diff --git a/src/modules/llms/server/ollama/ollama.router.ts b/src/modules/llms/server/ollama/ollama.router.ts index 77c767321..f87d6c5b9 100644 --- a/src/modules/llms/server/ollama/ollama.router.ts +++ b/src/modules/llms/server/ollama/ollama.router.ts @@ -1,5 +1,4 @@ import { z } from 'zod'; -import { TRPCError } from '@trpc/server'; import { createTRPCRouter, publicProcedure } from '~/server/api/trpc.server'; import { env } from '~/server/env.mjs'; @@ -9,11 +8,11 @@ import { LLM_IF_OAI_Chat } from '~/common/stores/llms/llms.types'; import { capitalizeFirstLetter } from '~/common/util/textUtils'; import { fixupHost } from '~/common/util/urlUtils'; -import { ListModelsResponse_schema, llmsChatGenerateOutputSchema, llmsGenerateContextSchema } from '../llm.server.types'; -import { OpenAIHistorySchema, openAIHistorySchema, OpenAIModelSchema, openAIModelSchema } from '../openai/openai.router'; +import { ListModelsResponse_schema } from '../llm.server.types'; +import { OpenAIHistorySchema, OpenAIModelSchema } from '../openai/openai.router'; import { OLLAMA_BASE_MODELS, OLLAMA_PREV_UPDATE } from './ollama.models'; -import { WireOllamaChatCompletionInput, wireOllamaChunkedOutputSchema, wireOllamaListModelsSchema, wireOllamaModelInfoSchema } from './ollama.wiretypes'; +import { WireOllamaChatCompletionInput, wireOllamaListModelsSchema, wireOllamaModelInfoSchema } from './ollama.wiretypes'; // Default hosts @@ -114,15 +113,6 @@ const adminPullModelSchema = z.object({ name: z.string(), }); -const chatGenerateInputSchema = z.object({ - access: ollamaAccessSchema, - model: openAIModelSchema, - history: openAIHistorySchema, - // functions: openAIFunctionsSchema.optional(), - // forceFunctionName: z.string().optional(), - context: llmsGenerateContextSchema.optional(), -}); - const listPullableOutputSchema = z.object({ pullable: z.array(z.object({ id: z.string(), @@ -248,32 +238,4 @@ export const llmOllamaRouter = createTRPCRouter({ }; }), - /* Ollama: Chat generation */ - chatGenerate: publicProcedure - .input(chatGenerateInputSchema) - .output(llmsChatGenerateOutputSchema) - .mutation(async ({ input: { access, history, model } }) => { - - const wireGeneration = await ollamaPOST(access, ollamaChatCompletionPayload(model, history, access.ollamaJson, false), OLLAMA_PATH_CHAT); - const generation = wireOllamaChunkedOutputSchema.parse(wireGeneration); - - if ('error' in generation) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Ollama chat-generation issue: ${generation.error}`, - }); - - if (!generation.message?.content) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Ollama chat-generation API issue: ${JSON.stringify(wireGeneration)}`, - }); - - return { - role: 'assistant', - content: generation.message.content, - finish_reason: generation.done ? 'stop' : null, - }; - }), - }); diff --git a/src/modules/llms/server/openai/openai.router.ts b/src/modules/llms/server/openai/openai.router.ts index 8060a7166..c33de6fe2 100644 --- a/src/modules/llms/server/openai/openai.router.ts +++ b/src/modules/llms/server/openai/openai.router.ts @@ -10,9 +10,9 @@ import { T2iCreateImageOutput, t2iCreateImagesOutputSchema } from '~/modules/t2i import { Brand } from '~/common/app.config'; import { fixupHost } from '~/common/util/urlUtils'; -import { OpenAIWire_API_Chat_Completions, OpenAIWire_API_Images_Generations, OpenAIWire_API_Models_List, OpenAIWire_API_Moderations_Create, OpenAIWire_Tools } from '~/modules/aix/server/dispatch/wiretypes/openai.wiretypes'; +import { OpenAIWire_API_Images_Generations, OpenAIWire_API_Models_List, OpenAIWire_API_Moderations_Create } from '~/modules/aix/server/dispatch/wiretypes/openai.wiretypes'; -import { ListModelsResponse_schema, llmsChatGenerateWithFunctionsOutputSchema, llmsGenerateContextSchema, ModelDescriptionSchema } from '../llm.server.types'; +import { ListModelsResponse_schema, ModelDescriptionSchema } from '../llm.server.types'; import { azureModelToModelDescription, deepseekModelToModelDescription, groqModelSortFn, groqModelToModelDescription, lmStudioModelToModelDescription, localAIModelToModelDescription, mistralModelsSort, mistralModelToModelDescription, openAIModelFilter, openAIModelToModelDescription, openPipeModelDescriptions, openPipeModelSort, openPipeModelToModelDescriptions, openRouterModelFamilySortFn, openRouterModelToModelDescription, perplexityAIModelDescriptions, perplexityAIModelSort, togetherAIModelsToModelDescriptions } from './models.data'; import { wilreLocalAIModelsApplyOutputSchema, wireLocalAIModelsAvailableOutputSchema, wireLocalAIModelsListOutputSchema } from './localai.wiretypes'; @@ -56,15 +56,6 @@ const listModelsInputSchema = z.object({ access: openAIAccessSchema, }); -const chatGenerateWithFunctionsInputSchema = z.object({ - access: openAIAccessSchema, - model: openAIModelSchema, - history: openAIHistorySchema, - functions: z.array(OpenAIWire_Tools.FunctionDefinition_schema).optional(), - forceFunctionName: z.string().optional(), - context: llmsGenerateContextSchema.optional(), -}); - const createImagesInputSchema = z.object({ access: openAIAccessSchema, // for this object sync with <> wireOpenAICreateImageRequestSchema @@ -256,40 +247,6 @@ export const llmOpenAIRouter = createTRPCRouter({ return { models }; }), - /* [OpenAI] (non streaming) chat generation */ - chatGenerateWithFunctions: publicProcedure - .input(chatGenerateWithFunctionsInputSchema) - .output(llmsChatGenerateWithFunctionsOutputSchema) - .mutation(async ({ input }) => { - - const { access, model, history, functions, forceFunctionName, context } = input; - const isFunctionsCall = !!functions && functions.length > 0; - - const completionsBody = openAIChatCompletionPayload(access.dialect, model, history, isFunctionsCall ? functions : null, forceFunctionName ?? null, 1, false); - const wireCompletions = await openaiPOSTOrThrow( - access, model.id, completionsBody, '/v1/chat/completions', - ); - - // expect a single output - if (wireCompletions?.choices?.length !== 1) { - console.error(`[POST] llmOpenAI.chatGenerateWithFunctions: ${access.dialect}: ${context?.name || 'no context'}: unexpected output${forceFunctionName ? ` (fn: ${forceFunctionName})` : ''}:`, model.id, wireCompletions?.choices); - throw new TRPCError({ - code: 'UNPROCESSABLE_CONTENT', - message: `[OpenAI Issue] Expected 1 completion, got ${wireCompletions?.choices?.length}`, - }); - } - let { message, finish_reason } = wireCompletions.choices[0]; - - // LocalAI hack/workaround, until https://github.com/go-skynet/LocalAI/issues/788 is fixed - if (finish_reason === undefined) - finish_reason = 'stop'; - - // check for a function output - // NOTE: this includes a workaround for when we requested a function but the model could not deliver - return (finish_reason === 'tool_calls' || 'tool_calls' in message) - ? parseChatGenerateSingleToolFunctionOutput(isFunctionsCall, message) - : parseChatGenerateOutput(message, finish_reason); - }), /* [OpenAI/LocalAI] images/generations */ createImages: publicProcedure @@ -625,69 +582,6 @@ export function openAIAccess(access: OpenAIAccessSchema, modelRefId: string | nu } -export function openAIChatCompletionPayload(dialect: OpenAIDialects, model: OpenAIModelSchema, history: OpenAIHistorySchema, functions: OpenAIWire_Tools.FunctionDefinition[] | null, forceFunctionName: string | null, n: number, stream: boolean): OpenAIWire_API_Chat_Completions.Request { - - // Hotfixes to comply with API restrictions - const hotfixAlternateUARoles = dialect === 'perplexity'; - const hotfixSkipEmptyMessages = dialect === 'perplexity'; - const performFixes = hotfixAlternateUARoles || hotfixSkipEmptyMessages; - - // recreate history for hotfixes - // NOTE: we do not like that we have to introduce aberrations by altering history, but it's a necessary evil - if (performFixes) { - history = history.reduce((acc, historyItem) => { - - // skip empty messages - if (hotfixSkipEmptyMessages && !historyItem.content.trim()) return acc; - - // if the current item has the same role as the last item, concatenate their content - if (hotfixAlternateUARoles && acc.length > 0) { - const lastItem = acc[acc.length - 1]; - if (lastItem.role === historyItem.role) { - // replace the last item with the new concatenatedItem - acc[acc.length - 1] = { - ...lastItem, - content: lastItem.content + ABERRATION_FIXUP_SQUASH + historyItem.content, - }; - return acc; - } - } - - // if it's not a case for concatenation, just push the current item to the accumulator - acc.push(historyItem); - return acc; - }, [] as OpenAIHistorySchema); - } - - const chatCompletionRequest: OpenAIWire_API_Chat_Completions.Request = { - model: model.id, - messages: history, - }; - if (stream) { - chatCompletionRequest.stream = true; - chatCompletionRequest.stream_options = { include_usage: true }; - } - if (model.temperature !== undefined) - chatCompletionRequest.temperature = model.temperature; - if (model.maxTokens) - chatCompletionRequest.max_tokens = model.maxTokens; - if (functions?.length) - chatCompletionRequest.tools = functions.map(fun => ({ - type: 'function', - function: fun, - })); - if (forceFunctionName) - chatCompletionRequest.tool_choice = { - type: 'function', - function: { name: forceFunctionName }, - }; - if (n > 1) { - chatCompletionRequest.n = n; - throw new Error('OpenAI-derived API do not support n > 1 for chat completions, so we will not do it either'); - } - return chatCompletionRequest; -} - async function openaiGETOrThrow(access: OpenAIAccessSchema, apiPath: string /*, signal?: AbortSignal*/): Promise { const { headers, url } = openAIAccess(access, null, apiPath); return await fetchJsonOrTRPCThrow({ url, headers, name: `OpenAI/${access.dialect}` }); @@ -697,59 +591,3 @@ async function openaiPOSTOrThrow( const { headers, url } = openAIAccess(access, modelRefId, apiPath); return await fetchJsonOrTRPCThrow({ url, method: 'POST', headers, body, name: `OpenAI/${access.dialect}` }); } - -function parseChatGenerateSingleToolFunctionOutput(isFunctionsCall: boolean, message: OpenAIWire_API_Chat_Completions.Response['choices'][number]['message']) { - // NOTE: Defensive: we run extensive validation because the API is not well tested and documented at the moment - if (!isFunctionsCall) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `[OpenAI Issue] Received a function call without a function call request`, - }); - - // validate a single function call - if (!message.tool_calls || message.tool_calls.length !== 1 || message.tool_calls[0].type !== 'function' || message.content) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `[OpenAI Issue] Expected a function call, got a message`, - }); - - // got a function call, so parse it - const fc = message.tool_calls[0].function; - if (!fc.name || !fc.arguments) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `[OpenAI Issue] Issue with the function call, missing name or arguments`, - }); - - // decode the function call - const fcName = fc.name; - let fcArgs: object; - try { - fcArgs = JSON.parse(fc.arguments); - } catch (error: any) { - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `[OpenAI Issue] Issue with the function call, arguments are not valid JSON`, - }); - } - - return { - function_name: fcName, - function_arguments: fcArgs, - }; -} - -function parseChatGenerateOutput(message: OpenAIWire_API_Chat_Completions.Response['choices'][number]['message'], finish_reason: OpenAIWire_API_Chat_Completions.Response['choices'][number]['finish_reason']) { - // validate the message - if (message.content === undefined) - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `[OpenAI Issue] Expected a message, got a null message`, - }); - - return { - role: message.role, - content: message.content, - finish_reason: finish_reason === 'stop' ? 'stop' as const : finish_reason === 'length' ? 'length' as const : null, - }; -} \ No newline at end of file diff --git a/src/modules/llms/vendors/IModelVendor.ts b/src/modules/llms/vendors/IModelVendor.ts index aea858f77..309e54e00 100644 --- a/src/modules/llms/vendors/IModelVendor.ts +++ b/src/modules/llms/vendors/IModelVendor.ts @@ -4,13 +4,11 @@ import type { SvgIconProps } from '@mui/joy'; import type { BackendCapabilities } from '~/modules/backend/store-backend-capabilities'; -import type { DLLM, DLLMId } from '~/common/stores/llms/llms.types'; +import type { DLLM } from '~/common/stores/llms/llms.types'; import type { DModelsServiceId } from '~/common/stores/llms/modelsservice.types'; import type { ModelDescriptionSchema } from '../server/llm.server.types'; import type { ModelVendorId } from './vendors.registry'; -import type { StreamingClientUpdate } from './unifiedStreamingClient'; -import type { VChatContextRef, VChatFunctionIn, VChatGenerateContextName, VChatMessageIn, VChatMessageOrFunctionCallOut, VChatMessageOut, VChatStreamContextName } from '../llm.client'; export interface IModelVendor> { @@ -43,24 +41,4 @@ export interface IModelVendor; - rpcChatGenerateOrThrow( - access: TAccess, - llmOptions: TLLMOptions, - messages: VChatMessageIn[], - contextName: VChatGenerateContextName, contextRef: VChatContextRef | null, - functions: VChatFunctionIn[] | null, forceFunctionName: string | null, - maxTokens?: number, - ): Promise; - - streamingChatGenerateOrThrow( - access: TAccess, - llmId: DLLMId, - llmOptions: TLLMOptions, - messages: VChatMessageIn[], - contextName: VChatStreamContextName, contextRef: VChatContextRef, - functions: VChatFunctionIn[] | null, forceFunctionName: string | null, - abortSignal: AbortSignal, - onUpdate: (update: StreamingClientUpdate, done: boolean) => void, - ): Promise; - } diff --git a/src/modules/llms/vendors/anthropic/anthropic.vendor.ts b/src/modules/llms/vendors/anthropic/anthropic.vendor.ts index c2fd66b19..f2873a879 100644 --- a/src/modules/llms/vendors/anthropic/anthropic.vendor.ts +++ b/src/modules/llms/vendors/anthropic/anthropic.vendor.ts @@ -3,10 +3,8 @@ import { apiAsync } from '~/common/util/trpc.client'; import type { AnthropicAccessSchema } from '../../server/anthropic/anthropic.router'; import type { IModelVendor } from '../IModelVendor'; -import type { VChatContextRef, VChatGenerateContextName, VChatMessageOut } from '../../llm.client'; -import { unifiedStreamingClient } from '../unifiedStreamingClient'; -import { DOpenAILLMOptions, FALLBACK_LLM_RESPONSE_TOKENS, FALLBACK_LLM_TEMPERATURE } from '../openai/openai.vendor'; +import type { DOpenAILLMOptions } from '../openai/openai.vendor'; import { OpenAILLMOptions } from '../openai/OpenAILLMOptions'; import { AnthropicServiceSetup } from './AnthropicServiceSetup'; @@ -47,35 +45,4 @@ export const ModelVendorAnthropic: IModelVendor await apiAsync.llmAnthropic.listModels.query({ access }), - // Chat Generate (non-streaming) with Functions - rpcChatGenerateOrThrow: async (access, llmOptions, messages, contextName: VChatGenerateContextName, contextRef: VChatContextRef | null, functions, forceFunctionName, maxTokens) => { - if (functions?.length || forceFunctionName) - throw new Error('Anthropic does not support functions'); - - const { llmRef, llmTemperature, llmResponseTokens } = llmOptions; - try { - return await apiAsync.llmAnthropic.chatGenerateMessage.mutate({ - access, - model: { - id: llmRef, - temperature: llmTemperature ?? FALLBACK_LLM_TEMPERATURE, - maxTokens: maxTokens || llmResponseTokens || FALLBACK_LLM_RESPONSE_TOKENS, - }, - history: messages, - context: contextRef ? { - method: 'chat-generate', - name: contextName, - ref: contextRef, - } : undefined, - }) as VChatMessageOut; - } catch (error: any) { - const errorMessage = error?.message || error?.toString() || 'Anthropic Chat Generate Error'; - console.error(`anthropic.rpcChatGenerateOrThrow: ${errorMessage}`); - throw new Error(errorMessage); - } - }, - - // Chat Generate (streaming) with Functions - streamingChatGenerateOrThrow: unifiedStreamingClient, - }; diff --git a/src/modules/llms/vendors/azure/azure.vendor.ts b/src/modules/llms/vendors/azure/azure.vendor.ts index 1977f7cf8..6d3b6a867 100644 --- a/src/modules/llms/vendors/azure/azure.vendor.ts +++ b/src/modules/llms/vendors/azure/azure.vendor.ts @@ -58,6 +58,5 @@ export const ModelVendorAzure: IModelVendor await apiAsync.llmGemini.listModels.query({ access }), - // Chat Generate (non-streaming) with Functions - rpcChatGenerateOrThrow: async (access, llmOptions, messages, contextName: VChatGenerateContextName, contextRef: VChatContextRef | null, functions, forceFunctionName, maxTokens) => { - if (functions?.length || forceFunctionName) - throw new Error('Gemini does not support functions'); - - const { llmRef, temperature, maxOutputTokens } = llmOptions; - try { - return await apiAsync.llmGemini.chatGenerate.mutate({ - access, - model: { - id: llmRef, - temperature: temperature ?? FALLBACK_LLM_TEMPERATURE, - maxTokens: maxTokens || maxOutputTokens || FALLBACK_LLM_RESPONSE_TOKENS, - }, - history: messages, - context: contextRef ? { - method: 'chat-generate', - name: contextName, - ref: contextRef, - } : undefined, - }) as VChatMessageOut; - } catch (error: any) { - const errorMessage = error?.message || error?.toString() || 'Gemini Chat Generate Error'; - console.error(`gemini.rpcChatGenerateOrThrow: ${errorMessage}`); - throw new Error(errorMessage); - } - }, - - // Chat Generate (streaming) with Functions - streamingChatGenerateOrThrow: unifiedStreamingClient, - }; diff --git a/src/modules/llms/vendors/groq/groq.vendor.ts b/src/modules/llms/vendors/groq/groq.vendor.ts index 58a51e976..a4ecd10d2 100644 --- a/src/modules/llms/vendors/groq/groq.vendor.ts +++ b/src/modules/llms/vendors/groq/groq.vendor.ts @@ -44,6 +44,5 @@ export const ModelVendorGroq: IModelVendor await apiAsync.llmOllama.listModels.query({ access }), - // Chat Generate (non-streaming) with Functions - rpcChatGenerateOrThrow: async (access, llmOptions, messages, contextName: VChatGenerateContextName, contextRef: VChatContextRef | null, functions, forceFunctionName, maxTokens) => { - if (functions?.length || forceFunctionName) - throw new Error('Ollama does not support functions'); - - const { llmRef, llmTemperature, llmResponseTokens } = llmOptions; - try { - return await apiAsync.llmOllama.chatGenerate.mutate({ - access, - model: { - id: llmRef, - temperature: llmTemperature ?? FALLBACK_LLM_TEMPERATURE, - maxTokens: maxTokens || llmResponseTokens || FALLBACK_LLM_RESPONSE_TOKENS, - }, - history: messages, - context: contextRef ? { - method: 'chat-generate', - name: contextName, - ref: contextRef, - } : undefined, - }) as VChatMessageOut; - } catch (error: any) { - const errorMessage = error?.message || error?.toString() || 'Ollama Chat Generate Error'; - console.error(`ollama.rpcChatGenerateOrThrow: ${errorMessage}`); - throw new Error(errorMessage); - } - }, - - // Chat Generate (streaming) with Functions - streamingChatGenerateOrThrow: unifiedStreamingClient, - }; diff --git a/src/modules/llms/vendors/openai/openai.vendor.ts b/src/modules/llms/vendors/openai/openai.vendor.ts index 770232c59..2c73ef0ef 100644 --- a/src/modules/llms/vendors/openai/openai.vendor.ts +++ b/src/modules/llms/vendors/openai/openai.vendor.ts @@ -3,8 +3,6 @@ import { apiAsync } from '~/common/util/trpc.client'; import type { IModelVendor } from '../IModelVendor'; import type { OpenAIAccessSchema } from '../../server/openai/openai.router'; -import type { VChatContextRef, VChatGenerateContextName, VChatMessageOrFunctionCallOut } from '../../llm.client'; -import { unifiedStreamingClient } from '../unifiedStreamingClient'; import { OpenAILLMOptions } from './OpenAILLMOptions'; import { OpenAIServiceSetup } from './OpenAIServiceSetup'; @@ -59,34 +57,4 @@ export const ModelVendorOpenAI: IModelVendor await apiAsync.llmOpenAI.listModels.query({ access }), - // Chat Generate (non-streaming) with Functions - rpcChatGenerateOrThrow: async (access, llmOptions, messages, contextName: VChatGenerateContextName, contextRef: VChatContextRef | null, functions, forceFunctionName, maxTokens) => { - const { llmRef, llmTemperature, llmResponseTokens } = llmOptions; - try { - return await apiAsync.llmOpenAI.chatGenerateWithFunctions.mutate({ - access, - model: { - id: llmRef, - temperature: llmTemperature ?? FALLBACK_LLM_TEMPERATURE, - maxTokens: maxTokens || llmResponseTokens || FALLBACK_LLM_RESPONSE_TOKENS, - }, - functions: functions ?? undefined, - forceFunctionName: forceFunctionName ?? undefined, - history: messages, - context: contextRef ? { - method: 'chat-generate', - name: contextName, - ref: contextRef, - } : undefined, - }) as VChatMessageOrFunctionCallOut; - } catch (error: any) { - const errorMessage = error?.message || error?.toString() || 'OpenAI Chat Generate Error'; - console.error(`openai.rpcChatGenerateOrThrow: ${errorMessage}`); - throw new Error(errorMessage); - } - }, - - // Chat Generate (streaming) with Functions - streamingChatGenerateOrThrow: unifiedStreamingClient, - }; diff --git a/src/modules/llms/vendors/openpipe/openpipe.vendor.ts b/src/modules/llms/vendors/openpipe/openpipe.vendor.ts index 71b58a3f7..847e1ada0 100644 --- a/src/modules/llms/vendors/openpipe/openpipe.vendor.ts +++ b/src/modules/llms/vendors/openpipe/openpipe.vendor.ts @@ -46,6 +46,5 @@ export const ModelVendorOpenPipe: IModelVendor; - -/** - * Client side chat generation, with streaming. This decodes the (text) streaming response from - * our server streaming endpoint (plain text, not EventSource), and signals updates via a callback. - * - * Vendor-specific implementation is on our server backend (API) code. This function tries to be - * as generic as possible. - * - * NOTE: onUpdate is callback when a piece of a message (text, model name, typing..) is received - */ -export async function unifiedStreamingClient( - access: ChatStreamingInputSchema['access'], - llmId: DLLMId, - llmOptions: TLLMOptions, - messages: VChatMessageIn[], - contextName: VChatStreamContextName, contextRef: VChatContextRef, - functions: VChatFunctionIn[] | null, forceFunctionName: string | null, - abortSignal: AbortSignal, - onUpdate: (update: StreamingClientUpdate, done: boolean) => void, -): Promise { - - // model params (llm) - const { llmRef, llmTemperature, llmResponseTokens } = (llmOptions as any) || {}; - if (!llmRef || llmTemperature === undefined) - throw new Error(`Error in configuration for model ${llmId}: ${JSON.stringify(llmOptions)}`); - - // [OpenAI-only] check for harmful content with the free 'moderation' API, if the user requests so - if (access.dialect === 'openai' && access.moderationCheck) { - const moderationUpdate = await _openAIModerationCheck(access, messages.at(-1) ?? null); - if (moderationUpdate) - return onUpdate({ textSoFar: moderationUpdate, typing: false }, true); - } - - // prepare the input, similarly to the tRPC openAI.chatGenerate - const input: ChatStreamingInputSchema = { - access, - model: { - id: llmRef, - temperature: llmTemperature, - ...(llmResponseTokens ? { maxTokens: llmResponseTokens } : {}), - }, - history: messages, - context: { - method: 'chat-stream', - name: contextName, // this errors if the client VChatContextName mismatches the server z.enum - ref: contextRef, - }, - }; - - // connect to the server-side streaming endpoint - const timeFetch = performance.now(); - const response = await frontendSideFetch('/api/llms/stream', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(input), - signal: abortSignal, - }); - - if (!response.ok || !response.body) { - const errorMessage = response.body ? await response.text() : 'No response from server'; - return onUpdate({ textSoFar: errorMessage, typing: false }, true); - } - - const responseReader = response.body.getReader(); - const textDecoder = new TextDecoder('utf-8'); - - // loop forever until the read is done, or the abort controller is triggered - let incrementalText = ''; - let parsedPreambleStart = false; - let parsedPreableModel = false; - while (true) { - const { value, done } = await responseReader.read(); - - // normal exit condition - if (done) { - if (value?.length) - console.log('unifiedStreamingClient: unexpected value in the last packet:', value?.length); - break; - } - - incrementalText += textDecoder.decode(value, { stream: true }); - - // we have two packets with a serialized flat json object at the start; this is side data, before the text flow starts - while ((!parsedPreambleStart || !parsedPreableModel) && incrementalText.startsWith('{')) { - - // extract a complete JSON object, if present - const endOfJson = incrementalText.indexOf('}'); - if (endOfJson === -1) break; - const jsonString = incrementalText.substring(0, endOfJson + 1); - incrementalText = incrementalText.substring(endOfJson + 1); - - // first packet: preamble to let the Vercel edge function go over time - if (!parsedPreambleStart) { - parsedPreambleStart = true; - try { - const parsed: ChatStreamingPreambleStartSchema = JSON.parse(jsonString); - if (parsed.type !== 'start') - console.log('unifiedStreamingClient: unexpected preamble type:', parsed?.type, 'time:', performance.now() - timeFetch); - } catch (e) { - // error parsing JSON, ignore - console.log('unifiedStreamingClient: error parsing start JSON:', e); - } - continue; - } - - // second packet: the model name - if (!parsedPreableModel) { - parsedPreableModel = true; - try { - const parsed: ChatStreamingPreambleModelSchema = JSON.parse(jsonString); - onUpdate({ originLLM: parsed.model }, false); - } catch (e) { - // error parsing JSON, ignore - console.log('unifiedStreamingClient: error parsing model JSON:', e); - } - } - } - - if (incrementalText) - onUpdate({ textSoFar: incrementalText }, false); - } -} - - -/** - * OpenAI-specific moderation check. This is a separate function, as it's not part of the - * streaming chat generation, but it's a pre-check before we even start the streaming. - * - * @returns null if the message is safe, or a string with the user message if it's not safe - */ -async function _openAIModerationCheck(access: OpenAIAccessSchema, lastMessage: VChatMessageIn | null): Promise { - if (!lastMessage || lastMessage.role !== 'user') - return null; - - try { - const moderationResult = await apiAsync.llmOpenAI.moderation.mutate({ - access, text: lastMessage.content, - }); - const issues = moderationResult.results.reduce((acc, result) => { - if (result.flagged) { - Object - .entries(result.categories) - .filter(([_, value]) => value) - .forEach(([key, _]) => acc.add(key)); - } - return acc; - }, new Set()); - - // if there's any perceived violation, we stop here - if (issues.size) { - const categoriesText = [...issues].map(c => `\`${c}\``).join(', '); - // do not proceed with the streaming request - return `[Moderation] I an unable to provide a response to your query as it violated the following categories of the OpenAI usage policies: ${categoriesText}.\nFor further explanation please visit https://platform.openai.com/docs/guides/moderation/moderation`; - } - } catch (error: any) { - // as the moderation check was requested, we cannot proceed in case of error - return '[Issue] There was an error while checking for harmful content. ' + error?.toString(); - } - - // moderation check was successful - return null; -}