Skip to main content

从 MapReduceDocumentsChain 迁移

MapReduceDocumentsChain 实现了一种针对(可能较长)文本的映射-归约策略。该策略如下:

  • 将文本拆分为较小的文档;
  • 将一个过程映射到较小的文档上;
  • 将过程的结果归约或整合为最终结果。

请注意,映射步骤通常在输入文档上并行化。

在这种情况下应用的一个常见过程是摘要,其中映射步骤对单个文档进行摘要,而归约步骤生成摘要的摘要。

在归约步骤中,MapReduceDocumentsChain 支持摘要的递归“折叠”:输入将根据令牌限制进行分区,并生成分区的摘要。此步骤将重复进行,直到摘要的总长度在所需限制内,从而允许对任意长度文本进行摘要。这对于具有较小上下文窗口的模型特别有用。

LangGraph 支持 map-reduce 工作流,并为此问题提供了许多优势:

  • LangGraph 允许单个步骤(例如连续摘要)进行流式处理,从而更好地控制执行;
  • LangGraph的检查点支持错误恢复、扩展人机协作工作流,并更容易融入对话应用程序。
  • LangGraph的实现更容易扩展,正如我们下面将看到的。

下面我们将介绍MapReduceDocumentsChain及其相应的LangGraph实现,首先是一个简单的示例以作说明,其次是一个较长的示例文本以演示递归减少步骤。

让我们首先加载一个聊天模型:

pip install -qU langchain-openai
import getpass
import os

os.environ["OPENAI_API_KEY"] = getpass.getpass()

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

基本示例(短文档)

让我们使用以下3个文档作为说明。

<!--IMPORTS:[{"imported": "Document", "source": "langchain_core.documents", "docs": "https://python.langchain.com/api_reference/core/documents/langchain_core.documents.base.Document.html", "title": "Migrating from MapReduceDocumentsChain"}]-->
from langchain_core.documents import Document

documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]

传统

Details

下面我们展示了一个使用 MapReduceDocumentsChain 的实现。我们为映射和归约步骤定义提示词模板,为这些步骤实例化单独的链,最后实例化 MapReduceDocumentsChain

<!--IMPORTS:[{"imported": "MapReduceDocumentsChain", "source": "langchain.chains", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.combine_documents.map_reduce.MapReduceDocumentsChain.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "ReduceDocumentsChain", "source": "langchain.chains", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.combine_documents.reduce.ReduceDocumentsChain.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "StuffDocumentsChain", "source": "langchain.chains.combine_documents.stuff", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.combine_documents.stuff.StuffDocumentsChain.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "LLMChain", "source": "langchain.chains.llm", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.llm.LLMChain.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "ChatPromptTemplate", "source": "langchain_core.prompts", "docs": "https://python.langchain.com/api_reference/core/prompts/langchain_core.prompts.chat.ChatPromptTemplate.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "CharacterTextSplitter", "source": "langchain_text_splitters", "docs": "https://python.langchain.com/api_reference/text_splitters/character/langchain_text_splitters.character.CharacterTextSplitter.html", "title": "Migrating from MapReduceDocumentsChain"}]-->
from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import CharacterTextSplitter

# Map
map_template = "Write a concise summary of the following: {docs}."
map_prompt = ChatPromptTemplate([("human", map_template)])
map_chain = LLMChain(llm=llm, prompt=map_prompt)


# Reduce
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)


# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)

# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=1000,
)

# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
result = map_reduce_chain.invoke(documents)

print(result["output_text"])
Fruits come in a variety of colors, with apples being red, blueberries being blue, and bananas being yellow.

LangSmith trace 中,我们观察到四个 LLM 调用:一个对每个输入文档进行总结,一个对总结进行总结。

LangGraph

下面我们展示了一个 LangGraph 实现,使用与上述相同的提示词模板。图中包括一个生成摘要的节点,该节点映射到输入文档列表。这个节点然后流向第二个生成最终摘要的节点。

Details

我们需要安装 langgraph

