chatglm2-2b+sdxl1.0+langchain打造私有AIGC(四)

一、实现目标

第一篇文章就有交代,想要打造一个AIGC,既然都是AI了,那么当然希望人工干预的成分越低越好,所以初步想法就是

1)我输入一些关键字;

2)ChatGLM可以自主识别出我意图;

3)ChatGLM可以自主判断出为了满足我的意图,它需要执行哪些动作

但是ChatGLM只是一个预训练的大语言模型,在执行动作上这个层面上来讲,可行性不大,那么就只有基于ChatGLM做业务层的开发,将单一业务封装成一个一个的工具,通过Prompt预先告诉ChatGLM这些工具的作用,然后让它去判断要使用哪些工具,以及这些工具调用的顺序情况

langchain在这一方面为我们做了很好的封装,但是经过测试,并不完全适合于ChatGLM,网上针对这一块到处的说法都千篇一律,也搞得好像很深奥的样子,其实就是提示词工程

二、Agent概述

2.1 基本概念

Agent主要分为两种:

执行代理:在每一步骤需要执行前,使用所有先前执行的输出来决定下一个执行

计划和执行代理:预先决定完整的操作顺序,然后执行所有操作而不更新计划

Agent里面涉及的其他类包括:

工具:代理可以采取的操作。说直接一点就是具体干活的

这些概念不清楚其实也没关系,主要是明白自己想做什么,明白代理和工具的关系。用户把关键信息告诉Agent,Agent决定使用哪些工具,可能有的朋友会疑惑,不是说ChatGLM决定使用哪些工具的吗?

解惑:Agent里面有一个对象llm,最终决定使用哪些工具是Agent里面的llm(ChatGLM)决定的

 2.2 执行流程及重点类&方法&属性

Agent的大致流程就是这样
Agent的大致流程就是这样

接下来,在清楚流程之后,再重源码上去梳理整个调用关系以及涉及到的类和方法,代码具体的梳理跟踪过程就不一一展示了,主要说一下在整个流程中比较关键的类和方法以及他们的作用。重点,重点,重点

1) AgentExecutor: 这个类就是在实际调用工具方法的类,其中很关键的方法是_call,_take_next_step两个方法,AgentExecutor集成于Chain,_call方法相当于是执行入口,而在_call方法内部调用了_take_next_step方法。_take_next_step方法是核心,这个方法其实就是在循环调用GLM挑选出的工具集合,并返回每个工具执行的结果(记住,这个方法在langchain里面是在最后return的是“每个工具执行的结果集合”)

observation就是工具执行的结果

2)BaseMultiActionAgent:这个类就是一个Agent类,llm就是被包装到这个类里面,其中的关键方法是plan,这个方法就是在挑选工具并做排列工具调用的顺序(在langchain中这是个抽象方法,很明显是拿来被重写的)

3)AgentAction,AgentFinish:这两个类是数据类,并没有方法,主要作用是让AgentExecutor类下的_call方法判断工具是否已经调用完,是否需要返回最终结果,如果是AgentFinish这直接返回最终结果

4)BaseTool:工具类,其中的关键方法是_run,工具的具体执行动作都写在这个方法下,在这个类里面还有一个很重要的属性:return_direct:bool,为True则代表执行完这个方法后直接返回执行结果(后面会利用这个属性做重写)

说明:以上重点类,方法,以及整个执行过程,均为我个人项目里面所涉及到的内容,并不代表langchain的正确使用方式

2.3 源码调用顺序

我们先看客户端是怎样调用代理的

# 1、实例化llm 看不懂这一句的情去看我前几篇文章
llm_helper = ChatGLM_Helper.instance()
# 2、定义工具集合XHS_Note_Tool,Travel_Tools_Base_XC分别是两个不同的工具
tools = [
    XHS_Note_Tool(llm=llm_helper.llm),
    Travel_Tools_Base_XC(llm=llm_helper.llm)]
