构建多智能体AI系统的医学文本处理指南

2024年11月19日 由 alex 发表 112 0

4


介绍

在人工智能这一快速发展的领域,多智能体系统因其能够通过协作处理复杂任务而备受关注。本文探索了一个旨在处理医学文本的多智能体人工智能应用的架构、工作流程以及未来发展前景。该系统使用Streamlit构建前端,并通过Ollama平台利用Llama-3.2:3b模型,包含用于总结医学文本、撰写研究文章以及编辑受保护的健康信息(PHI)的智能体。


在自建多智能体系统与使用如CrewAI、Autogen或OpenAI Swarm等现有智能体框架之间做出选择时,需要权衡每种方法的利弊。以下是自建系统与使用框架的优缺点对比:


自建多智能体系统


优点:

1. 定制化和灵活性:

  • 量身定制的解决方案:自建系统可以根据特定需求和要求进行高度定制,对架构、设计和实现细节拥有完全控制权。
  • 灵活性:可以实现现有框架可能不支持的独特功能。

2. 深入理解:

  • 深入了解:从零开始开发系统能够深入了解底层机制和架构,这对故障排除和优化非常有益。
  • 学习机会:对开发人员来说是一次宝贵的学习经历,能够提升他们在系统设计、架构和人工智能方面的技能。

3. 不依赖外部框架:

  • 独立性:不受第三方框架的限制或更新影响,这些框架有时会引入破坏性更改或弃用功能。
  • 安全性和隐私性:对数据处理和安全措施拥有完全控制权,这对于像医学数据处理这样的敏感应用至关重要。

4. 优化性能:

  • 性能调优:可以根据特定的性能指标(如速度或资源使用)优化系统,而无需承担通用框架可能带来的额外负担。

5. 成本效益:

  • 无许可费用:可以避免使用商业框架可能产生的费用,这些框架可能会为高级功能或企业支持收费。


使用智能体框架(CrewAI、Autogen、OpenAI Swarm)


优点:

1. 快速开发:

  • 节省时间:框架提供预构建的组件和功能,显著缩短开发时间。
  • 经过验证的解决方案:利用经过优化和调试的解决方案,这些解决方案已由社区或公司验证。

2. 可扩展性:

  • 内置可扩展性:许多框架都设计用于处理可扩展性问题,使系统更易于根据需要扩展。
  • 云集成:通常内置对云服务的支持,便于部署和扩展。

3. 社区和支持:

  • 活跃的社区:可以访问开发人员和用户社区,他们可以提供支持、分享最佳实践并为框架的发展做出贡献。
  • 文档和教程:全面的文档和教程可以帮助加速学习曲线。

4. 高级功能:

  • 最先进的功能:框架通常包含开箱即用的高级功能,如自然语言处理、机器学习集成和多智能体协调。
  • 持续更新:可以从框架维护者的持续改进和更新中受益。

5. 互操作性:

  • 与其他工具集成:框架通常提供与其他工具和服务的轻松集成,从而增强系统的功能。


工作流程架构

该应用的架构是模块化的,由几个关键组件组成:

  • 前端(Streamlit):提供直观的网络界面供用户交互。用户可以选择任务、输入数据并查看结果。
  • 智能体管理器:作为中央协调器,将任务分配给适当的主智能体及其对应的验证智能体。


主智能体:

  • 总结智能体:生成医学文本的摘要。
  • 撰写文章智能体:创建研究文章的草稿。
  • 编辑数据智能体:在医学数据中掩盖PHI。


验证智能体:

  • 总结验证智能体:评估摘要的质量。
  • 优化智能体:提高草稿的质量。
  • PHI验证智能体:确保所有PHI都已正确掩盖。
  • 日志记录器:记录所有交互、输入、输出和错误,以便进行监控和调试。


实施所用的技术栈

该应用结合了多种现代技术,以确保效率、可扩展性和易用性:

  • Streamlit:一个强大且易于使用的框架,用于在Python中构建网络应用。它为用户提供了一个直观的界面来与AI系统进行交互,即使是非技术人员也能轻松使用。
  • Ollama:一个平台,便于使用像Llama、Mistral、Gemma等大型语言模型。它允许将AI模型无缝集成到应用中,从而实现复杂的自然语言处理任务。
  • Python:开发该应用的主要编程语言。Python丰富的库和框架生态系统使其成为AI和网络开发的理想选择。
  • Asyncio:一个Python库,用于使用async/await语法编写并发代码。它有助于高效地管理异步任务,这对于同时处理多个AI智能体至关重要。


