0%

告别SSE,拥抱Streamable HTTP —— MCP协议的传输机制革新

随着人工智能(AI)技术的迅猛发展,AI助手与应用程序之间的高效通信变得尤为重要。模型上下文协议(Model Context Protocol,简称MCP)应运而生,旨在为大型语言模型(LLMs)与外部数据源和工具之间提供标准化的接口。在MCP的众多特性中,Streamable HTTP传输机制作为一种现代化的通信方式,正在逐渐取代传统的Server-Sent Events(SSE)传输方式,成为AI通信的新标准。

image-20250513181409075

MCP协议概述

MCP是由Anthropic推动的一项开放标准,旨在解决LLMs在执行任务时对外部信息的依赖问题。通过定义一套通用的通信规则和数据格式,MCP使得LLMs能够动态地获取所需的上下文信息,从而增强其功能和使用范围。MCP的核心架构包括:

  • MCP客户端:在LLM或其服务基础设施端的实现,负责构建请求并发送给MCP服务器。
  • MCP服务器:在外部系统端的实现,接收来自MCP客户端的请求,与实际的数据源或工具进行交互,并将获取的数据按照MCP协议规范格式化后返回给客户端。
  • 上下文信息交换:促进LLMs与外部系统之间上下文信息的双向交换。

通过MCP,开发者可以更轻松地将AI助手与各种应用程序和数据源集成,实现更高效的AI通信。

传统SSE的局限性

Server-Sent Events(SSE)是一种基于HTTP的单向通信协议,允许服务器向客户端推送实时更新。然而,SSE在以下方面存在局限性:

  • 通信方向受限SSE仅支持服务器向客户端的单向通信,无法满足双向交互的需求。
  • 会话管理缺失SSE缺乏内建的会话管理机制,难以实现复杂的状态维护。
  • 连接恢复能力弱:在网络中断后,SSE的连接恢复能力有限,可能导致数据丢失。
  • 数据格式支持有限SSE主要支持UTF-8文本,无法处理多种数据格式。

这些局限性使得SSE在现代AI应用中逐渐显得力不从心。

Streamable HTTP的创新之处

Streamable HTTPMCP框架中推荐的传输机制,旨在通过标准HTTP实现高效、双向的数据流通信。其主要特点包括:

  • 单一端点通信:使用单一HTTP端点处理所有MCP通信,简化了网络架构。
  • 多种响应模式:支持批量(JSON)和流式(SSE)响应,满足不同的通信需求。
  • 内建会话管理:通过Mcp-Session-Id头部进行会话管理,简化了状态维护。
  • 连接可恢复性:支持在网络中断后恢复SSE连接,提高通信的稳定性。
  • 灵活的身份验证:支持多种身份验证方式,增强了安全性。
  • 跨域资源共享(CORS)配置:提供灵活的CORS配置,便于Web应用的集成。

这些特性使得Streamable HTTP在现代AI通信中具有显著优势。

技术对比:SSE vs. Streamable HTTP

