基于Crew AI、FastAPI和Streamlit的多代理协作聊天机器人框架

2025年03月31日 由 alex 发表 4593 0

介绍

在当今快节奏的世界中,企业需要动态且智能的聊天机器人系统,以处理复杂的交互和多样化的工作流程。虽然大多数聊天机器人可以处理简单的问答或任务自动化,但构建一个可以与多个代理无缝协作以实现细微目标的聊天机器人则完全是另一回事。


在本文中,我将指导你使用Crew AI、FastAPI 和 Streamlit构建Agentic 聊天机器人框架。该框架旨在优化查询、处理反馈、生成结构化响应,并可选择将票证创建作为附加功能进行管理。


动机

随着对情境感知聊天机器人和自动化工作流程的需求不断增长,我们需要能够:

  1. 处理模糊和不明确的用户输入。
  2. 阐明意图但不要让用户感到不知所措。
  3. 提供准确的答复和清晰的解释。
  4. 提供可选的票证创建作为扩展工作流程的一部分。


其核心思想是构建一个模块化、代理式的聊天机器人,利用多个专门的代理进行智能、高效的协作。


架构概述


2


我们的代理聊天机器人框架由以下组件组成:

  1. 主管代理:负责查询的细化和澄清。
  2. 验证代理:确保检索到的响应的准确性和相关性。
  3. 摘要代理:提供简洁、结构化的响应。
  4. 反馈代理:分析用户反馈并检测意图和满意度水平。
  5. 票务代理(插件) :当触发票务创建时生成结构化摘要。


框架如何运作

1. 用户查询处理

当用户提交查询时,它首先由主管代理进行处理,主管代理决定该查询是否需要澄清或细化。

  • 如果查询不明确,代理会返回一个澄清问题。
  • 如果查询定义明确,则会继续进行策略检索。


2. 查询细化和澄清

主管代理利用矢量搜索和相似性匹配来确保用户的查询是有结构的并且具有上下文意义。

  • 此步骤对于确保知识库响应的相关性至关重要。


3. 策略检索和验证

然后,验证代理会验证响应,以确保其准确回答了用户的问题。此步骤可确保输出高质量结果,并消除无关政策带来的干扰。


4. 答复总结

为了使响应对用户友好且易于阅读,摘要代理提供了清晰简洁的输出。

  • 目标是保留基本信息并使其易于理解。


5. 反馈与互动

然后反馈代理确定:

  • 用户是否对答案满意。
  • 答复是否需要进一步完善。
  • 如果问题可能需要创建票证,例如用于支持或服务台任务。


6. 可选票证创建

如果反馈代理识别出需要票证,它会要求用户确认。

  • 如果用户同意,票务代理将根据整个对话生成结构化摘要。
  • 票证可以属于任何类别(IT,服务台,一般支持),从而使系统具有高度灵活性。


技术堆栈

  • Crew AI:多智能体协作框架。
  • FastAPI:用于查询处理和代理通信的后端。
  • Streamlit:为最终用户提供交互式 UI。
  • 向量数据库:知识库数据的有效存储和检索。


为什么选择 CrewAI

在构建复杂的聊天机器人系统时,必须超越简单的问答交互,并实现一个可以处理上下文响应、任务委派和多智能体协作的框架。这就是Crew AI发挥作用的地方。


Crew AI是一个功能强大的框架,旨在实现协作式多智能体架构。与遵循单一逻辑流程的传统聊天机器人不同,Crew AI 允许我们:

  • 在专门代理(如主管、汇总和反馈代理)之间划分任务。
  • 与 LLM 无缝集成,以提高响应准确性和上下文理解。
  • 根据任务要求,有效地协调多个代理并行或顺序工作。


实施演练

1. 设置矢量数据库

要求:请确保所有东西都已正确安装


我建议在安装库之前创建一个虚拟环境,并确保你使用的 Python 版本低于 3.13(我使用的是 3.11.5)


python3 -m venv menv


在继续创建数据库和后端之前,需要安装以下库


pip install fastapi uvicorn streamlit requests openai chromadb python-dotenv crewai


确保你拥有 openai 密钥,我建议将其保存在 .env 文件中。接下来,我们将创建一个 vector_db.py,我们将在其中编写索引数据的代码


如果你有一个根文件夹它应该看起来像下面这样


3


我将使用以下示例数据进行索引