工作流程

该应用的工作流程如下:

  • 用户交互:用户通过Streamlit界面与系统交互,选择任务并提供输入数据。
  • 任务分配:智能体管理器接收任务请求并将其分配给适当的主智能体。
  • 处理:主智能体使用LLaMA模型处理输入数据以生成所需的输出。
  • 验证:输出随后传递给对应的验证智能体,该智能体评估结果的质量和准确性。提供5分制的验证评分以便快速评估。
  • 结果展示:向用户展示最终验证过的内容,并提供导出结果或返回主页的选项。


代码实现

文件夹结构:


medical_ai_agents/

├── agents/

│ ├── __init__.py

│ ├── base_agent.py

│ ├── main_agents.py

│ └── validator_agents.py

├── core/

│ ├── __init__.py

│ ├── agent_manager.py

│ └── logger.py

├── utils/

│ ├── __init__.py

│ └── ollama_utils.py

├── app.py

└── requirements.txt


安装所需依赖项


pip install -r requirements.txttxt


创建基础代理 


base_agent.py


from abc import ABC, abstractmethod
from typing import Any, Dict
import ollama
class BaseAgent(ABC):
    def __init__(self, model_name: str = "llama3.2:3b"):
        self.model_name = model_name
        
    async def get_completion(self, prompt: str) -> str:
        try:
            response = ollama.chat(model=self.model_name, messages=[
                {'role': 'user', 'content': prompt}
            ])
            return response['message']['content']
        except Exception as e:
            raise Exception(f"Error getting completion: {str(e)}")
class MainAgent(BaseAgent):
    @abstractmethod
    async def process(self, input_data: Any) -> Dict[str, Any]:
        pass
class ValidatorAgent(BaseAgent):
    @abstractmethod
    async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:
        pass 


main_agent.py


from typing import Any, Dict
from .base_agent import MainAgent
class SummarizeAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        prompt = f"Summarize the following medical text:\n\n{input_data}"
        summary = await self.get_completion(prompt)
        return {"summary": summary}
class WriteArticleAgent(MainAgent):
    async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
        prompt = f"""Write a research article with the following:
        Topic: {input_data['topic']}
        Key points: {input_data['key_points']}"""
        article = await self.get_completion(prompt)
        return {"article": article}
class SanitizeDataAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        prompt = """Mask all Protected Health Information (PHI) in the following text. 
        Replace with appropriate masks:
        - Patient names with [PATIENT_NAME]
        - Doctor/Provider names with [PROVIDER_NAME]
        - Dates with [DATE]
        - Locations/Addresses with [LOCATION]
        - Phone numbers with [PHONE]
        - Email addresses with [EMAIL]
        - Medical record numbers with [MRN]
        - Social Security numbers with [SSN]
        - Device identifiers with [DEVICE_ID]
        - Any other identifying numbers with [ID]
        - Physical health conditions with [HEALTH_CONDITION]
        - Medications with [MEDICATION]
        - Lab results with [LAB_RESULT]
        - Vital signs with [VITAL_SIGN]
        - Procedures with [PROCEDURE]
        Text to mask:\n\n""" + input_data
        sanitized_data = await self.get_completion(prompt)
        return {"sanitized_data": sanitized_data} 


validator_agent.py


from typing import Any, Dict
from .base_agent import ValidatorAgent
class SummarizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Evaluate if this summary accurately represents the original text:
        Original: {input_data}
        Summary: {output_data['summary']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}
class RefinerAgent(ValidatorAgent):
    async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Review this research article for quality and accuracy:
        Article: {output_data['article']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}
class SanitizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
        Masked text: {output_data['sanitized_data']}
        
        Check for any unmasked:
        - Patient names
        - Doctor/Provider names
        - Dates
        - Locations/Addresses
        - Phone numbers
        - Email addresses
        - Medical record numbers
        - Social Security numbers
        - Device identifiers
        - Other identifying numbers
        - Physical health conditions
        - Medications
        - Lab results
        - Vital signs
        - Procedures
        
        Provide:
        1. A score out of 5 (where 5 means all PHI properly masked)
        2. 'valid' or 'invalid'
        3. List any found unmasked PHI
        
        Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result} 