特性 传统SSE传输 Streamable HTTP协议(MCP
通信方向 单向(服务器 → 客户端) 双向(客户端 ↔ 服务器)
会话管理 无内建机制 基于Mcp-Session-Id头部
数据格式支持 仅支持UTF-8文本 支持多种数据格式
自动重连 支持 支持
与现有基础设施的兼容性

从上述对比可以看出,Streamable HTTP在通信灵活性、会话管理和数据格式支持等方面均优于传统的SSE传输方式。

实现与效果演示

基于Streamable HTTP的MCP Server

在前面的文章中,我们一起编写过基于sse协议的天气查询MCP Server并成功部署到线上。

项目地址:https://gitee.com/ming_log/mcp-server

将基于sse协议的MCP Server修改为Streamable HTTP协议,方法非常简单,只需要将MCP Server的执行方式修改为streamable-http即可。

image-20250513175712008

注意:需要首先更新一下mcp[cli]的版本为1.8.0

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# 初始化FastMCP服务器
mcp = FastMCP(
name="weather",
host="0.0.0.0",
port=8002,
description="通过城市名称(拼音)或经纬度获取天气信息",
sse_path="/mcp"
)

# 常量
NWS_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"

# 温度单位转换,将开尔文转化为摄氏度
def kelvin_to_celsius(kelvin: float) -> float:
return kelvin - 273.15

async def get_weather_from_cityname(cityname: str) -> dict[str, Any] | None:
"""向openweathermap发送请求并进行适当的错误处理。"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
params = {
"q": cityname,
"appid": "24ecadbe4bb3d55cb1f06ea48a41ac51"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(NWS_API_BASE, headers=headers, params=params)
response.raise_for_status()
return response.json()
except Exception:
return None


async def get_weather_from_latitude_longitude(latitude: float, longitude: float) -> dict[str, Any] | None:
"""向openweathermap发送请求并进行适当的错误处理。"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
params = {
"lat": latitude,
"lon": longitude,
"appid": "24ecadbe4bb3d55cb1f06ea48a41ac51"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(NWS_API_BASE, headers=headers, params=params)
response.raise_for_status()
return response.json()
except Exception:
return None

def format_alert(feature: dict) -> str:
"""将接口返回的天气信息进行格式化文本输出"""
if feature["cod"] == 404:
return "参数异常,请确认城市名称是否正确。"
elif feature["cod"] == 401:
return "API key 异常,请确认API key是否正确。"
elif feature["cod"] == 200:
return f"""
City: {feature.get('name', 'Unknown')}
Weather: {feature.get('weather', [{}])[0].get('description', 'Unknown')}
Temperature: {kelvin_to_celsius(feature.get('main', {}).get('temp', 0)):.2f}°C
Humidity: {feature.get('main', {}).get('humidity', 0)}%
Wind Speed: {feature.get('wind', {}).get('speed', 0):.2f} m/s
"""
else:
return "未知错误,请稍后再试。"

@mcp.tool()
async def get_weather_from_cityname_tool(city: str) -> str:
"""Get weather information for a city.

Args:
city: City name (e.g., "wuhan"). For Chinese cities, please use pinyin
"""
data = await get_weather_from_cityname(city)
return format_alert(data)

@mcp.tool()
async def get_weather_from_latitude_longitude_tool(latitude: float, longitude: float) -> str:
"""Get weather information for a location.

Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
data = await get_weather_from_latitude_longitude(latitude, longitude)
return format_alert(data)

if __name__ == "__main__":
# 初始化并运行服务器
# mcp.run(transport='stdio')
print("Starting server...")
mcp.run(transport='streamable-http')

开启服务

image-20250513175829712

接下来测试该服务是否可以正常使用。

由于Cursor对于Streamable HTTP协议目前还不支持,所以我们使用Cherry Studio来进行测试,大家可以自行下载。官网地址:https://www.cherry-ai.com/

下载登陆后,点击左下角的设置。

image-20250513180108389

然后点击MCP服务器

image-20250513180143361

接下来点击添加服务器,将我们刚才开启的MCP Server配置进去。

image-20250513180223922

按照下图所示的内容填写。

image-20250513180301435

然后点击右上角的保存。

image-20250513180332988

如果没有问题的话,可以看到图示中显示的,服务器更新成功。如果有问题会出现报错。

为了测试MCP Server服务,还需要大家准备一个LLM,大家根据自己的情况在设置中进行配置即可,我这里选择的是Moonshot

image-20250513180546680

接下来回到聊天助手页面,创建一个聊天助手。在聊天输入框下方选择要使用的MCP Server

image-20250513180650887

到此,我们的准备工作和配置工作就做完了,接下来我们就可以向聊天助手询问天气,测试MCP Server了。

例如:我询问“武汉和北京天气怎么样?”

PixPin_2025-05-13_17-21-22

Streamable HTTP协议在访问MCP Server时是并发的,通过以下动图可以看出来,北京的天气是先请求成功的。

o303p-9167f

基于Streamable HTTP的MCP Client

在前面的文章中,我们手动编写了MCP Client代码,并且同样可以根据mcp.json配置文件,加载对应的MCP Server服务。当时只适配了stdiosse协议,现在加上Streamable HTTP协议。

项目地址:https://gitee.com/ming_log/mcp_client

具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv() # load environment variables from .env

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# 需要提前在.env文件中设置相关环境变量
self.API_KEY = os.getenv("API_KEY")
self.BASE_URL = os.getenv("BASE_URL")
self.MODEL = os.getenv("MODEL")
self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
self.sessions = {}
self.messages = []
with open("./MCP_Prompt.txt", "r", encoding="utf-8") as file:
self.system_prompt = file.read()

async def mcp_json_config(self, mcp_json_file):
try:
with open(mcp_json_file, 'r') as f:
mcp_config: dict = json.load(f)
except json.JSONDecodeError:
raise ValueError("Invalid MCP config")
servers_config: dict = mcp_config.get('mcpServers', {})
for k, v in servers_config.items():
try:
if v.get('isActive', False) == False:
continue
print('-'*50)
mcp_name = v.get('name', k)
mcp_type: str = v.get('type', 'stdio')
if mcp_type.lower() == 'stdio':
command = v.get('command', None)
args = v.get('args', [])
env = v.get('env', {})
if command is None:
raise ValueError(f'{mcp_name} command is empty.')
if args == []:
raise ValueError(f'{mcp_name} args is empty.')
await self.connect_to_stdio_server(mcp_name, command, args, env)
elif mcp_type.lower() == 'sse':
server_url = v.get('url', None)
if server_url is None:
raise ValueError(f'{mcp_name} server_url is empty.')
await self.connect_to_sse_server(mcp_name, server_url)
elif mcp_type.lower() == 'streamable_http':
server_url = v.get('url', None)
if server_url is None:
raise ValueError(f'{mcp_name} server_url is empty.')
await self.connect_to_streamable_http_server(mcp_name, server_url)
else:
raise ValueError(f'{mcp_name} mcp type must in [stdio, sse, streamable_http].')
except Exception as e:
print(f"Error connecting to {mcp_name}: {e}")

async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
server_params = StdioServerParameters(
command=command,
args=args,
env=env
)

stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
self.sessions[mcp_name] = self.session

await self.session.initialize()
# 将MCP信息添加到system_prompt
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def connect_to_sse_server(self, mcp_name, server_url: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
self.sse, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))
self.sessions[mcp_name] = self.session

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>\n")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def connect_to_streamable_http_server(self, mcp_name, server_url: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
stdio_transport = await self.exit_stack.enter_async_context(streamablehttp_client(server_url))
self.streamable_http, self.write, _ = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.streamable_http, self.write))
self.sessions[mcp_name] = self.session

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>\n")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
self.messages.append(
{
"role": "system",
"content": self.system_prompt
}
)
self.messages.append(
{
"role": "user",
"content": query
}
)

# Initial Claude API call
response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages
)

# Process response and handle tool calls
final_text = []
content = response.choices[0].message.content
if '<use_mcp_tool>' not in content:
final_text.append(content)
else:
# 解析tool_string
server_name, tool_name, tool_args = self.parse_tool_string(content)

# 执行工具调用
result = await self.sessions[server_name].call_tool(tool_name, tool_args)
print(f"[Calling tool {tool_name} with args {tool_args}]")
print("-"*40)
print("Server:", server_name)
print("Tool:", tool_name)
print("Args:", tool_args)
print("-"*40)
print("Result:", result.content[0].text)
print("-"*40)
self.messages.append({
"role": "assistant",
"content": content
})
self.messages.append({
"role": "user",
"content": f"[Tool {tool_name} \n returned: {result}]"
})

response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages
)
final_text.append(response.choices[0].message.content)
return "\n".join(final_text)

def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
root = etree.fromstring(tool_string)
server_name = root.find('server_name').text
tool_name = root.find('tool_name').text
try:
tool_args = json.loads(root.find('arguments').text)
except json.JSONDecodeError:
raise ValueError("Invalid tool arguments")
return server_name, tool_name, tool_args

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
self.messages = []
while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == 'quit':
break
if query.strip() == '':
print("Please enter a query.")
continue
response = await self.process_query(query)
print(response)

except Exception as e:
print(f"\nError: {str(e)}")

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()

async def main():
client = MCPClient()
try:
# await client.connect_to_sse_server('amap', 'https://mcp.amap.com/sse?key=d769f05385fe314e9b3ae548ba7d86b1')
mcp_config_file = './mcp.json'
await client.mcp_json_config(mcp_config_file)
await client.chat_loop()
finally:
await client.cleanup()

if __name__ == "__main__":
asyncio.run(main())

接下来,我们使用自己编写的MCP Client连接前面的MCP Server

修改mcp.json文件内容如下所示:

1
2
3
4
5
6
7
8
9
10
{
"mcpServers": {
"weather-http": {
"isActive": true,
"type": "streamable_http",
"url": "http://127.0.0.1:8002/mcp",
"name": "weather-http"
}
}
}

同样可以成功调用到MCP Server的服务。

image-20250513182033814

MCP Server测试,修改mcp.json文件内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"mcpServers": {
"time-http": {
"isActive": true,
"type": "streamable_http",
"url": "https://time.mcp.minglog.cn/mcp",
"name": "time-http"
},
"weather-http": {
"isActive": true,
"type": "streamable_http",
"url": "http://127.0.0.1:8002/mcp",
"name": "weather-http"
}
}
}

这里我使用了一个我服务器中的MCP Server,同样是streamable_http协议。

image-20250513182306857

所有工具都可以调用成功。

结语

Streamable HTTP作为MCP协议中的推荐传输机制,结合了HTTP的广泛兼容性和SSE的实时数据推送能力,提供了一种高效、灵活的通信方式。在AI助手与应用程序之间的通信需求日益增长的背景下,Streamable HTTP有望成为AI通信的新标准。

如果您希望深入了解MCP协议和Streamable HTTP的实现细节,可以访问以下资源:

通过这些资源,您可以更深入地了解MCP协议的设计理念和实现方式,探索其在实际应用中的潜力。

-------------本文结束感谢您的阅读-------------