0%

从0开始实现MCP-Client

什么是MCP-Client?

MCP-ClientModel Context Protocol(模型上下文协议)架构中的一个重要组件,用于连接AI模型(如ClaudeGPT等大型语言模型)与外部数据源、工具和服务的桥梁。

MCP(Model Context Protocol)是由Anthropic公司在2024年底首次提出并开源的一种开放标准协议,旨在解决大语言模型(LLM)与外部世界的连接问题。这一协议的核心价值在于打破了AI模型的”信息孤岛”限制,使模型能够以标准化的方式访问和处理实时数据,显著扩展了大模型的应用场景。

MCP架构中,有三个关键组件:

  1. MCP服务器(Server):轻量级服务程序,负责对接具体数据源或工具(如数据库、API等),并按照MCP规范提供标准化的功能接口。每个MCP服务器封装了特定的能力,如文件检索、数据库查询等。
  2. MCP客户端(Client):嵌入在AI应用中的连接器,与MCP服务器建立一对一连接,充当模型与服务器之间的桥梁。它负责发现可用服务、发送调用请求、获取结果,并将这些信息传递给AI模型。
  3. 宿主应用(Host):运行LLM的应用程序或环境,如Claude桌面应用、Cursor IDE等。宿主通过集成MCP客户端,使其内部的模型能够调用外部MCP服务器的能力。

MCP-Client的工作原理是基于JSON-RPC 2.0协议,通过标准化的接口与MCP服务器进行通信。它能够自动发现可用的MCP服务器及其提供的工具,并将这些信息以结构化的方式提供给大语言模型,使模型能够理解可用的工具及其功能,从而根据用户需求决定何时何地调用这些工具。

为什么要自写MCP-Client?

自主开发MCP-Client有几个重要的原因和优势:

  1. 定制化需求:市场上的通用MCP客户端(例如:ClaudeDesktopCursorCline等等)可能无法满足特定业务场景的需求。通过自主开发,可以根据企业或个人的具体需求进行定制,比如添加特定的安全验证、数据过滤、或针对特定领域的优化。
  2. 系统集成:将MCP-Client与现有系统无缝集成。自主开发的MCP-Client可以更好地适配已有的技术栈和架构,减少兼容性问题,提高开发效率。
  3. 数据隐私与安全:对于敏感数据或内部系统,自主开发的MCP-Client可以实现更严格的权限控制和数据保护措施,确保敏感信息不会被未授权访问或泄露。
  4. 性能优化:针对特定用例优化性能。例如,对于需要高频率、低延迟访问的场景,可以通过定制MCP-Client来减少通信开销,提高响应速度。
  5. 扩展功能:实现标准MCP协议之外的增强功能。比如添加高级缓存机制、请求队列管理、负载均衡,或针对特定AI模型优化的上下文处理逻辑。
  6. 控制和可维护性:对于依赖AI能力的核心业务,自主开发的MCP-Client意味着更好的控制能力和可维护性。当需求变化或出现问题时,可以快速进行调整和修复,而不必依赖第三方供应商。
  7. 适配多种AI模型:自主开发的MCP-Client可以设计为同时支持多种不同的大语言模型(如ClaudeGPT等),根据任务需求动态选择最适合的模型,提高系统灵活性。
  8. 特殊协议支持:对于需要使用特殊通信协议或数据格式的场景,自主开发可以实现这些非标准需求。
  9. 降低依赖风险:减少对第三方服务的依赖,增强系统的独立性和韧性。如果第三方服务发生变更或中断,自主开发的系统可以更快地适应和调整。
  10. 专业知识沉淀:通过自主开发MCP-Client,团队可以积累AI与外部系统集成的专业知识和经验,这对于长期的AI战略和能力建设非常有价值。

实际上,随着AI应用的深入和普及,越来越多的组织开始认识到,AI基础设施(包括MCP-Client)是一种战略性资产。自主开发这些组件不仅可以获得更好的技术匹配度,还能在竞争中获得差异化优势,尤其是在AI技术对业务至关重要的领域。

MCP协议的开放性恰恰为自主开发MCP-Client提供了可能,使得组织和开发者能够在标准化框架下创建适合自己需求的定制解决方案,同时仍然保持与整个生态系统的互操作性。通过自写MCP-Client,开发者可以充分利用AI大模型的能力,同时保持对系统架构和数据流的完全控制。

MCP-Client编写

工程搭建

在本节实验中,需要大家自己准备一个适配openai协议的大模型API,例如:deepseek V3Qwen系列,Moonshot月之暗面等等。