核心功能 


agent_manager.py


from typing import Dict, Any
from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent
from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent
from core.logger import Logger
class AgentManager:
    def __init__(self):
        self.logger = Logger()
        
        # Initialize main agents
        self.summarize_agent = SummarizeAgent()
        self.write_article_agent = WriteArticleAgent()
        self.sanitize_agent = SanitizeDataAgent()
        
        # Initialize validator agents
        self.summarize_validator = SummarizeValidatorAgent()
        self.refiner_agent = RefinerAgent()
        self.sanitize_validator = SanitizeValidatorAgent()
    async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:
        try:
            self.logger.log_input(task_type, input_data)
            
            if task_type == "summarize":
                result = await self.summarize_agent.process(input_data)
                validation = await self.summarize_validator.validate(input_data, result)
            
            elif task_type == "write_article":
                result = await self.write_article_agent.process(input_data)
                validation = await self.refiner_agent.validate(input_data, result)
            
            elif task_type == "sanitize":
                result = await self.sanitize_agent.process(input_data)
                validation = await self.sanitize_validator.validate(input_data, result)
            else:
                raise ValueError(f"Unknown task type: {task_type}")
            self.logger.log_output(task_type, result, validation)
            return {"result": result, "validation": validation}
        except Exception as e:
            self.logger.log_error(task_type, str(e))
            raise 


logger.py


