使用Claude for Desktop创建并集成Kafka-MCP和Qdrant-MCP服务器

2025年03月28日 由 alex 发表 2130 0

构建真正有用的 AI 应用程序,通过创建Kafka-MCP 服务器并将其与我们现有的Qdrant-MCP 服务器连接,我们改变了团队处理通信和数据检索的方式。当我们将此设置链接到Claude for Desktop时,真正的奇迹发生了- 突然间我们的消息传递变得无缝,矢量搜索变得快如闪电。我们的 AI 和 Agentic 应用程序现在不仅更高效,而且真正直观,可以使用标准协议处理复杂的数据处理任务。


1


架构设计:


客户端(Claude Desktop)

系统从客户端应用程序开始,这些应用程序可能代表用户的前端界面。客户端通过模型上下文协议(MCP)与两个不同的服务器组件通信,该协议促进客户端和服务器之间的结构化数据交换。


Kafka 服务器

其中一条通信路径将客户端连接到Kafka MCP服务器。该服务器与Apache Kafka接口,后者是一个分布式事件流平台,能够处理高吞吐量、容错的实时数据流。图中的发布/消费关系表明:

  • 客户端可以将数据或事件发布到Kafka主题
  • 客户端可以从Kafka主题消费数据流
  • 系统利用Kafka的事件源和实时分析能力


Qdrant 服务器

第二条通信路径将客户端连接到Qdrant MCP服务器。Qdrant是一个向量相似性搜索引擎,专为生产级别的高负载应用程序而设计。图中显示的存储/查找连接表示:

  • 客户端可以在Qdrant中存储向量嵌入或其他结构化数据
  • 客户端可以执行相似性搜索或检索操作
  • 系统可能利用Qdrant进行语义搜索、推荐系统或其他基于向量的操作


图中显示的kafka-qdrant连接器是一个重要细节,表明数据在Kafka生态系统和Qdrant之间流动。该连接器可能确保两个系统之间的数据一致性,允许来自Kafka的事件更新向量存储或在Qdrant中触发操作。


什么是MCP?

模型上下文协议(MCP)是由Anthropic开发的一个开放标准,用于简化AI模型与外部工具、数据源和服务之间的交互。通过提供一个标准化的框架,MCP允许大型语言模型(LLM)访问和处理实时信息,超越它们的静态训练数据,从而增强其功能和适应性。


MCP的主要优势:

增强AI能力:通过使AI模型能够直接与外部系统交互,MCP允许它们执行诸如检索最新信息、在应用程序中执行操作和利用专业工具等任务,从而扩展其核心功能。

可扩展性:MCP的标准化协议促进了AI服务的更轻松扩展,使开发人员能够集成更多工具和数据源,而不会显著增加复杂性。


实施步骤:


安装Confluent Kafka:

Confluent平台提供了多种安装选项,以适应不同的环境和偏好:


ZIP和TAR档案:

适用于开发和生产环境,你可以下载平台作为ZIP或TAR文件。下载后,解压缩内容并根据需要配置环境变量。


软件包管理器:

  • Debian和Ubuntu:使用APT存储库通过systemd安装Confluent平台。此方法适合多节点环境。
  • RHEL、CentOS和Fedora:使用YUM存储库进行安装,确保与systemd集成以进行服务管理。
  • Docker:对于容器化环境,Confluent提供了Docker镜像。此方法有利于开发和测试场景。
  • Confluent CLI:Confluent命令行界面可以单独安装,提供简化的Confluent平台组件管理。适用于macOS、Linux和Windows。


编排安装:

  • Kubernetes上的Confluent:在Kubernetes集群上部署Confluent平台,利用Kubernetes的编排能力。
  • Ansible Playbooks:使用Ansible自动化部署过程,适合在多个环境中实现一致的设置。


一旦Confluent平台安装完成,使其按如下方式启动和运行(Mac版本)。