# Unified Knowledge Base (Single Dictionary)
KNOWLEDGE_BASE = {
    "leave policy": "Employees are entitled to 20 annual leaves per year. Unused leaves cannot be carried over. Sick leave requires a medical certificate if taken for more than 2 consecutive days.",
    "maternity leave": "Female employees are entitled to 26 weeks of paid maternity leave. Additional unpaid leave can be requested up to 16 weeks.",
    "paternity leave": "Male employees can avail up to 2 weeks of paid paternity leave.",
    "salary increments": "Annual salary increments are performance-based and reviewed every April. Employees with outstanding performance may receive additional bonuses.",
    "promotion criteria": "Promotions are based on performance reviews, leadership potential, and business needs. Employees can apply for internal job postings after 1 year in their current role.",
    "remote work policy": "Employees can work remotely up to 3 days a week. Fully remote positions require management approval.",
    "overtime policy": "Employees working beyond 40 hours per week are eligible for overtime pay or compensatory time off, subject to approval.",
    "health benefits": "Company provides full medical insurance to employees and dependents, covering hospitalization, consultation, and medications.",
    "retirement plan": "Employees are enrolled in a company-sponsored retirement plan with a 5% employer contribution match.",
    "password reset": "To reset your password, visit the IT portal and click 'Forgot Password'. If locked out, contact IT Support.",
    "vpn issue": "Ensure your VPN software is updated. If issues persist, restart your computer and reconnect.",
    "email access issue": "If you cannot access your email, reset your password via the email portal. If issues persist, check Outlook settings.",
    "software installation": "Submit a request through the IT Helpdesk for software installation. Approval from your manager may be required.",
    "printer not working": "Ensure the printer is powered on and connected. If issues persist, reinstall the drivers or contact IT support.",
    "incident reporting": "Employees must report security breaches within 24 hours to the IT Security Team.",
    "firewall rules": "Strict firewall rules are enforced to block unauthorized access to company systems.",
}


最终的 vector_db.py 应如下所示:


import os
from openai import OpenAI
import chromadb
from chromadb.utils import embedding_functions
import logging
import re
from typing import List
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

#Initialize Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

#Load OpenAI Credentials
api_key = os.getenv("OPENAI_API_KEY")
# Initialize OpenAI client
client = OpenAI(api_key=api_key)
# Initialize ChromaDB with Persistence
DB_PATH = "./vector_db"
chroma_client = chromadb.PersistentClient(path=DB_PATH)
# Define Custom ChromaDB Embedding Function
class OpenAIEmbeddingFunction:
    """Custom embedding function class for ChromaDB compatible with OpenAI."""
    def __call__(self, input: List[str]) -> List[List[float]]:
        if isinstance(input, str):
            input = [input]
        try:
            response = client.embeddings.create(
                input=input,
                model="text-embedding-ada-002"
            )
            return [item.embedding for item in response.data]
        except Exception as e:
            logger.error(f"Error generating embeddings: {str(e)}")
            return [[0.0] * 1536] * len(input)
# Use the Custom Embedding Function
embedding_function = OpenAIEmbeddingFunction()
# Define Unified Collection in ChromaDB
collection = chroma_client.get_or_create_collection(name="company_policies", embedding_function=embedding_function)
# Unified Knowledge Base (Single Dictionary)
KNOWLEDGE_BASE = {
    "leave policy": "Employees are entitled to 20 annual leaves per year. Unused leaves cannot be carried over. Sick leave requires a medical certificate if taken for more than 2 consecutive days.",
    "maternity leave": "Female employees are entitled to 26 weeks of paid maternity leave. Additional unpaid leave can be requested up to 16 weeks.",
    "paternity leave": "Male employees can avail up to 2 weeks of paid paternity leave.",
    "salary increments": "Annual salary increments are performance-based and reviewed every April. Employees with outstanding performance may receive additional bonuses.",
    "promotion criteria": "Promotions are based on performance reviews, leadership potential, and business needs. Employees can apply for internal job postings after 1 year in their current role.",
    "remote work policy": "Employees can work remotely up to 3 days a week. Fully remote positions require management approval.",
    "overtime policy": "Employees working beyond 40 hours per week are eligible for overtime pay or compensatory time off, subject to approval.",
    "health benefits": "Company provides full medical insurance to employees and dependents, covering hospitalization, consultation, and medications.",
    "retirement plan": "Employees are enrolled in a company-sponsored retirement plan with a 5% employer contribution match.",
    "password reset": "To reset your password, visit the IT portal and click 'Forgot Password'. If locked out, contact IT Support.",
    "vpn issue": "Ensure your VPN software is updated. If issues persist, restart your computer and reconnect.",
    "email access issue": "If you cannot access your email, reset your password via the email portal. If issues persist, check Outlook settings.",
    "software installation": "Submit a request through the IT Helpdesk for software installation. Approval from your manager may be required.",
    "printer not working": "Ensure the printer is powered on and connected. If issues persist, reinstall the drivers or contact IT support.",
    "incident reporting": "Employees must report security breaches within 24 hours to the IT Security Team.",
    "firewall rules": "Strict firewall rules are enforced to block unauthorized access to company systems.",
}
# Generate OpenAI Embeddings for Text
def get_embedding(text: str):
    """Fetch OpenAI embeddings with error handling."""
    if not text or not isinstance(text, str):  #Prevent invalid input
        logger.warning(f"Skipping embedding: Invalid text input - {text}")
        return None
    
    try:
        response = client.embeddings.create(
            model="text-embedding-ada-002",
            input=[text]  #Ensure input is always a list
        )
        return response.data[0].embedding
    except Exception as e:
        logger.error(f"Error generating embedding for '{text}': {str(e)}")
        return None


