手把手教你开发 MCP Server:Python/Node.js/Go 三语言完整教程
从零构建 MCP Server 的完整指南,覆盖 Python、Node.js、Go 三种主流语言,包含文件系统、数据库、API 网关等实战案例。
MCP(Model Context Protocol)在 2026 年已成为 AI 连接外部工具的事实标准。但很多人只会配置现成的 MCP Server,遇到定制需求就无从下手。本文手把手教你从零构建 MCP Server,覆盖 Python、Node.js、Go 三种主流语言。
准备工作
了解 MCP SDK
Anthropic 官方提供三种语言的 MCP SDK:
| 语言 | 包名 | 安装方式 |
|---|---|---|
| Python | mcp | pip install mcp |
| Node.js | @modelcontextprotocol/sdk | npm install @modelcontextprotocol/sdk |
| Go | github.com/mark3labs/mcp-go | go get github.com/mark3labs/mcp-go |
SDK 封装了 MCP 协议的核心通信逻辑(JSON-RPC over stdio/SSE),开发者只需专注于实现具体的工具功能。
MCP Server 的三种运行模式
- stdio 模式:通过标准输入输出通信,适合本地工具、CLI 集成
- SSE(Server-Sent Events)模式:通过 HTTP 长连接通信,适合远程服务、Web 应用
- Streamable HTTP 模式:2026 年新增,支持流式响应,适合大文件传输
本文主要覆盖 stdio 模式(最常用)和 SSE 模式(企业级部署)。
第一章:Python MCP Server 开发
Python 是开发 MCP Server 最流行的语言,生态最丰富。
1.1 基础模板
# server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("my-tools")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="greet",
description="向指定名字的人打招呼",
input_schema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "你的名字"}
},
"required": ["name"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "greet":
return [TextContent(type="text", text=f"你好,{arguments['name']}!")]
raise ValueError(f"未知工具: {name}")
async def main():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
运行:python server.py
1.2 实战:文件搜索 MCP Server
这个 Server 让 AI 能在指定目录中搜索文件内容:
# file_search_server.py
import os
import re
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("file-search")
SEARCH_DIR = os.environ.get("SEARCH_DIR", ".") # 从环境变量读取搜索目录
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="search_in_files",
description="在文件中搜索指定关键词",
input_schema={
"type": "object",
"properties": {
"keyword": {"type": "string", "description": "搜索关键词"},
"pattern": {"type": "string", "description": "文件匹配模式,如 *.py、*.md"}
},
"required": ["keyword"]
}
),
Tool(
name="count_lines",
description="统计指定模式文件的总行数",
input_schema={
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "文件匹配模式,如 *.py"}
},
"required": ["pattern"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "search_in_files":
keyword = arguments["keyword"]
pattern = arguments.get("pattern", "*")
results = []
for filepath in Path(SEARCH_DIR).rglob(pattern):
if filepath.is_file():
try:
content = filepath.read_text(encoding="utf-8", errors="ignore")
for i, line in enumerate(content.splitlines(), 1):
if keyword in line:
results.append(f"{filepath}:{i}: {line.strip()[:200]}")
except Exception:
continue
return [TextContent(
type="text",
text=f"在 {len(results)} 处找到 '{keyword}':\n\n" + "\n".join(results[:50])
)]
elif name == "count_lines":
pattern = arguments["pattern"]
total = 0
for filepath in Path(SEARCH_DIR).rglob(pattern):
if filepath.is_file():
total += sum(1 for _ in filepath.open("rb"))
return [TextContent(type="text", text=f"匹配 {pattern} 的文件共 {total} 行")]
async def main():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
配置到 Claude Desktop:
{
"mcpServers": {
"file-search": {
"command": "python",
"args": ["/path/to/file_search_server.py"],
"env": {
"SEARCH_DIR": "/home/user/projects"
}
}
}
}
1.3 高级:调用外部 API 的 MCP Server
# api_gateway_server.py
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
app = Server("api-gateway")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_github_stars",
description="查询 GitHub 仓库的 Star 数",
input_schema={
"type": "object",
"properties": {
"owner": {"type": "string", "description": "仓库所有者"},
"repo": {"type": "string", "description": "仓库名"}
},
"required": ["owner", "repo"]
}
),
Tool(
name="fetch_trending_repos",
description="获取 GitHub 热门仓库",
input_schema={
"type": "object",
"properties": {
"language": {"type": "string", "description": "编程语言,如 python"},
"since": {"type": "string", "description": "时间范围: daily/weekly/monthly"}
}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
async with httpx.AsyncClient(timeout=30) as client:
if name == "get_github_stars":
resp = await client.get(
f"https://api.github.com/repos/{arguments['owner']}/{arguments['repo']}"
)
if resp.status_code == 200:
data = resp.json()
return [TextContent(
type="text",
text=f"⭐ {arguments['owner']}/{arguments['repo']}\n"
f"Stars: {data['stargazers_count']:,}\n"
f"Forks: {data['forks_count']:,}\n"
f"语言: {data.get('language', 'N/A')}\n"
f"描述: {data.get('description', '无')}"
)]
return [TextContent(type="text", text=f"查询失败: {resp.status_code}")]
elif name == "fetch_trending_repos":
# 使用 GitHub Trending API
params = {}
if arguments.get("language"):
params["language"] = arguments["language"]
if arguments.get("since"):
params["since"] = arguments["since"]
resp = await client.get("https://api.github.com/search/repositories", params={
"q": "stars:>1000",
"sort": "stars",
"order": "desc",
"per_page": 5
})
if resp.status_code == 200:
repos = resp.json()["items"]
lines = ["🏆 GitHub 热门仓库 Top 5:\n"]
for i, repo in enumerate(repos, 1):
lines.append(
f"{i}. {repo['full_name']} ⭐{repo['stargazers_count']:,}\n"
f" {repo['description'] or '无描述'}\n"
)
return [TextContent(type="text", text="\n".join(lines))]
async def main():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
第二章:Node.js MCP Server 开发
2.1 基础模板
// server.js
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
ListToolsRequestSchema,
CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
const server = new Server(
{ name: 'my-tools', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [{
name: 'get_time',
description: '获取指定时区的当前时间',
inputSchema: {
type: 'object',
properties: {
timezone: {
type: 'string',
description: '时区,如 Asia/Shanghai、America/New_York'
}
},
required: ['timezone']
}
}]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === 'get_time') {
const { timezone } = request.params.arguments;
const time = new Date().toLocaleString('zh-CN', { timeZone: timezone });
return {
content: [{ type: 'text', text: `当前 ${timezone} 时间:${time}` }]
};
}
throw new Error(`未知工具: ${request.params.name}`);
});
const transport = new StdioServerTransport();
await server.connect(transport);
2.2 实战:PDF 阅读 MCP Server
这个 Server 让 AI 直接读取 PDF 文件内容:
// pdf_reader_server.js
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
ListToolsRequestSchema,
CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import fs from 'fs/promises';
import path from 'path';
import pdf from 'pdf-parse'; // npm install pdf-parse
// 简单的 PDF 解析包装
async function extractTextFromPDF(filePath) {
const buffer = await fs.readFile(filePath);
// 实际项目中可以使用 pdf-parse 或 pdfjs-dist
const data = await pdf(buffer);
return data.text;
}
const server = new Server(
{ name: 'pdf-reader', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: 'read_pdf',
description: '读取 PDF 文件内容',
inputSchema: {
type: 'object',
properties: {
file_path: {
type: 'string',
description: 'PDF 文件路径'
},
max_pages: {
type: 'number',
description: '最大读取页数(可选)'
}
},
required: ['file_path']
}
},
{
name: 'list_pdfs',
description: '列出目录下的所有 PDF 文件',
inputSchema: {
type: 'object',
properties: {
directory: {
type: 'string',
description: '目录路径'
}
},
required: ['directory']
}
}
]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === 'read_pdf') {
const text = await extractTextFromPDF(args.file_path);
const pages = text.split('\f');
const maxPages = args.max_pages || pages.length;
return {
content: [{
type: 'text',
text: `📄 ${path.basename(args.file_path)}\n页数: ${pages.length}\n\n${pages.slice(0, maxPages).join('\n---\n')}`
}]
};
}
if (name === 'list_pdfs') {
const files = await fs.readdir(args.directory);
const pdfs = files.filter(f => f.toLowerCase().endsWith('.pdf'));
return {
content: [{
type: 'text',
text: pdfs.length > 0
? '📚 PDF 文件列表:\n' + pdfs.map(f => ` 📄 ${f}`).join('\n')
: '该目录下没有 PDF 文件'
}]
};
}
throw new Error(`未知工具: ${name}`);
});
const transport = new StdioServerTransport();
await server.connect(transport);
运行:node pdf_reader_server.js
2.3 SSE 模式服务端
如果需要在远程服务器上运行 MCP Server,使用 SSE 模式:
// sse_server.js
import express from 'express';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
const app = express();
const transport = new SSEServerTransport('/messages');
const server = new Server(
{ name: 'remote-tools', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [/* 工具定义 */]
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
// 工具调用逻辑
});
app.get('/sse', async (req, res) => {
transport.connect(req, res);
await server.connect(transport);
});
app.post('/messages', (req, res) => {
transport.handlePostMessage(req, res);
});
app.listen(3000, () => {
console.log('MCP Server running on http://localhost:3000/sse');
});
第三章:Go MCP Server 开发
Go 适合构建高性能、生产级 MCP Server。
3.1 基础模板
// main.go
package main
import (
"context"
"fmt"
"os"
"github.com/mark3labs/mcp-go/server"
"github.com/mark3labs/mcp-go/mcp"
)
func main() {
s := server.NewMCPServer(
"my-tools",
"1.0.0",
server.WithResourceCapabilities(true, true),
)
// 添加工具
s.AddTool(mcp.NewTool("calculate",
mcp.WithDescription("执行数学计算"),
mcp.WithString("expression",
mcp.Required(),
mcp.Description("数学表达式,如 2 + 3 * 4"),
),
), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
expr := request.Params.Arguments["expression"].(string)
// 这里可以集成一个表达式计算引擎
return mcp.NewToolResultText(fmt.Sprintf("表达式: %s (结果待计算)", expr)), nil
})
// 启动 stdio server
if err := server.ServeStdio(s); err != nil {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
os.Exit(1)
}
}
3.2 实战:系统监控 MCP Server
// system_monitor.go
package main
import (
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/mark3labs/mcp-go/server"
"github.com/mark3labs/mcp-go/mcp"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/disk"
)
func main() {
s := server.NewMCPServer("system-monitor", "1.0.0")
// CPU 信息
s.AddTool(mcp.NewTool("get_cpu_info",
mcp.WithDescription("获取 CPU 使用率和核心信息"),
), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
percent, _ := cpu.Percent(time.Second, false)
cpuInfo, _ := cpu.Info()
text := fmt.Sprintf("CPU 使用率: %.1f%%\n", percent[0])
if len(cpuInfo) > 0 {
text += fmt.Sprintf("型号: %s\n核心数: %d\n", cpuInfo[0].ModelName, runtime.NumCPU())
}
return mcp.NewToolResultText(text), nil
})
// 内存信息
s.AddTool(mcp.NewTool("get_memory_info",
mcp.WithDescription("获取系统内存使用情况"),
), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
memInfo, _ := mem.VirtualMemory()
text := fmt.Sprintf(
"总内存: %.2f GB\n已用: %.2f GB (%.1f%%)\n可用: %.2f GB",
float64(memInfo.Total)/1024/1024/1024,
float64(memInfo.Used)/1024/1024/1024,
memInfo.UsedPercent,
float64(memInfo.Available)/1024/1024/1024,
)
return mcp.NewToolResultText(text), nil
})
// 磁盘信息
s.AddTool(mcp.NewTool("get_disk_info",
mcp.WithDescription("获取磁盘使用情况"),
mcp.WithString("path",
mcp.Description("磁盘路径,默认 /"),
),
), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
path := "/"
if p, ok := request.Params.Arguments["path"].(string); ok {
path = p
}
diskInfo, _ := disk.Usage(path)
text := fmt.Sprintf(
"路径: %s\n总空间: %.2f GB\n已用: %.2f GB (%.1f%%)\n可用: %.2f GB",
path,
float64(diskInfo.Total)/1024/1024/1024,
float64(diskInfo.Used)/1024/1024/1024,
diskInfo.UsedPercent,
float64(diskInfo.Free)/1024/1024/1024,
)
return mcp.NewToolResultText(text), nil
})
if err := server.ServeStdio(s); err != nil {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
os.Exit(1)
}
}
第四章:进阶技巧与最佳实践
4.1 工具描述的重要性
MCP 的智能程度高度依赖工具描述的质量。AI 模型通过描述来决定何时调用哪个工具。
差的描述:
{
"name": "search",
"description": "搜索功能"
}
好的描述:
{
"name": "search_database",
"description": "在用户数据库中按名称、邮箱或ID搜索用户信息,返回匹配用户的详情(含联系方式、注册时间、会员等级)",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,支持用户名、邮箱前缀或用户ID"
},
"limit": {
"type": "number",
"description": "最多返回结果数量,默认10,最大100"
}
},
"required": ["query"]
}
}
4.2 输入校验
在实际部署中,务必对输入参数做严格校验:
from pydantic import BaseModel, Field, ValidationError
class SearchParams(BaseModel):
keyword: str = Field(..., min_length=1, max_length=200)
limit: int = Field(default=10, ge=1, le=100)
category: str | None = Field(default=None, pattern="^(tech|science|life)$")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
params = SearchParams(**arguments)
except ValidationError as e:
return [TextContent(type="text", text=f"参数错误: {e}")]
# 继续处理...
4.3 错误处理
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
result = await process_request(name, arguments)
return [TextContent(type="text", text=result)]
except PermissionError:
return [TextContent(type="text", text="⚠️ 权限不足,请检查 MCP Server 配置的访问权限")]
except TimeoutError:
return [TextContent(type="text", text="⏰ 请求超时,外部服务可能暂时不可用")]
except Exception as e:
# 记录错误但不暴露敏感信息
logger.error(f"Tool {name} failed: {e}", exc_info=True)
return [TextContent(type="text", text="❌ 工具调用失败,请稍后重试")]
4.4 资源类型的使用
除了 Tools(工具),MCP 还支持 Resources(资源)和 Prompts(提示)。
Resources 用于让 AI 读取数据(而非执行操作):
@app.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="config://app",
name="应用配置",
description="当前应用的配置文件内容",
mime_type="application/json"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
if uri == "config://app":
return json.dumps(load_config(), ensure_ascii=False)
raise ValueError(f"未知资源: {uri}")
4.5 测试你的 MCP Server
推荐的测试工具:
-
MCP Inspector(官方):交互式测试工具
npx @modelcontextprotocol/inspector python my_server.py -
claude-code 本地测试:直接让 Claude 调用你的工具
-
单元测试:直接测试工具逻辑函数
import pytest
# 测试工具逻辑(不依赖 MCP 协议)
async def test_search_in_files():
server = FileSearchServer()
result = await server.search_in_files(keyword="TODO", pattern="*.py")
assert "TODO" in result
assert len(result) > 0
总结
| 语言 | 适合场景 | 生态成熟度 | 学习成本 |
|---|---|---|---|
| Python | 快速原型、数据处理、AI 集成 | ⭐⭐⭐⭐⭐ | 低 |
| Node.js | Web 服务、SSE 模式、前端工具链 | ⭐⭐⭐⭐ | 低 |
| Go | 高性能服务、系统工具、生产部署 | ⭐⭐⭐ | 中 |
最佳实践总结:
- 工具名字要语义化(
search_users而非func1) - 描述要详细,告诉 AI “何时用、怎么用、返回什么”
- 参数定义要精确,类型、长度、范围都要限制
- 错误信息要友好,让 AI 能理解并重试
- 每个 MCP Server 只做一件事,做精
从今天开始,把常用的 API 和数据源包装成 MCP Server,你的 AI 工具链会顺手十倍。