探讨模型上下文协议 (MCP)

2025年03月14日 由 alex 发表 2944 0

虽然模型上下文协议(MCP)目前主要在讨论工具集成方面,但其上下文管理能力同样是该协议的一个重要方面,甚至可以说是更基础的一个方面。


MCP中的上下文管理解决了几个关键挑战:

  • 上下文窗口限制,大型语言模型(LLMs)的上下文窗口是有限的。MCP提供了一种标准化的方法来管理、优先处理和压缩对话历史,以最大限度地有效利用这一有限空间。
  • 有状态的对话,通过在模型外部维护对话状态,MCP使得更长、更连贯的互动成为可能,这些互动超越了单次交流的限制。
  • 内存管理,MCP允许选择性保留重要信息,同时丢弃不太相关的上下文,为AI系统创建了一个更高效的“工作记忆”。


2


跨会话的上下文共享,通过适当的实现,MCP能够在不同的会话甚至不同的模型之间保持上下文的一致性,为用户提供连续的体验。


结构化的知识表示,MCP并非将所有上下文视为简单的标记序列,而是能够在上下文中实现更结构化的知识表示。


检索增强,MCP提供了一个框架,用于从外部来源动态检索相关信息,并将其集成到上下文中。


工具连接方面可能受到了更多的关注,因为它带来的变革更为显而易见——使大型语言模型(LLM)能够执行操作。


但上下文管理能力同样具有革命性,因为它们解决了LLMs随时间与信息交互的根本局限性。


事实上,有效的上下文管理是实现有意义工具使用的前提条件——模型需要理解之前使用了哪些工具,这些工具返回了什么信息,以及这些信息与当前对话状态的关系。


MCP 与上下文


MCP 服务器

以下是一个MCP服务器和客户端管理上下文的实用示例。你可以在你的MacBook上本地运行此MCP配置……操作方法如下……


使用终端窗口,创建一个名为mcp_context_server.py的文件,并将下面的代码窗口复制到文件中并保存。


# mcp_context_server.py
import json
import uuid
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
# In-memory storage for context/sessions
context_store = {}
# Simple weather database
WEATHER_DATA = {
    "New York": {"temperature": 72, "condition": "Sunny", "humidity": 65},
    "London": {"temperature": 60, "condition": "Rainy", "humidity": 80},
    "Tokyo": {"temperature": 75, "condition": "Partly Cloudy", "humidity": 70},
    "Sydney": {"temperature": 80, "condition": "Clear", "humidity": 55},
    "Paris": {"temperature": 68, "condition": "Cloudy", "humidity": 75}
}
class SessionContext:
    """Represents a user session with context data"""
    def __init__(self, session_id):
        self.session_id = session_id
        self.created_at = time.time()
        self.last_accessed = time.time()
        self.data = {
            "recent_searches": [],
            "preferred_unit": "celsius",
            "visits": 0
        }
    
    def update_access(self):
        self.last_accessed = time.time()
        self.data["visits"] += 1
    
    def add_search(self, city):
        # Keep only the 5 most recent searches
        if city not in self.data["recent_searches"]:
            self.data["recent_searches"].insert(0, city)
            self.data["recent_searches"] = self.data["recent_searches"][:5]
    
    def set_preference(self, key, value):
        self.data[key] = value
    
    def to_dict(self):
        return {
            "session_id": self.session_id,
            "created_at": self.created_at,
            "last_accessed": self.last_accessed,
            "data": self.data
        }
