【指南】构建人工智能驱动的业务经理

2024年12月25日 由 alex 发表 73 0

想象一下,通过手机上的一个用户友好界面来管理你的整个业务。虽然同时使用多个应用程序是常见做法,但未来在于将所有交互整合到一个基于聊天的平台上,该平台由大型语言模型(LLM)提供支持。


对于小型企业而言,这种方法具有显著优势。通过在统一的聊天界面中集中数据管理任务,企业主可以节省时间、降低复杂性,并减少对不同软件工具的依赖。这样可以更高效地分配资源,从而将更多精力集中在核心业务增长活动上。


然而,这种潜力不仅限于小型企业。本文将详细介绍的概念和技术也适用于个人用例。从管理待办事项、跟踪支出到整理收藏品,基于聊天的界面提供了一种直观且高效的方式来与你的数据进行交互。


本文将指导你完成从初步构思到实际实施的开发过程。基于之前文章介绍的组件,我们将建立应用程序的基础元素,包括:

  • 设置数据库架构
  • 定义核心应用程序功能
  • 构建项目存储库结构
  • 创建能够使用自然语言命令与多个SQL数据库表进行交互的工具


首先,让我们简要回顾一下前一篇文章的关键要点,以便为我们当前的目标设定背景。


回顾

在之前,我们构建了一个原型代理工作流,能够与工具对象进行交互。我们的目标是减少由底层语言模型(在本例中是gpt-3.5-turbo)生成的工具参数中的幻觉。


为实现这一目标,我们实施了两项关键更改:

  1. 移除了工具架构中的必需参数
  2. 在执行所需函数之前添加了参数验证步骤


通过将所有工具参数设置为可选,并手动检查缺失的参数,我们消除了代理/LLM产生幻觉以填补缺失值的倾向。


前一篇文章介绍的关键对象是:

  • OpenAiAgent:主要的代理工作流类
  • Tool:代表代理可以使用的工具的类
  • ToolResult和StepResult:用于封装工具执行结果的类


这些组件构成了我们代理系统的基础,使其能够处理用户请求、选择适当的工具并生成响应。


为什么小型企业数据管理要选择聊天界面

小型企业在数据维护方面往往面临独特的挑战。与大型企业一样,他们需要定期更新和维护各种类型的数据,如会计记录、时间跟踪、发票等。然而,现代ERP(企业资源规划)系统的复杂性和成本可能让小型企业望而却步。因此,许多人选择使用一系列Excel电子表格来捕获和维护关键数据。


这种方法的问题在于,小型企业主通常不专门负责行政任务,他们无法投入大量时间和精力到复杂的行政管理和控制流程中。关键在于定义精简的流程,并在数据产生时及时更新,以最大限度地减少数据管理的开销。


通过利用大型语言模型的力量并创建一个聊天界面,我们旨在简化和优化小型企业的数据管理。聊天机器人将作为一个统一的界面,允许用户使用自然语言命令输入数据、检索信息和执行各种任务。这消除了在多个电子表格之间导航或开发具有多个表单和仪表板的复杂Web应用程序的需求。


在本文章中,我们将逐步增强聊天机器人的功能,添加诸如基于角色的访问控制、高级查询和评估、多模式支持以及与WhatsApp等流行通信平台的集成等功能。到系列结束时,你将拥有一个强大且灵活的工具,可以根据你的特定需求进行定制,无论你是在经营小型企业还是仅仅希望更有效地组织个人生活。


项目结构

为确保项目组织良好且易于维护,我们系统地构建了存储库,以封装不同的功能和组件。以下是存储库结构的概述:


project-root/

├── database/
│ ├── db.py # Database connection and setup
│ ├── models.py # Database models/schemas
| └── utils.py # Database utilities

├── tools/
│ ├── base.py # Base class for tools
│ ├── add.py # Tool for adding data to the database
│ ├── query.py # Tool for querying data from the database
| └── utils.py # Tool utilities

├── agents/
│ ├── base.py # Main AI agent logic
│ ├── routing.py # Specialized agent for routing tasks
│ ├── task.py # Tool wrapper for OpenAI subagents
| └── utils.py # agent utilities

└── utils.py # Utility functions and classes


这种结构使得关注点清晰分离,从而更容易开发、维护和扩展我们的应用程序。


设置数据库

选择正确的数据库和对象关系映射(Object-Relational Mapping,简称ORM)库对我们的应用程序至关重要。对于这个项目,我们选择了以下框架:

  • SQLAlchemy:一个强大的SQL工具包和Python的对象关系映射(ORM)库。它提供了一系列工具,允许使用Python对象和类与数据库进行交互。
  • SQLModel:一个基于SQLAlchemy和Pydantic构建的库,提供了一种简单直观的方式来定义数据库模型和执行数据库操作。


通过利用SQLModel,我们可以与Pydantic和SQLAlchemy无缝集成,从而实现高效的数据验证和数据库操作,同时消除SQL注入攻击的风险。此外,SQLModel允许我们轻松地在之前设计的Tool类上进行构建,该类使用Pydantic模型来创建工具架构。


为了确保应用程序的安全性和稳健性,我们实施了以下措施:

  1. 基于角色的访问控制:可执行操作与用户角色绑定,确保用户只能执行他们被授权执行的操作。这增加了额外的安全层,并防止未经授权的访问敏感数据。
  2. 防止SQL注入攻击:通过利用ChatGPT的自然语言理解能力,我们可以验证和清理用户输入,从而减轻SQL注入漏洞的风险。SQLModel与Pydantic的集成帮助我们强制执行严格的数据验证规则。


既然我们的技术栈已经确定,接下来让我们深入设置数据库并定义模型。


数据库模型

为了开始构建我们的原型应用程序,我们将定义必要的数据库表及其对应的SQLModel定义。在本文中,我们将重点关注三个核心表:

  • 费用(Expense)
  • 收入(Revenue)
  • 客户(Customer)


这些表将作为我们应用程序的基础,使我们能够展示关键功能和交互。


在数据库目录中创建一个名为models.py的新文件,并使用SQLModel定义这些表:


# database\models.py
from typing import Optional  
  