# Function to Insert Policies into ChromaDB
def insert_policies():
    """Store all policies in a single ChromaDB collection."""
    logger.info("\nIndexing Policies into Unified Knowledge Base...")
    for key, value in KNOWLEDGE_BASE.items():
        embedding = get_embedding(value)
        if embedding is None:
            logger.warning(f"Skipping policy {key} due to embedding failure.")
            continue
        try:
            collection.add(
                ids=[key],
                embeddings=[embedding],
                metadatas=[{"policy": value}]
            )
            logger.info(f"Stored Policy: {key} → {value[:50]}...")
        except Exception as e:
            logger.error(f"Error inserting policy {key}: {str(e)}")

# Function to Query Policies
def query_policies(query: str) -> str:
    """Query VectorDB for policy information and summarize the best match."""
    
    logger.info(f"Searching Unified Knowledge Base for query: {query}")
    # Generate embedding for query
    query_embedding = get_embedding(query)
    if query_embedding is None:
        logger.warning(f"Embedding failed for query: {query}. Skipping search.")
        return "Error generating query embedding."
    # Step 1: Vector Search
    results = collection.query(query_embeddings=[query_embedding], n_results=3)
    # Check if valid results exist
    if results and results.get("metadatas", [[]])[0]:
        logger.info(f"? Vector search results for query: {query}")
        filtered_policies = []
        for i, metadata in enumerate(results["metadatas"][0]):
            similarity = results.get("distances", [[]])[0][i]
            policy_text = metadata.get("policy", "")
            # Adjust similarity threshold
            if similarity < 0.5:  
                filtered_policies.append(policy_text)
                logger.info(f"   Match {i+1}: {policy_text[:50]}... (Score: {similarity})")
            else:
                logger.info(f"   Ignored Weak Match {i+1}: {policy_text[:50]}... (Score: {similarity})")
        if not filtered_policies:
            return "No relevant policy found."
        # Summarize if multiple results found
        if len(filtered_policies) == 1:
            return filtered_policies[0]
        else:
            summary = "\n\n".join([f"{policy}" for policy in filtered_policies])
            return f"I found multiple relevant policies:\n\n{summary}\n\nWould you like to refine your query?"
    return "No relevant policy found."

# Run This to Index Policies
if __name__ == "__main__":
    insert_policies()
    logger.info("All policies have been indexed successfully!")

    # TESTING QUERIES
    queries = [
        "What is my leave policy?",
        "How do I reset my password?",
        "What are the firewall security rules?",
        "Tell me about maternity leave benefits.",
        "I need help with VPN issues."
    ]
    for test_query in queries:
        response = query_policies(test_query)
        logger.info(f"Query: {test_query}\n Response: {response}\n")


现在要索引数据并创建矢量数据库,你必须运行如下所示的 python 文件:


(menv) suryateja@Suryas-MacBook-Air chat_agents % python vector_db.py


如果成功,你的输出应该如下所示


4


2. 使用 crewai 创建代理并使用 FastAPI 设置后端

现在我们已经创建了数据库,我们将在根文件夹中创建 bot.py,并使用以下代码为我们的项目创建机器人,并使用代理运行游戏


import logging
import json
import os
from fastapi import FastAPI
from pydantic import BaseModel
from crewai import Agent, Task, Crew, LLM
from openai import OpenAI
import vector_db 
import time
import asyncio
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
#Load OpenAI Credentials
api_key = os.getenv("OPENAI_API_KEY")



#Initialize Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
#Initialize FastAPI
app = FastAPI(title="Evabot AI Agentic Framework", version="3.2")
#File for storing tickets (simulating a backend)
TICKET_FILE = "tickets.json"
#Initialize app state for storing recent queries and clarification attempts
if not hasattr(app.state, "recent_queries"):
    app.state.recent_queries = {}  # Stores last 5 queries per session
if not hasattr(app.state, "clarification_attempts"):
    app.state.clarification_attempts = {}  # Track clarification attempts per session


#Initialize OpenAI client
client = OpenAI(api_key=api_key)

#Define LLM Instance
llm = LLM(
    model="gpt-4o",
    api_key=api_key
)

#Define FastAPI Request Models
class QueryRequest(BaseModel):
    query: str
    session_id: str = "default_session"  # Used to track multiple queries from the same user
class TicketRequest(BaseModel):
    issue_summary: str
    issue_category: str