为了编写MCP Client,在这里我们直接使用上一节(从0开始实现MCP-Server)中,创建好的工程。

首先创建环境变量.env文件,在该文件中我们放入自己的大模型相关信息:

主要包含3个字段:

  1. OPENAI_API_KEY:大模型的API KEY
  2. BASE_URL:大模型请求地址
  3. MODEL:模型名称

image-20250421221754858

这里对模型的种类不限,我使用的是moonshot,大家使用其他大模型均可。

MCP-Prompt

目前支持或深度集成MCP协议的大模型,主要包括:Claude系列GPT系列等。

国内的大模型供应商对于MCP协议基本上没有做针对性的集成训练,所以在国内使用MCP协议,必须编写结构化的MCP-Prompt,通过system prompt的方式让国内的大模型具备适配MCP协议。

为了编写这个提示词,我使用cloudflare对大模型进行代理,然后对CursorMCP请求进行截获并将MCP提示词相关内容保留,其他无关内容删除,得到以下提示词,当然大家也可以自行对这个提示词进行修改,实现自己的定制化。

在工程中创建文件MCP_Prompt.txt,将以下内容放入文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
You are an AI assistant, you can help users solve problems, including but not limited to programming, editing files, browsing websites, etc.

====

TOOL USE

You have access to a set of tools that are executed upon the user's approval. You can use one tool per message, and will receive the result of that tool use in the user's response. You use tools step-by-step to accomplish a given task, with each tool use informed by the result of the previous tool use.

# Tool Use Formatting

Tool use is formatted using XML-style tags. The tool name is enclosed in opening and closing tags, and each parameter is similarly enclosed within its own set of tags. Here's the structure:

<tool_name>
<parameter1_name>value1</parameter1_name>
<parameter2_name>value2</parameter2_name>
...
</tool_name>

For example:

<read_file>
<path>src/main.js</path>
</read_file>

Always adhere to this format for the tool use to ensure proper parsing and execution.

# Tools
## use_mcp_tool
Description: Request to use a tool provided by a connected MCP server. Each MCP server can provide multiple tools with different capabilities. Tools have defined input schemas that specify required and optional parameters.
Parameters:
- server_name: (required) The name of the MCP server providing the tool
- tool_name: (required) The name of the tool to execute
- arguments: (required) A JSON object containing the tool's input parameters, following the tool's input schema
Usage:
<use_mcp_tool>
<server_name>server name here</server_name>
<tool_name>tool name here</tool_name>
<arguments>
{
"param1": "value1",
"param2": "value2"
}
</arguments>
</use_mcp_tool>

# Tool Use Examples
## Example 1: Requesting to use an MCP tool

<use_mcp_tool>
<server_name>weather-server</server_name>
<tool_name>get_forecast</tool_name>
<arguments>
{
"city": "San Francisco",
"days": 5
}
</arguments>
</use_mcp_tool>

## Example 2: Another example of using an MCP tool (where the server name is a unique identifier such as a URL)

<use_mcp_tool>
<server_name>github.com/modelcontextprotocol/servers/tree/main/src/github</server_name>
<tool_name>create_issue</tool_name>
<arguments>
{
"owner": "octocat",
"repo": "hello-world",
"title": "Found a bug",
"body": "I'm having a problem with this.",
"labels": ["bug", "help wanted"],
"assignees": ["octocat"]
}
</arguments>
</use_mcp_tool>

===

MCP SERVERS

The Model Context Protocol (MCP) enables communication between the system and locally running MCP servers that provide additional tools and resources to extend your capabilities.

# Connected MCP Servers

When a server is connected, you can use the server's tools via the `use_mcp_tool` tool, and access the server's resources via the `access_mcp_resource` tool.
<$MCP_INFO$>

===

CAPABILITIES
- You have access to MCP servers that may provide additional tools and resources. Each server may provide different capabilities that you can use to accomplish tasks more effectively.

====

RULES
- MCP operations should be used one at a time, similar to other tool usage. Wait for confirmation of success before proceeding with additional operations.

====

OBJECTIVE

You accomplish a given task iteratively, breaking it down into clear steps and working through them methodically.

1. Analyze the user's task and set clear, achievable goals to accomplish it. Prioritize these goals in a logical order.
2. Work through these goals sequentially, utilizing available tools one at a time as necessary. Each goal should correspond to a distinct step in your problem-solving process. You will be informed on the work completed and what's remaining as you go.
3. Once you've completed the user's task, you must use the attempt_completion tool to present the result of the task to the user.
4. The user may provide feedback, which you can use to make improvements and try again. But DO NOT continue in pointless back and forth conversations, i.e. don't end your responses with questions or offers for further assistance."

