我们将共同设计和构建一个高度复杂的多智能体系统,该系统将充分利用分布式计算、边缘设备和语义路由的强大功能。我们将创建一个部署在多个Raspberry Pi 设备上的智能体网络,这些智能体将协同工作,共同收集、处理和聚合信息。真正的魔力在于Qdrant,它是一个语义路由器,能够根据请求的语义含义,智能地将请求导向最合适的智能体。中央机器上运行的本地控制流,使得这个系统展现了分布式人工智能、边缘计算和智能体协作的无限可能。
架构概述:
该系统由一个多智能体设置组成,其核心动力来自于Anthropic的Sonet3.5大型语言模型(LLM)和OpenAI的GPT-4o LLM。Sonet3.5 LLM与金融智能体进行交互,而GPT-4o LLM则与新闻智能体通信,新闻智能体利用AskNews和Qdrant实现其功能。
系统的核心在于Qdrant语义路由器。当收到聊天消息时,该消息会被发送到Qdrant Semanti-Router,路由器会分析查询内容,并根据消息的语义含义确定适当的路由。语义路由器由Qdrant驱动,Qdrant是一个向量相似性搜索引擎,能够实现高效的语义路由。
一旦Semanti-Router确定了适当的路由,就会触发相应的智能体。系统中主要有两个智能体:金融智能体和新闻智能体。每个智能体都在单独的Raspberry上运行,从而实现分布式处理和可扩展性。
如果查询与金融事务相关,则会调用金融智能体。该智能体专门设计用于处理金融分析,并根据用户的查询提供见解。它利用一组专门的工具和资源来生成准确且相关的金融信息。
另一方面,如果查询与新闻或时事相关,则会触发新闻智能体。新闻智能体由AskNews驱动,AskNews是一个专门用于检索和聚合新闻文章的工具。它还利用Qdrant来增强其语义理解能力,并提供更准确的新闻结果。
智能体使用其各自的工具处理查询并生成响应。然后,响应会被发送回主系统,在那里进行汇总并呈现给用户。这种分布式架构能够高效处理各种查询,并使系统能够提供全面且准确的响应。
为了进一步说明工作流程,n8n 流程提供了该架构的可视化表示。当收到聊天消息时,消息会通过语义路由器进行路由,路由器会根据查询内容确定适当的智能体。然后,智能体会与其关联的工具进行交互,如OpenAI聊天模型、金融分析工具或新闻聚合工具。每个工具都有助于生成最终响应,然后将该响应发送回给用户。
由Phidata驱动的智能体使用其各自的工具处理查询并生成响应。然后,响应会被发送回主系统,在那里进行汇总并呈现给用户。这种分布式架构能够高效处理各种查询,并使系统能够提供全面且准确的响应。
通过利用分布式智能体、语义路由和专用工具的力量,这种系统架构能够高效且准确地处理不同领域的用户查询,提供无缝且智能的对话体验。
实现部分:
语义路由器:
requirements.txt 文件内容如下,
# qdrant client for vector similarity search
qdrant-client
# semantic routing
semantic-router[qdrant]
semantic-router[fastembed]
# api serving
fastapi
uvicorn
语义路由功能以API的形式公开,以便可以从n8n流程中调用和使用。
from fastapi import FastAPI
from pydantic import BaseModel
from semantic_router_core import SemanticRoutingSystem
router = SemanticRoutingSystem(
qdrant_url="https://5496bdf1-fe1b-4e36-8715-aa5319aa1bf7.us-east4-0.gcp.cloud.qdrant.io:6333",
qdrant_api_key="<YOUR-KEY>"
)
class RequestPayload(BaseModel):
query: str
app = FastAPI()
@app.post("/api/route")
def route_request(request_data: RequestPayload):
router_response = result = router.process_query(request_data.query)
return router_response
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
语义路由器的核心逻辑如下。
import os
from semantic_router import Route
from semantic_router.encoders import FastEmbedEncoder
from semantic_router.layer import RouteLayer
from semantic_router.index import QdrantIndex
from qdrant_client import QdrantClient
from typing import List, Optional
class SemanticRoutingSystem:
def __init__(
self,
qdrant_url: str,
qdrant_api_key: str,
collection_name: str = "tool_finder",
encoder_name: str = "snowflake/snowflake-arctic-embed-m",
score_threshold: float = 0.80
):
"""
Initialize the Semantic Routing System.
Args:
qdrant_url (str): URL for the Qdrant server
qdrant_api_key (str): API key for Qdrant authentication
collection_name (str): Name of the collection to use in Qdrant
encoder_name (str): Name of the encoder model to use
score_threshold (float): Threshold score for routing decisions
"""
self.qdrant_url = qdrant_url
self.qdrant_api_key = qdrant_api_key
self.collection_name = collection_name
self.encoder_name = encoder_name
self.score_threshold = score_threshold
# Initialize Qdrant client
self.q_client = QdrantClient(
url=self.qdrant_url,
api_key=self.qdrant_api_key
)
# Initialize encoder
self.encoder = FastEmbedEncoder(
name=self.encoder_name,
score_threshold=self.score_threshold
)
# Initialize routes
self.financial_route = self._create_financial_route()
self.news_route = self._create_news_route()
self.train_routes = [self.financial_route, self.news_route]
# Initialize RouteLayer if collection doesn't exist
self.route_layer = self._initialize_route_layer()
def _create_financial_route(self) -> Route:
"""Create and return the financial route with predefined utterances."""
return Route(
name="financial analyser agent",
utterances=[
"I want to know the stock price of Apple",
"What is the latest news about Microsoft?",
"Can you tell me the current exchange rate between USD and EUR?",
"Could you provide an analysis on the performance of Google's stock?",
"Please give me a summary of the economic indicators for Q1 2023.",
"What are the top performing stocks in the tech sector?",
"What is the historical price trend of Tesla?",
"Could you provide insights into the impact of inflation on the stock market?",
"Please give me a summary of the latest financial news and trends.",
"What are the key factors affecting the performance of Amazon's stock?",
"Can you provide an analysis on the potential risks associated with investing in renewable energy stocks?",
"summarize the current market conditions",
"compare the stock prices of Apple and Microsoft and explain the reasons behind any differences.",
"What are the major trends in the financial sector that investors should be aware of?"
]
)
def _create_news_route(self) -> Route:
"""Create and return the news route with predefined utterances."""
return Route(
name="news summary agent",
utterances=[
"Tell me about the latest news on climate change",
"What is the current political situation in Russia?",
"Could you provide an update on the global economy?",
"What are the key events occurring in the tech industry this week?",
"Summarize the most important stories from today's newspaper.",
"What is the latest on the space race between China and the United States?",
"Tell me about the latest developments in artificial intelligence",
"What is the current state of the global pandemic?"
]
)
def _initialize_route_layer(self) -> Optional[RouteLayer]:
"""Initialize the RouteLayer if the collection doesn't exist."""
if not self.q_client.collection_exists(self.collection_name):
return RouteLayer(
encoder=self.encoder,
routes=self.train_routes,
index=QdrantIndex(
location=self.qdrant_url,
api_key=self.qdrant_api_key,
index_name=self.collection_name
)
)
return None
def process_query(self, query: str) -> str:
"""
Process a query through the routing system.
Args:
query (str): The input query to process
Returns:
str: The routing result
"""
if self.route_layer:
return str(self.route_layer(query).name)
else:
raise ValueError("RouteLayer not initialized. Collection may already exist.")
# Example usage:
# if __name__ == "__main__":
# # Initialize the system
# router = SemanticRoutingSystem(
# qdrant_url="https://5496bdf1-fe1b-4e36-8715-aa5319aa1bf7.us-east4-0.gcp.cloud.qdrant.io:6333",
# qdrant_api_key="<YOUR-KEY>"
# )
#
# print("\nWelcome to the Semantic Routing System!")
# print("Type 'quit' or 'bye' to exit")
# print("-" * 50)
#
# while True:
# # Get user input
# query = input("\nEnter your query: ").strip()
#
# # Check for exit commands
# if query.lower() in ['quit', 'bye']:
# print("\nThank you for using the Semantic Routing System. Goodbye!")
# break
#
# # Process the query if not empty
# if query:
# try:
# result = router.process_query(query)
# print("\nResult:", result)
# except Exception as e:
# print(f"\nError processing query: {str(e)}")
# else:
# print("\nPlease enter a valid query.")
金融智能体:
该智能体旨在执行用户查询,特别侧重于金融分析。项目的requirements.txt文件内容如下:
# agent framework
phidata
# fianance libraries
yfinance
# llm libraries
openai
# api framework
fastapi
# api server
uvicorn
# env library
python-dotenv
#misc
packaging
pydantic
该智能体通过FastAPI以API的形式公开。
from fastapi import FastAPI
from pydantic import BaseModel
from fin_agent_main import fin_agent
import uvicorn
app = FastAPI()
class RequestData(BaseModel):
query: str
@app.post("/api/stocks/analyse")
async def analyse_stocks(data: RequestData):
return fin_agent(query=data.query)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
金融智能体的核心逻辑如下。
from typing import Iterator
from phi.agent import Agent
from phi.tools.yfinance import YFinanceTools
from phi.model.anthropic import Claude
from phi.run.response import RunResponse
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
def fin_agent(query: str):
agent = Agent(
model=Claude(id="claude-3-5-sonnet-20241022"),
tools=[YFinanceTools(stock_price=True, analyst_recommendations=True, stock_fundamentals=True)],
show_tool_calls=False,
debug_mode=True,
description="You are an investment analyst that researches stock prices, analyst recommendations, "
"and stock fundamentals.",
instructions=["Format your response using markdown and use tables to display data where possible. "
"Always consider the data from the most recent date available till 6 months in past."
"Provide a brief analysis of the stock based on the provided data."
"Provide different analysis why to invest or not invest in the stock. "
"Assume you are a beginner investor and provide advice accordingly."
"assume i have 10k USD to invest"]
)
response: RunResponse = agent.run(message=query, stream_intermediate_steps=True)
return response
然后,将整个智能体部署到Raspberry Pi,并通过API(192.168.1.6)进行访问,以下是输出结果。
新闻智能体:
新闻智能体是一个复杂的设置,其中有一组智能体负责执行专门的任务。在我们的示例中,我们将仅考虑金融智能体团队。整个系统同样由Phidata、AskNews和Qdrant提供支持。以下是该项目的requirements.txt文件。
# the phidata agentic library
phidata
# openai library for interacting with OpenAI's API
openai
# sqlalchemy for database operations
sqlalchemy
# duckduckgo-search for searching the web
duckduckgo-search
# yfinance for accessing financial data
yfinance
# fastapi and uvicorn for creating a web server
fastapi
uvicorn
# python-dotenv for loading environment variables from a .env file
python-dotenv
# qdrant-client for interacting with Qdrant vector database
qdrant-client
# pypdf for working with PDF files
pypdf
# ollama for interacting with local LLM models
ollama
# asknews library for interacting with the AskNews API to fetch and analyze news articles
asknews
由于该智能体也以API的形式暴露出来,以下是使用FastAPI将智能体暴露为API的代码。
from fastapi import FastAPI
from pydantic import BaseModel
from news_agent_team import agent_team
from phi.run.response import RunResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=['*'], allow_headers=['*'], allow_methods=['*'])
class MessagePayload(BaseModel):
query: str
@app.post("/api/v1/news")
def invoke_agents(payload: MessagePayload):
run_response: RunResponse = agent_team.run(message=payload.query, stream_intermediate_steps=True)
return run_response
if __name__ == "__main__":
uvicorn.run(app=app, host="0.0.0.0", port=8001)
担任主编的主要代理人如下。
from phi.agent import Agent
from agents.business_news_agent import business_news_agent
from agents.climate_news_agent import climate_news_agent
from agents.sports_news_agent import sports_news_agent
from agents.health_news_agent import health_news_agent
from agents.crime_news_agent import crime_news_agent
from agents.military_news_agent import military_news_agent
from agents.science_and_technology_news_agent import science_and_technology_news_agent
from agents.political_news_agent import political_news_agent
from agents.financial_news_agent import financial_news_agent
from phi.playground import Playground, serve_playground_app
from phi.model.openai import OpenAIChat
import asyncio
agent_team = Agent(
name="chief news editor",
model=OpenAIChat(id="gpt-4o"),
team=[business_news_agent, financial_news_agent],
instructions=["Always use tools to fulfil the user query. Always give the link to sources "],
show_tool_calls=True,
reasoning=False,
markdown=True,
show_full_reasoning=True,
add_datetime_to_instructions=True,
stream=True
)
# response = asyncio.run(agent_team.run(message="What are the AI announcements?"))
# print(response)
# app = Playground(agents=[agent_team]).get_app()
#
# if __name__ == "__main__":
# serve_playground_app("news_agent_team:app", reload=True)
专门负责商业和金融的特别智能体如下所示。
from phi.agent import Agent
from asknews_tools.query_tool import query_finance_news
from phi.model.openai import OpenAIChat
from phi.storage.agent.sqlite import SqlAgentStorage
financial_news_agent = Agent(
name="Finance News Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[query_finance_news],
role="Search only for financial news using the tools provided",
instructions=[
"""You are now the Finance News Reporter and News Agent of a major news organization, who decades of experience
in fact-checking of the actual news. Always use the tools provided to fulfil request from editor in chief.
Your role requires:
ANALYSIS APPROACH:
1. First, break down the news piece using Chain of Thought reasoning:
- What are the key claims?
- Who are the primary sources?
- What is the chronological sequence of events?
- What supporting evidence is provided?
2. Then, apply critical analysis:
- Cross-reference dates and statistics with your knowledge base
- Identify potential biases or gaps in reporting
- Evaluate the credibility of sources
- Check for logical consistency in the narrative
3. For data verification:
- Use the most recent available data (specify the year)
- Flag any outdated statistics
- Note any discrepancies between different data sources
- Highlight where additional verification might be needed
OUTPUT STRUCTURE:
- Start with an executive summary
- Present key findings using markdown bullet points
- Include specific dates and sources for all major claims
- Provide confidence levels for each verified claim (High/Medium/Low)
- Add editorial recommendations for further investigation if needed
CRITICAL GUIDELINES:
- Always indicate source links and dates.
- Always use the tools provided.
- Always refer to the latest year.
- Clearly separate verified facts from unverified claims.
- Note any temporal gaps in the narrative.
- Flag any potential misinformation or need for additional context.
When responding, explicitly walk through your reasoning process before presenting conclusions."""
],
storage=SqlAgentStorage(table_name="news_agent", db_file="asknews_tools/agents.db"),
add_history_to_messages=True,
markdown=True,
reasoning=True,
show_full_reasoning=True
)
from phi.agent import Agent
from asknews_tools.query_tool import query_business_news
from phi.model.openai import OpenAIChat
from phi.storage.agent.sqlite import SqlAgentStorage
business_news_agent = Agent(
name="Business News Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[query_business_news],
role="Search only for business news using the tools provided",
instructions=[
"""You are now the Business News Reporter and News Agent of a major news organization, who decades of experience
in fact-checking of the actual news. Always use the tools provided to fulfil request from editor in chief.
Your role requires:
ANALYSIS APPROACH:
1. First, break down the news piece using Chain of Thought reasoning:
- What are the key claims?
- Who are the primary sources?
- What is the chronological sequence of events?
- What supporting evidence is provided?
2. Then, apply critical analysis:
- Cross-reference dates and statistics with your knowledge base
- Identify potential biases or gaps in reporting
- Evaluate the credibility of sources
- Check for logical consistency in the narrative
3. For data verification:
- Use the most recent available data (specify the year)
- Flag any outdated statistics
- Note any discrepancies between different data sources
- Highlight where additional verification might be needed
OUTPUT STRUCTURE:
- Start with an executive summary
- Present key findings using markdown bullet points
- Include specific dates and sources for all major claims
- Provide confidence levels for each verified claim (High/Medium/Low)
- Add editorial recommendations for further investigation if needed
CRITICAL GUIDELINES:
- Always indicate source links and dates.
- Always use the tools provided.
- Always refer to the latest year.
- Clearly separate verified facts from unverified claims.
- Note any temporal gaps in the narrative.
- Flag any potential misinformation or need for additional context.
When responding, explicitly walk through your reasoning process before presenting conclusions."""
],
storage=SqlAgentStorage(table_name="news_agent", db_file="asknews_tools/agents.db"),
add_history_to_messages=True,
markdown=True,
reasoning=True,
show_full_reasoning=True
)
这两个智能体反过来使用专用工具来触发AskNews API,以获取各地域相关领域的最新新闻。
def query_business_news(query_str: str, continents: str, country_code: str) -> Any:
"""Use this function to get top news related to business
Args:
query_str (str): the user query to search for business news.
continents (str): specific news from the geographic region (continent).
country_code (str): specific news in a specific country within the continents.
Returns:
str: JSON object of top story summaries.
"""
print(f"Calling business tool with, query_str: {query_str}, continents: {continents}")
response = asknews_news_client().news.search_news(
query=query_str, # your keyword query
n_articles=10, # control the number of articles to include in the context
return_type="dicts", # you can also ask for "dicts" if you want more information
method="both", # use "nl" for natural language for your search, or "kw" for keyword search,
continents=[continents],
countries=[country_code],
categories=["Business"],
strategy='latest news'
)
return create_json_response(response)
def query_finance_news(query_str: str, continents: str, country_code: str) -> Any:
"""Use this function to get top news related to Finance
Args:
query_str (str): the user query to search for business news.
continents (str): specific news from the geographic region (continent).
country_code (str): specific news in a specific country within the continents.
Returns:
str: JSON object of top story summaries.
"""
print(f"Calling finance tool with, query_str: {query_str}, continents: {continents}")
response = asknews_news_client().news.search_news(
query=query_str, # your keyword query
n_articles=10, # control the number of articles to include in the context
return_type="dicts", # you can also ask for "dicts" if you want more information
method="both", # use "nl" for natural language for your search, or "kw" for keyword search,
continents=[continents],
countries=[country_code],
categories=["Finance"],
strategy='latest news'
)
return create_json_response(response)
这个智能体也被打包并移植到另一台Raspberry Pi,并通过API进行访问。
边缘智能体:
输出:
结论:
在这篇文章中,我们探讨了设计并实现一个由Phidata的AI解决方案驱动的高度复杂的多智能体系统的激动人心之旅。通过利用树莓派上的分布式智能体、Qdrant的语义路由和专用工具,我们创建了一个强大的架构,能够高效处理不同领域的用户查询。Anthropic的Sonet3.5大型语言模型(LLM)、OpenAI的GPT-4o大型语言模型以及Phidata在AI和数据解决方案方面的专业知识相结合,使我们能够构建一个提供准确和全面响应的系统。通过这个实践实验,我们展示了分布式AI、边缘计算和智能体协作的巨大潜力。