#Supervisor Agent (Query Refinement)**
supervisor_agent = Agent(
    role="Supervisor Agent",
    goal="Ensure the user's query is specific and well-defined.",
    backstory="You help refine vague queries to ensure the best possible response.",
    llm=llm
)
supervisor_task = Task(
    description="You refine vague queries before retrieval. "
                "Rephrase the query into a more specific form or ask ONE targeted clarifying question. "
                "Return either:\n"
                "- A refined query\n"
                "- A single clarification question (if needed)\n"
                "- The same query (if already clear)\n\n"
                "Recent queries for context:\n{recent_queries}",
    agent=supervisor_agent,
    expected_output="A refined query or a specific clarification question."
)
#Policy Retrieval Agent**
policy_retrieval_agent = Agent(
    role="Policy Retrieval Agent",
    goal="Retrieve the most relevant policy from the unified knowledge base.",
    backstory="You search company policies using semantic search.",
    llm=llm
)
policy_retrieval_task = Task(
    description="Search the company's **unified knowledge base** for the best-matching policy. "
                "If no good match is found, suggest alternative searches or ask for clarification.",
    agent=policy_retrieval_agent,
    expected_output="A relevant policy response."
)
#Validation Agent**
validation_agent = Agent(
    role="Validation Agent",
    goal="Ensure retrieved policies are accurate and clear.",
    backstory="You verify if the retrieved policies fully answer the query.",
    llm=llm
)
validation_task = Task(
    description="Check if the retrieved policies are relevant to the user's query. "
                "- If they fully answer the question, return them as-is.\n"
                "- If they are **partially relevant**, summarize the key points.\n"
                "- If they **do not match**, suggest refining the query.\n\n"
                "**User Query:** {query}\n"
                "**Retrieved Policies:** {retrieved_policies}\n\n"
                "**Expected Output:** A validated, relevant policy response.",
    agent=validation_agent,
    expected_output="A validated response."
)
#Summarization Agent**
summarization_agent = Agent(
    role="Summarization Agent",
    goal="Ensure the final response is concise and structured.",
    backstory="You refine validated responses into clear, well-structured answers.",
    llm=llm
)
summarization_task = Task(
    description="Summarize the validated response into a **concise, structured format**. "
                "- Do **not** change policy details.\n"
                "- Summarize **only** if the response is too long.\n"
                "- If already clear, return as-is.\n\n"
                "**Validated Response:** {validated_response}\n"
                "**Expected Output:** A clear, structured policy response.",
    agent=summarization_agent,
    expected_output="A well-structured final response."
)




#Feedback Agent (Handling User Feedback & IT Issues)**
feedback_agent = Agent(
    role="Feedback Agent",
    goal="Analyze user feedback, determine intent, and detect IT issues while prompting ticket creation when necessary.",
    backstory="You analyze user responses and classify their intent (satisfied, refinement needed, new query, IT issue, or general feedback). If an IT issue is detected, confirm with the user if they want to create a support ticket.",
    llm=llm
)
feedback_task = Task(
    description="Analyze the following user response and determine the intent. "
                "Always check for IT-related issues **first** before considering other intents."
                "\n\n**Intent Prioritization:**"
                "\n1**IT Issue - Prompt Ticket**: If the response mentions an IT problem (e.g., password reset, VPN issue, system failure), classify it as an IT issue and prompt ticket creation."
                "\n2**Satisfied**: If no IT issue is found, check if the user is happy with the answer."
                "\n3**Refine**: If no IT issue is found, determine if the user wants to refine or expand the answer."
                "\n4**New Question**: If the user is asking about a different topic, classify it accordingly."
                "\n5**General Feedback**: If the response is general feedback, classify it as such."
                
                "\n\n**User Response:** {user_response}"
                "\n**Previous Response:** {previous_response}"
                
                "\n\n**Important:** If the user mentions an IT-related issue (e.g., password reset, VPN, account lockout), classify it as an IT issue **before** considering other intents. "
                "Always prioritize IT detection over satisfaction or other intents.",
    agent=feedback_agent,
    expected_output="One of: 'IT Issue - Prompt Ticket', 'Satisfied', 'Refine', 'New Question', or 'General Feedback'."
)




#Ticket Agent**
ticket_agent = Agent(
    role="Ticket Agent",
    goal="Create an IT support ticket if the issue is unresolved.",
    backstory="You generate a structured ticket summary when users require further IT assistance.",
    llm=llm
)
ticket_task = Task(
    description="If the user indicates the issue is unresolved, generate a **structured IT support ticket summary** "
                "using the user's latest query **and** recent conversation context.\n\n"
                "Use the following inputs to ensure a well-structured summary:\n"
                "- **User's Latest Query:** {query}\n"
                "- **Recent Queries for Context:** {recent_queries}\n\n"
                "**Expected Output:** A structured IT support ticket summary that captures the issue comprehensively.\n\n"
                "Then, ask the user to **select a category** before creating the ticket.\n\n"
                "Expected categories: ['Network Issue', 'Password Reset', 'Software Installation', 'Hardware Problem'].",
    agent=ticket_agent,
    expected_output="A structured ticket summary, followed by category selection."
)