╭─ ~ ·············································✔  base   at 11:55:15 AM 
╰─ confluent local services start          
# The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html
# As of Confluent Platform 8.0, Java 8 will no longer be supported.
# Using CONFLUENT_CURRENT: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152
# ZooKeeper is [UP]
# Kafka is [UP]
# Schema Registry is [UP]
# Kafka REST is [UP]
# Connect is [UP]
# Starting ksqlDB Server
# ksqlDB Server is [UP]
# Starting Control Center
# Control Center is [UP]


访问 http://localhost:9021


屏幕截图2025-03-28114607


使用 Confluent CLI 按如下方式安装 Kafka-Qdrant 连接器。


╭─ ~ ······················································ ✔  took 37s   base     at 11:55:56 AM  
╰─  confluent-hub install qdrant/qdrant-kafka:1.1.0
The component can be installed in any of the following Confluent Platform installations: 
  1. /Users/pavanmantha/Pavans/confluent-7.9.0 (based on $CONFLUENT_HOME) 
  2. /Users/pavanmantha/Pavans/confluent-7.9.0 (where this tool is installed) 
Choose one of these to continue the installation (1-2): 1
Do you want to install this into /Users/pavanmantha/Pavans/confluent-7.9.0/share/confluent-hub-components? (yN) y
 
Component's license: 
The Apache License, Version 2.0 
https://www.apache.org/licenses/LICENSE-2.0 
I agree to the software license agreement (yN) y
You are about to install 'qdrant-kafka' from Qdrant, as published on Confluent Hub. 
Do you want to continue? (yN) y
Downloading component Qdrant Connector for Apache Kafka 1.1.2, provided by Qdrant from Confluent Hub and installing into /Users/pavanmantha/Pavans/confluent-7.9.0/share/confluent-hub-components 
Detected Worker's configs: 
  1. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-distributed.properties 
  2. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-standalone.properties 
  3. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-distributed.properties 
  4. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-standalone.properties 
  5. Based on CONFLUENT_CURRENT: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties 
  6. Used by Connect process with PID 29014: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties 
Do you want to update all detected configs? (yN) y
Adding installation directory to plugin path in the following files: 
  /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-distributed.properties 
  /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-standalone.properties 
  /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-distributed.properties 
  /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-standalone.properties 
  /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties 
  /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties 
 
Completed


3


编辑 QdrantSinkConnector 的配置,如下所示,或者你可以通过上传以前项目中的预配置进行配置。


{
  "name": "QdrantSinkConnectorConnector_0",
  "config": {
    "name": "QdrantSinkConnectorConnector_0",
    "connector.class": "io.qdrant.kafka.QdrantSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "mcp_topic",
    "errors.deadletterqueue.topic.name": "dead_queue",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "qdrant.grpc.url": "https://0029f2c8-edd5-412d-8924-a7c8f0481362.europe-west3-0.gcp.cloud.qdrant.io",
    "qdrant.api.key": "****************************************************************************************************",
    "value.converter.schemas.cache.size": "1",
    "value.converter.schemas.enable": "false"
  }
}


4


完成并保存后,QdrantSinkConnector 应处于运行状态,并且 CDC 现在已准备就绪。


Kafka MCP服务器:

Kafka MCP服务器的项目结构如下。


.
├── LICENSE
├── README.md
├── kafka_mcp_server.iml
├── requirements.txt
└── src
    ├── __pycache__
    └── kafka_mcp_server
        ├── __init__.py
        ├── kafka.py
        ├── main.py
        ├── server.py
        └── settings.py


settings.py


from pydantic_settings import BaseSettings
from pydantic import Field
from typing import Optional
DEFAULT_TOOL_PUBLISH_DESCRIPTION = (
    "publish the information to the kafka topic for the down stream usage."
)
DEFAULT_TOOL_CONSUME_DESCRIPTION = (
    "Look up topics in kafka. Use this tool when you need to: \n"
    " - consume information from the topics\n"
)

class ToolSettings(BaseSettings):
    """
    Configuration for all the tools.
    """
    tool_publish_description: str = Field(
        default=DEFAULT_TOOL_PUBLISH_DESCRIPTION,
        validation_alias="TOOL_PUBLISH_DESCRIPTION",
    )
    tool_consume_description: str = Field(
        default=DEFAULT_TOOL_CONSUME_DESCRIPTION,
        validation_alias="TOOL_CONSUME_DESCRIPTION",
    )