class MCPRequestHandler(BaseHTTPRequestHandler):
    def _set_headers(self, content_type='application/json'):
        self.send_response(200)
        self.send_header('Content-type', content_type)
        # Now set the cookie if needed
        if hasattr(self, 'should_set_cookie') and self.should_set_cookie:
            self.send_header('Set-Cookie', f'session_id={self.new_session_id}; Path=/')
        self.end_headers()
    
    def _get_or_create_session(self):
        # Check if client sent a session cookie
        session_id = None
        if 'Cookie' in self.headers:
            cookies = self.headers['Cookie'].split('; ')
            for cookie in cookies:
                if cookie.startswith('session_id='):
                    session_id = cookie.split('=')[1]
                    break
        
        # Create new session if none exists or if it's expired
        if not session_id or session_id not in context_store:
            session_id = str(uuid.uuid4())
            context_store[session_id] = SessionContext(session_id)
            # Don't set the cookie header here - it's too early!
            self.should_set_cookie = True  # Flag to set cookie when sending headers
            self.new_session_id = session_id
        else:
            self.should_set_cookie = False
        
        # Update last accessed time
        context_store[session_id].update_access()
        return context_store[session_id]
    
    def _clean_expired_sessions(self, max_age=3600):  # 1 hour
        """Remove sessions that haven't been used for max_age seconds"""
        current_time = time.time()
        expired_keys = []
        for key, session in context_store.items():
            if current_time - session.last_accessed > max_age:
                expired_keys.append(key)
        
        for key in expired_keys:
            del context_store[key]
    
    def do_GET(self):
        self._clean_expired_sessions()
        parsed_url = urlparse(self.path)
        path = parsed_url.path
        query = parse_qs(parsed_url.query)
        
        session = self._get_or_create_session()
        
        # Get user's temperature preference
        unit = query.get('unit', [session.data.get('preferred_unit')])[0]
        
        # Set temperature unit preference if specified
        if 'unit' in query:
            session.set_preference('preferred_unit', unit)
        
        if path == '/api/weather':
            self._set_headers()
            # Get city from query parameters
            if 'city' in query:
                city = query['city'][0]
                session.add_search(city)
                
                if city in WEATHER_DATA:
                    data = WEATHER_DATA[city].copy()
                    
                    # Convert temperature based on user preference
                    if unit == 'fahrenheit' and 'temperature' in data:
                        # Data is stored in Fahrenheit
                        pass
                    elif unit == 'celsius' and 'temperature' in data:
                        # Convert Fahrenheit to Celsius
                        data['temperature'] = round((data['temperature'] - 32) * 5/9, 1)
                    
                    response = {
                        "city": city,
                        "weather": data,
                        "unit": unit
                    }
                else:
                    response = {"error": f"City '{city}' not found"}
            else:
                response = {"cities": list(WEATHER_DATA.keys())}
            
            # Include session context data in response
            response["context"] = session.to_dict()
            self.wfile.write(json.dumps(response).encode())
        
        elif path == '/api/context':
            self._set_headers()
            response = {"context": session.to_dict()}
            self.wfile.write(json.dumps(response).encode())
        
        else:
            # Serve a simple HTML interface for manual testing
            self._set_headers('text/html')
            html = f"""
            <!DOCTYPE html>
            <html>
            <head>
                <title>MCP Weather Service</title>
                <style>
                    body {{ font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }}
                    .card {{ border: 1px solid #ddd; border-radius: 8px; padding: 15px; margin-bottom: 15px; }}
                    .recent {{ color: #666; font-size: 0.9em; }}
                </style>
            </head>
            <body>
                <h1>MCP Weather Service</h1>
                <div class="card">
                    <h2>Your Session</h2>
                    <p>Session ID: {session.session_id}</p>
                    <p>Number of visits: {session.data["visits"]}</p>
                    <p>Temperature unit preference: {session.data["preferred_unit"]}</p>
                    <form>
                        <label>
                            Change temperature unit:
                            <select name="unit" onchange=this.form.submit()">
                                <option value="celsius" {"selected" if session.data["preferred_unit"] == "celsius" else ""}>Celsius</option>
                                <option value="fahrenheit" {"selected" if session.data["preferred_unit"] == "fahrenheit" else ""}>Fahrenheit</option>
                            </select>
                        </label>
                    </form>
                </div>
                
                <div class="card">
                    <h2>Get Weather</h2>
                    <form action="/" method="get">
                        <select name="city">
                            {' '.join([f'<option value="{city}">{city}</option>' for city in WEATHER_DATA.keys()])}
                        </select>
                        <button type="submit">Get Weather</button>
                    </form>
                </div>
                
                {"<div class='card'><h2>Your Recent Searches</h2><ul>" + 
                ''.join([f'<li><a href="/?city={city}">{city}</a></li>' for city in session.data["recent_searches"]]) + 
                "</ul></div>" if session.data["recent_searches"] else ""}
                
                {"<div class='card'><h2>Weather Result</h2>" + 
                f"<h3>Weather in {query['city'][0]}</h3>" +
                f"<p>Temperature: {WEATHER_DATA[query['city'][0]]['temperature']}°F " + 
                f"({round((WEATHER_DATA[query['city'][0]]['temperature'] - 32) * 5/9, 1)}°C)</p>" +
                f"<p>Condition: {WEATHER_DATA[query['city'][0]]['condition']}</p>" +
                f"<p>Humidity: {WEATHER_DATA[query['city'][0]]['humidity']}%</p></div>" 
                if 'city' in query and query['city'][0] in WEATHER_DATA else ""}
            </body>
            </html>
            """
            self.wfile.write(html.encode())
    
    def do_POST(self):
        self._clean_expired_sessions()
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        request_json = json.loads(post_data.decode('utf-8'))
        
        session = self._get_or_create_session()
        
        if self.path == '/api/preferences':
            # Update user preferences
            for key, value in request_json.items():
                session.set_preference(key, value)
            
            self._set_headers()
            response = {
                "status": "success",
                "message": "Preferences updated",
                "context": session.to_dict()
            }
            self.wfile.write(json.dumps(response).encode())
        else:
            self.send_response(404)
            self.end_headers()
            self.wfile.write(json.dumps({"error": "Not found"}).encode())