#API Endpoint: Handle User Query
@app.post("/query")
async def handle_query(request: QueryRequest):
    """Handles user queries by refining vague queries before retrieval."""
    start_time_total = time.time()
    logger.info(f"Received query: {request.query} (Session: {request.session_id})")
    #Ensure session storage for recent queries and clarifications
    app.state.recent_queries.setdefault(request.session_id, []).append(request.query)
    app.state.recent_queries[request.session_id] = app.state.recent_queries[request.session_id][-5:]
    if not hasattr(app.state, "clarification_attempts"):
        app.state.clarification_attempts = {}  # Store pending clarifications
    if not hasattr(app.state, "pending_refinement"):
        app.state.pending_refinement = {}  # Store user responses that require refinement
    #Handle User Clarifications
    if request.session_id in app.state.clarification_attempts:
        prev_clarification = app.state.clarification_attempts.pop(request.session_id)
        logger.info(f"Processing clarification response: {request.query} (Previous: {prev_clarification})")
        #Use Supervisor Agent to structure the final query meaningfully
        query_rewrite_crew = Crew(agents=[supervisor_agent], tasks=[
            Task(
                description="Rephrase the following query into a structured, self-contained search query. "
                            "Ensure it is meaningful and provides complete context."
                            "\n\nOriginal Query: {prev_query}"
                            "\nUser Clarification: {clarification}",
                agent=supervisor_agent,
                expected_output="A final, context-aware search query."
            )
        ])
        refined_query = await asyncio.to_thread(query_rewrite_crew.kickoff, inputs={
            "prev_query": prev_clarification,
            "clarification": request.query
        })
    else:
        #Supervisor Agent – Initial Query Refinement
        supervisor_crew = Crew(agents=[supervisor_agent], tasks=[supervisor_task])
        refined_query = await asyncio.to_thread(supervisor_crew.kickoff, inputs={
            "query": request.query,
            "recent_queries": "\n".join(app.state.recent_queries[request.session_id])
        })
    #Convert CrewOutput to String
    refined_query_str = str(refined_query).strip()
    #Detect if Supervisor Returned a Clarification Question
    if refined_query_str.endswith("?"):
        logger.info(f"❓ Clarification Needed: {refined_query_str}")
        app.state.clarification_attempts[request.session_id] = request.query  # Store original question
        return {"response": refined_query_str, "clarification_needed": True}
    #Retrieve Policies (Proceed Only If No Clarification Needed)
    policy_response = vector_db.query_policies(refined_query_str)
    #Validate Policies
    validation_crew = Crew(agents=[validation_agent], tasks=[validation_task])
    validated_response = await asyncio.to_thread(validation_crew.kickoff, inputs={
        "query": refined_query_str,
        "retrieved_policies": policy_response
    })
    #Convert CrewOutput to a String
    validated_response_str = str(validated_response).strip()
    #Summarize Response
    summarization_crew = Crew(agents=[summarization_agent], tasks=[summarization_task])
    summarization_result = await asyncio.to_thread(summarization_crew.kickoff, inputs={
        "validated_response": validated_response_str 
    })
    #Extract clean response
    if isinstance(summarization_result, dict) and "tasks_output" in summarization_result:
        final_response = summarization_result["tasks_output"][0].get("raw", "").strip()
    else:
        final_response = str(summarization_result).strip()
    logger.info(f"Final Clean Response: {final_response}")
    # Trigger Feedback Agent**
    logger.info(f"? Triggering Feedback Agent for user response analysis.")
    feedback_crew = Crew(agents=[feedback_agent], tasks=[feedback_task])
    feedback_result = await asyncio.to_thread(feedback_crew.kickoff, inputs={
        "user_response": final_response,
        "previous_response": validated_response_str
    })
    feedback_str = str(feedback_result).strip()
    logger.info(f"? Feedback Agent Analysis Result: {feedback_str}")
    # Handle Feedback Response (Including IT Issues)**
    if "IT Issue - Prompt Ticket" in feedback_str:
        logger.info("? IT-related issue detected by Feedback Agent. Asking user if they want to create a ticket.")
        return {
            "response": final_response,  # how final response normally
            "prompt_ticket": True  # sk user for confirmation in Streamlit
        }
    elif "Satisfied" in feedback_str:
        follow_up_response = "Glad I could help! ? Let me know if you have any other questions."
    elif "Refine" in feedback_str:
        app.state.pending_refinement[request.session_id] = final_response
        follow_up_response = "Please provide more details so I can refine my answer. ✏️"
    elif "New Question" in feedback_str:
        follow_up_response = "Sure! What other policy would you like to ask about? ?"
    else:
        follow_up_response = "Thank you for your feedback! Let me know if I can assist you further. ?"
    total_time = time.time() - start_time_total
    logger.info(f"⏱️ **Total Query Processing Time: {total_time:.2f} seconds**")
    return {"response": f"{final_response}\n\n? *{follow_up_response}*"}