# 3、实例化Agent类,API_Sequence_Agent是我自己重写的一个类,后面会详细说明
agent = API_Sequence_Agent(tools=tools, llm=llm_helper.llm)
# 4、使用agent和tools实例化出一个AgentExecutor
agent_exec = Sequence_AgentExecutor.from_agent_and_tools(agent=agent,
                                                         tools=tools,
                                                         verbose=True,
                                                         max_iterations=1)
# 5、调用AgentExecutor的run方法直接执行代理,AgentExecutor继承于Chain
# 5、其实是在调用Chain的run方法,而Chain的run方法里面直接返回self(),所以是在调用Chain的__call__
# 5、上篇文章有讲,Chain的__call__其实是在调用子类的_call,也就是AgentExecutor的_call
agent_generate = agent_exec.run(prompt)

从run方法开始现在顺着入口F12跟踪源码查看整个调用链的顺序结构(具体每一步就不做详细介绍了)

在_take_next_step方法中调用plan,让Agent去判断调用哪些工具,以及工具调用的顺序,然后在按顺序调用工具的_run方法获得执行数据

三、langchain框架下打造适配ChatGLM的Agent 

通过上面的执行流程和代码调用流程的了解,我开始打造自己的Agent,langchain本身的源码应该是基于ChatGPT进行打造的Agent,不太适用于ChatGLM,原因就不多说了,所以我需要动手改造的地方如下:

1)定义自己的提示词模板(让LLM判断使用哪些工具的模板)langchain本身是采用边执行边思考的策略,这种策略确实不合适,所以改造

SEQUENCE_EXECUTE_API_TOOLS_PROMPT_TEMPLATE: str = """
    已知工具信息:{intents},你需要根据已知工具信息,自主思考为了满足用户意图,必须从已知工具中按从 
    前到后的逻辑顺序调用哪些工具(某些工具的输出可以作为另一些工具的输入),
    并且你的输出格式必须按照:
    工具调用链:[你认为要调用的工具的名称。如果需要调用多个工具,请用按从前到后的调用顺序输出工具名 
    称,并用逗号分割]
    不能随意更改输出格式

    例如:
    
    用户输入:创作一篇敦煌的游记,并发布到【XXX】平台
    工具调用链:[XXX笔记查询工具,XXX旅行游记生成工具]

    用户输入:'{query}'
    """

2)重写Plan,按我自己的提示词进行计划,按我规定的格式进行输出,然后根据输出得到对应工具信息

import re
from typing import List, Tuple, Any, Union
from langchain.schema import AgentAction, AgentFinish
from langchain.agents import BaseMultiActionAgent
from langchain import LLMChain, PromptTemplate
from langchain.base_language import BaseLanguageModel
from agents.BaseTools.base_tools import functional_Tool
from agents.Config.global_config import SEQUENCE_EXECUTE_API_TOOLS_PROMPT_TEMPLATE