% pip install -qU langgraph
<!--IMPORTS:[{"imported": "StrOutputParser", "source": "langchain_core.output_parsers", "docs": "https://python.langchain.com/api_reference/core/output_parsers/langchain_core.output_parsers.string.StrOutputParser.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "ChatPromptTemplate", "source": "langchain_core.prompts", "docs": "https://python.langchain.com/api_reference/core/prompts/langchain_core.prompts.chat.ChatPromptTemplate.html", "title": "Migrating from MapReduceDocumentsChain"}]-->
import operator
from typing import Annotated, List, TypedDict

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph

map_template = "Write a concise summary of the following: {context}."

reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""

map_prompt = ChatPromptTemplate([("human", map_template)])
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | StrOutputParser()

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
# Notice here we use the operator.add
# This is because we want combine all the summaries we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
contents: List[str]
summaries: Annotated[list, operator.add]
final_summary: str


# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
content: str


# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
response = await map_chain.ainvoke(state["content"])
return {"summaries": [response]}


# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [
Send("generate_summary", {"content": content}) for content in state["contents"]
]


# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["summaries"])
return {"final_summary": response}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("generate_final_summary", generate_final_summary)
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "generate_final_summary")
graph.add_edge("generate_final_summary", END)
app = graph.compile()
from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

请注意,以流式模式调用图形允许我们在执行过程中监控步骤并可能对其采取行动。

# Call the graph:
async for step in app.astream({"contents": [doc.page_content for doc in documents]}):
print(step)
{'generate_summary': {'summaries': ['Apples are typically red in color.']}}
{'generate_summary': {'summaries': ['Bananas are yellow in color.']}}
{'generate_summary': {'summaries': ['Blueberries are a type of fruit that are blue in color.']}}
{'generate_final_summary': {'final_summary': 'The main themes are the colors of different fruits: apples are red, blueberries are blue, and bananas are yellow.'}}

LangSmith trace 中,我们恢复了之前的四个 LLM 调用。

总结长文档

当文本相对于 LLM 的上下文窗口较长时,映射-归约流程特别有用。MapReduceDocumentsChain 支持摘要的递归“折叠”:输入根据令牌限制进行分区,并生成分区的摘要。此步骤重复进行,直到摘要的总长度在所需限制内,从而允许对任意长度的文本进行总结。

这个“合并”步骤在MapReduceDocumentsChain中实现为一个while循环。我们可以在一篇更长的文本上演示这个步骤,这是一篇由Lilian Weng撰写的LLM驱动的自主代理博客文章(在RAG教程和其他文档中有介绍)。

首先,我们加载帖子并将其分块为更小的“子文档”:

<!--IMPORTS:[{"imported": "WebBaseLoader", "source": "langchain_community.document_loaders", "docs": "https://python.langchain.com/api_reference/community/document_loaders/langchain_community.document_loaders.web_base.WebBaseLoader.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "CharacterTextSplitter", "source": "langchain_text_splitters", "docs": "https://python.langchain.com/api_reference/text_splitters/character/langchain_text_splitters.character.CharacterTextSplitter.html", "title": "Migrating from MapReduceDocumentsChain"}]-->
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()

text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(documents)
print(f"Generated {len(split_docs)} documents.")
USER_AGENT environment variable not set, consider setting it to identify your requests.
Created a chunk of size 1003, which is longer than the specified 1000
``````output
Generated 14 documents.

传统

Details

我们可以像之前一样调用MapReduceDocumentsChain

result = map_reduce_chain.invoke(split_docs)

print(result["output_text"])
The article discusses the use of Large Language Models (LLMs) to power autonomous agents in various tasks, showcasing their capabilities in problem-solving beyond generating written content. Key components such as planning, memory optimization, and tool use are explored, with proof-of-concept demos like AutoGPT and GPT-Engineer demonstrating the potential of LLM-powered agents. Challenges include limitations in historical information retention and natural language interface reliability, while the potential of LLMs in enhancing reasoning, problem-solving, and planning proficiency for autonomous agents is highlighted. Overall, the article emphasizes the versatility and power of LLMs in creating intelligent agents for tasks like scientific discovery and experiment design.

考虑上述调用的LangSmith跟踪。在实例化我们的ReduceDocumentsChain时,我们设置了token_max为1,000个令牌。这导致总共进行了17次LLM调用:

  • 14次调用用于总结由我们的文本分割器生成的14个子文档。
  • 这生成的摘要总计约为1,000 - 2,000个令牌。因为我们设置了token_max为1,000,所以还有两次调用来总结(或“合并”)这些摘要。
  • 最后一次调用用于生成两个“合并”摘要的最终摘要。

LangGraph

Details

我们可以在LangGraph中扩展我们原始的映射-减少实现,以实现相同的递归合并步骤。我们进行以下更改:

  • 添加一个 collapsed_summaries 键到状态中以存储压缩摘要;
  • 更新最终摘要节点以总结压缩摘要;
  • 添加一个 collapse_summaries 节点,根据令牌长度(这里为1,000个令牌,如之前所述)对文档列表进行分区,并生成每个分区的摘要,将结果存储在 collapsed_summaries 中。

我们从 collapse_summaries 到自身添加一个条件边以形成循环:如果压缩摘要的总数超过 token_max,我们重新运行该节点。

<!--IMPORTS:[{"imported": "acollapse_docs", "source": "langchain.chains.combine_documents.reduce", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.combine_documents.reduce.acollapse_docs.html", "title": "Migrating from MapReduceDocumentsChain"}, {"imported": "split_list_of_docs", "source": "langchain.chains.combine_documents.reduce", "docs": "https://python.langchain.com/api_reference/langchain/chains/langchain.chains.combine_documents.reduce.split_list_of_docs.html", "title": "Migrating from MapReduceDocumentsChain"}]-->
from typing import Literal

from langchain.chains.combine_documents.reduce import (
acollapse_docs,
split_list_of_docs,
)


def length_function(documents: List[Document]) -> int:
"""Get number of tokens for input contents."""
return sum(llm.get_num_tokens(doc.page_content) for doc in documents)


token_max = 1000


class OverallState(TypedDict):
contents: List[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: List[Document] # add key for collapsed summaries
final_summary: str


# Add node to store summaries for collapsing
def collect_summaries(state: OverallState):
return {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}


# Modify final summary to read off collapsed summaries
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["collapsed_summaries"])
return {"final_summary": response}


graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary) # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("generate_final_summary", generate_final_summary)


# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
doc_lists = split_list_of_docs(
state["collapsed_summaries"], length_function, token_max
)
results = []
for doc_list in doc_lists:
results.append(await acollapse_docs(doc_list, reduce_chain.ainvoke))

return {"collapsed_summaries": results}


graph.add_node("collapse_summaries", collapse_summaries)


def should_collapse(
state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
num_tokens = length_function(state["collapsed_summaries"])
if num_tokens > token_max:
return "collapse_summaries"
else:
return "generate_final_summary"


graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)
app = graph.compile()

LangGraph 允许绘制图形结构以帮助可视化其功能:

from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

如之前所述,我们可以流式传输图形以观察其步骤序列。下面,我们将简单打印出步骤的名称。

请注意,由于图中存在循环,指定执行时的 recursion_limit 可能会很有帮助。这类似于 ReduceDocumentsChain.token_max,当超过指定限制时将引发特定错误。

async for step in app.astream(
{"contents": [doc.page_content for doc in split_docs]},
{"recursion_limit": 10},
):
print(list(step.keys()))
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
print(step)
{'generate_final_summary': {'final_summary': 'The summaries discuss the use of Large Language Models (LLMs) to power autonomous agents in various tasks such as problem-solving, planning, and tool use. Key components like planning, memory, and task decomposition are highlighted, along with challenges such as inefficient planning and hallucination. Techniques like Algorithm Distillation and Maximum Inner Product Search are explored for optimization, while frameworks like ReAct and Reflexion show improvements in knowledge-intensive tasks. The importance of accurate interpretation of user input and well-structured code for functional autonomy is emphasized, along with the potential of LLMs in prompting, reasoning, and emergent social behavior in simulation environments. Challenges in real-world scenarios and the use of LLMs with expert-designed tools for tasks like organic synthesis and drug discovery are also discussed.'}}

在相应的 LangSmith 跟踪 中,我们可以看到与之前相同的17个LLM调用,这次按各自的节点分组。

下一步

查看 LangGraph 文档 以获取有关使用 LangGraph 的详细信息,包括关于 LangGraph 中 map-reduce 细节的 本指南

请查看这个教程以获取更多基于大型语言模型的摘要策略。


Was this page helpful?


You can also leave detailed feedback on GitHub.

扫我,入群扫我,找书