# API Endpoint: Generate Ticket Summary
@app.post("/generate_ticket_summary")
async def generate_ticket_summary(request: QueryRequest):
    """Generates a structured ticket summary using the Ticket Agent."""
    logger.info(" Generating structured summary for IT support ticket.")
    #**Trigger Ticket Agent for Structured Summary**
    ticket_crew = Crew(agents=[ticket_agent], tasks=[ticket_task])
    ticket_summary_result = await asyncio.to_thread(ticket_crew.kickoff, inputs={
        "query": request.query,  # User's original query
        "recent_queries": "\n".join(app.state.recent_queries.get(request.session_id, [])) if app.state.recent_queries.get(request.session_id) else "No recent queries available."
    })
    # Convert Ticket Agent Output to String
    ticket_summary = str(ticket_summary_result).strip()
    logger.info(f"Structured Ticket Summary: {ticket_summary}")
    return {"ticket_summary": ticket_summary}


#  API Endpoint: Create Ticket
@app.post("/create_ticket")
async def create_ticket(request: TicketRequest):
    """Creates an IT support ticket."""
    logger.info(f"Creating IT support ticket.")
    ticket = {
        "ticket_id": f"TCK-{len(load_tickets()) + 1:04d}",
        "issue_summary": request.issue_summary,
        "issue_category": request.issue_category,
        "status": "Open"
    }
    save_ticket(ticket)
    return {
        "response": f" Ticket Created: {ticket['ticket_id']} in category *{ticket['issue_category']}*.",
        "ticket": ticket
    }


def load_tickets():
    """Load existing tickets from a JSON file."""
    if os.path.exists(TICKET_FILE):
        with open(TICKET_FILE, "r") as file:
            return json.load(file)
    return []
def save_ticket(ticket):
    """Save a new ticket to the JSON file."""
    tickets = load_tickets()
    tickets.append(ticket)
    with open(TICKET_FILE, "w") as file:
        json.dump(tickets, file, indent=4)


关键组件

使用 FastAPI 处理用户查询


@app.post("/query")
async def handle_query(request: QueryRequest):
    """Handles user queries by refining vague queries before retrieval."""
    start_time_total = time.time()
    logger.info(f"Received query: {request.query} (Session: {request.session_id})")


此代码使用FastAPI定义一个异步 API 端点来处理用户查询。装饰器指定此端点响应 HTTP POST 请求。@app.post("/query")

  • 该函数记录传入的查询并存储开始时间以便稍后计算处理持续时间。
  • 此设置允许高效的查询处理,同时提供实时日志监控。


使用 Supervisor Agent 进行查询细化


supervisor_crew = Crew(agents=[supervisor_agent], tasks=[supervisor_task])
refined_query = await asyncio.to_thread(supervisor_crew.kickoff, inputs={
    "query": request.query,
    "recent_queries": "\n".join(app.state.recent_queries[request.session_id])
})


该模块使用Supervisor Agent来优化用户的查询并使其更加结构化和更具上下文感知能力。

  • Crew AI 框架利用异步supervisor_agent启动细化任务。
  • 该功能收集最近的查询以提供上下文,使得细化查询更加准确和相关。
  • 在异步线程中运行代理可确保应用程序保持响应,即使在密集处理期间也是如此。


策略检索和验证


policy_response = vector_db.query_policies(refined_query_str)
validation_crew = Crew(agents=[validation_agent], tasks=[validation_task])
validated_response = await asyncio.to_thread(validation_crew.kickoff, inputs={
    "query": refined_query_str,
    "retrieved_policies": policy_response
})


此代码块执行两个关键步骤:


1. 策略检索:

  • 利用向量数据库,根据细化后的查询字符串检索相关策略。
  • 这确保系统提取上下文相关的数据而不是返回不相关或嘈杂的响应。


2. 响应验证:

  • 使用验证代理来验证检索到的策略的准确性和相关性。
  • 此步骤确保仅向用户传递有意义且准确的信息。


总结回应


summarization_crew = Crew(agents=[summarization_agent], tasks=[summarization_task])
summarization_result = await asyncio.to_thread(summarization_crew.kickoff, inputs={
    "validated_response": validated_response_str
})


响应经过验证后,将被传递给摘要代理进行简洁的格式化。

  • 此步骤使输出易于理解,而不会丢失重要信息。
  • 异步运行可以确保响应的延迟最小。


反馈处理和票务


if "IT Issue - Prompt Ticket" in feedback_str:
    logger.info("? IT-related issue detected by Feedback Agent. Asking user if they want to create a ticket.")
    return {
        "response": final_response,
        "prompt_ticket": True
    }


反馈代理分析用户的响应以确定下一步的操作。

  • 如果响应表明存在潜在的 IT 问题,代理会提示用户确认创建票证,而不是立即创建票证。
  • 这个额外的确认步骤减少了误报并提高了用户体验。


生成结构化工单摘要


@app.post("/generate_ticket_summary")
async def generate_ticket_summary(request: QueryRequest):
    """Generates a structured ticket summary using the Ticket Agent."""
    ticket_crew = Crew(agents=[ticket_agent], tasks=[ticket_task])
    ticket_summary_result = await asyncio.to_thread(ticket_crew.kickoff, inputs={
        "query": request.query,
        "recent_queries": "\n".join(app.state.recent_queries.get(request.session_id, [])) or "No recent queries available."
    })