在这个提示词中,我设置了一个特殊的标记符<$MCP_INFO$>,该标记符用于后期载入MCP ServerMCP Tool相关工具的描述信息。

Stdio通信协议

stdio 传输方式是最简单的通信方式,通常在本地工具之间进行消息传递时使用。它利用标准输入输出(stdin/stdout)作为数据传输通道,适用于本地进程间的交互。

在这里我们首先以上一节(从0开始实现MCP-Server)中构建的weather MCP Server为例,编写MCP ClientMCP Server进行请求。

编写mcp_client_stdio.py代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv() # 加载.env文件内容到环境变量中

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# 需要提前在.env文件中设置相关环境变量
self.API_KEY = os.getenv("API_KEY")
self.BASE_URL = os.getenv("BASE_URL")
self.MODEL = os.getenv("MODEL")
# 创建LLM client
self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
# 存储历史消息
self.messages = []
# 读取提示词模板
with open("./MCP_Prompt.txt", "r", encoding="utf-8") as file:
self.system_prompt = file.read()

async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
server_params = StdioServerParameters(
command=command,
args=args,
env=env
)

stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

await self.session.initialize()
# 将MCP信息添加到system_prompt
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
self.messages.append(
{
"role": "system",
"content": self.system_prompt
}
)
self.messages.append(
{
"role": "user",
"content": query
}
)

# Initial Claude API call
response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages,
)

# Process response and handle tool calls
final_text = []
content = response.choices[0].message.content
if '<use_mcp_tool>' not in content:
final_text.append(content)
else:
# 解析tool_string
server_name, tool_name, tool_args = self.parse_tool_string(content)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
print(f"[Calling tool {tool_name} with args {tool_args}]")
print("-"*40)
print("Server:", server_name)
print("Tool:", tool_name)
print("Args:", tool_args)
print("-"*40)
print("Result:", result.content[0].text)
print("-"*40)
self.messages.append({
"role": "assistant",
"content": content
})
self.messages.append({
"role": "user",
"content": f"[Tool {tool_name} \n returned: {result}]"
})

response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages
)
final_text.append(response.choices[0].message.content)
return "\n".join(final_text)

def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
"""
解析大模型工具调用返回的字符串
"""
tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
root = etree.fromstring(tool_string)
server_name = root.find('server_name').text
tool_name = root.find('tool_name').text
try:
tool_args = json.loads(root.find('arguments').text)
except json.JSONDecodeError:
raise ValueError("Invalid tool arguments")
return server_name, tool_name, tool_args

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
self.messages = []
while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == 'quit':
break
if query.strip() == '':
print("Please enter a query.")
continue
response = await self.process_query(query)
print(response)

except Exception as e:
print(f"\nError: {str(e)}")

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()

async def main():
client = MCPClient()
try:
await client.connect_to_stdio_server('weather', 'python', ['weather.py'])
await client.chat_loop()
finally:
await client.cleanup()

if __name__ == "__main__":
asyncio.run(main())

注意:这里调用的是上一节中编写的weather MCP Server,所以weather.py文件应该和mcp_client_stdio.py文件在同一目录下。

代码执行效果演示:

image-20250421223247549

MCP Server调用成功!

SSE通信协议

SSE 是基于 HTTP 协议的流式传输机制,它允许服务器通过 HTTP 单向推送事件到客户端。SSE 适用于客户端需要接收服务器推送的场景,通常用于实时数据更新。

在这里同样可以以上一节中,我们在公网中搭建的weather MCP SSE Server

编写代码mcp_client_sse.py如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv() # 加载.env文件内容到环境变量中

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# 需要提前在.env文件中设置相关环境变量
self.API_KEY = os.getenv("API_KEY")
self.BASE_URL = os.getenv("BASE_URL")
self.MODEL = os.getenv("MODEL")
# 创建LLM client
self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
# 存储历史消息
self.messages = []
# 读取提示词模板
with open("./MCP_Prompt.txt", "r", encoding="utf-8") as file:
self.system_prompt = file.read()

async def connect_to_sse_server(self, mcp_name, server_url: str):
stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
self.sse, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>\n")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
self.messages.append(
{
"role": "system",
"content": self.system_prompt
}
)
self.messages.append(
{
"role": "user",
"content": query
}
)