class KafkaSettings(BaseSettings):
    """
    Configuration for the Kafka connector.
    """
    bootstrap_server: Optional[str] = Field(default=None, validation_alias="KAFKA_BOOTSTRAP_SERVERS")
    topic_name: Optional[str] = Field(default=None, validation_alias="TOPIC_NAME")
    from_beginning: Optional[bool] = Field(default=False, validation_alias="IS_TOPIC_READ_FROM_BEGINNING")
    group_id: Optional[str] = Field(default="kafka-mcp-group", validation_alias="DEFAULT_GROUP_ID_FOR_CONSUMER")

    def get_kafka_bootstrap_server(self) -> str:
        """
        Get the Kafka location from bootstrap URL.
        """
        return self.bootstrap_server


上面的代码是初始加载文件,它将从 .env 文件中读取所需的配置,例如 Kafka 服务器名称和主题名称等。同时,它还将提供适当的工具描述作为全局配置。


kafka.py


import logging
import json
import uuid
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class KafkaConnector:
    """
    Encapsulates the connection to a kafka server and all the methods to interact with it.
    :param kafka_bootstrap_url: The URL of the kafka server.
    :param topic_name: The topic to which the client will talk to.
    """
    def __init__(self, kafka_bootstrap_url: str, topic_name: str, group_id: str):
        self.KAFKA_BOOTSTRAP_SERVERS = kafka_bootstrap_url
        self.topic_name = topic_name
        self.group_id = group_id
        self.producer = None
    async def create_producer(self):
        """Create and start a Kafka producer."""
        producer = AIOKafkaProducer(bootstrap_servers=self.KAFKA_BOOTSTRAP_SERVERS)
        await producer.start()
        logger.info(f"Kafka producer started, connected to {self.KAFKA_BOOTSTRAP_SERVERS}")
        self.producer = producer
        return producer
    async def close_producer(self):
        """Close the Kafka producer."""
        await self.producer.stop()
        logger.info("Kafka producer stopped")
    async def publish(self, value):
        """
        Publish a message to the specified Kafka topic.
        Args:
            producer: AIOKafkaProducer instance
            topic_name: Topic to publish to
            key: Message key (can be None)
            value: Message value
        """
        try:
            key = str(uuid.uuid4())
            # Convert value to bytes if it's not already
            if isinstance(value, dict):
                value_bytes = json.dumps(value).encode('utf-8')
            elif isinstance(value, str):
                value_bytes = value.encode('utf-8')
            else:
                value_bytes = value
            # Convert key to bytes if it's not None and not already bytes
            key_bytes = None
            if key is not None:
                if isinstance(key, str):
                    key_bytes = key.encode('utf-8')
                else:
                    key_bytes = key
            # Send message
            await self.producer.send_and_wait(self.topic_name, value=value_bytes, key=key_bytes)
            logger.info(f"Published message with key {key} to topic {self.topic_name}")
        except Exception as e:
            logger.error(f"Error publishing message: {e}")
            raise
    async def consume(self, from_beginning=True):
        """
        Consume messages from the specified Kafka topics.
        Args:
            from_beginning: Whether to start consuming from the beginning
        """
        # Convert single topic to list
        if isinstance(self.topic_name, str):
            topics = [self.topic_name]
        # Set auto_offset_reset based on from_beginning
        auto_offset_reset = 'earliest' if from_beginning else 'latest'
        # Create consumer
        consumer = AIOKafkaConsumer(
            *topics,
            bootstrap_servers=self.KAFKA_BOOTSTRAP_SERVERS,
            group_id=self.group_id,
            auto_offset_reset=auto_offset_reset,
            enable_auto_commit=True,
        )
        # Start consumer
        await consumer.start()
        logger.info(f"Kafka consumer started, subscribed to {topics}")
        messages = []
        try:
            # Get a batch of messages with timeout
            batch = await consumer.getmany(timeout_ms=5000)
            for tp, msgs in batch.items():
                for msg in msgs:
                    logger.info(f"Raw message received: {msg}")
                    processed_message = await self._process_message(msg)
                    messages.append(processed_message)
            return messages
        finally:
            # Close consumer
            await consumer.stop()
            logger.info("Kafka consumer stopped")
    async def _process_message(self, msg):
        """
       Process a message received from Kafka.
       Args:
           msg: Message object from Kafka
       """
        try:
            # Decode the message value
            if msg.value:
                try:
                    value = json.loads(msg.value.decode('utf-8'))
                except json.JSONDecodeError:
                    value = msg.value.decode('utf-8')
            else:
                value = None
            # Decode the message key
            key = msg.key.decode('utf-8') if msg.key else None
            logger.info(f"Received message: Topic={msg.topic}, Partition={msg.partition}, "
                        f"Offset={msg.offset}, Key={key}, Value={value}")
            # Your message processing logic here
            return value
        except Exception as e:
            logger.error(f"Error processing message: {e}")
            raise


