2.3 四种组合文档链方式在LangChain开发中的作用及源码分析
假设阅读一本书,作者和智华合作写的一本关于Spark方面的书籍,一共1300多页,如果从第一页开始,逐渐去遍历检索信息,无论从速度、还是从性价比的角度,这是一个问题,LangChain给我们提供了一些比较经典的方式,例如map_reduce、map_rerank、refine、stuff等四种方式。
Map Reduce方式把一个很大的文件分成多个文件,如果大家做Hadoop、Spark或Flink,对这个概念应该很清楚,这是一种分而治之的思想。
map_reduce.py的代码实现:
- """通过先在文档上映射一个链来组合文档,然后再组合结果."""
- from __future__ import annotations
- from typing import Any, Callable, Dict, List, Optional, Protocol, Tuple
- from pydantic import Extra, root_validator
- from langchain.callbacks.manager import Callbacks
- from langchain.chains.combine_documents.base import BaseCombineDocumentsChain
- from langchain.chains.llm import LLMChain
- from langchain.docstore.document import Document
- class CombineDocsProtocol(Protocol):
- """ combine_docs方法的接口."""
- def __call__(self, docs: List[Document], **kwargs: Any) -> str:
- """combine_docs方法的接口."""
- def _split_list_of_docs(
- docs: List[Document], length_func: Callable, token_max: int, **kwargs: Any
- ) -> List[List[Document]]:
- new_result_doc_list = []
- _sub_result_docs = []
- for doc in docs:
- _sub_result_docs.append(doc)
- _num_tokens = length_func(_sub_result_docs, **kwargs)
- if _num_tokens > token_max:
- if len(_sub_result_docs) == 1:
- raise ValueError(
- "A single document was longer than the context length,"
- " we cannot handle this."
- )
- if len(_sub_result_docs) == 2:
- raise ValueError(
- "A single document was so long it could not be combined "
- "with another document, we cannot handle this."
- )
- new_result_doc_list.append(_sub_result_docs[:-1])
- _sub_result_docs = _sub_result_docs[-1:]
- new_result_doc_list.append(_sub_result_docs)
- return new_result_doc_list
- def _collapse_docs(
- docs: List[Document],
- combine_document_func: CombineDocsProtocol,
- **kwargs: Any,
- ) -> Document:
- result = combine_document_func(docs, **kwargs)
- combined_metadata = {k: str(v) for k, v in docs[0].metadata.items()}
- for doc in docs[1:]:
- for k, v in doc.metadata.items():
- if k in combined_metadata:
- combined_metadata[k] += f", {v}"
- else:
- combined_metadata[k] = str(v)
- return Document(page_content=result, metadata=combined_metadata)
- class MapReduceDocumentsChain(BaseCombineDocumentsChain):
- """通过在文档上映射一个链来组合文档,然后组合结果."""
- llm_chain: LLMChain
- """链单独应用于每个文档"""
- combine_document_chain: BaseCombineDocumentsChain
- """用于组合将llm_chain应用于文档的结果."""
- collapse_document_chain: Optional[BaseCombineDocumentsChain] = None
- """ 链用于在需要时折叠中间结果。如果None,将使用combine_document_chain."""
- document_variable_name: str
- """ llm_chain中用于放置文档的变量名。如果llm_chain中只有一个变量,则不需要提供这个."""
- return_intermediate_steps: bool = False
- """返回输出中映射步骤的结果."""
- @property
- def output_keys(self) -> List[str]:
- """期望输入键
- :meta private:
- """
- _output_keys = super().output_keys
- if self.return_intermediate_steps:
- _output_keys = _output_keys + ["intermediate_steps"]
- return _output_keys
- class Config:
- """pydantic对象的配置"""
- extra = Extra.forbid
- arbitrary_types_allowed = True
- @root_validator(pre=True)
- def get_return_intermediate_steps(cls, values: Dict) -> Dict:
- """为了向后兼容"""
- if "return_map_steps" in values:
- values["return_intermediate_steps"] = values["return_map_steps"]
- del values["return_map_steps"]
- return values
- @root_validator(pre=True)
- def get_default_document_variable_name(cls, values: Dict) -> Dict:
- """获取默认文档变量名(如果没有提供)"""
- if "document_variable_name" not in values:
- llm_chain_variables = values["llm_chain"].prompt.input_variables
- if len(llm_chain_variables) == 1:
- values["document_variable_name"] = llm_chain_variables[0]
- else:
- raise ValueError(
- "document_variable_name must be provided if there are "
- "multiple llm_chain input_variables"
- )
- else:
- llm_chain_variables = values["llm_chain"].prompt.input_variables
- if values["document_variable_name"] not in llm_chain_variables:
- raise ValueError(
- f"document_variable_name {values['document_variable_name']} was "
- f"not found in llm_chain input_variables: {llm_chain_variables}"
- )
- return values
- @property
- def _collapse_chain(self) -> BaseCombineDocumentsChain:
- if self.collapse_document_chain is not None:
- return self.collapse_document_chain
- else:
- return self.combine_document_chain
- def combine_docs(
- self,
- docs: List[Document],
- token_max: int = 3000,
- callbacks: Callbacks = None,
- **kwargs: Any,
- ) -> Tuple[str, dict]:
- """ 以map reduce方式组合文档。
- 通过在所有文档上映射第一个链,然后合并结果来进行组合。
- 如果需要(如果有很多文档),这种合并可以递归地完成。
- """
- results = self.llm_chain.apply(
- # FYI -这是并行化的,所以速度很快
- [{self.document_variable_name: d.page_content, **kwargs} for d in docs],
- callbacks=callbacks,
- )
- return self._process_results(
- results, docs, token_max, callbacks=callbacks, **kwargs
- )
- async def acombine_docs(
- self, docs: List[Document], callbacks: Callbacks = None, **kwargs: Any
- ) -> Tuple[str, dict]:
- """ 以map reduce方式组合文档。
- 通过在所有文档上映射第一个链,然后合并结果来进行组合。
- 如果需要(如果有很多文档),这种合并可以递归地完成。
- """
- results = await self.llm_chain.aapply(
- # FYI - this is parallelized and so it is fast.
- [{**{self.document_variable_name: d.page_content}, **kwargs} for d in docs],
- callbacks=callbacks,
- )
- return self._process_results(results, docs, callbacks=callbacks, **kwargs)
- def _process_results(
- self,
- results: List[Dict],
- docs: List[Document],
- token_max: int = 3000,
- callbacks: Callbacks = None,
- **kwargs: Any,
- ) -> Tuple[str, dict]:
- question_result_key = self.llm_chain.output_key
- result_docs = [
- Document(page_content=r[question_result_key], metadata=docs[i].metadata)
- # 它使用来自文档的元数据,以及来自' results '的文本结果
- for i, r in enumerate(results)
- ]
- length_func = self.combine_document_chain.prompt_length
- num_tokens = length_func(result_docs, **kwargs)
- def _collapse_docs_func(docs: List[Document], **kwargs: Any) -> str:
- return self._collapse_chain.run(
- input_documents=docs, callbacks=callbacks, **kwargs
- )
- while num_tokens is not None and num_tokens > token_max:
- new_result_doc_list = _split_list_of_docs(
- result_docs, length_func, token_max, **kwargs
- )
- result_docs = []
- for docs in new_result_doc_list:
- new_doc = _collapse_docs(docs, _collapse_docs_func, **kwargs)
- result_docs.append(new_doc)
- num_tokens = self.combine_document_chain.prompt_length(
- result_docs, **kwargs
- )
- if self.return_intermediate_steps:
- _results = [r[self.llm_chain.output_key] for r in results]
- extra_return_dict = {"intermediate_steps": _results}
- else:
- extra_return_dict = {}
- output = self.combine_document_chain.run(
- input_documents=result_docs, callbacks=callbacks, **kwargs
- )
- return output, extra_return_dict
- @property
- def _chain_type(self) -> str:
- return "map_reduce_documents_chain"
我们对文档进行切分,把一个大文件分成很多小部分,然后把这小部分再进行合并,这是Map Reduce的方式。在map_reduce.py代码文件中,MapReduceDocumentsChain类用于将多个文档进行处理和合并。该类通过将一个链映射到每个文档上,然后将结果组合起来来实现文档的合并。MapReduceDocumentsChain类包含了一个LLMChain类型的语言模型链,一个BaseCombineDocumentsChain类型的文档合并链,一个可选的BaseCombineDocumentsChain类型的文档折叠链、一个文档变量名,以及一个返回中间结果的布尔值。MapReduceDocumentsChain类的combine_docs方法将语言模型链应用于每个文档,并使用文档合并链将结果组合起来。如果需要,可以使用文档折叠链折叠中间结果。如果结果文档的长度超过了token_max的限制,方法将会递归地将文档拆分、折叠和合并,直到结果文档的长度小于或等于token_max。map_reduce.py文件还实现了一个CombineDocsProtocol接口,用于定义合并文档的方法。该接口定义了一个接受文档列表和其他关键字参数的方法,并返回一个字符串。该方法可以根据需要进行自定义,以适应不同的应用场景。 map_reduce.py代码文件提供了一种方便的方式来将多个文档进行处理和合并。它支持使用不同的语言模型链和文档合并链来适应不同的应用场景,并提供了可选的文档折叠链来折叠中间结果。该类还支持递归拆分、折叠和合并结果文档,以处理长度超过限制的情况。
……..
LangChain组合文档链一共有4种方式,也可以使用自定义的方式,实现自定义子类,BaseCombineDocumentsChain 类继承至ABC类。
base.py的代码实现:
- class BaseCombineDocumentsChain(Chain, ABC):
- """用于组合文档的链的基本接口."""
- input_key: str = "input_documents" #: :meta private:
- output_key: str = "output_text" #: :meta private:
- @property
- def input_keys(self) -> List[str]:
- """期望输入键
- :meta private:
- """
- return [self.input_key]
- @property
- def output_keys(self) -> List[str]:
- """返回输出键
- :meta private:
- """
- return [self.output_key]
- def prompt_length(self, docs: List[Document], **kwargs: Any) -> Optional[int]:
- """ 返回给定传入文档的提示长度。
- 如果方法不依赖于提示符长度,则返回None。
- """
- return None
- @abstractmethod
- def combine_docs(self, docs: List[Document], **kwargs: Any) -> Tuple[str, dict]:
- """将文档组合成单个字符串."""
- @abstractmethod
- async def acombine_docs(
- self, docs: List[Document], **kwargs: Any
- ) -> Tuple[str, dict]:
- """将文档异步地组合成单个字符串."""
- def _call(
- self,
- inputs: Dict[str, List[Document]],
- run_manager: Optional[CallbackManagerForChainRun] = None,
- ) -> Dict[str, str]:
- _run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
- docs = inputs[self.input_key]
- # 其他键被认为是LLM预测所需要的
- other_keys = {k: v for k, v in inputs.items() if k != self.input_key}
- output, extra_return_dict = self.combine_docs(
- docs, callbacks=_run_manager.get_child(), **other_keys
- )
- extra_return_dict[self.output_key] = output
- return extra_return_dict
- async def _acall(
- self,
- inputs: Dict[str, List[Document]],
- run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
- ) -> Dict[str, str]:
- _run_manager = run_manager or AsyncCallbackManagerForChainRun.get_noop_manager()
- docs = inputs[self.input_key]
- # 其他键被认为是LLM预测所需要的
- other_keys = {k: v for k, v in inputs.items() if k != self.input_key}
- output, extra_return_dict = await self.acombine_docs(
- docs, callbacks=_run_manager.get_child(), **other_keys
- )
- extra_return_dict[self.output_key] = output
- return extra_return_dict
base.py代码文件中,定义了一个BaseCombineDocumentsChain类,它是 Chain 和 ABC 的子类,用于通过将多个文档组合成单个字符串来处理文档。
BaseCombineDocumentsChain类具有以下属性:
- input_key: 接收文档输入键的字符串名称。
- output_key:输出的字符串键的字符串名称。
BaseCombineDocumentsChain类具有以下方法:
- prompt_length():接收一个 Document 对象的列表以及关键字参数,并返回所需的提示长度。如果该方法不依赖于提示长度,则返回 None。
- combine_docs():是一个抽象方法,接收一个 Document 对象的列表以及关键字参数,并将其组合成单个字符串。
- acombine_docs():combine_docs() 方法的异步版本。
- _call():该方法接收一个由文档列表组成的字典输入。它调用 combine_docs() 方法来组合文档,并将其输出作为字典返回。
- _acall():_call() 方法的异步版本。
BaseCombineDocumentsChain 类提供了一个接口,用于处理文档并将其组合成单个字符串,提供一个prompt_length方法来确定所需的提示长度,并通combine_docs抽象方法实现文档组合的逻辑。BaseCombineDocumentsChain是一个抽象基类,需要子类实现其抽象方法。
ABC类是一个通用的抽象接口,abc.py的代码实现:
- class ABC(metaclass=ABCMeta):
- """ Helper类,它提供了使用继承创建ABC的标准方法.
- """
- __slots__ = ()
如果做ChatGDP或者大模型的开发,基于历史数据和当前的输入信息,检索出本地的信息,例如Notion里面的数据,为什么还需要跟大模型进行交互?这是一个很核心的问题,也是一个很好的面试题,如果对这个问题能有很好理解的话,通过技术面试的第一轮,不会有太大的问题。
《企业级ChatGPT开发入门实战直播21课》报名课程请联系:
Gavin老师:NLP_Matrix_Space
Sam老师:NLP_ChatGPT_LLM
我们的两本最新书籍年底即将出版:
- 《企业级Transformer&ChatGPT解密:原理、源码及案例》
- 《企业级Transformer&Rasa解密:原理、源码及案例》
《企业级Transformer&ChatGPT解密:原理、源码及案例》本书以Transformer和ChatGPT技术为主线,系统剖析了Transformer架构的理论基础、模型设计与实现,Transformer语言模型GPT与BERT,ChatGPT技术及其开源实现,以及相关应用案例。内容涉及贝叶斯数学、注意力机制、语言模型、最大似然与贝叶斯推理等理论,和Transformer架构设计、GPT、BERT、ChatGPT等模型的实现细节,以及OpenAI API、ChatGPT提示工程、类ChatGPT大模型等应用。第一卷介绍了Transformer的Bayesian Transformer思想、架构设计与源码实现,Transformer语言模型的原理与机制,GPT自回归语言模型和BERT自编码语言模型的设计与实现。第二卷深入解析ChatGPT技术,包括ChatGPT发展历史、基本原理与项目实践,OpenAI API基础与高级应用,ChatGPT提示工程与多功能应用,类ChatGPT开源大模型技术与项目实践。
ChatGPT 技术:从基础应用到进阶实践涵盖了ChatGPT技术和OpenAI API的基础和应用,分为8个章节,从ChatGPT技术概述到类ChatGPT开源大模型技术的进阶项目实践。
1. ChatGPT技术概述:主要介绍了GPT-1、GPT-2、GPT-3、GPT-3.5和GPT-4的发展历程和技术特点,以及ChatGPT技术的基本原理和项目案例实战。
2. OpenAI API基础应用实践:主要介绍了OpenAI API模型及接口概述,以及如何使用OpenAI API进行向量检索和文本生成。
3. OpenAI API进阶应用实践:主要介绍了如何使用OpenAI API基于嵌入式向量检索实现问答系统,如何使用OpenAI API对特定领域模型进行微调。
4. ChatGPT提示工程基础知识:主要介绍了如何构建优质提示的两个关键原则,以及如何迭代快速开发构建优质提示。
5. ChatGPT提示工程实现多功能应用:主要介绍了如何使用ChatGPT提示工程实现概括总结、推断任务、文本转换和扩展功能。
6. ChatGPT提示工程构建聊天机器人:主要介绍了聊天机器人的应用场景,以及如何使用ChatGPT提示工程构建聊天机器人和订餐机器人。
7. 类ChatGPT开源大模型技术概述:主要介绍了类ChatGPT开源大模型的发展历程和技术特点,以及ChatGLM项目案例实践和LMFlow项目案例实践。
8. 类ChatGPT开源大模型进阶项目实践:主要介绍了类ChatGPT开源大模型的进阶项目实践,包括基于LoRA SFT+RM+RAFT技术进行模型微调、基于P-Tuning等技术对特定领域数据进行模型微调、基于LLama Index和Langchain技术的全面实践,以及使用向量检索技术对特定领域数据进行模型微调。
本书适用于NLP工程师、AI研究人员以及对Transformer和ChatGPT技术感兴趣的读者。通过学习,读者能够系统掌握Transformer理论基础,模型设计与训练推理全过程,理解ChatGPT技术内幕,并能运用OpenAI API、ChatGPT提示工程等技术进行项目实践。
Transformer作为目前NLP领域最为主流和成功的神经网络架构,ChatGPT作为Transformer技术在对话系统中的典型应用,本书内容涵盖了该领域的最新进展与技术。通过案例实践,使理论知识变成技能,这也是本书的独特之处。
《企业级Transformer&Rasa解密:原理、源码及案例》:是一本深入介绍Rasa对话机器人框架的实战开发指南。本书分为两卷,第一卷主要介绍基于Transformer的Rasa Internals解密,详细介绍了DIETClassifier和TED在Rasa架构中的实现和源码剖析。第二卷主要介绍Rasa 3.X硬核对话机器人应用开发,介绍了基于Rasa Interactive Learning和ElasticSearch的实战案例,以及通过Rasa Interactive Learning发现和解决对话机器人的Bugs案例实战。
第一卷中介绍了Rasa智能对话机器人中的Retrieval Model和Stateful Computations,解析了Rasa中去掉对话系统的Intent的内幕,深入研究了End2End Learning,讲解了全新一代可伸缩的DAG图架构的内幕,介绍了如何定制Graph NLU及Policies组件,讨论了自定义GraphComponent的内幕,从Python角度分析了GraphComponent接口,详细解释了自定义模型的create和load内幕,并讲述了自定义模型的languages及Packages支持。深入剖析了自定义组件Persistence源码,包括自定义对话机器人组件代码示例分析、Resource源码逐行解析、以及ModelStorage、ModelMetadata等逐行解析等。介绍了自定义组件Registering源码的内幕,包括采用Decorator进行Graph Component注册内幕源码分析、不同NLU和Policies组件Registering源码解析、以及手工实现类似于Rasa注册机制的Python Decorator全流程实现。讨论了自定义组件及常见组件源码的解析,包括自定义Dense Message Featurizer和Sparse Message Featurizer源码解析、Rasa的Tokenizer及WhitespaceTokenizer源码解析、以及CountVectorsFeaturizer及SpacyFeaturizer源码解析。深入剖析了框架核心graph.py源码,包括GraphNode源码逐行解析及Testing分析、GraphModelConfiguration、ExecutionContext、GraphNodeHook源码解析以及GraphComponent源码回顾及其应用源码。
第二卷主要介绍了基于Rasa Interactive Learning和ElasticSearch的实战案例,以及通过Rasa Interactive Learning发现和解决对话机器人的Bugs案例实战。介绍了使用Rasa Interactive Learning来调试nlu和prediction的案例实战,使用Rasa Interactive Learning来发现和解决对话机器人的Bugs案例实战介绍了使用Rasa Interactive Learning透视Rasa Form的NLU和Policies的内部工作机制案例实战,使用ElasticSearch来实现对话机器人的知识库功能,并介绍了相关的源码剖析和最佳实践,介绍了Rasa微服务和ElasticSearch整合中的代码架构分析,使用Rasa Interactive Learning对ConcertBot进行源码、流程及对话过程的内幕解密,介绍了使用Rasa来实现Helpdesk Assistant功能,并介绍了如何使用Debug模式进行Bug调试,使用Rasa Interactive Learning纠正Helpdesk Assistant中的NLU和Prediction错误,逐行解密Domain和Action微服务的源码。
本书适合对Rasa有一定了解的开发人员和研究人员,希望通过本书深入了解Rasa对话机器人的内部工作原理及其源代码实现方式。无论您是想要深入了解Rasa的工作原理还是想要扩展和定制Rasa,本书都将为您提供有价值的参考和指导。
《企业级Transformer&ChatGPT解密:原理、源码及案例》、《企业级Transformer&Rasa解密:原理、源码及案例》,是您深入学习的好选择,年底即将重磅出版,欢迎购买!