# Initial Claude API call
response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages,
)

# Process response and handle tool calls
final_text = []
content = response.choices[0].message.content
if '<use_mcp_tool>' not in content:
final_text.append(content)
else:
# 解析tool_string
server_name, tool_name, tool_args = self.parse_tool_string(content)

# 执行工具调用
result = await self.session.call_tool(tool_name, tool_args)
print(f"[Calling tool {tool_name} with args {tool_args}]")
print("-"*40)
print("Server:", server_name)
print("Tool:", tool_name)
print("Args:", tool_args)
print("-"*40)
print("Result:", result.content[0].text)
print("-"*40)
self.messages.append({
"role": "assistant",
"content": content
})
self.messages.append({
"role": "user",
"content": f"[Tool {tool_name} \n returned: {result}]"
})

response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages
)
final_text.append(response.choices[0].message.content)
return "\n".join(final_text)

def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
"""
解析大模型工具调用返回的字符串
"""
tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
root = etree.fromstring(tool_string)
server_name = root.find('server_name').text
tool_name = root.find('tool_name').text
try:
tool_args = json.loads(root.find('arguments').text)
except json.JSONDecodeError:
raise ValueError("Invalid tool arguments")
return server_name, tool_name, tool_args

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
self.messages = []
while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == 'quit':
break
if query.strip() == '':
print("Please enter a query.")
continue
response = await self.process_query(query)
print(response)

except Exception as e:
print(f"\nError: {str(e)}")

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()

async def main():
client = MCPClient()
try:
await client.connect_to_sse_server('weather_sse', 'http://47.113.225.16:8000/sse')
await client.chat_loop()
finally:
await client.cleanup()

if __name__ == "__main__":
asyncio.run(main())

执行效果演示:

image-20250421223848425

同样可以成功调用MCP SSE Server

大家可以直接调用我部署好的MCP SSE Server

为了方便大家学习,这里可以直接使用的部署好的服务http://47.113.225.16:8000/sse来进行测试。当然也鼓励大家换成其他的服务尝试。

采用配置文件进行加载

经过前面的实验,我们现在已经可以通过自己编写的MCP Client连接任意MCP Server,包括stdiosse通信协议,但是都只连接了一个MCP Server。用过Cursor或其他MCP Client应用的同学应该很清楚,他们是通过一个JSON配置文件去加载多个MCP Server,那么我们自己编写的MCP Client能否达到这个效果呢?

当然可以,接下来的实验我们就一起来编写相关代码,实现这个需求。

首先,我们定义自己的JSON文件协议,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"mcpServers": {
"weather-sse": {
"isActive": true,
"type": "stdio",
"command": "python",
"args": [
"weather.py"
],
"name": "weather-sse",
"env": {}
},
"amap-amap-sse": {
"isActive": true,
"type": "sse",
"url": "https://mcp.amap.com/sse?key={高德API KEY}",
"name": "amap-amap-sse"
}
}
}

注意:{高德API KEY}大家可以去高德官网注册账号,可以免费获取。

字段意义如下所示:

  1. 公共字段:
    • isActive:用于控制该MCP Server是否被激活。
    • typeMCP Server的类型,取值为stdiosse
    • nameMCP Server别名。
  2. stdio相关字段:
    • command:命令名称。
    • args:参数列表
    • env:环境变量字典
  3. sse相关参数:
    • urlSSE MCP Server服务地址。

编写mcp_client_mix.py文件,内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from dotenv import load_dotenv
import os, re
from openai import OpenAI
from lxml import etree

load_dotenv() # load environment variables from .env

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# 需要提前在.env文件中设置相关环境变量
self.API_KEY = os.getenv("API_KEY")
self.BASE_URL = os.getenv("BASE_URL")
self.MODEL = os.getenv("MODEL")
self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
self.sessions = {}
self.messages = []
with open("./MCP_Prompt.txt", "r", encoding="utf-8") as file:
self.system_prompt = file.read()