from pydantic import BeforeValidator, model_validator  
from sqlmodel import SQLModel, Field  
from datetime import time, datetime  
from typing_extensions import Annotated 
def validate_date(v):  
    if isinstance(v, datetime):  
        return v  
  
    for f in ["%Y-%m-%d", "%Y-%m-%d %H:%M:%S"]:  
        try:  
            return datetime.strptime(v, f)  
        except ValueError:  
            pass  
  
    raise ValueError("Invalid date format")  
  
  
def numeric_validator(v):  
    if isinstance(v, int):  
        return float(v)  
    elif isinstance(v, float):  
        return v  
    raise ValueError("Value must be a number")  
  
  
DateFormat = Annotated[datetime, BeforeValidator(validate_date)]  
Numeric = Annotated[float, BeforeValidator(numeric_validator)]
  
class Customer(SQLModel, table=True):  
    id: Optional[int] = Field(primary_key=True, default=None)
    company: str
    first_name: str  
    last_name: str  
    phone: str  
    address: str  
    city: str  
    zip: str  
    country: str  
  
  
class Revenue(SQLModel, table=True):  
    id: Optional[int] = Field(primary_key=True, default=None)  
    description: str  
    net_amount: Numeric  
    gross_amount: Numeric  
    tax_rate: Numeric  
    date: DateFormat  
  
  
class Expense(SQLModel, table=True):  
    id: Optional[int] = Field(primary_key=True, default=None)  
    description: str  
    net_amount: Numeric = Field(description="The net amount of the expense")  
    gross_amount: Numeric  
    tax_rate: Numeric  
    date: DateFormat


除了标准的SQLModel字段外,我们还定义了三个自定义类型注解:DateFormat、TimeFormat和Numeric。这些注解利用Pydantic的BeforeValidator来确保输入数据在存储到数据库之前被正确格式化。validate_date函数负责将字符串输入转换为适当的日期时间格式。这种方法使我们能够接受来自大型语言模型的多种日期格式,减少了对提示中严格格式要求的需求。


数据库引擎

定义了模型之后,我们需要一个脚本来设置数据库引擎并创建相应的表。让我们在数据库目录中创建一个db.py文件来处理这个任务:


# database/db.py
from database.models import *  
from sqlmodel import SQLModel, create_engine  
import os  
  
  
# local stored database  
DATABASE_URL = "sqlite:///app.db"  
  
engine = create_engine(DATABASE_URL, echo=True) 
  
  
def create_db_and_tables():  
    SQLModel.metadata.create_all(engine)  
  
  
create_db_and_tables()


在这个脚本中,我们导入了模型以及必要的SQLModel组件。我们定义了DATABASE_URL,指向名为app.db的本地SQLite数据库文件。我们使用SQLModel的create_engine创建了一个引擎,并传入了DATABASE_URL。echo=True参数启用了用于调试的详细输出。


create_db_and_tables函数使用SQLModel.metadata.create_all根据我们定义的模型在数据库中生成相应的表。最后,我们调用这个函数以确保在运行脚本时创建数据库和表。


数据库设置完成后,我们现在可以专注于更新Tool类,以使其与SQLModel无缝协作,并优化工具架构转换过程。


Tool类

在本节中,我们将讨论对Tool类进行的更新,以处理SQLModel实例并改进验证过程。


首先,我们使用Union类型提示将Type[SQLModel]添加为model字段的可能类型。这允许Tool类接受Pydantic的BaseModel和SQLModel的SQLModel作为有效的模型类型。


接下来,我们引入了一个名为exclude_keys的新属性,类型为list[str],默认值为["id"]。此属性的目的是指定在验证过程和OpenAI工具架构生成过程中应排除哪些键。在这种情况下,默认排除的键是id,因为在使用SqlModel创建数据条目时,id会在摄入过程中自动生成。


此外,我们在Tool类中引入了parse_model布尔属性,这样我们基本上可以决定工具函数是与我们的pydantic/SQLModel一起调用,还是与关键字参数一起调用。


在validate_input()方法中,我们添加了一个检查,以确保在验证过程中不考虑exclude_keys中指定的键作为缺失键。这对于像id这样的字段特别有用,因为它们由SQLModel自动生成,并且不应在输入中要求。


同样,在openai_tool_schema属性中,我们添加了一个循环,以从生成的架构中删除排除的键。这确保了排除的键不会包含在发送到OpenAI API的架构中。为了回顾,我们使用openai_tool_schema属性从我们的工具架构中删除必需的参数。这样做是为了消除语言模型的幻觉。


此外,我们将导入从from pydantic.v1 import BaseModel更改为from pydantic import BaseModel。由于SQLModel基于Pydantic v2,我们希望保持一致,并在此时使用Pydantic v2。


以下是Tool类的更新代码:


# tools/base.py
from typing import Type, Callable, Union
from tools.convert import convert_to_openai_tool
from pydantic import BaseModel, ConfigDict
from sqlmodel import SQLModel

class ToolResult(BaseModel):
    content: str
    success: bool

class Tool(BaseModel):
    name: str
    model: Union[Type[BaseModel], Type[SQLModel], None]
    function: Callable
    validate_missing: bool = True
    parse_model: bool = False
    exclude_keys: list[str] = ["id"]
    model_config = ConfigDict(arbitrary_types_allowed=True)
    def run(self, **kwargs) -> ToolResult:
        if self.validate_missing and model is not None:
            missing_values = self.validate_input(**kwargs)
            if missing_values:
                content = f"Missing values: {', '.join(missing_values)}"
                return ToolResult(content=content, success=False)
        if self.parse_model:
            if hasattr(self.model, "model_validate"):
                input_ = self.model.model_validate(kwargs)
            else:
                input_ = self.model(**kwargs)
            result = self.function(input_)
        else:
            result = self.function(**kwargs)
        return ToolResult(content=str(result), success=True)
    def validate_input(self, **kwargs):
        if not self.validate_missing or not self.model:
            return []
        model_keys = set(self.model.__annotations__.keys()) - set(self.exclude_keys)
        input_keys = set(kwargs.keys())
        missing_values = model_keys - input_keys
        return list(missing_values)
    @property
    def openai_tool_schema(self):
        schema = convert_to_openai_tool(self.model)
        # set function name
        schema["function"]["name"] = self.name
        # remove required field
        if schema["function"]["parameters"].get("required"):
            del schema["function"]["parameters"]["required"]
        # remove exclude keys
        if self.exclude_keys:
            for key in self.exclude_keys:
                if key in schema["function"]["parameters"]["properties"]:
                    del schema["function"]["parameters"]["properties"][key]
        return schema


