使用CrewAI和Qdrant为医疗数据构建Agentic RAG管道

2025年01月14日 由 alex 发表 603 0

这篇文章将演示如何利用 Qdrant 和 Crewai 构建数据输入管道和 RAG 代理。我们考虑一个现有的医院管理系统,其中传统数据被输入到数据库(例如 MongoDB)中。这些数据随后被流式传输到 Kafka,然后在 Qdrant 中矢量化。Crewai 代理将与 Qdrant 合作以满足用户请求。


3


连续数据摄取

将任何数据库与Kafka和Qdrant相结合,可以创建一个强大且高效的数据流管道。Qdrant Sink连接器确保了数据能够从Kafka持续不断地摄入到Qdrant中,无需人工干预。这种实时集成对于依赖最新数据进行决策和分析的应用程序至关重要。该管道整合了MongoDB或任何传统数据库(如PostgreSQL)的数据存储能力、Kafka的数据流能力以及Qdrant的向量搜索能力,为大规模数据的管理和实时处理提供了全面的解决方案。该架构的可扩展性、容错性和实时处理能力是其有效性的关键,使其成为当代数据驱动型应用程序的通用解决方案。


假设Confluent Kafka和Qdrant均已启动并运行,请按照以下步骤安装Qdrant Kafka连接器。


confluent-hub install qdrant/qdrant-kafka:1.1.0


此命令直接从Confluent Hub检索并安装指定的连接器到你的Confluent平台或Kafka Connect环境中。安装过程会自动管理所有必需的依赖项,从而简化Qdrant Kafka连接器与你当前配置的无缝集成。安装完成后,可以通过Confluent Control Center或Kafka Connect REST API来配置和管理连接,从而实现Kafka与Qdrant之间高效的数据流传输,无需复杂的人工配置。


4


安装完连接器后,请确保其配置如下。请记住,key.converter和value.converter对于Kafka安全地将消息从主题传递到Qdrant非常重要。


{
  "name": "QdrantSinkConnectorConnector_0",
  "config": {
    "value.converter.schemas.enable": "false",
    "name": "QdrantSinkConnectorConnector_0",
    "connector.class": "io.qdrant.kafka.QdrantSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "topic_62,qdrant_kafka.docs",
    "errors.deadletterqueue.topic.name": "dead_queue",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "qdrant.grpc.url": "http://localhost:6334",
    "qdrant.api.key": "************"
  }
}


按如下方式安装MongoDB Kafka连接器。


confluent-hub install mongodb/kafka-connect-mongodb:latest


安装完MongoDB连接器后,连接器的配置如下所示。


{
  "name": "MongoSourceConnectorConnector_0","name": "MongoSourceConnectorConnector_0",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true",
    "database": "qdrant_kafka",
    "collection": "docs",
    "publish.full._document.only": "true",
    "topic.namespace.map": "{\"*\":\"qdrant_kafka.docs\"}",
    "copy.existing": "true"
  }
}


完成这些步骤后,插入到MongoDB中的数据将无缝流动到Qdrant。


Crewai智能体在行动:

假设有大量记录被导入到Qdrant中(下面是一个示例),现在是我们构建智能体并为这些智能体分配适当任务的时候了,以便从它们那里获得正确的答案并协助用户。在我们的案例中,用户是医生和医院管理人员。


5


实现

下面的代码展示了一个用于通过现代AI方法查询医疗记录的健壮系统。两个基本类相互协作,以促进医疗记录的智能搜索。


第一个类SearchInput是一个基础但至关重要的组件,它定义了搜索查询的框架。它继承自BaseModel,并且只需要一个输入:字符串格式的搜索查询。可以将其视为网站搜索框的数字对应物,尽管它对允许的输入类型有严格的规定。这种标准化保证了搜索请求的一致性,提高了系统的可靠性和可维护性。


第二个类SearchMedicalHistoryTool是真正实现功能的地方。该程序旨在通过一种称为向量相似性的方法来搜索医疗记录。它不仅仅匹配关键词,而是利用OpenAI的嵌入方法将搜索查询转换为数学表示(即向量)。这个向量封装了查询的语义意义,使系统能够理解搜索背后的上下文和意图,而不仅仅是确切的术语。


class SearchInput(BaseModel):
    """Input schema for search tool."""
    query: str = Field(..., description="The search query")

class SearchMedicalHistoryTool(BaseTool):
    name: str = "search_medical_records"
    description: str = "Search through medical records using vector similarity"
    args_schema: Type[BaseModel] = SearchInput
    def _run(self, query: str) -> Any:
        # Use OpenAI embeddings to match data_loader.py
        query_vector = next(embedding_model.query_embed(query=query))
        search_results = qdrant_client.search(
            collection_name='medical_records',
            query_vector=query_vector,
            limit=10,
            score_threshold=0.7
        )
        return [
            {
                "score": hit.score,
                "text": hit.payload.get('text', 'N/A'),
            }
            for hit in search_results
        ]


