介绍
在人工智能这一快速发展的领域,多智能体系统因其能够通过协作处理复杂任务而备受关注。本文探索了一个旨在处理医学文本的多智能体人工智能应用的架构、工作流程以及未来发展前景。该系统使用Streamlit构建前端,并通过Ollama平台利用Llama-3.2:3b模型,包含用于总结医学文本、撰写研究文章以及编辑受保护的健康信息(PHI)的智能体。
在自建多智能体系统与使用如CrewAI、Autogen或OpenAI Swarm等现有智能体框架之间做出选择时,需要权衡每种方法的利弊。以下是自建系统与使用框架的优缺点对比:
自建多智能体系统
优点:
1. 定制化和灵活性:
2. 深入理解:
3. 不依赖外部框架:
4. 优化性能:
5. 成本效益:
使用智能体框架(CrewAI、Autogen、OpenAI Swarm)
优点:
1. 快速开发:
2. 可扩展性:
3. 社区和支持:
4. 高级功能:
5. 互操作性:
工作流程架构
该应用的架构是模块化的,由几个关键组件组成:
主智能体:
验证智能体:
实施所用的技术栈
该应用结合了多种现代技术,以确保效率、可扩展性和易用性:
工作流程
该应用的工作流程如下:
代码实现
文件夹结构:
medical_ai_agents/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── main_agents.py
│ └── validator_agents.py
├── core/
│ ├── __init__.py
│ ├── agent_manager.py
│ └── logger.py
├── utils/
│ ├── __init__.py
│ └── ollama_utils.py
├── app.py
└── requirements.txt
安装所需依赖项
pip install -r requirements.txttxt
创建基础代理
base_agent.py
from abc import ABC, abstractmethod
from typing import Any, Dict
import ollama
class BaseAgent(ABC):
def __init__(self, model_name: str = "llama3.2:3b"):
self.model_name = model_name
async def get_completion(self, prompt: str) -> str:
try:
response = ollama.chat(model=self.model_name, messages=[
{'role': 'user', 'content': prompt}
])
return response['message']['content']
except Exception as e:
raise Exception(f"Error getting completion: {str(e)}")
class MainAgent(BaseAgent):
@abstractmethod
async def process(self, input_data: Any) -> Dict[str, Any]:
pass
class ValidatorAgent(BaseAgent):
@abstractmethod
async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:
pass
main_agent.py
from typing import Any, Dict
from .base_agent import MainAgent
class SummarizeAgent(MainAgent):
async def process(self, input_data: str) -> Dict[str, Any]:
prompt = f"Summarize the following medical text:\n\n{input_data}"
summary = await self.get_completion(prompt)
return {"summary": summary}
class WriteArticleAgent(MainAgent):
async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
prompt = f"""Write a research article with the following:
Topic: {input_data['topic']}
Key points: {input_data['key_points']}"""
article = await self.get_completion(prompt)
return {"article": article}
class SanitizeDataAgent(MainAgent):
async def process(self, input_data: str) -> Dict[str, Any]:
prompt = """Mask all Protected Health Information (PHI) in the following text.
Replace with appropriate masks:
- Patient names with [PATIENT_NAME]
- Doctor/Provider names with [PROVIDER_NAME]
- Dates with [DATE]
- Locations/Addresses with [LOCATION]
- Phone numbers with [PHONE]
- Email addresses with [EMAIL]
- Medical record numbers with [MRN]
- Social Security numbers with [SSN]
- Device identifiers with [DEVICE_ID]
- Any other identifying numbers with [ID]
- Physical health conditions with [HEALTH_CONDITION]
- Medications with [MEDICATION]
- Lab results with [LAB_RESULT]
- Vital signs with [VITAL_SIGN]
- Procedures with [PROCEDURE]
Text to mask:\n\n""" + input_data
sanitized_data = await self.get_completion(prompt)
return {"sanitized_data": sanitized_data}
validator_agent.py
from typing import Any, Dict
from .base_agent import ValidatorAgent
class SummarizeValidatorAgent(ValidatorAgent):
async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Evaluate if this summary accurately represents the original text:
Original: {input_data}
Summary: {output_data['summary']}
Provide:
1. A score out of 5 (where 5 is perfect)
2. 'valid' or 'invalid'
3. Brief explanation
Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}
class RefinerAgent(ValidatorAgent):
async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Review this research article for quality and accuracy:
Article: {output_data['article']}
Provide:
1. A score out of 5 (where 5 is perfect)
2. 'valid' or 'invalid'
3. Brief explanation
Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}
class SanitizeValidatorAgent(ValidatorAgent):
async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
Masked text: {output_data['sanitized_data']}
Check for any unmasked:
- Patient names
- Doctor/Provider names
- Dates
- Locations/Addresses
- Phone numbers
- Email addresses
- Medical record numbers
- Social Security numbers
- Device identifiers
- Other identifying numbers
- Physical health conditions
- Medications
- Lab results
- Vital signs
- Procedures
Provide:
1. A score out of 5 (where 5 means all PHI properly masked)
2. 'valid' or 'invalid'
3. List any found unmasked PHI
Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""
result = await self.get_completion(prompt)
is_valid = "valid" in result.lower()
return {"is_valid": is_valid, "feedback": result}
核心功能
agent_manager.py
from typing import Dict, Any
from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent
from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent
from core.logger import Logger
class AgentManager:
def __init__(self):
self.logger = Logger()
# Initialize main agents
self.summarize_agent = SummarizeAgent()
self.write_article_agent = WriteArticleAgent()
self.sanitize_agent = SanitizeDataAgent()
# Initialize validator agents
self.summarize_validator = SummarizeValidatorAgent()
self.refiner_agent = RefinerAgent()
self.sanitize_validator = SanitizeValidatorAgent()
async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:
try:
self.logger.log_input(task_type, input_data)
if task_type == "summarize":
result = await self.summarize_agent.process(input_data)
validation = await self.summarize_validator.validate(input_data, result)
elif task_type == "write_article":
result = await self.write_article_agent.process(input_data)
validation = await self.refiner_agent.validate(input_data, result)
elif task_type == "sanitize":
result = await self.sanitize_agent.process(input_data)
validation = await self.sanitize_validator.validate(input_data, result)
else:
raise ValueError(f"Unknown task type: {task_type}")
self.logger.log_output(task_type, result, validation)
return {"result": result, "validation": validation}
except Exception as e:
self.logger.log_error(task_type, str(e))
raise
logger.py
import logging
from datetime import datetime
from typing import Any, Dict
class Logger:
def __init__(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('medical_ai_agents.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def log_input(self, task_type: str, input_data: Any):
self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")
def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
self.logger.info(f"Validation result: {validation['is_valid']}")
def log_error(self, task_type: str, error_message: str):
self.logger.error(f"Task: {task_type} - Error: {error_message}")
Streamlit 应用程序
app.py
import streamlit as st
import asyncio
from core.agent_manager import AgentManager
# Set page configuration with custom theme
st.set_page_config(
page_title="Medical AI Agents",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for styling
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
color: white;
text-align: center;
padding: 1.5rem;
margin-bottom: 1rem;
font-weight: bold;
background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);
border-radius: 10px;
box-shadow: 0 2px 10px rgba(30,136,229,0.2);
}
.sub-header {
font-size: 1.8rem;
color: #0D47A1;
padding: 0.5rem 0;
border-bottom: 2px solid #1E88E5;
margin-bottom: 1rem;
}
.task-container {
background-color: #F8F9FA;
padding: 2rem;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.result-box {
background-color: white;
padding: 1.5rem;
border-radius: 8px;
border-left: 4px solid #1E88E5;
margin: 1rem 0;
}
.validation-box {
padding: 1rem;
border-radius: 8px;
margin-top: 1rem;
}
.stButton>button {
background-color: #1E88E5;
color: white;
border-radius: 25px;
padding: 0.5rem 2rem;
border: none;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
transition: all 0.3s ease;
}
.stButton>button:hover {
background-color: #1565C0;
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
transform: translateY(-2px);
}
.stTextArea>div>div {
border-radius: 8px;
border: 2px solid #E3F2FD;
}
.sidebar-content {
padding: 1rem;
background-color: #F8F9FA;
border-radius: 8px;
}
</style>
""", unsafe_allow_html=True)
@st.cache_resource
def get_agent_manager():
return AgentManager()
def show_results_page(result_data):
st.markdown("<h1 class='main-header'>Final Validated Content</h1>", unsafe_allow_html=True)
# Add a subheader based on the content type
if "summary" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Medical Text Summary</h2>", unsafe_allow_html=True)
elif "article" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Research Article</h2>", unsafe_allow_html=True)
elif "sanitized_data" in result_data["result"]:
st.markdown("<h2 class='sub-header'>Redacted PHI Content</h2>", unsafe_allow_html=True)
# Display content in a styled box
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
if "summary" in result_data["result"]:
st.write(result_data["result"]["summary"])
elif "article" in result_data["result"]:
st.write(result_data["result"]["article"])
elif "sanitized_data" in result_data["result"]:
st.write(result_data["result"]["sanitized_data"])
st.markdown("</div>", unsafe_allow_html=True)
# Action buttons in columns
col1, col2, col3 = st.columns([1, 1, 1])
with col2:
# Export button
if st.button("? Export Results"):
export_data = ""
if "summary" in result_data["result"]:
export_data = result_data["result"]["summary"]
elif "article" in result_data["result"]:
export_data = result_data["result"]["article"]
elif "sanitized_data" in result_data["result"]:
export_data = result_data["result"]["sanitized_data"]
st.download_button(
label="? Download Content",
data=export_data,
file_name="final_content.txt",
mime="text/plain"
)
with col3:
# Return button
if st.button("? Return to Main Page"):
st.session_state.show_results = False
st.rerun()
def main():
# Sidebar styling
with st.sidebar:
st.markdown("<h2 style='text-align: center; color: #1E88E5;'>Tasks</h2>", unsafe_allow_html=True)
st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)
task_type = st.radio(
"", # Empty label as we're using custom header
["summarize", "write_article", "Redact PHI"],
format_func=lambda x: {
"summarize": "? Summarize Medical Text",
"write_article": "? Write Research Article",
"Redact PHI": "? Redact PHI"
}[x]
)
st.markdown("</div>", unsafe_allow_html=True)
# Main content - Single header for the entire page
st.markdown("<h1 class='main-header'>Medical Multi-Agent System</h1>", unsafe_allow_html=True)
# Initialize session state
if 'show_results' not in st.session_state:
st.session_state.show_results = False
if 'result_data' not in st.session_state:
st.session_state.result_data = None
if st.session_state.show_results:
show_results_page(st.session_state.result_data)
return
agent_manager = get_agent_manager()
# Task containers with consistent styling
st.markdown("<div class='task-container'>", unsafe_allow_html=True)
if task_type == "summarize":
st.markdown("<h2 class='sub-header'>? Summarize Medical Text</h2>", unsafe_allow_html=True)
input_text = st.text_area("Enter medical text to summarize", height=200)
col1, col2 = st.columns(2)
with col1:
if st.button("? Generate Summary"):
with st.spinner("Processing..."):
result = asyncio.run(agent_manager.process_task("summarize", input_text))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Summary")
st.write(result["result"]["summary"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")
# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)
st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)
with col2:
if st.session_state.result_data and st.button("?️ View Edited Content"):
st.session_state.show_results = True
st.rerun()
elif task_type == "write_article":
st.markdown("<h2 class='sub-header'>? Write Research Article</h2>", unsafe_allow_html=True)
topic = st.text_input("Enter research topic")
key_points = st.text_area("Enter key points (one per line)", height=150)
col1, col2 = st.columns(2)
with col1:
if st.button("? Generate Article"):
with st.spinner("Processing..."):
input_data = {"topic": topic, "key_points": key_points}
result = asyncio.run(agent_manager.process_task("write_article", input_data))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Article")
st.write(result["result"]["article"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")
# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)
st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)
with col2:
if st.session_state.result_data and st.button("?️ View Edited Content"):
st.session_state.show_results = True
st.rerun()
elif task_type == "Redact PHI":
st.markdown("<h2 class='sub-header'>? Redact Protected Health Information (PHI)</h2>", unsafe_allow_html=True)
input_text = st.text_area("Enter medical text to redact PHI", height=200)
col1, col2 = st.columns(2)
with col1:
if st.button("? Redact PHI"):
with st.spinner("Processing..."):
result = asyncio.run(agent_manager.process_task("sanitize", input_text))
st.session_state.result_data = result
st.markdown("<div class='result-box'>", unsafe_allow_html=True)
st.subheader("Redacted Text")
st.write(result["result"]["sanitized_data"])
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
st.subheader("Validation")
# Extract and display score
feedback = result["validation"]["feedback"]
if "Score:" in feedback:
score = feedback.split("Score:")[1].split("\n")[0].strip()
st.markdown(f"""
<div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
<h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
</div>
""", unsafe_allow_html=True)
st.write(feedback)
st.markdown("</div>", unsafe_allow_html=True)
with col2:
if st.session_state.result_data and st.button("?️ View Edited Content"):
st.session_state.show_results = True
st.rerun()
st.markdown("</div>", unsafe_allow_html=True)
if __name__ == "__main__":
main()
运行 Streamlit 应用程序
streamlit run app.pypy
Streamlit 界面已被实例化。你现在可以在浏览器中查看你的 Streamlit 应用程序。本地 URL:http://localhost:8501。
Streamlit 用户界面。
已生成摘要
生成的摘要的验证结果
查看已验证内容
点击“导出结果”下载结果
撰写研究文章任务
生成文章
验证结果
查看最终编辑内容
编辑 PHI 任务
编辑文本
删节文本的验证分数
查看最终编辑内容
结论
这个多智能体 AI 系统展示了协作智能体在处理复杂医学文本方面的强大功能。通过利用 LLaMA 等高级模型并通过 Streamlit 提供用户友好的界面,该应用程序为摘要、文章撰写和 PHI 编辑等任务提供了实用的解决方案。随着 AI 的不断发展,这样的系统将在改变我们处理和解释医疗数据的方式方面发挥关键作用。