Skip to main content

如何并行调用可运行对象

前提条件

RunnableParallel(也称为 RunnableMap)是一种原始对象,其值是可运行对象(或可以被强制转换为可运行对象的对象,比如函数)。
它会并行运行其所有值,每个值都会使用传给 RunnableParallel 的初始输入进行调用。最终的返回值是一个对象,其键对应各个值的结果。

使用 RunnableParallels 格式化数据

RunnableParallels 适用于并行化操作,但也可以用于将一个可运行对象的输出格式转换为下一个可运行对象所需的输入格式。你可以使用它们来拆分或分支链,使得多个组件可以并行处理输入。之后,其他组件可以合并或整合这些结果,以生成最终响应。这种类型的链会创建一个如下所示的计算图:

     输入
/ \
/ \
分支1 分支2
\ /
\ /
合并

在下面的示例中,RunnableParallel 中每个链的输入都应是一个包含 "topic" 键的对象。
我们可以通过传递一个符合该结构的对象来满足这一要求。

:::提示 请参阅安装集成包的一般说明部分。 :::

npm install @langchain/anthropic @langchain/cohere @langchain/core
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableMap } from "@langchain/core/runnables";
import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({});
const jokeChain = PromptTemplate.fromTemplate(
"Tell me a joke about {topic}"
).pipe(model);
const poemChain = PromptTemplate.fromTemplate(
"write a 2-line poem about {topic}"
).pipe(model);

const mapChain = RunnableMap.from({
joke: jokeChain,
poem: poemChain,
});

const result = await mapChain.invoke({ topic: "bear" });
console.log(result);
/*
{
joke: AIMessage {
content: " Here's a silly joke about a bear:\n" +
'\n' +
'What do you call a bear with no teeth?\n' +
'A gummy bear!',
additional_kwargs: {}
},
poem: AIMessage {
content: ' Here is a 2-line poem about a bear:\n' +
'\n' +
'Furry and wild, the bear roams free \n' +
'Foraging the forest, strong as can be',
additional_kwargs: {}
}
}
*/

API Reference:

操作输入/输出

Maps 可以用于将一个可运行对象的输出转换为下一个可运行对象所需的输入格式。

请注意,下面 RunnableSequence.from() 调用中的对象会被自动转换为一个 runnable map。对象的所有键的值必须是可运行对象,或者本身可以被强制转换为可运行对象(函数转换为 RunnableLambda,对象转换为 RunnableMap)。
当你通过 .pipe() 方法组合链时,也会发生类似的强制转换。

import { CohereEmbeddings } from "@langchain/cohere";
import { PromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import { Document } from "@langchain/core/documents";
import { ChatAnthropic } from "@langchain/anthropic";
import { MemoryVectorStore } from "langchain/vectorstores/memory";

const model = new ChatAnthropic();
const vectorstore = await MemoryVectorStore.fromDocuments(
[{ pageContent: "mitochondria is the powerhouse of the cell", metadata: {} }],
new CohereEmbeddings({ model: "embed-english-v3.0" })
);
const retriever = vectorstore.asRetriever();
const template = `Answer the question based only on the following context:
{context}

Question: {question}`;

const prompt = PromptTemplate.fromTemplate(template);

const formatDocs = (docs: Document[]) => docs.map((doc) => doc.pageContent);

const retrievalChain = RunnableSequence.from([
{ context: retriever.pipe(formatDocs), question: new RunnablePassthrough() },
prompt,
model,
new StringOutputParser(),
]);

const result = await retrievalChain.invoke(
"what is the powerhouse of the cell?"
);
console.log(result);

/*
Based on the given context, the powerhouse of the cell is mitochondria.
*/

API Reference:

在此示例中,prompt 的输入期望是一个包含 "context" 和 "question" 键的对象。用户的输入只是问题。因此我们需要使用检索器获取上下文,并将用户输入作为 "question" 键的值透传过去。

下一步

你现在了解了一些使用 RunnableParallel 来格式化和并行化链式步骤的方法。

接下来,你可能会对在链中使用自定义逻辑感兴趣。


Was this page helpful?


You can also leave detailed feedback on GitHub.