kafka.py 文件将提供我们与 Kafka 服务器联系并将数据发布到主题所需的所有操作。


server.py


import logging
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
from mcp.server import Server
from mcp.server.fastmcp import Context, FastMCP
from kafka import KafkaConnector
from settings import KafkaSettings, ToolSettings
# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
    """
    Context manager to handle the lifespan of the server.
    This is used to configure the kafka connector.
    All the configuration is now loaded from the environment variables.
    Settings handle that for us.
    """
    try:
        kafka_configurations = KafkaSettings()
        logger.info(
            f"Connecting to kafka at {kafka_configurations.get_kafka_bootstrap_server()}"
        )
        kafka_connector = KafkaConnector(kafka_bootstrap_url=kafka_configurations.bootstrap_server,
                                         topic_name=kafka_configurations.topic_name,
                                         group_id=kafka_configurations.group_id)
        await kafka_connector.create_producer()
        yield {
            "kafka_connector": kafka_connector,
        }
    except Exception as e:
        logger.error(e)
        raise e
    finally:
        pass

# FastMCP is an alternative interface for declaring the capabilities
# of the server. Its API is based on FastAPI.
mcp = FastMCP("mcp-server-kafka", lifespan=server_lifespan)
# Load the tool settings from the env variables, if they are set,
# or use the default values otherwise.
tool_settings = ToolSettings()
@mcp.tool(name="kafka-publish", description=tool_settings.tool_publish_description)
async def publish(ctx: Context, information: Any) -> str:
    """
    :param ctx:
    :param information:
    :return:
    """
    await ctx.debug(f"Storing information {information} in kafka topic")
    kafka_connector: KafkaConnector = ctx.request_context.lifespan_context[
        "kafka_connector"
    ]
    await kafka_connector.publish(value=information)
    return f"published: {information}"

@mcp.tool(name="kafka-consume", description=tool_settings.tool_consume_description)
async def consumer(ctx: Context) -> str:
    """
    :param ctx:
    :param information:
    :return:
    """
    await ctx.debug(f"consuming information from kafka")
    kafka_connector: KafkaConnector = ctx.request_context.lifespan_context[
        "kafka_connector"
    ]
    information = await kafka_connector.consume()
    return f"consumed: {information}"


上述 server.py 代码是实际创建 MCP 服务器的文件,并配置了两个工具,即 kafka-publish 和 kafka-consume,这些工具对客户端可见,以便使用。


main.py:这只是一个驱动代码。


import argparse
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
def main():
    """
    Main entry point for the mcp-server-kafka script.
    It runs the MCP server with a specific transport protocol.
    """
    # Parse the command-line arguments to determine the transport protocol.
    parser = argparse.ArgumentParser(description="mcp-server-kafka")
    parser.add_argument(
        "--transport",
        choices=["stdio", "sse"],
        default="stdio",
    )
    args = parser.parse_args()
    # Import is done here to make sure environment variables are loaded
    # only after we make the changes.
    from server import mcp
    mcp.run(transport=args.transport)
if __name__ == "__main__":
    main()