import logging
from datetime import datetime
from typing import Any, Dict
class Logger:
    def __init__(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('medical_ai_agents.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    def log_input(self, task_type: str, input_data: Any):
        self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")
    def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
        self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
        self.logger.info(f"Validation result: {validation['is_valid']}")
    def log_error(self, task_type: str, error_message: str):
        self.logger.error(f"Task: {task_type} - Error: {error_message}") 


Streamlit 应用程序 


app.py


import streamlit as st
import asyncio
from core.agent_manager import AgentManager
# Set page configuration with custom theme
st.set_page_config(
    page_title="Medical AI Agents",
    layout="wide",
    initial_sidebar_state="expanded"
)
# Custom CSS for styling
st.markdown("""
    <style>
    .main-header {
        font-size: 2.5rem;
        color: white;
        text-align: center;
        padding: 1.5rem;
        margin-bottom: 1rem;
        font-weight: bold;
        background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);
        border-radius: 10px;
        box-shadow: 0 2px 10px rgba(30,136,229,0.2);
    }
    .sub-header {
        font-size: 1.8rem;
        color: #0D47A1;
        padding: 0.5rem 0;
        border-bottom: 2px solid #1E88E5;
        margin-bottom: 1rem;
    }
    .task-container {
        background-color: #F8F9FA;
        padding: 2rem;
        border-radius: 10px;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
    }
    .result-box {
        background-color: white;
        padding: 1.5rem;
        border-radius: 8px;
        border-left: 4px solid #1E88E5;
        margin: 1rem 0;
    }
    .validation-box {
        padding: 1rem;
        border-radius: 8px;
        margin-top: 1rem;
    }
    .stButton>button {
        background-color: #1E88E5;
        color: white;
        border-radius: 25px;
        padding: 0.5rem 2rem;
        border: none;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        transition: all 0.3s ease;
    }
    .stButton>button:hover {
        background-color: #1565C0;
        box-shadow: 0 4px 8px rgba(0,0,0,0.2);
        transform: translateY(-2px);
    }
    .stTextArea>div>div {
        border-radius: 8px;
        border: 2px solid #E3F2FD;
    }
    .sidebar-content {
        padding: 1rem;
        background-color: #F8F9FA;
        border-radius: 8px;
    }
    </style>
    """, unsafe_allow_html=True)
@st.cache_resource
def get_agent_manager():
    return AgentManager()
def show_results_page(result_data):
    st.markdown("<h1 class='main-header'>Final Validated Content</h1>", unsafe_allow_html=True)
    
    # Add a subheader based on the content type
    if "summary" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Medical Text Summary</h2>", unsafe_allow_html=True)
    elif "article" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Research Article</h2>", unsafe_allow_html=True)
    elif "sanitized_data" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Redacted PHI Content</h2>", unsafe_allow_html=True)
    
    # Display content in a styled box
    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
    if "summary" in result_data["result"]:
        st.write(result_data["result"]["summary"])
    elif "article" in result_data["result"]:
        st.write(result_data["result"]["article"])
    elif "sanitized_data" in result_data["result"]:
        st.write(result_data["result"]["sanitized_data"])
    st.markdown("</div>", unsafe_allow_html=True)
    
    # Action buttons in columns
    col1, col2, col3 = st.columns([1, 1, 1])
    with col2:
        # Export button
        if st.button("? Export Results"):
            export_data = ""
            if "summary" in result_data["result"]:
                export_data = result_data["result"]["summary"]
            elif "article" in result_data["result"]:
                export_data = result_data["result"]["article"]
            elif "sanitized_data" in result_data["result"]:
                export_data = result_data["result"]["sanitized_data"]
                
            st.download_button(
                label="? Download Content",
                data=export_data,
                file_name="final_content.txt",
                mime="text/plain"
            )
    
    with col3:
        # Return button
        if st.button("? Return to Main Page"):
            st.session_state.show_results = False
            st.rerun()
def main():
    # Sidebar styling
    with st.sidebar:
        st.markdown("<h2 style='text-align: center; color: #1E88E5;'>Tasks</h2>", unsafe_allow_html=True)
        st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)
        task_type = st.radio(
            "",  # Empty label as we're using custom header
            ["summarize", "write_article", "Redact PHI"],
            format_func=lambda x: {
                "summarize": "? Summarize Medical Text",
                "write_article": "? Write Research Article",
                "Redact PHI": "? Redact PHI"
            }[x]
        )
        st.markdown("</div>", unsafe_allow_html=True)
    
    # Main content - Single header for the entire page
    st.markdown("<h1 class='main-header'>Medical Multi-Agent System</h1>", unsafe_allow_html=True)
    
    # Initialize session state
    if 'show_results' not in st.session_state:
        st.session_state.show_results = False
    if 'result_data' not in st.session_state:
        st.session_state.result_data = None
    
    if st.session_state.show_results:
        show_results_page(st.session_state.result_data)
        return
    
    agent_manager = get_agent_manager()
    
    # Task containers with consistent styling
    st.markdown("<div class='task-container'>", unsafe_allow_html=True)
    
    if task_type == "summarize":
        st.markdown("<h2 class='sub-header'>? Summarize Medical Text</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to summarize", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("? Generate Summary"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("summarize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Summary")
                    st.write(result["result"]["summary"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("?️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "write_article":
        st.markdown("<h2 class='sub-header'>? Write Research Article</h2>", unsafe_allow_html=True)
        topic = st.text_input("Enter research topic")
        key_points = st.text_area("Enter key points (one per line)", height=150)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("? Generate Article"):
                with st.spinner("Processing..."):
                    input_data = {"topic": topic, "key_points": key_points}
                    result = asyncio.run(agent_manager.process_task("write_article", input_data))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Article")
                    st.write(result["result"]["article"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("?️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "Redact PHI":
        st.markdown("<h2 class='sub-header'>? Redact Protected Health Information (PHI)</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to redact PHI", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("? Redact PHI"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("sanitize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Redacted Text")
                    st.write(result["result"]["sanitized_data"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("?️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    st.markdown("</div>", unsafe_allow_html=True)
if __name__ == "__main__":
    main() 


运行 Streamlit 应用程序


streamlit run app.pypy


Streamlit 界面已被实例化。你现在可以在浏览器中查看你的 Streamlit 应用程序。本地 URL:http://localhost:8501。


Streamlit 用户界面。


5


已生成摘要


6


生成的摘要的验证结果


7


查看已验证内容


8


点击“导出结果”下载结果


9


撰写研究文章任务


10


生成文章


11


验证结果


12


查看最终编辑内容


13

14


编辑 PHI 任务


15


编辑文本


16


删节文本的验证分数


17


查看最终编辑内容


18


结论

这个多智能体 AI 系统展示了协作智能体在处理复杂医学文本方面的强大功能。通过利用 LLaMA 等高级模型并通过 Streamlit 提供用户友好的界面,该应用程序为摘要、文章撰写和 PHI 编辑等任务提供了实用的解决方案。随着 AI 的不断发展,这样的系统将在改变我们处理和解释医疗数据的方式方面发挥关键作用。


文章来源:https://medium.com/the-ai-forum/building-a-multi-agent-ai-system-from-scratch-for-medical-text-processing-dc6f10fc5f04
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消