def run_server(port=8000):
    server_address = ('', port)
    httpd = HTTPServer(server_address, MCPRequestHandler)
    print(f"Starting MCP context-aware server on port {port}...")
    httpd.serve_forever()
if __name__ == "__main__":
    run_server()


然后,在终端窗口中运行以下命令来启动服务器:python3 mcp_context_server.py。


下面是服务器运行时终端窗口的截图……命令提示符没有返回,而是显示如下内容,这表明服务器正在运行。


请记住,这一切都是在你的本地机器上运行的。


3


MCP 客户端

创建一个名为mcp_context_client.py的文件,MCP客户端将使用此代码运行。


将以下代码复制到文件中……


# mcp_context_client.py
import json
import requests
import sys
import os
class WeatherClient:
    def __init__(self, server_url="http://localhost:8000"):
        self.server_url = server_url
        self.session = requests.Session()  # Use a session to maintain cookies
        self.context = None
        
        # Try to load saved session ID from file
        self.session_file = "session.json"
        if os.path.exists(self.session_file):
            try:
                with open(self.session_file, "r") as f:
                    saved_data = json.load(f)
                    if "session_id" in saved_data:
                        self.session.cookies.set("session_id", saved_data["session_id"])
                    if "context" in saved_data:
                        self.context = saved_data["context"]
                print("Restored previous session")
            except Exception as e:
                print(f"Error loading session: {e}")
    
    def save_session(self):
        """Save the session ID and context to a file"""
        if "session_id" in self.session.cookies:
            session_data = {
                "session_id": self.session.cookies.get("session_id"),
                "context": self.context
            }
            with open(self.session_file, "w") as f:
                json.dump(session_data, f)
    
    def get_cities(self):
        """Get list of available cities"""
        response = self.session.get(f"{self.server_url}/api/weather")
        data = response.json()
        
        # Update context if available
        if "context" in data:
            self.context = data["context"]
            self.save_session()
            
        return data["cities"]
    
    def get_weather(self, city, unit=None):
        """Get weather for a specific city"""
        url = f"{self.server_url}/api/weather?city={city}"
        
        # Use stored preference or provided unit
        if unit:
            url += f"&unit={unit}"
        elif self.context and "data" in self.context and "preferred_unit" in self.context["data"]:
            url += f"&unit={self.context['data']['preferred_unit']}"
            
        response = self.session.get(url)
        data = response.json()
        
        # Update context if available
        if "context" in data:
            self.context = data["context"]
            self.save_session()
            
        return data
    
    def update_preferences(self, preferences):
        """Update user preferences"""
        response = self.session.post(
            f"{self.server_url}/api/preferences",
            json=preferences
        )
        data = response.json()
        
        # Update context if available
        if "context" in data:
            self.context = data["context"]
            self.save_session()
            
        return data
    
    def get_context(self):
        """Get current session context"""
        if not self.context:
            response = self.session.get(f"{self.server_url}/api/context")
            data = response.json()
            self.context = data["context"]
            self.save_session()
        
        return self.context
    
    def display_weather(self, weather_data):
        """Display weather information nicely"""
        if "error" in weather_data:
            print(f"\nError: {weather_data['error']}")
            return
            
        city = weather_data["city"]
        weather = weather_data["weather"]
        unit = weather_data.get("unit", "celsius")
        unit_symbol = "°F" if unit == "fahrenheit" else "°C"
        
        print(f"\nWeather for {city}:")
        print(f"Temperature: {weather['temperature']}{unit_symbol}")
        print(f"Condition: {weather['condition']}")
        print(f"Humidity: {weather['humidity']}%")
    
    def display_context(self):
        """Display current context information"""
        context = self.get_context()
        
        if not context:
            print("\nNo context available")
            return
            
        print("\n=== Your Session Context ===")
        print(f"Session ID: {context['session_id']}")
        print(f"Created: {context['created_at']}")
        print(f"Last accessed: {context['last_accessed']}")
        print("\nPreferences:")
        print(f"Temperature unit: {context['data']['preferred_unit']}")
        print(f"Total visits: {context['data']['visits']}")
        
        if context['data']['recent_searches']:
            print("\nRecent searches:")
            for i, city in enumerate(context['data']['recent_searches'], 1):
                print(f"{i}. {city}")