接下来,让我们使 Claude for Desktop 知道 MCP 服务器的存在,因此需要编辑

claude_desktop_config.json。


{
    "mcpServers": {
        "kafka": {
            "command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
            "args": [
                "/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
            ]
        },
    }
}


Fastembed MCP服务器:

Fastembed MCP服务器的项目结构如下。


.
├── LICENSE
├── README.md
├── fastembed_mcp_server.iml
├── requirements.txt
└── src
    └── fastembed_mcp_server
        ├── __init__.py
        ├── fastembed_connector.py
        ├── main.py
        ├── server.py
        └── settings.py


settings.py 该文件首先读取所有所需配置,主要包括 embed_model 名称和全局工具描述,作为 ToolSettings。


from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings
DEFAULT_TOOL_LIST_EMBEDDINGS = (
    "List the embeddings that are available with fastembed."
)
DEFAULT_TOOL_EMBED_DOCUMENTS = (
    "Look up given documents to vector embed them. Use this tool when you need to: \n"
    " - vectorize the given documents\n"
)
class ToolSettings(BaseSettings):
    """
    Configuration for all the tools.
    """
    tool_list_embeddings_description: str = Field(
        default=DEFAULT_TOOL_LIST_EMBEDDINGS,
        validation_alias="TOOL_LIST_EMBEDDINGS_DESCRIPTION",
    )
    tool_embed_documents_description: str = Field(
        default=DEFAULT_TOOL_EMBED_DOCUMENTS,
        validation_alias="TOOL_EMBED_DOCUMENTS_DESCRIPTION",
    )

class FastembedSettings(BaseSettings):
    """
    Configuration for the fastembed connector.
    """
    embed_model: Optional[str] = Field(default=None, validation_alias="EMBED_MODEL")
    def get_fastembed_model(self) -> str:
        """
        Get the embedding model name.
        """
        return self.embed_model


fastembed_connector.py 这是主要的文件,它将提供 fastembed 的实际操作,例如列出可用模型或将文档嵌入到服务器。


import logging
from fastembed import TextEmbedding
from typing import List, Dict, Any
# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FastembedConnector:
    def __init__(self, embed_model: str):
        self.embedding = TextEmbedding(model_name=embed_model)
    async def supported_embeddings(self) -> List[Dict[str, Any]]:
        """
        :return:
        """
        try:
            return TextEmbedding.list_supported_models()
        except Exception as e:
            logger.error(f"Error while fetching supported models: {e}")
            raise
    async def embed_documents(self, value):
        """
        :param value:
        :return:
        """
        embedding = None
        embeddings_generator = None
        try:
            if isinstance(value, str):
                embeddings_generator = self.embedding.embed([value])
                logger.info(f"document {value} embedded to {embeddings_generator}")
            if isinstance(value, list):
                embeddings_generator = self.embedding.embed(value)
                logger.info(f"documents {value} embedded to {embeddings_generator}")
            embedding = list(embeddings_generator)
            logger.info(f"documents {value} embedded to {embedding}")
            return embedding
        except Exception as e:
            logger.error(f"Error embedding documents: {e}")
            raise


server.py 这是实际启动 MCP 服务器的文件,以及对 MCP 客户端(在我们的例子中是 Claude Desktop)公开的工具。


import logging
from mcp.server import Server
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
from fastembed_connector import FastembedConnector
from settings import FastembedSettings, ToolSettings
# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
    """
    :param server:
    :return:
    """
    try:
        fastembed_configuration = FastembedSettings()
        logger.info(f"Connecting to fastembed")
        fastembed_connector = FastembedConnector(embed_model=fastembed_configuration.embed_model)
        yield {
            "fastembed_connector": fastembed_connector
        }
    except Exception as e:
        logger.error(e)
        raise e