此API端点仅在必要时触发票务代理。

  • 它收集会话中的最新查询,为票证摘要提供背景信息。
  • 通过将票证创建作为单独的端点,我们使框架更加模块化和可扩展。


要运行后端,你需要启动以下服务


(menv) suryateja@Suryas-MacBook-Air chat_agents % uvicorn bot:app


如果一切正常,并且服务器启动,你应该看到如下所示的内容


(menv) suryateja@Suryas-MacBook-Air chat_agents % uvicorn bot:app
2025-03-29 05:14:18,042 - INFO - Anonymized telemetry enabled. See                     https://docs.trychroma.com/telemetry for more information.
INFO:     Started server process [16204]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


我们将在会话中存储用户的最后 5 个查询,以实现上下文感知


3. 创建流畅的用户界面

现在我们有了可运行的数据库、后端和机器人,我们需要一个简单的用户界面来与机器人交互。


让我们在根文件夹中创建一个 chat_ui.py,如果你到目前为止一直跟着我,你的最终文件夹结构应该如下所示


5


我们将以下代码放入 chat_ui.py 中,以创建一个交互式 UI,左侧显示历史聊天,右侧显示聊天窗口


import streamlit as st
import requests
import json
import os
import uuid  # Unique session IDs
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# FastAPI Backend URL
API_URL = "http://127.0.0.1:8000/query"
TICKET_API_URL = "http://127.0.0.1:8000/create_ticket"
TICKET_SUMMARY_URL = "http://127.0.0.1:8000/generate_ticket_summary"
# File to store chat sessions
CHAT_HISTORY_FILE = "chat_sessions.json"
# Function to load chat sessions from JSON file
def load_chat_sessions():
    """Load chat sessions safely, handling potential file errors."""
    if os.path.exists(CHAT_HISTORY_FILE):
        try:
            with open(CHAT_HISTORY_FILE, "r") as file:
                history = json.load(file)
                return history if isinstance(history, list) else []
        except (json.JSONDecodeError, FileNotFoundError):
            return []
    return []
# Function to save chat sessions to JSON file
def save_chat_sessions(history):
    """Safely save chat sessions."""
    with open(CHAT_HISTORY_FILE, "w") as file:
        json.dump(history, file, indent=4)
# Streamlit UI Setup
st.set_page_config(page_title="Evabot AI", layout="wide")
# Sidebar: Chat History
with st.sidebar:
    st.title("⚙️ Settings")
    if st.button("? New Chat"):
        st.session_state.session_id = str(uuid.uuid4())
        st.session_state.messages = []
        st.session_state.session_title = ""  # ✅ Reset session title
        st.session_state.issue_resolved = None
        st.session_state.raise_ticket = False
        st.session_state.ticket_summary = ""
        st.session_state.ticket_submitted = False
        chat_sessions = load_chat_sessions()
        save_chat_sessions(chat_sessions)
        st.rerun()
    st.subheader("? Chat History")
    chat_sessions = load_chat_sessions()
    for i, session in enumerate(reversed(chat_sessions)):
        session_title = session.get("title") or (session["messages"][0]["content"][:30] if session.get("messages") else "Untitled Chat")
        if st.button(f"?️ {session_title}...", key=f"session_{i}"):
            st.session_state.session_id = session["id"]
            st.session_state.messages = session["messages"]
            st.session_state.session_title = session_title
            st.rerun()
# Initialize session state variables
if "session_id" not in st.session_state:
    st.session_state.session_id = str(uuid.uuid4())
if "messages" not in st.session_state:
    st.session_state.messages = []
if "session_title" not in st.session_state:
    st.session_state.session_title = ""
if "issue_resolved" not in st.session_state:
    st.session_state.issue_resolved = None
if "raise_ticket" not in st.session_state:
    st.session_state.raise_ticket = False
if "ticket_summary" not in st.session_state:
    st.session_state.ticket_summary = ""
if "ticket_submitted" not in st.session_state:
    st.session_state.ticket_submitted = False
# Main Chat UI
st.title("?️ Evabot AI")
# Display chat messages
chat_container = st.container()
with chat_container:
    for msg in st.session_state.messages:
        with st.chat_message(msg["role"]):
            st.markdown(msg["content"])