对Tool类的这些更新为我们在使用SQLModel实例时提供了对验证过程和架构生成的更多灵活性和控制。


自定义工具架构转换

在我们的Tool类中,我们使用Langchain的convert_to_openai_tool函数从Pydantic模型创建架构。然而,这个函数是基于Pydantic v1的,而SQLModel使用的是Pydantic v2。为了使转换函数兼容,我们需要对其进行调整。让我们创建一个名为convert.py的新脚本:


# tools/convert.py
from langchain_core.utils.function_calling import _rm_titles
from typing import Type, Optional
from langchain_core.utils.json_schema import dereference_refs
from pydantic import BaseModel
def convert_to_openai_tool(
        model: Type[BaseModel],
        *,
        name: Optional[str] = None,
        description: Optional[str] = None,
) -> dict:
    """Converts a Pydantic model to a function description for the OpenAI API."""
    function = convert_pydantic_to_openai_function(
        model, name=name, description=description
    )
    return {"type": "function", "function": function}
def convert_pydantic_to_openai_function(
        model: Type[BaseModel],
        *,
        name: Optional[str] = None,
        description: Optional[str] = None,
        rm_titles: bool = True,
) -> dict:
    """Converts a Pydantic model to a function description for the OpenAI API."""
    model_schema = model.model_json_schema() if hasattr(model, "model_json_schema") else model.schema()
    
    schema = dereference_refs(model_schema)
    schema.pop("definitions", None)
    title = schema.pop("title", "")
    default_description = schema.pop("description", "")
    return {
        "name": name or title,
        "description": description or default_description,
        "parameters": _rm_titles(schema) if rm_titles else schema,
    }


这个经过调整的转换函数处理了Pydantic v1和v2之间的差异,确保我们的Tool类可以为OpenAI API生成兼容的架构。


接下来,更新tools/base.py中的导入语句,以使用新的convert_to_openai_tool函数:


# tools/base.py
from typing import Type, Callable, Union
from tools.convert import convert_to_openai_tool
from pydantic import BaseModel
from sqlmodel import SQLModel
#...rest of the code ...


随着这些更改的到位,我们的Tool类现在能够处理SQLModel实例,并生成与OpenAI API兼容的架构。


通过调整工具架构转换过程,我们确保了我们的应用程序可以与SQLModel和Pydantic v2无缝协作,使我们能够在保持与OpenAI API兼容的同时,利用这些库的优势。


定义SQL工具

在本节中,我们将创建函数和工具,以便使用SQL与我们的数据库表进行交互。


添加数据工具

首先,让我们定义一个通用的函数add_row_to_table,它接受一个SQLModel实例并将其添加到对应的表中:


# tools/add.py
from sqlmodel import SQLModel, Session, select
def add_row_to_table(model_instance: SQLModel):  
    with Session(engine) as session:  
        session.add(model_instance)  
        session.commit()  
        session.refresh(model_instance)  
    return f"Successfully added {model_instance} to the table"


接下来,我们将创建一个特定于模型的函数add_expense_to_table,它接受一个Expense条目的输入参数,并将其添加到表中:


# tools/add.py
# ...
def add_expense_to_table(**kwargs):
    model_instance = Expense.model_validate(kwargs)
    return add_row_to_table(model_instance)


在add_expense_to_table函数中,我们使用model_validate()方法来触发之前定义的BeforeValidator的执行,并确保数据验证。


为了避免为每个表或SQLModel编写单独的函数,我们可以动态生成这些函数:


# example usage
def add_entry_to_table(sql_model: Type[SQLModel]):  
    # return a Callable that takes a SQLModel instance and adds it to the table  
    return lambda **data: add_row_to_table(model_instance=sql_model.model_validate(data))
    
add_expense_to_table = add_entry_to_table(Expense)


这种方法可以产生相同的结果,并且可以用于为所有其他模型动态生成函数。


有了这些函数之后,我们就可以使用我们的Tool类来创建工具,通过OpenAIAgent向我们的数据库表中添加条目:


add_expense_tool = Tool(
 name="add_expense_tool",
 description="useful for adding expenses to database",
 function=add_entry_to_table(Expense),
 model=Expense,
 validate_missing=True
)
add_revenue_tool = Tool(
 name="add_revenue_tool",
 description="useful for adding revenue to database",
 function=add_entry_to_table(Revenue),
 model=Revenue,
 validate_missing=True
)


查询工具

虽然由于输入架构的不同,我们需要为每个表创建一个add_xxx_tool,但对于查询所有表,我们只需要一个查询工具。为了消除SQL注入的风险,我们将使用SQLAlchemy和SQLModel提供的SQL清理功能。这意味着我们将通过标准的Python类和对象来查询数据库,而不是直接解析SQL语句。


对于我们要在表上执行的查询,我们需要以下逻辑:

  1. select语句 -> SELECT * FROM table_name 参数:列名、表名
  2. where语句 -> WHERE column_name = value 参数:列名、操作符、值


在SQLModel中,当我们想要在Expense表中找到所有与咖啡相关的费用时,这对应于以下经过清理的代码:


result = database.execute(
  select(Expense).where(Expense.description == "Coffee")
)


将其抽象为pydantic模型,


# tools/query.py
from typing import Union, Literal
from pydantic import BaseModel
class WhereStatement(BaseModel):  
    column: str  
    operator: Literal["eq", "gt", "lt", "gte", "lte", "ne", "ct"]  
    value: str  
  
class QueryConfig(BaseModel):  
    table_name: str  
    columns: list[str]  
    where: list[Union[WhereStatement, None]]


QueryConfig模型允许我们设置表名(table_name)、列(columns)和where语句。where属性接受一个WhereStatement模型的列表或一个空列表(当我们想要返回所有值而不进行进一步过滤时)。WhereStatement是一个子模型,定义了列、操作符和值。Literal类型用于将允许的操作符限制为预定义的一组。


接下来,我们定义一个函数,该函数基于QueryConfig执行查询:


# tools/query.py
# ...
from database.models import Expense, Revenue, Customer
TABLES = {
    "expense": Expense,
    "revenue": Revenue,
    "customer": Customer
}


def query_data_function(**kwargs) -> ToolResult:  
    """Query the database via natural language."""
    query_config = QueryConfig.model_validate(kwargs)
    
    if query_config.table_name not in TABLES:  
        return ToolResult(content=f"Table name {query_config.table_name} not found in database models", success=False)  
  
    sql_model = TABLES[query_config.table_name]  
  
    # query_config = validate_query_config(query_config, sql_model)  
    data = sql_query_from_config(query_config, sql_model)  
  
    return ToolResult(content=f"Query results: {data}", success=True)  
    
  
def sql_query_from_config(  
        query_config: QueryConfig,  
        sql_model: Type[SQLModel]):  
  
    with Session(engine) as session:  
        selection = []  
        for column in query_config.select_columns:  
            if column not in sql_model.__annotations__:  
                return f"Column {column} not found in model {sql_model.__name__}"  
            selection.append(getattr(sql_model, column))  
        
        statement = select(*selection)  
        wheres = query_config.where  
        if wheres:  
            for where in wheres:  
  
                if where.column not in sql_model.__annotations__:  # noqa  
                    return (f"Column {where['column']} not found "
                       "in model {sql_model.__name__}")
  
                elif where.operator == "eq":  
                    statement = statement.where(
                     getattr(sql_model, where.column) == where.value)  
                elif where.operator == "gt":  
                    statement = statement.where(
                     getattr(sql_model, where.column) > where.value)  
                elif where.operator == "lt":  
                    statement = statement.where(
                     getattr(sql_model, where.column) < where.value)  
                elif where.operator == "gte":  
                    statement = statement.where(
                     getattr(sql_model, where.column) >= where.value)  
                elif where.operator == "lte":  
                    statement = statement.where(
                     getattr(sql_model, where.column) <= where.value)  
                elif where.operator == "ne":  
                    statement = statement.where(
                     getattr(sql_model, where.column) != where.value)  
                elif where.operator == "ct":  
                    statement = statement.where(
                     getattr(sql_model, where.column).contains(where.value))  
  
        result = session.exec(statement)  
        data = result.all()  
        try:  
            data = [repr(d) for d in data]  
        except:  
            pass  
    return data


query_data_function是一个高级抽象函数,用于从TABLES字典中选择我们的表模型,而sql_query_from_config是在表上(SQLModel)执行QueryConfig的底层函数。


在QueryConfig中,你还可以选择将table_names定义为Literal类型,将可用的表名硬编码到其中。你甚至可以使用我们的TABLES字典动态定义Literal。这样做可以减少table_name的错误参数。目前我选择不使用枚举对象,因为我会为代理提示提供关于当前可用表和它们底层ORM架构的上下文。我计划为我们的未来代理添加一个工具,以便它能够自己创建新表。虽然我可以动态更改代理的提示,但在我们的运行服务器上更改QueryConfig中的枚举对象并不简单。


最后,我们可以定义我们的查询工具:


query_data_tool = Tool(  
    name="query_data_tool",  
    description = "useful to perform queries on a database table",
    model=QueryConfig,  
    function=query_data_function, 
)


有了这些工具,我们的OpenAIAgent现在就能够使用自然语言命令向数据库表中添加和查询数据了。


配置代理

为了使我们先前定义的工具能够成功使用,上一篇文章中的代理将需要更多的上下文信息,特别是为了使用查询工具。代理提示将需要包含有关可用表及其架构的信息。由于我们目前只使用两个表,因此可以在系统提示或用户提示中包含ORM架构和表名。这两种选项都可能效果很好,但我更喜欢在用户提示中包含此类可变信息。这样做,我们可以创建一些少量样本示例,来展示上下文感知的工具使用方法。


为了使我们的代理能够处理系统提示和用户提示中的可变上下文,我们可以如下更新我们的代理类:


import colorama  
from colorama import Fore  
from openai import OpenAI  
from pydantic import BaseModel  
from tools.base import Tool, ToolResult  
from agents.utils import parse_function_args, run_tool_from_response  
  
  
class StepResult(BaseModel):  
    event: str  
    content: str  
    success: bool  
  
  
SYSTEM_MESSAGE = """You are tasked with completing specific objectives and must report the outcomes. At your disposal, you have a variety of tools, each specialized in performing a distinct type of task.  
  
For successful task completion:  
Thought: Consider the task at hand and determine which tool is best suited based on its capabilities and the nature of the work. If you can complete the task or answer a question, soley by the information provided you can use the report_tool directly.  
  
Use the report_tool with an instruction detailing the results of your work or to answer a user question.  
If you encounter an issue and cannot complete the task:  
  
Use the report_tool to communicate the challenge or reason for the task's incompletion.  
You will receive feedback based on the outcomes of each tool's task execution or explanations for any tasks that couldn't be completed. This feedback loop is crucial for addressing and resolving any issues by strategically deploying the available tools.  
  
Return only one tool call at a time.  
  
{context}  
"""

