如何流式传输
本指南假定您熟悉以下概念:
流式传输对于使基于大型语言模型(LLM)的应用程序对最终用户具有响应性至关重要。
像 LLM、解析器、提示词、检索器和代理这样的重要 LangChain 原语实现了 LangChain 的 Runnable 接口。
该接口提供了两种通用的流式传输方法:
.stream():流式传输的默认实现,用于从链中流式传输最终输出。streamEvents()和streamLog():这些方法可以同时流式传输链中的中间步骤和最终输出。
让我们来看看这两种方法!
有关 LangChain 中流式传输技术的更高层次概述,请参阅概念指南的这一部分。
使用 Stream
所有 Runnable 对象都实现了一个名为 stream 的方法。
这些方法旨在以分块的形式流式传输最终输出,一旦有可用的分块就立即生成该分块。
只有当程序中的所有步骤都知道如何处理输入流时,才能实现流式传输;也就是说,逐个处理输入分块,并生成相应的输出分块。
这种处理的复杂性可能各不相同,从像输出由 LLM 生成的 token 这样简单的任务,到像在完整 JSON 完成之前流式传输 JSON 部分这样更具挑战性的任务。
探索流式传输的最佳起点是 LLM 应用中最重要的组件之一 —— 模型本身!
LLM 和聊天模型
大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于应用程序对最终用户具有响应性的~200-300 毫秒阈值。
让应用程序感觉更具响应性的关键策略是显示中间进度;例如,逐个 token 地流式传输模型的输出。
import "dotenv/config";
Pick your chat model:
- Groq
- OpenAI
- Anthropic
- Google Gemini
- FireworksAI
- MistralAI
- VertexAI
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/groq
yarn add @langchain/groq
pnpm add @langchain/groq
Add environment variables
GROQ_API_KEY=your-api-key
Instantiate the model
import { ChatGroq } from "@langchain/groq";
const model = new ChatGroq({
model: "llama-3.3-70b-versatile",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/openai
yarn add @langchain/openai
pnpm add @langchain/openai
Add environment variables
OPENAI_API_KEY=your-api-key
Instantiate the model
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-4o-mini",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/anthropic
yarn add @langchain/anthropic
pnpm add @langchain/anthropic
Add environment variables
ANTHROPIC_API_KEY=your-api-key
Instantiate the model
import { ChatAnthropic } from "@langchain/anthropic";
const model = new ChatAnthropic({
model: "claude-3-5-sonnet-20240620",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/google-genai
yarn add @langchain/google-genai
pnpm add @langchain/google-genai
Add environment variables
GOOGLE_API_KEY=your-api-key
Instantiate the model
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
const model = new ChatGoogleGenerativeAI({
model: "gemini-2.0-flash",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/community
yarn add @langchain/community
pnpm add @langchain/community
Add environment variables
FIREWORKS_API_KEY=your-api-key
Instantiate the model
import { ChatFireworks } from "@langchain/community/chat_models/fireworks";
const model = new ChatFireworks({
model: "accounts/fireworks/models/llama-v3p1-70b-instruct",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/mistralai
yarn add @langchain/mistralai
pnpm add @langchain/mistralai
Add environment variables
MISTRAL_API_KEY=your-api-key
Instantiate the model
import { ChatMistralAI } from "@langchain/mistralai";
const model = new ChatMistralAI({
model: "mistral-large-latest",
temperature: 0
});
Install dependencies
- npm
- yarn
- pnpm
npm i @langchain/google-vertexai
yarn add @langchain/google-vertexai
pnpm add @langchain/google-vertexai
Add environment variables
GOOGLE_APPLICATION_CREDENTIALS=credentials.json
Instantiate the model
import { ChatVertexAI } from "@langchain/google-vertexai";
const model = new ChatVertexAI({
model: "gemini-1.5-flash",
temperature: 0
});
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
console.log(`${chunk.content}|`);
}
|
Hello|
!|
I'm|
a|
large|
language|
model|
developed|
by|
Open|
AI|
called|
GPT|
-|
4|
,|
based|
on|
the|
Gener|
ative|
Pre|
-trained|
Transformer|
architecture|
.|
I'm|
designed|
to|
understand|
and|
generate|
human|
-like|
text|
based|
on|
the|
input|
I|
receive|
.|
My|
primary|
function|
is|
to|
assist|
with|
answering|
questions|
,|
providing|
information|
,|
and|
engaging|
in|
various|
types|
of|
conversations|
.|
While|
I|
don't|
have|
personal|
experiences|
or|
emotions|
,|
I'm|
trained|
on|
diverse|
datasets|
that|
enable|
me|
to|
provide|
useful|
and|
relevant|
information|
across|
a|
wide|
array|
of|
topics|
.|
How|
can|
I|
assist|
you|
today|
?|
|
|
让我们看一下其中一个原始数据块:
chunks[0];
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: '',
tool_call_chunks: [],
additional_kwargs: {},
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
response_metadata: {}
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
我们得到了一个名为 AIMessageChunk 的对象。这个 chunk 代表了
AIMessage 的一部分。
消息块在设计上是可累加的——可以简单地使用 .concat()
方法将它们相加,以获得到目前为止的响应状态!
let finalChunk = chunks[0];
for (const chunk of chunks.slice(1, 5)) {
finalChunk = finalChunk.concat(chunk);
}
finalChunk;
AIMessageChunk {
lc_serializable: true,
lc_kwargs: {
content: "Hello! I'm a",
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
tool_call_chunks: [],
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: []
},
lc_namespace: [ 'langchain_core', 'messages' ],
content: "Hello! I'm a",
name: undefined,
additional_kwargs: {},
response_metadata: { prompt: 0, completion: 0, finish_reason: null },
id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
链(Chains)
实际上,几乎所有 LLM 应用程序都包含多个步骤,而不仅仅是调用语言模型。
让我们使用LangChain表达式语言(LCEL)构建一个简单的链,将提示(prompt)、模型和解析器(parser)组合在一起,并验证流式传输是否正常工作。
我们将使用StringOutputParser来解析模型的输出。这是一个简单的解析器,可以从AIMessageChunk中提取内容字段,从而获取模型返回的token。
LCEL 是一种声明式方法,通过将不同的 LangChain 基本组件串联起来,以指定一个“程序”。使用 LCEL 创建的链可以自动实现流(stream)功能,从而允许流式传输最终输出。事实上,使用 LCEL 创建的链实现了完整的标准 Runnable 接口。
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");
const parser = new StringOutputParser();
const chain = prompt.pipe(model).pipe(parser);
const stream = await chain.stream({
topic: "parrot",
});
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
Sure|
,|
here's|
a|
joke|
for|
you|
:
|
Why|
did|
the|
par|
rot|
sit|
on|
the|
stick|
?
|
Because|
it|
wanted|
to|
be|
a|
"|
pol|
ly|
-stick|
-al|
"|
observer|
!|
|
|
您不必使用 LangChain 表达语言 来使用 LangChain,而是可以通过以标准的
命令式 编程方式, 单独调用每个组件的 invoke、batch 或 stream
方法,将结果赋值给变量,然后根据需要在后续流程中使用这些变量。
如果这种方式能满足您的需求,那对我们来说完全没问题 👌!
处理输入流
如果您想在生成 JSON 数据的同时对其进行流式处理,该怎么办?
如果您依赖 JSON.parse 来解析部分 JSON 数据,解析会失败,因为部分 JSON
并不是有效的完整 JSON。
此时,您可能会完全不知道该如何处理,并认为 JSON 流式传输是不可能实现的。
其实,有一种方法可以做到这一点——解析器需要在 输入流 上进行操作,并尝试将部分 JSON “自动补全” 成为一个有效的状态。
让我们看看这样的解析器是如何工作的,以便理解其含义。
import { JsonOutputParser } from "@langchain/core/output_parsers";
const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
{
countries: [
{ name: 'France', population: 67390000 },
{ name: 'Spain', population: 47350000 },
{ name: 'Japan', population: 125800000 }
]
}
现在,让我们来打破流式传输。我们将使用前面的例子,并在末尾追加一个提取函数,从最终的 JSON 中提取国家名称。由于这个新的最后一步只是一个没有定义流式行为的函数调用,因此前面步骤的流式输出会被聚合,然后作为单个输入传递给该函数。
链中任何对最终输入而不是对输入流进行操作的步骤,都可能通过
stream 打破流式传输功能。
稍后,我们将讨论 streamEvents
API,它可以流式传输中间步骤的结果。即使链中包含仅对最终输入进行操作的步骤,该
API 仍能流式传输中间步骤的结果。
// A function that operates on finalized inputs
// rather than on an input_stream
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
非流式组件
与上述示例类似,一些内置组件(如检索器)不提供任何流式传输功能。如果我们尝试对它们进行
stream 操作会发生什么?
import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";
const template = `Answer the question based only on the following context:
{context}
Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);
const vectorstore = await MemoryVectorStore.fromTexts(
["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
[{}, {}],
new OpenAIEmbeddings()
);
const retriever = vectorstore.asRetriever();
const chunks = [];
for await (const chunk of await retriever.stream(
"What is the powerhouse of the cell?"
)) {
chunks.push(chunk);
}
console.log(chunks);
[
[
Document {
pageContent: 'mitochondria is the powerhouse of the cell',
metadata: {},
id: undefined
},
Document {
pageContent: 'buildings are made of brick',
metadata: {},
id: undefined
}
]
]
流刚刚从该组件生成最终结果。
这没问题!并非所有组件都必须实现流式传输——在某些情况下,流式传输可能是不必要的、困难的,或者根本没有意义。
由某些非流式组件构建的 LCEL 链在很多情况下仍然能够进行流式传输,链中最后一个非流式步骤之后将开始部分输出的流式传输。
以下是一个示例:
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";
const formatDocs = (docs: Document[]) => {
return docs.map((doc) => doc.pageContent).join("\n-----\n");
};
const retrievalChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
new StringOutputParser(),
]);
const stream = await retrievalChain.stream(
"What is the powerhouse of the cell?"
);
for await (const chunk of stream) {
console.log(`${chunk}|`);
}
|
M|
ito|
ch|
ond|
ria|
is|
the|
powerhouse|
of|
the|
cell|
.|
|
|
现在我们已经了解了 stream 方法的工作原理,让我们进入事件流的世界吧!
使用流事件
事件流是一个测试版 API。该 API 可能会根据反馈进行一些调整。
在 @langchain/core 0.1.27 中引入。
为了让 streamEvents 方法正常工作:
- 任何自定义函数 / 可运行对象都必须传递回调
- 在模型上设置适当的参数以强制 LLM 流式传输 token。
- 如果有任何不符合预期的情况,请告诉我们!
事件参考
以下是一个参考表,展示了一些可运行对象可能发出的事件。
当正确实现流式传输时,在输入流完全被消费之前,可运行对象的输入通常是未知的。这意味着
inputs 通常只会包含在 end 事件中,而不是 start 事件中。
| 事件 | 名称 | 数据块 | 输入 | 输出 |
|---|---|---|---|---|
| on_llm_start | [模型名称] | {‘input’: ‘hello’} | ||
| on_llm_stream | [模型名称] | ‘Hello’ 或 AIMessageChunk(content=“hello”) | ||
| on_llm_end | [模型名称] | ‘Hello human!’ | {“生成内容”: […], “模型输出”: None, …} | |
| on_chain_start | format_docs | |||
| on_chain_stream | format_docs | “hello world!, goodbye world!” | ||
| on_chain_end | format_docs | [Document(…)] | “hello world!, goodbye world!” | |
| on_tool_start | some_tool | {“x”: 1, “y”: “2”} | ||
| on_tool_stream | some_tool | {“x”: 1, “y”: “2”} | ||
| on_tool_end | some_tool | {“x”: 1, “y”: “2”} | ||
| on_retriever_start | [检索器名称] | {“query”: “hello”} | ||
| on_retriever_chunk | [检索器名称] | {文档: […]} | ||
| on_retriever_end | [检索器名称] | {“query”: “hello”} | {文档: […]} | |
| on_prompt_start | [模板名称] | {“question”: “hello”} | ||
| on_prompt_end | [模板名称] | {“question”: “hello”} | ChatPromptValue(messages: [SystemMessage, …]) |
streamEvents 在 v2
中还将发出分发的自定义事件。更多信息请参阅此指南。
聊天模型
让我们首先看一下聊天模型产生的事件。
const events = [];
const eventStream = await model.streamEvents("hello", { version: "v2" });
for await (const event of eventStream) {
events.push(event);
}
console.log(events.length);
25
嘿,API 中那个奇怪的 version=“v2” 参数是什么?! 😾
这是一个测试版 API,我们几乎肯定会对其进行一些更改。
这个版本参数将允许我们尽量减少对您代码的破坏性更改。
简而言之,我们现在打扰一下您,是为了以后不再打扰您。
让我们看一下部分开始事件和部分结束事件。
events.slice(0, 3);
[
{
event: 'on_chat_model_start',
data: { input: 'hello' },
name: 'ChatOpenAI',
tags: [],
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
data: { chunk: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
}
]
events.slice(-2);
[
{
event: 'on_chat_model_stream',
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
},
data: { chunk: [AIMessageChunk] }
},
{
event: 'on_chat_model_end',
data: { output: [AIMessageChunk] },
run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
name: 'ChatOpenAI',
tags: [],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
]
链式结构
让我们重新审视解析流式 JSON 的示例链,以探索流式事件 API。
const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" }
);
const events = [];
for await (const event of eventStream) {
events.push(event);
}
console.log(events.length);
83
如果您查看前几个事件,您会注意到有3个不同的开始事件,而不是2个开始事件。
这三个开始事件分别对应于:
- 链(模型 + 解析器)
- 模型
- 解析器
events.slice(0, 3);
[
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [],
run_id: '5dd960b8-4341-4401-8993-7d04d49fcc08',
metadata: {}
},
{
event: 'on_chat_model_start',
data: { input: [Object] },
name: 'ChatOpenAI',
tags: [ 'seq:step:1' ],
run_id: '5d2917b1-886a-47a1-807d-8a0ba4cb4f65',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
},
{
event: 'on_parser_start',
data: {},
name: 'JsonOutputParser',
tags: [ 'seq:step:2' ],
run_id: '756c57d6-d455-484f-a556-79a82c4e1d40',
metadata: {}
}
]
如果你查看最后 3 个事件,你觉得会看到什么?中间的事件呢?
让我们使用这个 API 来输出模型和解析器的流事件。我们忽略开始事件、结束事件以及来自链的事件。
let eventCount = 0;
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v1" }
);
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_llm_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
}
eventCount += 1;
}
Chat model chunk:
Chat model chunk: ```
Chat model chunk: json
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk: [
Chat model chunk:
Chat model chunk: {
Chat model chunk:
Chat model chunk: "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk: "
Chat model chunk: France
Chat model chunk: ",
Chat model chunk:
Chat model chunk: "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:
Chat model chunk: 652
Chat model chunk: 735
Chat model chunk: 11
Chat model chunk:
由于模型和解析器都支持流式传输,我们可以实时看到来自这两个组件的流式事件!很整洁!🦜
过滤事件
由于此 API 会产生大量事件,因此能够按事件进行过滤是非常有用的。
你可以通过组件的名称、组件的标签或组件的类型进行过滤。
按名称
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeNames: ["my_parser"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_parser_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'my_parser',
tags: [ 'seq:step:2' ],
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
metadata: {}
}
{
event: 'on_parser_stream',
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {},
data: { chunk: { countries: [Array] } }
}
{
event: 'on_parser_end',
data: { output: { countries: [Array] } },
run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
name: 'my_parser',
tags: [ 'seq:step:2' ],
metadata: {}
}
按类型
const chain = model
.withConfig({ runName: "model" })
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }));
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTypes: ["chat_model"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chat_model_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'model',
tags: [ 'seq:step:1' ],
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '```',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'json',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '{\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' ',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' "',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'countries',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '":',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' [\n',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
name: 'model',
tags: [ 'seq:step:1' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
按标签
标签会被给定可运行组件的子组件继承。
如果您使用标签进行过滤,请确保这是您想要的效果。
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{ version: "v2" },
{ includeTags: ["my_chain"] }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 10) {
continue;
}
console.log(event);
eventCount += 1;
}
{
event: 'on_chain_start',
data: {
input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
},
name: 'RunnableSequence',
tags: [ 'my_chain' ],
run_id: '1fed60d6-e0b7-4d5e-8ec7-cd7d3ee5c69f',
metadata: {}
}
{
event: 'on_chat_model_start',
data: { input: { messages: [Array] } },
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_parser_start',
data: {},
name: 'my_parser',
tags: [ 'seq:step:2', 'my_chain' ],
run_id: 'caf24a1e-255c-4937-9f38-6e46275d854a',
metadata: {}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: 'Certainly',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: '!',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: " Here's",
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' the',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' JSON',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' format',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
{
event: 'on_chat_model_stream',
data: {
chunk: AIMessageChunk {
lc_serializable: true,
lc_kwargs: [Object],
lc_namespace: [Array],
content: ' output',
name: undefined,
additional_kwargs: {},
response_metadata: [Object],
id: 'chatcmpl-9lO99nzUvCsZWCiq6vNtS1Soa1qNp',
tool_calls: [],
invalid_tool_calls: [],
tool_call_chunks: [],
usage_metadata: undefined
}
},
run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
name: 'ChatOpenAI',
tags: [ 'seq:step:1', 'my_chain' ],
metadata: {
ls_provider: 'openai',
ls_model_name: 'gpt-4o',
ls_model_type: 'chat',
ls_temperature: 1,
ls_max_tokens: undefined,
ls_stop: undefined
}
}
通过 HTTP 流式传输事件
为了方便起见,streamEvents 支持将流式中间事件编码为 HTTP
服务器发送事件,以字节形式进行编码。以下是其使用方式(使用
TextDecoder
将二进制数据重新转换为可读字符串):
const chain = model
.pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
.withConfig({ tags: ["my_chain"] });
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
let eventCount = 0;
const textDecoder = new TextDecoder();
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 3) {
continue;
}
console.log(textDecoder.decode(event));
eventCount += 1;
}
event: data
data: {"event":"on_chain_start","data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""},"name":"RunnableSequence","tags":["my_chain"],"run_id":"41cd92f8-9b8c-4365-8aa0-fda3abdae03d","metadata":{}}
event: data
data: {"event":"on_chat_model_start","data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}},"name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}
event: data
data: {"event":"on_parser_start","data":{},"name":"my_parser","tags":["seq:step:2","my_chain"],"run_id":"402533c5-0e4e-425d-a556-c30a350972d0","metadata":{}}
event: data
data: {"event":"on_chat_model_stream","data":{"chunk":{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","tool_call_chunks":[],"additional_kwargs":{},"id":"chatcmpl-9lO9BAQwbKDy2Ou2RNFUVi0VunAsL","tool_calls":[],"invalid_tool_calls":[],"response_metadata":{"prompt":0,"completion":0,"finish_reason":null}}}},"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"metadata":{"ls_provider":"openai","ls_model_name":"gpt-4o","ls_model_type":"chat","ls_temperature":1}}
这种格式的一个 nice 特性是,你可以将生成的流直接传递给带有正确头部的原生HTTP 响应对象(常见于像Hono和Next.js这样的框架中使用),然后在前端解析该流。你的服务端处理程序看起来会像这样:
const handler = async () => {
const eventStream = await chain.streamEvents(
`Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
{
version: "v2",
encoding: "text/event-stream",
}
);
return new Response(eventStream, {
headers: {
"content-type": "text/event-stream",
},
});
};
而你的前端可能看起来像这样(使用
@microsoft/fetch-event-source
包来获取并解析事件源):
import { fetchEventSource } from "@microsoft/fetch-event-source";
const makeChainRequest = async () => {
await fetchEventSource("https://your_url_here", {
method: "POST",
body: JSON.stringify({
foo: "bar",
}),
onmessage: (message) => {
if (message.event === "data") {
console.log(message.data);
}
},
onerror: (err) => {
console.log(err);
},
});
};
非流式组件
还记得一些组件因为不操作输入流而无法很好地进行流式传输吗?
虽然这些组件在使用 stream 时可能会中断最终输出的流式传输,但
streamEvents 仍将从支持流式传输的中间步骤中产生流式事件!
// A function that operates on finalized inputs
// rather than on an input_stream
import { JsonOutputParser } from "@langchain/core/output_parsers";
import { RunnablePassthrough } from "@langchain/core/runnables";
// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
if (!Array.isArray(inputs.countries)) {
return "";
}
return JSON.stringify(inputs.countries.map((country) => country.name));
};
const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);
const stream = await chain.stream(
`output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);
for await (const chunk of stream) {
console.log(chunk);
}
["France","Spain","Japan"]
正如预期的那样,stream API 无法正常工作,因为 extractCountryNames
不支持在流上操作。
现在,我们确认一下使用 streamEvents
时,是否仍然能看到模型和解析器的流式输出。
const eventStream = await chain.streamEvents(
`output a list of the countries france, spain and japan and their populations in JSON format.
Use a dict with an outer key of "countries" which contains a list of countries.
Each country should have the key "name" and "population"
Your output should ONLY contain valid JSON data. Do not include any other text or content in your output.`,
{ version: "v2" }
);
let eventCount = 0;
for await (const event of eventStream) {
// Truncate the output
if (eventCount > 30) {
continue;
}
const eventType = event.event;
if (eventType === "on_chat_model_stream") {
console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
} else if (eventType === "on_parser_stream") {
console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
} else {
console.log(eventType);
}
eventCount += 1;
}
Chat model chunk: Chat model chunk: Here’s Chat model chunk: how Chat model chunk: you Chat model chunk: can Chat model chunk: represent Chat model chunk: the Chat model chunk: countries Chat model chunk: France Chat model chunk: , Chat model chunk: Spain Chat model chunk: , Chat model chunk: and Chat model chunk: Japan Chat model chunk: , Chat model chunk: along Chat model chunk: with Chat model chunk: their Chat model chunk: populations Chat model chunk: , Chat model chunk: in Chat model chunk: JSON Chat model chunk: format Chat model chunk: :
Chat model chunk: ``` Chat model chunk: json Chat model chunk:
Chat model chunk: {