该程序随后使用特定的数据库客户端(Qdrant)来识别最相关的医疗记录。它寻找与搜索查询在数学上相似的记录,并返回最多10条相似性阈值超过0.7的结果(在1.0表示完全匹配的尺度上)。每个结果都包含一个相关性得分以及医疗记录中对应的文本。


该系统的优势在于其能够理解医学术语和上下文的细微差别。例如,如果医生查询“胸痛伴呼吸困难的病例”,系统可能会使用不同但相关的医学术语来识别相关记录,这对于需要快速获取重要患者数据的医疗从业者来说是一个宝贵的资源。


def trigger_crew(query: str) -> str:
    # initialize the tools
    search_tool = SearchMedicalHistoryTool()
    # Create agents
    medical_data_search = Agent(
        role='Medical Records Search Assistant',
        goal='Find and analyze relevant information',
        backstory="""You are an expert at finding and analyzing information.
                  You know when to search medical history records, and when 
                  to perform detailed analysis.""",
        tools=[search_tool],
        verbose=True
    )
    data_synthesizer = Agent(
        role='Information Synthesizer',
        goal='Create comprehensive and clear responses',
        backstory="""You excel at taking raw information and analysis
                  and creating clear, and present them as actionable insights.""",
        verbose=True
    )
    # Create tasks with expected_output
    data_search_task = Task(
        description=f"""Process this query: '{query}'
                    2. If it needs medical history information, use the search tool.
                    3. For detailed analysis, use search tool.
                    Explain your tool selection and process.""",
        expected_output="""A dictionary containing:
                       - The tools used
                       - The raw results from each tool
                       - Any analysis performed""",
        agent=medical_data_search
    )
    data_synthesis_task = Task(
        description="""Take the research results and create a clear response.
                    Explain the process used and why it was appropriate.
                    Make sure the response directly addresses the original query.""",
        expected_output="""A clear, structured response that includes:
                       - Direct answer to the query
                       - Supporting evidence from the research
                       - present it in the form of bullets""",
        agent=data_synthesizer
    )
    # Create and run crew
    crew = Crew(
        agents=[medical_data_search, data_synthesizer],
        tasks=[data_search_task, data_synthesis_task],
        verbose=True
    )
    result = crew.kickoff()
    return str(result)


上述代码建立了一个 Agentic AI 驱动的医学研究系统,该系统使用一组专门的 AI 代理来处理和解释医学咨询。它类似于一群熟练的助手在协作,每个助手都有不同的专业功能


trigger_crew函数是这个数字团队的指挥者。它接受一个查询字符串作为输入,并利用两个专门的人工智能智能体来协调响应。这种方法类似于医疗团队的合作,当各种专业人员运用他们的知识来解决一个复杂问题时。


第一个智能体medical_data_search被设计为医疗记录检索和分析的专家。可以将其视为一个熟练的研究助手,擅长准确判断何时以及如何检查病史。它配备了之前提到的搜索功能,使其能够以语义理解来分析医疗记录。


第二个智能体data_synthesizer充当团队的沟通专家。它的功能是将第一个智能体的初步研究和分析转化为连贯、可操作的发现。这类似于医疗从业者如何解释复杂的测试数据,以既准确又易懂的方式表达出来。


通过Task类为每个智能体分配了不同的职责。医疗搜索任务旨在确定查询是否需要过去的医疗数据,并在适当时进行深入分析。合成任务则强调制定一个连贯、有条理且直接针对初始询问的响应,并附上佐证证据。


Crew类集成了所有组件,管理智能体之间的工作流程及其指定职责。它就像一个项目经理,确保每个团队成员的贡献能够无缝过渡到下一阶段,最终为初始询问提供全面的解决方案。


将所有这些组合在一起,可以作为一个独立的智能体检索增强(RAG)应用程序来运行。


import os
import sys
from qdrant_client import QdrantClient
from fastembed.text import TextEmbedding
from pydantic import BaseModel, Field
from typing import Type, Any
from crewai import Agent, Task, Crew
from crewai.tools import BaseTool
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
# Initialize qdrant client
qdrant_client = QdrantClient(url=os.environ.get('QDRANT_URL'), api_key=os.environ.get('QDRANT_API_KEY'))
# initialize the text embedding
embedding_model = TextEmbedding(model_name='snowflake/snowflake-arctic-embed-m')

class SearchInput(BaseModel):
    """Input schema for search tool."""
    query: str = Field(..., description="The search query")

class SearchMedicalHistoryTool(BaseTool):
    name: str = "search_medical_records"
    description: str = "Search through medical records using vector similarity"
    args_schema: Type[BaseModel] = SearchInput
    def _run(self, query: str) -> Any:
        # Use OpenAI embeddings to match data_loader.py
        query_vector = next(embedding_model.query_embed(query=query))
        search_results = qdrant_client.search(
            collection_name='medical_records',
            query_vector=query_vector,
            limit=10,
            score_threshold=0.7
        )
        return [
            {
                "score": hit.score,
                "text": hit.payload.get('text', 'N/A'),
            }
            for hit in search_results
        ]