class OpenAIAgent:  
  
    def __init__(  
            self,  
            tools: list[Tool],  
            client: OpenAI = OpenAI(),  
            system_message: str = SYSTEM_MESSAGE,  
            model_name: str = "gpt-3.5-turbo-0125",  
            max_steps: int = 5,  
            verbose: bool = True,  
            examples: list[dict] = None,  
            context: str = None,  
            user_context: str = None  
    ):  
        self.tools = tools  
        self.client = client  
        self.model_name = model_name  
        self.system_message = system_message  
        self.step_history = []  
        self.max_steps = max_steps  
        self.verbose = verbose  
        self.examples = examples or []  
        self.context = context or ""  
        self.user_context = user_context  
  
    def to_console(self, tag: str, message: str, color: str = "green"):  
        if self.verbose:  
            color_prefix = Fore.__dict__[color.upper()]  
            print(color_prefix + f"{tag}: {message}{colorama.Style.RESET_ALL}")  
  
    def run(self, user_input: str, context: str = None):  
  
        openai_tools = [tool.openai_tool_schema for tool in self.tools]  
        system_message = self.system_message.format(context=context)  
  
        if self.user_context:  
            context = f"{self.user_context}\n{context}" if context else self.user_context  
  
        if context:  
            user_input = f"{context}\n---\n\nUser Message: {user_input}"  
  
        self.to_console("START", f"Starting Agent with Input:\n'''{user_input}'''")  
  
  
        self.step_history = [  
            {"role": "system", "content": system_message},  
            *self.examples,  
            {"role": "user", "content": user_input}  
        ]  
  
        step_result = None  
        i = 0  
  
        while i < self.max_steps:  
            step_result = self.run_step(self.step_history, openai_tools)  
            if step_result.event == "finish":  
                break  
            elif step_result.event == "error":  
                self.to_console(step_result.event, step_result.content, "red")  
            else:  
                self.to_console(step_result.event, step_result.content, "yellow")  
  
            i += 1  
  
        self.to_console("Final Result", step_result.content, "green")  
  
        return step_result.content  
  
    def run_step(self, messages: list[dict], tools):  
  
        # plan the next step  
        response = self.client.chat.completions.create(  
            model=self.model_name,  
            messages=messages,  
            tools=tools  
        )  
        # check for multiple tool calls  
        if response.choices[0].message.tool_calls and len(response.choices[0].message.tool_calls) > 1:  
            messages = [  
                *self.step_history,  
                {"role": "user", "content": "Error: Please return only one tool call at a time."}  
            ]  
            return self.run_step(messages, tools)  
  
        # add message to history  
        self.step_history.append(response.choices[0].message)  
        # check if tool call is present  
        if not response.choices[0].message.tool_calls:  
            msg = response.choices[0].message.content  
            step_result = StepResult(event="Error", content=f"No tool calls were returned.\nMessage: {msg}", success=False)  
            return step_result  
  
        tool_name = response.choices[0].message.tool_calls[0].function.name  
        tool_kwargs = parse_function_args(response)  
  
        # execute the tool call  
        self.to_console("Tool Call", f"Name: {tool_name}\nArgs: {tool_kwargs}", "magenta")  
        tool_result = run_tool_from_response(response, tools=self.tools)  
        tool_result_msg = self.tool_call_message(response, tool_result)  
        self.step_history.append(tool_result_msg)  
  
        if tool_name == "report_tool":  
            try:  
                step_result = StepResult(  
                    event="finish",  
                    content=tool_result.content,  
                    success=True  
                )  
            except:  
                print(tool_result)  
                raise ValueError("Report Tool failed to run.")  
  
            return step_result  
  
        elif tool_result.success:  
            step_result = StepResult(  
                event="tool_result",  
                content=tool_result.content,  
                success=True)  
        else:  
            step_result = StepResult(  
                event="error",  
                content=tool_result.content,  
                success=False  
            )  
  
        return step_result  
  
    def tool_call_message(self, response, tool_result: ToolResult):  
        tool_call = response.choices[0].message.tool_calls[0]  
        return {  
            "tool_call_id": tool_call.id,  
            "role": "tool",  
            "name": tool_call.function.name,  
            "content": tool_result.content,  
        }


与我们之前的版本相比,主要变化如下:

  • 我们在默认的系统提示中放置了一个“{context}”占位符。
  • 我们向__init__()方法中添加了context和user_context作为输入参数。
  • 我们在run()方法中添加了context。
  • 在run()中,如果定义了context,我们会将其添加到用户消息中。
  • 我们还向__init__()方法中添加了一个examples属性,如果设置了该属性,它将在run()方法中的系统消息和用户消息之间传递。


现在,我们可以在初始化代理时定义系统上下文和用户上下文。此外,在调用run方法时,我们还可以传递一个用户上下文。如果向run方法传递了上下文,那么在该次运行中,它将覆盖初始化时设置的user_context。


向代理提供上下文

在我们可以运行代理之前,让我们定义一个函数来生成上下文信息。我们希望自动生成user_context,然后可以将其作为上文实现的那样传递给代理的run函数。为了保持简单,我们希望每张表都有一行作为上下文信息,应该包括:

  • 表名
  • 列名:<类型>


经过几次尝试和错误之后,以下函数可以完成这项工作:


# utils.py
from typing import Type  
import types  
import typing  
  
import sqlalchemy  
from pydantic import BaseModel
def orm_model_to_string(input_model_cls: Type[BaseModel]):  
    """Get the ORM model string from the input model"""  
  
    def process_field(key, value):  
        if key.startswith("__"):  
            return None  
        if isinstance(value, typing._GenericAlias):  
            if value.__origin__ == sqlalchemy.orm.base.Mapped:  
                return None  
            if isinstance(value, typing._AnnotatedAlias):  # noqa  
                return key, value.__origin__  
            elif isinstance(value, typing._UnionGenericAlias) or isinstance(value, types.UnionType):  
                return key, value.__args__[0]  
        return key, value  
  
    fields = dict(filter(None, (process_field(k, v) for k, v in input_model_cls.__annotations__.items())))  
    return ", ".join([f"{k} = <{v.__name__}>" for k, v in fields.items()])
def generate_context(*table_models) -> str:
   context_str = "You can access the following tables in database:\n"
   for table in table_models:
    context_str += f" - {table.__name__}: {orm_model_to_string(table)}\n" 
   return context_str


如果我们向generate_context()传递Expense和Revenue,我们应该得到以下上下文字符串:


我们希望代理能够知道当前的日期和星期几,这样我们就可以引用正确的日期。所以,让我们在我们的utils类中添加一些日期解析函数:


# utils.py
from datetime import datetime
#... rest of utils.py ...
def weekday_by_date(date: datetime):
    days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    return days[date.weekday()]

def date_to_string(date: datetime):
    return f"{weekday_by_date(date)} {parse_date(date)}"

def parse_date(date: datetime):
    return date.strftime("%Y-%m-%d")


现在,让我们为查询代理创建上下文。


# utils.py
# ...
def generate_query_context(*table_models) -> str:
    today = f"Today is {date_to_string(datetime.now())}"
    context_str = "You can access the following tables in database:\n"
    for table in table_models:
        context_str += f" - {table.__name__}: {orm_model_to_string(table)}\n" 
    return f"{today}\n{context_str}"


from database.models import Expense, Revenue
print(generate_query_context(Expense, Revenue))