async def mcp_json_config(self, mcp_json_file):
try:
with open(mcp_json_file, 'r') as f:
mcp_config: dict = json.load(f)
except json.JSONDecodeError:
raise ValueError("Invalid MCP config")
servers_config: dict = mcp_config.get('mcpServers', {})
for k, v in servers_config.items():
try:
print('-'*50)
if v.get('isActive', False) == False:
continue
mcp_name = v.get('name', k)
mcp_type: str = v.get('type', 'stdio')
if mcp_type.lower() == 'stdio':
command = v.get('command', None)
args = v.get('args', [])
env = v.get('env', {})
if command is None:
raise ValueError(f'{mcp_name} command is empty.')
if args == []:
raise ValueError(f'{mcp_name} args is empty.')
await self.connect_to_stdio_server(mcp_name, command, args, env)
elif mcp_type.lower() == 'sse':
server_url = v.get('url', None)
if server_url is None:
raise ValueError(f'{mcp_name} server_url is empty.')
await self.connect_to_sse_server(mcp_name, server_url)
else:
raise ValueError(f'{mcp_name} mcp type must in [stdio, sse].')
except Exception as e:
print(f"Error connecting to {mcp_name}: {e}")

async def connect_to_stdio_server(self, mcp_name, command: str, args: list[str], env: dict[str, str]={}):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
server_params = StdioServerParameters(
command=command,
args=args,
env=env
)

stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
self.sessions[mcp_name] = self.session

await self.session.initialize()
# 将MCP信息添加到system_prompt
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def connect_to_sse_server(self, mcp_name, server_url: str):
"""Connect to an MCP server

Args:
server_script_path: Path to the server script (.py or .js)
"""
stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_url))
self.sse, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write))
self.sessions[mcp_name] = self.session

await self.session.initialize()
# List available tools
response = await self.session.list_tools()
available_tools = ['##' + mcp_name + '\n### Available Tools\n- ' + tool.name + "\n" + tool.description + "\n" + json.dumps(tool.inputSchema) for tool in response.tools]
self.system_prompt = self.system_prompt.replace("<$MCP_INFO$>", "\n".join(available_tools)+"\n<$MCP_INFO$>\n")
tools = response.tools
print(f"Successfully connected to {mcp_name} server with tools:", [tool.name for tool in tools])

async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
self.messages.append(
{
"role": "system",
"content": self.system_prompt
}
)
self.messages.append(
{
"role": "user",
"content": query
}
)

# Initial Claude API call
response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages,
)

# Process response and handle tool calls
final_text = []
content = response.choices[0].message.content
if '<use_mcp_tool>' not in content:
final_text.append(content)
else:
# 解析tool_string
server_name, tool_name, tool_args = self.parse_tool_string(content)

# 执行工具调用
result = await self.sessions[server_name].call_tool(tool_name, tool_args)
print(f"[Calling tool {tool_name} with args {tool_args}]")
print("-"*40)
print("Server:", server_name)
print("Tool:", tool_name)
print("Args:", tool_args)
print("-"*40)
print("Result:", result.content[0].text)
print("-"*40)
self.messages.append({
"role": "assistant",
"content": content
})
self.messages.append({
"role": "user",
"content": f"[Tool {tool_name} \n returned: {result}]"
})

response = self.client.chat.completions.create(
model=self.MODEL,
max_tokens=1024,
messages=self.messages
)
final_text.append(response.choices[0].message.content)
return "\n".join(final_text)

def parse_tool_string(self, tool_string: str) -> tuple[str, str, dict]:
tool_string = re.findall("(<use_mcp_tool>.*?</use_mcp_tool>)", tool_string, re.S)[0]
root = etree.fromstring(tool_string)
server_name = root.find('server_name').text
tool_name = root.find('tool_name').text
try:
tool_args = json.loads(root.find('arguments').text)
except json.JSONDecodeError:
raise ValueError("Invalid tool arguments")
return server_name, tool_name, tool_args

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
self.messages = []
while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == 'quit':
break
if query.strip() == '':
print("Please enter a query.")
continue
response = await self.process_query(query)
print(response)

except Exception as e:
print(f"\nError: {str(e)}")

async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()

async def main():
client = MCPClient()
try:
mcp_config_file = './mcp.json'
await client.mcp_json_config(mcp_config_file)
await client.chat_loop()
finally:
await client.cleanup()

if __name__ == "__main__":
asyncio.run(main())

演示效果如下所示:

image-20250421230528033

image-20250421230615331

可以看到,两个工具都可以成功调用。

值得进一步优化的小建议

在本节中,我们已经实现了通过MCP配置文件,加载所有的MCP Server,并且经过验证所有的MCP Server工具都可以成功调用。

但是,现在的版本无法完成工具之前进行相互调用,无法通过用户的需求调用多个工具配置完成用户问题的解答,基于此大家可以自行修改现有代码,实现的方式不难,大家可以自己动手实操一下。

-------------本文结束感谢您的阅读-------------