# FastMCP is an alternative interface for declaring the capabilities
# of the server. Its API is based on FastAPI.
mcp = FastMCP("mcp-server-fastembed", lifespan=server_lifespan)
# Load the tool settings from the env variables, if they are set,
# or use the default values otherwise.
tool_settings = ToolSettings()
@mcp.tool(name="supported-embedding-models", description=tool_settings.tool_list_embeddings_description)
async def list_supported_embedding_models(ctx: Context) -> Any:
    await ctx.debug(f"fetching the list of supported models from fastembed")
    fastembed_connector: FastembedConnector = ctx.request_context.lifespan_context["fastembed_connector"]
    embeddings = await fastembed_connector.supported_embeddings()
    return embeddings
@mcp.tool(name="embed-documents", description=tool_settings.tool_embed_documents_description)
async def list_supported_embedding_models(ctx: Context, documents: Any) -> Any:
    await ctx.debug(f"embedding the provided documents {documents}")
    fastembed_connector: FastembedConnector = ctx.request_context.lifespan_context["fastembed_connector"]
    embeddings = await fastembed_connector.embed_documents(value=documents)
    return embeddings


main.py 驱动程序代码


import argparse
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
def main():
    """
    Main entry point for the mcp-server-fastembed script.
    It runs the MCP server with a specific transport protocol.
    """
    # Parse the command-line arguments to determine the transport protocol.
    parser = argparse.ArgumentParser(description="mcp-server-fastembed")
    parser.add_argument(
        "--transport",
        choices=["stdio", "sse"],
        default="stdio",
    )
    args = parser.parse_args()
    # Import is done here to make sure environment variables are loaded
    # only after we make the changes.
    from server import mcp
    mcp.run(transport=args.transport)
if __name__ == "__main__":
    main()


接下来,让我们使 Claude for Desktop 知道 MCP 服务器的存在,因此需要编辑 claude_desktop_config.json。


{
    "mcpServers": {
        "kafka": {
            "command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
            "args": [
                "/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
            ]
        },
        "fastembed": {
            "command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
            "args": [
                "/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/fastembed_mcp_server/src/fastembed_mcp_server/main.py"
            ]
        }
    }
}


安装 Qdrant MCP 服务器:

你可以按照以下步骤启动并运行 Qdrant MCP 服务器。


git clone https://github.com/qdrant/mcp-server-qdrant
QDRANT_URL="http://localhost:6333" \
COLLECTION_NAME="test" \
EMBEDDING_MODEL="BAAI/bge-small-en-v1.5" \
uvx mcp-server-qdrant
{
    "mcpServers": {
        "kafka": {
            "command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
            "args": [
                "/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
            ]
        },
        "fastembed": {
            "command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
            "args": [
                "/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/fastembed_mcp_server/src/fastembed_mcp_server/main.py"
            ]
        },
        "qdrant": {
            "command": "/Users/pavanmantha/.local/bin/uvx",
            "args": [
                "mcp-server-qdrant"
            ],
            "env": {
                "QDRANT_URL": "http://localhost:6333",
                "QDRANT_API_KEY": "qdrant_db_api_key",
                "COLLECTION_NAME": "test",
                "EMBEDDING_MODEL": "BAAI/bge-small-en-v1.5"
            }
        }
    }
}


结果:

通过以上所有 MCP 服务器和配置,我们使 Claude for Desktop 成为一个强大的代理,可以为我们执行一些复杂的任务。


5


6


7


8


9


10


11


结论:

这种基于 MCP(模型上下文协议)的架构使 Claude Desktop 能够无缝地与 Kafka、Qdrant 和 FastEmbed 进行交互,解锁了高效、可扩展和智能的 AI 驱动管道。通过 Kafka 处理实时消息流,Qdrant 实现基于向量的搜索和检索,以及 FastEmbed 加速生成嵌入,这一系统具备实时上下文理解、快速语义搜索和可扩展的 AI 部署能力。通过 Kafka-Qdrant 连接器进行的集成确保了信息的顺畅流动,使该架构非常适合个性化助手、智能搜索引擎、推荐系统和自动化决策工作流程等应用。这是一种面向未来的 AI 代理设计,提升了 AI 驱动生态系统中的性能和适应性。

文章来源:https://medium.com/towardsdev/kafka-mcp-and-qdrant-mcp-creating-and-integrating-mcp-servers-with-claude-for-desktop-2da1ef7727bf
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消