Today is Sunday 2024-04-21
You can access the following tables in database:
 - Expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
 - Revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>


路由代理

随着我们添加更多的工具,设置的复杂性可能会开始限制像“gpt-3.5-turbo”这样较便宜模型的可用性。在下一篇文章中,我们可能会考虑切换到Anthropic Claude,因为他们新发布的工具使用API功能看起来很有前景,即使是更实惠的HAIKU模型,也能同时处理多个工具。然而,现在,我们将继续使用OpenAI的GPT模型。


在开发个人用途的应用程序以及创建生产就绪的应用程序之前,我发现优化较小模型(如本例中的gpt-3.5-turbo)的工作流程很有用。这种方法迫使我们创建简化的处理逻辑和提示系统。虽然如果不使用最强大的模型,我们可能无法达到100%的可靠性,但我们能够发现缺陷并识别出不明确的指令。如果你的应用程序在较小模型的情况下有90%的成功率,那么你将拥有一个生产就绪的逻辑,它在更强大的模型下表现会更好。


为了使gpt-3.5-turbo能够可靠地处理多种工具,我们将实现一个路由代理,其唯一目的是将用户查询路由到适当的任务代理。这使我们能够分离执行逻辑并降低复杂性。每个代理都将具有有限的范围,使我们能够在未来分离访问角色和操作。我观察到,即使是gpt-4,也存在代理不知道其任务何时完成的情况。


通过引入路由代理,我们可以将问题分解为更小、更易管理的部分。路由代理将负责理解用户的意图并将查询指向相关的任务代理。这种方法不仅简化了各个代理的职责,还使系统更具模块化且更易于维护。


此外,分离执行逻辑和复杂性将为未来实现基于角色的访问控制铺平道路。每个任务代理都可以被赋予特定的权限和访问级别,确保敏感操作仅由授权代理执行。


虽然路由代理在过程中增加了一个额外步骤,但它最终构建了一个更稳健且可扩展的系统。通过优化较小模型并专注于清晰、简洁的提示,我们可以为将来切换到更强大的模型(如Claude Opus或GPT-4)奠定坚实的基础,并且在这些模型下表现会更好。


现在,让我们来看看路由代理的实现。


# agents/routing.py
from openai import OpenAI
import colorama
from agents.task_agent import TaskAgent
from agents.utils import parse_function_args

SYSTEM_MESSAGE = """You are a helpful assistant.
Role: You are an AI Assistant designed to serve as the primary point of contact for users interacting through a chat interface. 
Your primary role is to understand users' requests related to database operations and route these requests to the appropriate tool.
Capabilities: 
You have access to a variety of tools designed for Create, Read operations on a set of predefined tables in a database. 
Tables:
{table_names}
"""
NOTES = """Important Notes:
Always confirm the completion of the requested operation with the user.
Maintain user privacy and data security throughout the interaction.
If a request is ambiguous or lacks specific details, ask follow-up questions to clarify the user's needs."""



class RoutingAgent:
    def __init__(
            self,
            tools: list[TaskAgent] = None,
            client: OpenAI = OpenAI(),
            system_message: str = SYSTEM_MESSAGE,
            model_name: str = "gpt-3.5-turbo-0125",
            max_steps: int = 5,
            verbose: bool = True,
            prompt_extra: dict = None,
            examples: list[dict] = None,
            context: str = None
    ):
        self.tools = tools or ROUTING_AGENTS
        self.client = client
        self.model_name = model_name
        self.system_message = system_message
        self.memory = []
        self.step_history = []
        self.max_steps = max_steps
        self.verbose = verbose
        self.prompt_extra = prompt_extra or PROMPT_EXTRA
        self.examples = self.load_examples(examples)
        self.context = context or ""
    def load_examples(self, examples: list[dict] = None):
        examples = examples or []
        for agent in self.tools:
            examples.extend(agent.routing_example)
        return examples
    def run(self, user_input: str, employee_id: int = None, **kwargs):
        context = create_routing_agent_context(employee_id)
        if context:
            user_input_with_context = f"{context}\n---\n\nUser Message: {user_input}"
        else:
            user_input_with_context = user_input
        self.to_console("START", f"Starting Task Agent with Input:\n'''{user_input_with_context}'''")
        partial_variables = {**self.prompt_extra, "context": context}
        system_message = self.system_message.format(**partial_variables)
        messages = [
            {"role": "system", "content": system_message},
            *self.examples,
            {"role": "user", "content": user_input}
        ]
        tools = [tool.openai_tool_schema for tool in self.tools]
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=messages,
            tools=tools
        )
        self.step_history.append(response.choices[0].message)
        self.to_console("RESPONSE", response.choices[0].message.content, color="blue")
        tool_kwargs = parse_function_args(response)
        tool_name = response.choices[0].message.tool_calls[0].function.name
        self.to_console("Tool Name", tool_name)
        self.to_console("Tool Args", tool_kwargs)
        agent = self.prepare_agent(tool_name, tool_kwargs)
        return agent.run(user_input)
    def prepare_agent(self, tool_name, tool_kwargs):
        for agent in self.tools:
            if agent.name == tool_name:
                input_kwargs = agent.arg_model.model_validate(tool_kwargs)
                return agent.load_agent(**input_kwargs.dict())
        raise ValueError(f"Agent {tool_name} not found")
    def to_console(self, tag: str, message: str, color: str = "green"):
        if self.verbose:
            color_prefix = colorama.Fore.__dict__[color.upper()]
            print(color_prefix + f"{tag}: {message}{colorama.Style.RESET_ALL}")


与我们之前的OpenAIAgent相比,最大的不同在于:

  • 无开放循环:我们希望路由代理能够将用户的查询路由到适当的代理。因此,我们不是创建一个开放循环,而是通过工具调用选择所需的代理,并将用户查询传递给它。路由代理不应执行任何其他任务或提出后续问题。
  • 代理即工具:路由代理不是调用一个工具,而是设置一个子代理。因此,我们之前定义的OpenAIAgent现在成为了路由代理内的一个工具。


代理即工具 —— 任务代理

为了将我们的OpenAIAgent用作工具,我们需要引入一种专门为代理设计的工具类。我们希望为每个代理定义名称和描述,并自动化初始化过程。因此,在本教程中,我们定义了最后一个类——TaskAgent。