# Chat input box (Fixed at bottom)
user_input = st.chat_input("Ask Eva...")
if user_input:
    # Append user message to chat
    st.session_state.messages.append({"role": "user", "content": user_input})
    # Set session title if first question
    if not st.session_state.session_title:
        st.session_state.session_title = user_input[:30]  # Use first message as title
    with st.chat_message("user"):
        st.markdown(user_input)
    # Send request to FastAPI bot
    response = requests.post(API_URL, json={"query": user_input, "session_id": st.session_state.session_id})
    if response.status_code == 200:
        response_json = response.json()
        bot_reply = response_json.get("response", "Error: No response received.")
        st.session_state.raise_ticket = response_json.get("prompt_ticket", False)
        st.session_state.ticket_summary = bot_reply if st.session_state.raise_ticket else ""
        logger.info(f"Bot Response: {bot_reply}")
    else:
        bot_reply = "Error: Failed to reach API."
        st.session_state.raise_ticket = False
        st.session_state.ticket_summary = ""
    # Append bot response to chat
    st.session_state.messages.append({"role": "assistant", "content": bot_reply})
    with st.chat_message("assistant"):
        st.markdown(bot_reply)
    # Save chat history
    chat_sessions = load_chat_sessions()
    existing_session = next((s for s in chat_sessions if s["id"] == st.session_state.session_id), None)
    if existing_session:
        existing_session["messages"] = st.session_state.messages
        existing_session["title"] = st.session_state.session_title  # Ensure title is stored
    else:
        chat_sessions.append({
            "id": st.session_state.session_id,
            "title": st.session_state.session_title or st.session_state.messages[0]["content"][:30],  # Set title
            "messages": st.session_state.messages
        })
    save_chat_sessions(chat_sessions)
# IT Issue Handling with UI Elements
if st.session_state.raise_ticket and not st.session_state.ticket_submitted:
    st.markdown("❓ *Does this solve your issue?*")
    selected_option = st.radio(
        "Select an option:", 
        ["Yes, resolved", "No, I need IT support"], 
        index=None
    )
    if selected_option:
        st.session_state.issue_resolved = selected_option
    # Only trigger Ticket Agent when user selects "No, I need IT support"
    if st.session_state.issue_resolved == "No, I need IT support":
        st.markdown("? **Your issue might require IT support.** Please create a ticket.")
        # **Trigger Ticket Agent Only When Needed**
        ticket_response = requests.post(TICKET_SUMMARY_URL, json={
            "query": st.session_state.messages[-1]["content"],  # Send last user message
            "session_id": st.session_state.session_id
        })
        if ticket_response.status_code == 200:
            response_json = ticket_response.json()
            st.session_state.ticket_summary = response_json.get("ticket_summary", " Error: Ticket summary not generated.")
        else:
            st.session_state.ticket_summary = "Error: Failed to fetch ticket summary."
        st.markdown(f"**Issue Summary:** {st.session_state.ticket_summary}")
        # Select issue category
        issue_category = st.selectbox(
            "Select Issue Category:", 
            ["Network Issue", "Password Reset", "Software Installation", "Hardware Problem"],
            key="issue_category"
        )
        if st.button("? Submit Ticket"):
            logger.info(f"Submitting ticket with summary: {st.session_state.ticket_summary} and category: {issue_category}")
            ticket_submit_response = requests.post(TICKET_API_URL, json={
                "issue_summary": st.session_state.ticket_summary,
                "issue_category": issue_category
            })
            if ticket_submit_response.status_code == 200:
                ticket_reply = ticket_submit_response.json().get("response", "Error: Ticket creation failed.")
            else:
                ticket_reply = "Error: Ticket creation failed."
            logger.info(f"Ticket creation response: {ticket_reply}")
            st.session_state.messages.append({"role": "assistant", "content": ticket_reply})
            with st.chat_message("assistant"):
                st.markdown(ticket_reply)
            st.session_state.ticket_submitted = True  
            save_chat_sessions(chat_sessions)
            st.rerun()
# Auto-scroll to latest message
st.markdown(
    """<script>
        var chatDiv = window.parent.document.querySelector('.stChat');
        if (chatDiv) {
            chatDiv.scrollTop = chatDiv.scrollHeight;
        }
    </script>""",
    unsafe_allow_html=True
)


要启动 streamlit 应用程序,请运行以下代码


(menv) suryateja@Suryas-MacBook-Air chat_agents % streamlit run chat_ui.py


如果成功的话,你应该在终端中看到类似下面的内容


(menv) suryateja@Suryas-MacBook-Air chat_agents % streamlit run chat_ui.py
  You can now view your Streamlit app in your browser.
  Local URL: http://localhost:8501
  Network URL: http://192.168.0.106:8501
  For better performance, install the Watchdog module:
  $ xcode-select --install
  $ pip install watchdog


现在转到你的浏览器并访问 streamlit 应用程序,转到此 URL http://localhost:8501


你应该会在浏览器中看到类似下面的内容


6


我们成功了。


关键要点

  • 多代理协作是关键:在专门的代理之间划分任务可以提高准确性和响应能力。
  • 动态和模块化:框架根据用户输入进行调整,并在需要时触发必要的代理。
  • 高效的性能:智能代理委派确保快速响应而不牺牲质量。
  • 可扩展性和通用性:虽然这里重点介绍了票务系统,但该框架对于任何结构化任务自动化都很灵活。


文章来源:https://medium.com/@surya.vijjeswarapu/building-an-agentic-chatbot-framework-with-multi-agent-collaboration-using-crew-ai-fastapi-and-617a15fbb1f4
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消