def trigger_crew(query: str) -> str:
    # initialize the tools
    search_tool = SearchMedicalHistoryTool()
    # Create agents
    medical_data_search = Agent(
        role='Medical Records Search Assistant',
        goal='Find and analyze relevant information',
        backstory="""You are an expert at finding and analyzing information.
                  You know when to search medical history records, and when 
                  to perform detailed analysis.""",
        tools=[search_tool],
        verbose=True
    )
    data_synthesizer = Agent(
        role='Information Synthesizer',
        goal='Create comprehensive and clear responses',
        backstory="""You excel at taking raw information and analysis
                  and creating clear, and present them as actionable insights.""",
        verbose=True
    )
    # Create tasks with expected_output
    data_search_task = Task(
        description=f"""Process this query: '{query}'
                    2. If it needs medical history information, use the search tool.
                    3. For detailed analysis, use search tool.
                    Explain your tool selection and process.""",
        expected_output="""A dictionary containing:
                       - The tools used
                       - The raw results from each tool
                       - Any analysis performed""",
        agent=medical_data_search
    )
    data_synthesis_task = Task(
        description="""Take the research results and create a clear response.
                    Explain the process used and why it was appropriate.
                    Make sure the response directly addresses the original query.""",
        expected_output="""A clear, structured response that includes:
                       - Direct answer to the query
                       - Supporting evidence from the research
                       - present it in the form of bullets""",
        agent=data_synthesizer
    )
    # Create and run crew
    crew = Crew(
        agents=[medical_data_search, data_synthesizer],
        tasks=[data_search_task, data_synthesis_task],
        verbose=True
    )
    result = crew.kickoff()
    return str(result)

if __name__ == "__main__":
    while True:
        query = input("\nEnter your query (type 'bye' or 'quit' to exit): ").strip()
        if query.lower() in ['bye', 'quit']:
            print("Goodbye!")
            break
        if not query:
            print("Please enter a valid query.")
            continue
        try:
            result = trigger_crew(query)
            print(f"\nResult: {result}")
        except Exception as e:
            print(f"Error processing query: {str(e)}")


构建用户界面

以下代码使用Streamlit为医疗记录搜索系统创建了一个简洁的网页界面。该界面包含一个简单的查询输入框,该输入框连接到我们之前讨论的由智能体AI驱动的搜索后端。


界面以聊天形式的历史记录展示所有搜索及其结果,按时间倒序排列。每个查询和响应都存储在会话状态中,并以可展开的部分显示。该应用程序包含强大的错误处理功能,以清晰显示搜索过程中出现的任何问题。


import streamlit as st
import traceback
from rag_agents import trigger_crew
import openlit
openlit.init(otlp_endpoint="http://127.0.0.1:4318")

def main():
    st.set_page_config(
        page_title="Query Interface",
        page_icon="?",
        layout="wide"
    )
    st.title("Medical Case History Retriever")
    st.markdown("Enter your query below to get started.")
    # Initialize session state
    if 'chat_history' not in st.session_state:
        st.session_state.chat_history = []
    # Query input using a form
    with st.form(key='query_form'):
        query = st.text_input(
            "Enter your query:",
            key="query_input",
            placeholder="Type your query here..."
        )
        submit_button = st.form_submit_button("Submit")
    # Clear history button outside the form
    if st.button("Clear History"):
        st.session_state.chat_history = []
    # Process the query when form is submitted
    if submit_button and query:
        try:
            result = trigger_crew(query)
            # Add to chat history
            st.session_state.chat_history.append({
                "query": query,
                "result": result,
                "error": None
            })
        except Exception as e:
            error_msg = f"Error: {str(e)}\n{traceback.format_exc()}"
            st.session_state.chat_history.append({
                "query": query,
                "result": None,
                "error": error_msg
            })
    # Display chat history in reverse chronological order
    st.markdown("### Chat History")
    for item in reversed(st.session_state.chat_history):
        with st.expander(f"Query: {item['query']}", expanded=True):
            if item['result']:
                st.success(item['result'])
            if item['error']:
                st.error(item['error'])
    # Add some helpful information at the bottom
    st.markdown("---")
    st.markdown("""
    **Tips:**
    - Enter your query in the text box above
    - Press Enter or click Submit to process your query
    - Click Clear History to start fresh
    - Each query and its result will be saved in the chat history
    """)

if __name__ == "__main__":
    main()


关键功能包括一个主要的搜索表单、一个清除历史按钮以及底部的一个提示部分,以帮助用户导航界面。设计注重简洁性和可用性,同时为医疗专业人员提供了有效搜索医疗记录所需的所有必要功能。


结论

这个医疗记录搜索系统展示了将当代人工智能方法与实用的医疗保健应用相结合的有效性。该技术利用向量相似度搜索、一组专门的AI智能体以及直观的界面,为医疗从业者提供了一种有效的检索和评估患者记录的手段。搜索中融入的语义理解以及清晰的结果展示,使得该系统成为需要快速、准确获取患者信息的医疗从业者不可或缺的资源。

文章来源:https://medium.com/@manthapavankumar11/building-agentic-rag-pipelines-for-medical-data-with-crewai-and-qdrant-3a00a48fb0d1
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消