TaskAgent类与Tool类具有相似的功能。我们为每个代理定义了一个名称、一个描述以及一个我们称之为arg_model的输入模型。


from typing import Type, Callable, Optional
from agents.base import OpenAIAgent
from tools.base import Tool
from tools.report_tool import report_tool
from pydantic import BaseModel, ConfigDict, Field
from tools.utils import convert_to_openai_tool

SYSTEM_MESSAGE = """You are tasked with completing specific objectives and must report the outcomes. At your disposal, you have a variety of tools, each specialized in performing a distinct type of task.
For successful task completion:
Thought: Consider the task at hand and determine which tool is best suited based on its capabilities and the nature of the work. 
If you can complete the task or answer a question, soley by the information provided you can use the report_tool directly.
Use the report_tool with an instruction detailing the results of your work or to answer a user question.
If you encounter an issue and cannot complete the task:
Use the report_tool to communicate the challenge or reason for the task's incompletion.
You will receive feedback based on the outcomes of each tool's task execution or explanations for any tasks that couldn't be completed. This feedback loop is crucial for addressing and resolving any issues by strategically deploying the available tools.
On error: If information are missing consider if you can deduce or calculate the missing information and repeat the tool call with more arguments.
Use the information provided by the user to deduct the correct tool arguments.
Before using a tool think about the arguments and explain each input argument used in the tool. 
Return only one tool call at a time! Explain your thoughts!
{context}
"""

class EmptyArgModel(BaseModel):
    pass

class TaskAgent(BaseModel):
    name: str
    description: str
    arg_model: Type[BaseModel] = EmptyArgModel
    create_context: Callable = None
    create_user_context: Callable = None
    tool_loader: Callable = None
    system_message: str = SYSTEM_MESSAGE
    tools: list[Tool]
    examples: list[dict] = None
    routing_example: list[dict] = Field(default_factory=list)
    model_config = ConfigDict(arbitrary_types_allowed=True)
    def load_agent(self, **kwargs) -> OpenAIAgent:
        input_kwargs = self.arg_model(**kwargs)
        kwargs = input_kwargs.dict()
        context = self.create_context(**kwargs) if self.create_context else None
        user_context = self.create_user_context(**kwargs) if self.create_user_context else None
        if self.tool_loader:
            self.tools.extend(self.tool_loader(**kwargs))
        if report_tool not in self.tools:
            self.tools.append(report_tool)
        return OpenAIAgent(
            tools=self.tools,
            context=context,
            user_context=user_context,
            system_message=self.system_message,
            examples=self.examples,
        )
    @property
    def openai_tool_schema(self):
        return convert_to_openai_tool(self.arg_model, name=self.name, description=self.description)


此外,我们还在TaskAgent类中添加了所有相关属性,这些属性是底层专用OpenAIAgent所需的:

  • create_context / create_user_context:在这里,我们可以传递一个函数来创建上下文或用户上下文,如5.1节所述。
  • tool_loader是另一个可调用的函数,我们可能需要在设置底层代理时使用它。正如我们之前解释的动态工具构建一样,我们可能需要基于用户输入/路由代理输入动态构建的工具。
  • system_message是代理的系统提示。在我们的示例中,它将是每个代理的默认系统提示,但也可以为每个专用代理优化版本。
  • tools:代理应使用的预定义工具。
  • examples:要包含在子代理消息历史中的示例。
  • routing_example:要包含在路由代理消息历史中的示例。


此外,我们还有一个名为EmptyArgModel的空基模型,它是TaskAgent中的默认arg_model。


2


运行代理

现在,是时候测试我们的路由和子代理是否能很好地协同工作了。由于我们引入了示例作为参数,我们可以进行多次测试运行来检查执行中的主要缺陷,并为每个子代理定义示例用法。


首先,让我们定义我们的子代理:


from database.models import Expense, Revenue, Customer
from agents.task import TaskAgent
from utils import generate_query_context
from tools.base import Tool
from tools.query import query_data_tool
from tools.add import add_entry_to_table

query_task_agent = TaskAgent(
    name="query_agent",
    description="An agent that can perform queries on multiple data sources",
    create_user_context=lambda: generate_query_context(Expense, Revenue, Customer),
    tools=[query_data_tool]
)
add_expense_agent = TaskAgent(
    name="add_expense_agent",
    description="An agent that can add an expense to the database",
    create_user_context=lambda: generate_query_context(Expense) + "\nRemarks: The tax rate is 0.19. The user provide the net amount you need to calculate the gross amount.",
    tools=[
        Tool(
            name="add_expense",
            description="Add an expense to the database",
            function=add_entry_to_table(Expense),
            model=Expense
        )
    ]
)
add_revenue_agent = TaskAgent(
    name="add_revenue_agent",
    description="An agent that can add a revenue entry to the database",
    create_user_context=lambda: generate_query_context(Revenue) + "\nRemarks: The tax rate is 0.19. The user provide the gross_amount you should use the tax rate to calculate the net_amount.",
    tools=[
        Tool(
            name="add_revenue",
            description="Add a revenue entry to the database",
            function=add_entry_to_table(Revenue),
            model=Revenue
        )
    ]
)
add_customer_agent = TaskAgent(
    name="add_customer_agent",
    description="An agent that can add a customer to the database",
    create_user_context=lambda: generate_query_context(Customer),
    tools=[
        Tool(
            name="add_customer",
            description="Add a customer to the database",
            function=add_entry_to_table(Customer),
            model=Customer
        )
    ]
)


如你所见,我们在为收入和支出代理创建用户上下文(create_user_context)时添加了一些备注字符串。我们希望子代理能够处理税率,并自动计算净额或总额,以测试子代理的推理能力。


from agents.routing import RoutingAgent
routing_agent = RoutingAgent(
    tools=[
        query_task_agent,
        add_expense_agent,
        add_revenue_agent,
        add_customer_agent
    ]
)

routing_agent.run("I have spent 5 € on a office stuff. Last Thursday")
START: Starting Routing Agent with Input:
I have spent 5 € on a office stuff. Last Thursday
Tool Name: add_expense_agent
Tool Args: {}
START: Starting Task Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
 - expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