def run_client():
    client = WeatherClient()
    
    while True:
        print("\n--- MCP Weather Client with Context ---")
        print("1. List all cities")
        print("2. Get weather for a city")
        print("3. Change temperature unit preference")
        print("4. View session context")
        print("5. Exit")
        
        choice = input("Enter your choice (1-5): ")
        
        if choice == "1":
            cities = client.get_cities()
            print("\nAvailable cities:")
            for city in cities:
                print(f"- {city}")
        
        elif choice == "2":
            city = input("Enter city name: ")
            try:
                weather_data = client.get_weather(city)
                client.display_weather(weather_data)
            except Exception as e:
                print(f"Error getting weather: {e}")
        
        elif choice == "3":
            unit = input("Choose temperature unit (celsius/fahrenheit): ").lower()
            if unit in ["celsius", "fahrenheit"]:
                try:
                    result = client.update_preferences({"preferred_unit": unit})
                    print(f"Temperature unit updated to {unit}")
                except Exception as e:
                    print(f"Error updating preferences: {e}")
            else:
                print("Invalid unit. Please enter 'celsius' or 'fahrenheit'.")
        
        elif choice == "4":
            client.display_context()
        
        elif choice == "5":
            print("Exiting weather client.")
            sys.exit(0)
        
        else:
            print("Invalid choice. Please try again.")
if __name__ == "__main__":
    run_client()


现在,在终端窗口的命令提示符下运行以下命令来启动客户端:python3 mcp_context_client.py。


下面是用户界面的截图,你可以通过命令行与其交互,该截图还展示了上下文是如何被管理的。


4


关键上下文管理功能:

  • 服务器为每个客户端创建一个唯一的会话ID。
  • 会话使用cookie进行维护。
  • 上下文数据在多个请求之间持久存在。


上下文数据元素:

  • 用户偏好(温度单位:摄氏度/华氏度)
  • 使用历史(最近搜索)
  • 会话统计(访问次数、创建时间)


这展示了在客户端-服务器MCP架构中如何管理上下文,从而在多次交互中保持用户偏好和历史记录。

文章来源:https://medium.com/@cobusgreyling/model-context-protocol-mcp-da3e0f912bbc
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消