# 创建Controller来识别用户意图以及实现用户意图需要调用的工具集合
class API_Sequence_Agent(BaseMultiActionAgent):
    tools: List[functional_Tool]
    llm: BaseLanguageModel
    intent_template: str = SEQUENCE_EXECUTE_API_TOOLS_PROMPT_TEMPLATE
    prompt = PromptTemplate.from_template(intent_template)
    llm_chain: LLMChain = None

    def get_llm_chain(self):
        if not self.llm_chain:
            self.llm_chain = LLMChain(llm=self.llm, prompt=self.prompt)

    def output_parser(self, text: str):
        if not text:
            raise ValueError("未能获取到需要解析的文本信息")
        # 使用正则表达式匹配方括号中的内容
        matches = re.findall(r'[(.*?)]', text)
        # 将匹配到的内容构造成一个数组
        result_array = []
        if matches:
            result_array = [match.strip() for match in matches[0].split(',')]
        return result_array

    # 根据提示(prompt)选择工具
    def choose_tools(self, query) -> List[str]:
        self.get_llm_chain()
        tool_names = [{tool.name: tool.description} for tool in self.tools]
        resp = self.llm_chain.predict(intents=tool_names, query=query)
        select_tools = self.output_parser(resp)
        return select_tools

    @property
    def input_keys(self):
        return ["input"]

    # 通过 AgentAction 调用选择的工具,工具的输入是 "input"
    def plan(self, intermediate_steps: List[Tuple[AgentAction, str]],
             **kwargs: Any) -> Union[List[AgentAction], AgentFinish]:
        # 单工具调用
        tools = self.choose_tools(kwargs["input"])
        for tool in self.tools:
            if tool.name == tools[-1]:
                tool.return_direct = True
        result: List[Union[AgentAction, AgentFinish]] = []
        for tool in tools:
            result.append(
                AgentAction(tool=tool, tool_input=kwargs["input"], log=""))
        return result

    async def aplan(self, intermediate_steps: List[Tuple[AgentAction, str]],
                    **kwargs: Any) -> Union[List[AgentAction], AgentFinish]:
        raise NotImplementedError("IntentAgent does not support async")

注意plan方法里有一句:

for tool in self.tools:
            if tool.name == tools[-1]:
                tool.return_direct = True

这是很重要的一个细节点,最后一个工具的return_direct 设置为True,在_take_next_step方法调用工具时才知道哪一个工具执行完之后直接返回结果,langchain原本策略是让chatgpt去判断是否可以返回最终结果,但是ChatGLM....,我的策略就是假设ChatGLM在判断工具( tools = self.choose_tools(kwargs["input"])),并整理工具顺序时就已经完全正确,包含了所有工具信息,排序也完全正确

3)重写_take_next_step,这是关键中的关键

from typing import Dict, List, Optional, Tuple, Union
from langchain.agents import AgentExecutor
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.schema.agent import AgentAction, AgentFinish
from langchain.tools.base import BaseTool

class Sequence_AgentExecutor(AgentExecutor):

    @property
    def input_keys(self) -> List[str]:
        """Return the input keys.

        :meta private:
        """
        return self.agent.input_keys

    @property
    def output_keys(self) -> List[str]:
        """Return the singular output key.

        :meta private:
        """
        if self.return_intermediate_steps:
            return self.agent.return_values + ["intermediate_steps"]
        else:
            return self.agent.return_values

    def _take_next_step(
        self,
        name_to_tool_map: Dict[str, BaseTool],
        color_mapping: Dict[str, str],
        inputs: Dict[str, str],
        intermediate_steps: List[Tuple[AgentAction, str]],
        run_manager: Optional[CallbackManagerForChainRun] = None,
    ) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]:

        intermediate_steps = self._prepare_intermediate_steps(
            intermediate_steps)

        output = self.agent.plan(
            intermediate_steps,
            callbacks=run_manager.get_child() if run_manager else None,
            **inputs,
        )

        if isinstance(output, AgentFinish):
            return output
        actions: List[AgentAction]
        if isinstance(output, AgentAction):
            actions = [output]
        else:
            actions = output
        result = []
        for agent_action in actions:
            if run_manager:
                run_manager.on_agent_action(agent_action, color="green")
            if agent_action.tool in name_to_tool_map:
                tool = name_to_tool_map[agent_action.tool]
                return_direct = tool.return_direct
                color = color_mapping[agent_action.tool]
                tool_run_kwargs = self.agent.tool_run_logging_kwargs()
                if return_direct:
                    tool_run_kwargs["llm_prefix"] = ""
                if result is not None and len(result) > 0:
                    a, o = result[-1]
                    observation = tool.run(
                        o,
                        verbose=self.verbose,
                        color=color,
                        callbacks=run_manager.get_child()
                        if run_manager else None,
                        **tool_run_kwargs,
                    )
                else:
                    observation = tool.run(
                        agent_action.tool_input,
                        verbose=self.verbose,
                        color=color,
                        callbacks=run_manager.get_child()
                        if run_manager else None,
                        **tool_run_kwargs,
                    )
                if tool.return_direct is False:
                    result.append((agent_action, observation))
                else:
                    return AgentFinish({"output": observation},
                                       log=observation)
        return result