Remarks: The tax rate is 0.19. The user provide the net amount you need to calculate the gross amount.
---
User Message: I have spent 5 € on a office stuff. Last Thursday"""
Tool Call: Name: add_expense
Args: {'description': 'office stuff', 'net_amount': 5, 'tax_rate': 0.19, 'date': '2024-04-18'}
Message: None
error: Missing values: gross_amount
Tool Call: Name: add_expense
Args: {'description': 'office stuff', 'net_amount': 5, 'tax_rate': 0.19, 'date': '2024-04-18', 'gross_amount': 5.95}
Message: None
tool_result: Successfully added net_amount=5.0 id=2 gross_amount=5.95 description='office stuff' date=datetime.datetime(2024, 4, 18, 0, 0) tax_rate=0.19 to the table
Error: No tool calls were returned.
Message: I have successfully added the expense for office stuff with a net amount of 5€, calculated the gross amount, and recorded it in the database.
Tool Call: Name: report_tool
Args: {'report': 'Expense for office stuff with a net amount of 5€ has been successfully added. Gross amount calculated as 5.95€.'}
Message: None
Final Result: Expense for office stuff with a net amount of 5€ has been successfully added. Gross amount calculated as 5.95€.


现在让我们添加一个“收入:”


routing_agent.run("Two weeks ago on Saturday we had a revenue of 1000 € in the shop")


START: Starting Routing Agent with Input:
Two weeks ago on Saturday we had a revenue of 1000 € in the shop
Tool Name: add_revenue_agent
Tool Args: {}
START: Starting Task Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
 - revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
Remarks: The tax rate is 0.19. The user provide the gross_amount you should use the tax rate to calculate the net_amount.
---
User Message: Two weeks ago on Saturday we had a revenue of 1000 € in the shop"""
Tool Call: Name: add_revenue
Args: {'description': 'Revenue from the shop', 'gross_amount': 1000, 'tax_rate': 0.19, 'date': '2024-04-06'}
Message: None
error: Missing values: net_amount
Tool Call: Name: add_revenue
Args: {'description': 'Revenue from the shop', 'gross_amount': 1000, 'tax_rate': 0.19, 'date': '2024-04-06', 'net_amount': 840.34}
Message: None
tool_result: Successfully added net_amount=840.34 gross_amount=1000.0 tax_rate=0.19 description='Revenue from the shop' id=1 date=datetime.datetime(2024, 4, 6, 0, 0) to the table
Error: No tool calls were returned.
Message: The revenue entry for the shop on April 6, 2024, with a gross amount of 1000€ has been successfully added to the database. The calculated net amount after applying the tax rate is 840.34€.
Tool Call: Name: report_tool
Args: {'report': 'completed'}
Message: None
Final Result: completed


并且,作为最后一个测试,让我们尝试查询从数据库中创建的收入:


routing_agent.run("How much revenue did we made this month?")


START: Starting Routing Agent with Input:
How much revenue did we made this month?
Tool Name: query_agent
Tool Args: {}
START: Starting Agent with Input:
"""Today is Sunday 2024-04-21
You can access the following tables in database:
 - expense: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
 - revenue: id = <int>, description = <str>, net_amount = <float>, gross_amount = <float>, tax_rate = <float>, date = <datetime>
 - customer: id = <int>, company_name = <str>, first_name = <str>, last_name = <str>, phone = <str>, address = <str>, city = <str>, zip = <str>, country = <str>
---
User Message: How much revenue did we made this month?"""
Tool Call: Name: query_data_tool
Args: {'table_name': 'revenue', 'select_columns': ['gross_amount'], 'where': [{'column': 'date', 'operator': 'gte', 'value': '2024-04-01'}, {'column': 'date', 'operator': 'lte', 'value': '2024-04-30'}]}
Message: None
tool_result: content="Query results: ['1000.0']" success=True
Error: No tool calls were returned.
Message: The revenue made this month is $1000.00.
Tool Call: Name: report_tool
Args: {'report': 'The revenue made this month is $1000.00.'}
Message: None
Final Result: The revenue made this month is $1000.00.


所有工具都按预期工作。路由代理运行得非常好。对于任务代理,我不得不多次更新提示。


当不使用像gpt-4这样的最先进模型时,我建议为每个任务代理添加一些示例工具调用。总的来说,我建议通过示例和更直观的设计来解决缺陷,而不是通过提示工程。反复出现的缺陷表明设计不够直接。例如,当代理在计算总额或净额时遇到困难时,只需添加一个“calculate_gross_amount_tool”或“calculate_net_amount_tool”。另一方面,GPT-4会毫不犹豫地处理这样的用例。


结论

在本文中,我们在使用大型语言模型为小型企业创建全面的基于聊天的界面的旅程中迈出了重要的一步。


通过设置数据库模式、定义核心功能和构建项目仓库结构,我们为应用程序的开发奠定了坚实的基础。


我们首先使用SQLModel设计了数据库模型,这使我们能够无缝地与Pydantic和SQLAlchemy集成。这种方法确保了高效的数据验证和数据库操作,同时最大限度地降低了SQL注入攻击的风险。


然后,我们更新了工具类以处理SQLModel实例并改进验证过程。接下来,我们实现了SQL工具,用于向数据库表中添加数据并使用自然语言命令查询数据。通过利用SQLModel和Pydantic的强大功能,我们创建了一个强大且灵活的系统,该系统可以处理广泛的用户输入并生成准确的SQL查询。


我们通过更新代理类以处理系统提示和用户提示中的可变上下文,配置了OpenAIAgent以提供上下文感知的工具使用。这使我们的代理能够理解可用的表及其模式,从而实现更准确、更高效的工具使用。虽然我们已经取得了显著的进展,但还有更多需要探索和实现的内容。


为了进一步完善我们的聊天机器人,我们引入了TaskAgent类,该类与Tool类具有相似的功能。TaskAgent允许我们为每个代理定义名称、描述和输入模型,从而自动化初始化过程。


最后,我们通过定义用于查询数据、添加支出和添加收入的子代理来测试我们的路由和子代理。我们展示了代理如何处理税率并自动计算净额或总额,从而展示了我们子代理的推理能力。



文章来源:https://towardsdatascience.com/building-an-ai-powered-business-manager-e2a31a2fe984
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消