Sequence_AgentExecutor集成于AgentExecutor,看_take_next_step方法的最后

 if tool.return_direct is False:
                    result.append((agent_action, observation))
                else:
                    return AgentFinish({"output": observation},
                                       log=observation)

如果return_direct 为True就直接返回AgentFinish,AgentExecutor类中的_call在接收到AgentFinish类型的结果后就会认为工具链调用完毕,直接返回结果到客户端。所以在我的策略里面,工具链的最后一个工具的return_direct 属性一定为True,否则就只能等待程序超时或报错

4)封装工具

from typing import Optional
from langchain.tools import BaseTool
from langchain.callbacks.manager import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun)

# 重写抽象方法,"call_func"方法执行 tool
class functional_Tool(BaseTool):
    name: str = ""
    description: str = ""

    def _call_func(self, query, **kwargs):
        raise NotImplementedError("subclass needs to overwrite this method")

    def _run(self,
             query: str,
             run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        return self._call_func(query)

    async def _arun(
        self,
        query: str,
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        raise NotImplementedError("APITool does not support async")

定义一个基础工具类,所有的工具都集成于这个类

接着实现一个自定义工具类

from datetime import datetime
from langchain.schema.language_model import BaseLanguageModel
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from agents.BaseTools.base_tools import functional_Tool
from agents.Config.global_config import TOOLS_NOTE_TOOL_PROMPT_TEMPLATE
from utils.pipeline import Pipeline_Item, Pipeline_Process_Task
from utils.genrate.note_generate_utils_xhs import Note_Generate_Utils_XHS
from utils.genrate.note_generate_utils_xc import Note_Generate_Utils_XC
from utils.genrate.note_generate_utils import Content_Type

class XHS_Note_Tool(functional_Tool):
    llm: BaseLanguageModel
    name = "XXX旅行游记生成工具"
    description = "自动生成相关游记文章,并发布到XXX平台"
    qa_template = TOOLS_NOTE_TOOL_PROMPT_TEMPLATE
    prompt = PromptTemplate(input_variables=["intents", "query"],
                            template=qa_template)
    llm_chain: LLMChain = None
    categories = ["XXX", "XXX", "XXX", "XXX", "XXX", "XXX", "XXX", "XXX", "XXX", "XXX"]

    def parser_output(self, output: str):
        if not output:
            raise ValueError("未能获取到正确的输入信息")
        # 按换行符分割字符串
        lines = output.split('
')
        # 提取意图类别和关键字
        intent_category = lines[0].split(':')[1].strip()
        keywords = lines[1].split(':')[1].strip()
        if "、" in keywords:
            keywords = keywords.split('、')[0].strip()
        if "," in keywords:
            keywords = keywords.split(',')[0].strip()
        return intent_category, keywords

    def _call_func(self, query) -> str:

        if not query:
            raise ValueError("未能获取到正确的用户信息")
        self.get_llm_chain()
        ..........
        # 这个方法里面就是需要自己实现的工具执行代码,至于需不需要llm就根据自己的情况来定
        # name和description是必须要的LLM模型就是根据 name和description在判断是否要调用这个工具
        # 可以对照1)中的提示词模板揣摩一下name和description的作用


    def get_llm_chain(self):
        if not self.llm_chain:
            self.llm_chain = LLMChain(llm=self.llm, prompt=self.prompt)

到此为止,自己的工具基本上已经完成,目前在ChatGLM2-6b上跑得很通畅,工具判断和调用几乎都没问题