手把手教你开发 MCP Server:Python/Node.js/Go 三语言完整教程

📅 2026/5/9 ✍️ 小文 📖 约 1 分钟

从零构建 MCP Server 的完整指南,覆盖 Python、Node.js、Go 三种主流语言,包含文件系统、数据库、API 网关等实战案例。

MCP(Model Context Protocol)在 2026 年已成为 AI 连接外部工具的事实标准。但很多人只会配置现成的 MCP Server,遇到定制需求就无从下手。本文手把手教你从零构建 MCP Server,覆盖 Python、Node.js、Go 三种主流语言。

准备工作

了解 MCP SDK

Anthropic 官方提供三种语言的 MCP SDK:

语言包名安装方式
Pythonmcppip install mcp
Node.js@modelcontextprotocol/sdknpm install @modelcontextprotocol/sdk
Gogithub.com/mark3labs/mcp-gogo get github.com/mark3labs/mcp-go

SDK 封装了 MCP 协议的核心通信逻辑(JSON-RPC over stdio/SSE),开发者只需专注于实现具体的工具功能。

MCP Server 的三种运行模式

  1. stdio 模式:通过标准输入输出通信,适合本地工具、CLI 集成
  2. SSE(Server-Sent Events)模式:通过 HTTP 长连接通信,适合远程服务、Web 应用
  3. Streamable HTTP 模式:2026 年新增,支持流式响应,适合大文件传输

本文主要覆盖 stdio 模式(最常用)和 SSE 模式(企业级部署)。


第一章:Python MCP Server 开发

Python 是开发 MCP Server 最流行的语言,生态最丰富。

1.1 基础模板

# server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("my-tools")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="greet",
            description="向指定名字的人打招呼",
            input_schema={
                "type": "object",
                "properties": {
                    "name": {"type": "string", "description": "你的名字"}
                },
                "required": ["name"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "greet":
        return [TextContent(type="text", text=f"你好,{arguments['name']}!")]
    raise ValueError(f"未知工具: {name}")

async def main():
    async with stdio_server() as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

运行:python server.py

1.2 实战:文件搜索 MCP Server

这个 Server 让 AI 能在指定目录中搜索文件内容:

# file_search_server.py
import os
import re
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("file-search")
SEARCH_DIR = os.environ.get("SEARCH_DIR", ".")  # 从环境变量读取搜索目录

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="search_in_files",
            description="在文件中搜索指定关键词",
            input_schema={
                "type": "object",
                "properties": {
                    "keyword": {"type": "string", "description": "搜索关键词"},
                    "pattern": {"type": "string", "description": "文件匹配模式,如 *.py、*.md"}
                },
                "required": ["keyword"]
            }
        ),
        Tool(
            name="count_lines",
            description="统计指定模式文件的总行数",
            input_schema={
                "type": "object",
                "properties": {
                    "pattern": {"type": "string", "description": "文件匹配模式,如 *.py"}
                },
                "required": ["pattern"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "search_in_files":
        keyword = arguments["keyword"]
        pattern = arguments.get("pattern", "*")
        results = []
        
        for filepath in Path(SEARCH_DIR).rglob(pattern):
            if filepath.is_file():
                try:
                    content = filepath.read_text(encoding="utf-8", errors="ignore")
                    for i, line in enumerate(content.splitlines(), 1):
                        if keyword in line:
                            results.append(f"{filepath}:{i}: {line.strip()[:200]}")
                except Exception:
                    continue
        
        return [TextContent(
            type="text",
            text=f"在 {len(results)} 处找到 '{keyword}':\n\n" + "\n".join(results[:50])
        )]
    
    elif name == "count_lines":
        pattern = arguments["pattern"]
        total = 0
        for filepath in Path(SEARCH_DIR).rglob(pattern):
            if filepath.is_file():
                total += sum(1 for _ in filepath.open("rb"))
        return [TextContent(type="text", text=f"匹配 {pattern} 的文件共 {total} 行")]

async def main():
    async with stdio_server() as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

配置到 Claude Desktop:

{
  "mcpServers": {
    "file-search": {
      "command": "python",
      "args": ["/path/to/file_search_server.py"],
      "env": {
        "SEARCH_DIR": "/home/user/projects"
      }
    }
  }
}

1.3 高级:调用外部 API 的 MCP Server

# api_gateway_server.py
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

app = Server("api-gateway")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="get_github_stars",
            description="查询 GitHub 仓库的 Star 数",
            input_schema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string", "description": "仓库所有者"},
                    "repo": {"type": "string", "description": "仓库名"}
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="fetch_trending_repos",
            description="获取 GitHub 热门仓库",
            input_schema={
                "type": "object",
                "properties": {
                    "language": {"type": "string", "description": "编程语言,如 python"},
                    "since": {"type": "string", "description": "时间范围: daily/weekly/monthly"}
                }
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    async with httpx.AsyncClient(timeout=30) as client:
        if name == "get_github_stars":
            resp = await client.get(
                f"https://api.github.com/repos/{arguments['owner']}/{arguments['repo']}"
            )
            if resp.status_code == 200:
                data = resp.json()
                return [TextContent(
                    type="text",
                    text=f"⭐ {arguments['owner']}/{arguments['repo']}\n"
                         f"Stars: {data['stargazers_count']:,}\n"
                         f"Forks: {data['forks_count']:,}\n"
                         f"语言: {data.get('language', 'N/A')}\n"
                         f"描述: {data.get('description', '无')}"
                )]
            return [TextContent(type="text", text=f"查询失败: {resp.status_code}")]
        
        elif name == "fetch_trending_repos":
            # 使用 GitHub Trending API
            params = {}
            if arguments.get("language"):
                params["language"] = arguments["language"]
            if arguments.get("since"):
                params["since"] = arguments["since"]
            resp = await client.get("https://api.github.com/search/repositories", params={
                "q": "stars:>1000",
                "sort": "stars",
                "order": "desc",
                "per_page": 5
            })
            if resp.status_code == 200:
                repos = resp.json()["items"]
                lines = ["🏆 GitHub 热门仓库 Top 5:\n"]
                for i, repo in enumerate(repos, 1):
                    lines.append(
                        f"{i}. {repo['full_name']}{repo['stargazers_count']:,}\n"
                        f"   {repo['description'] or '无描述'}\n"
                    )
                return [TextContent(type="text", text="\n".join(lines))]

async def main():
    async with stdio_server() as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

第二章:Node.js MCP Server 开发

2.1 基础模板

// server.js
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  ListToolsRequestSchema,
  CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';

const server = new Server(
  { name: 'my-tools', version: '1.0.0' },
  { capabilities: { tools: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [{
    name: 'get_time',
    description: '获取指定时区的当前时间',
    inputSchema: {
      type: 'object',
      properties: {
        timezone: {
          type: 'string',
          description: '时区,如 Asia/Shanghai、America/New_York'
        }
      },
      required: ['timezone']
    }
  }]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === 'get_time') {
    const { timezone } = request.params.arguments;
    const time = new Date().toLocaleString('zh-CN', { timeZone: timezone });
    return {
      content: [{ type: 'text', text: `当前 ${timezone} 时间:${time}` }]
    };
  }
  throw new Error(`未知工具: ${request.params.name}`);
});

const transport = new StdioServerTransport();
await server.connect(transport);

2.2 实战:PDF 阅读 MCP Server

这个 Server 让 AI 直接读取 PDF 文件内容:

// pdf_reader_server.js
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  ListToolsRequestSchema,
  CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import fs from 'fs/promises';
import path from 'path';
import pdf from 'pdf-parse';  // npm install pdf-parse

// 简单的 PDF 解析包装
async function extractTextFromPDF(filePath) {
  const buffer = await fs.readFile(filePath);
  // 实际项目中可以使用 pdf-parse 或 pdfjs-dist
  const data = await pdf(buffer);
  return data.text;
}

const server = new Server(
  { name: 'pdf-reader', version: '1.0.0' },
  { capabilities: { tools: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: 'read_pdf',
      description: '读取 PDF 文件内容',
      inputSchema: {
        type: 'object',
        properties: {
          file_path: {
            type: 'string',
            description: 'PDF 文件路径'
          },
          max_pages: {
            type: 'number',
            description: '最大读取页数(可选)'
          }
        },
        required: ['file_path']
      }
    },
    {
      name: 'list_pdfs',
      description: '列出目录下的所有 PDF 文件',
      inputSchema: {
        type: 'object',
        properties: {
          directory: {
            type: 'string',
            description: '目录路径'
          }
        },
        required: ['directory']
      }
    }
  ]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  if (name === 'read_pdf') {
    const text = await extractTextFromPDF(args.file_path);
    const pages = text.split('\f');
    const maxPages = args.max_pages || pages.length;
    return {
      content: [{
        type: 'text',
        text: `📄 ${path.basename(args.file_path)}\n页数: ${pages.length}\n\n${pages.slice(0, maxPages).join('\n---\n')}`
      }]
    };
  }
  
  if (name === 'list_pdfs') {
    const files = await fs.readdir(args.directory);
    const pdfs = files.filter(f => f.toLowerCase().endsWith('.pdf'));
    return {
      content: [{
        type: 'text',
        text: pdfs.length > 0
          ? '📚 PDF 文件列表:\n' + pdfs.map(f => `  📄 ${f}`).join('\n')
          : '该目录下没有 PDF 文件'
      }]
    };
  }
  
  throw new Error(`未知工具: ${name}`);
});

const transport = new StdioServerTransport();
await server.connect(transport);

运行:node pdf_reader_server.js

2.3 SSE 模式服务端

如果需要在远程服务器上运行 MCP Server,使用 SSE 模式:

// sse_server.js
import express from 'express';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';

const app = express();
const transport = new SSEServerTransport('/messages');

const server = new Server(
  { name: 'remote-tools', version: '1.0.0' },
  { capabilities: { tools: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [/* 工具定义 */]
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  // 工具调用逻辑
});

app.get('/sse', async (req, res) => {
  transport.connect(req, res);
  await server.connect(transport);
});

app.post('/messages', (req, res) => {
  transport.handlePostMessage(req, res);
});

app.listen(3000, () => {
  console.log('MCP Server running on http://localhost:3000/sse');
});

第三章:Go MCP Server 开发

Go 适合构建高性能、生产级 MCP Server。

3.1 基础模板

// main.go
package main

import (
    "context"
    "fmt"
    "os"
    
    "github.com/mark3labs/mcp-go/server"
    "github.com/mark3labs/mcp-go/mcp"
)

func main() {
    s := server.NewMCPServer(
        "my-tools",
        "1.0.0",
        server.WithResourceCapabilities(true, true),
    )
    
    // 添加工具
    s.AddTool(mcp.NewTool("calculate",
        mcp.WithDescription("执行数学计算"),
        mcp.WithString("expression",
            mcp.Required(),
            mcp.Description("数学表达式,如 2 + 3 * 4"),
        ),
    ), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        expr := request.Params.Arguments["expression"].(string)
        // 这里可以集成一个表达式计算引擎
        return mcp.NewToolResultText(fmt.Sprintf("表达式: %s (结果待计算)", expr)), nil
    })
    
    // 启动 stdio server
    if err := server.ServeStdio(s); err != nil {
        fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
        os.Exit(1)
    }
}

3.2 实战:系统监控 MCP Server

// system_monitor.go
package main

import (
    "context"
    "fmt"
    "os"
    "runtime"
    "time"
    
    "github.com/mark3labs/mcp-go/server"
    "github.com/mark3labs/mcp-go/mcp"
    "github.com/shirou/gopsutil/cpu"
    "github.com/shirou/gopsutil/mem"
    "github.com/shirou/gopsutil/disk"
)

func main() {
    s := server.NewMCPServer("system-monitor", "1.0.0")
    
    // CPU 信息
    s.AddTool(mcp.NewTool("get_cpu_info",
        mcp.WithDescription("获取 CPU 使用率和核心信息"),
    ), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        percent, _ := cpu.Percent(time.Second, false)
        cpuInfo, _ := cpu.Info()
        
        text := fmt.Sprintf("CPU 使用率: %.1f%%\n", percent[0])
        if len(cpuInfo) > 0 {
            text += fmt.Sprintf("型号: %s\n核心数: %d\n", cpuInfo[0].ModelName, runtime.NumCPU())
        }
        return mcp.NewToolResultText(text), nil
    })
    
    // 内存信息
    s.AddTool(mcp.NewTool("get_memory_info",
        mcp.WithDescription("获取系统内存使用情况"),
    ), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        memInfo, _ := mem.VirtualMemory()
        text := fmt.Sprintf(
            "总内存: %.2f GB\n已用: %.2f GB (%.1f%%)\n可用: %.2f GB",
            float64(memInfo.Total)/1024/1024/1024,
            float64(memInfo.Used)/1024/1024/1024,
            memInfo.UsedPercent,
            float64(memInfo.Available)/1024/1024/1024,
        )
        return mcp.NewToolResultText(text), nil
    })
    
    // 磁盘信息
    s.AddTool(mcp.NewTool("get_disk_info",
        mcp.WithDescription("获取磁盘使用情况"),
        mcp.WithString("path",
            mcp.Description("磁盘路径,默认 /"),
        ),
    ), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
        path := "/"
        if p, ok := request.Params.Arguments["path"].(string); ok {
            path = p
        }
        diskInfo, _ := disk.Usage(path)
        text := fmt.Sprintf(
            "路径: %s\n总空间: %.2f GB\n已用: %.2f GB (%.1f%%)\n可用: %.2f GB",
            path,
            float64(diskInfo.Total)/1024/1024/1024,
            float64(diskInfo.Used)/1024/1024/1024,
            diskInfo.UsedPercent,
            float64(diskInfo.Free)/1024/1024/1024,
        )
        return mcp.NewToolResultText(text), nil
    })
    
    if err := server.ServeStdio(s); err != nil {
        fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
        os.Exit(1)
    }
}

第四章:进阶技巧与最佳实践

4.1 工具描述的重要性

MCP 的智能程度高度依赖工具描述的质量。AI 模型通过描述来决定何时调用哪个工具。

差的描述:

{
  "name": "search",
  "description": "搜索功能"
}

好的描述:

{
  "name": "search_database",
  "description": "在用户数据库中按名称、邮箱或ID搜索用户信息,返回匹配用户的详情(含联系方式、注册时间、会员等级)",
  "inputSchema": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "搜索关键词,支持用户名、邮箱前缀或用户ID"
      },
      "limit": {
        "type": "number",
        "description": "最多返回结果数量,默认10,最大100"
      }
    },
    "required": ["query"]
  }
}

4.2 输入校验

在实际部署中,务必对输入参数做严格校验:

from pydantic import BaseModel, Field, ValidationError

class SearchParams(BaseModel):
    keyword: str = Field(..., min_length=1, max_length=200)
    limit: int = Field(default=10, ge=1, le=100)
    category: str | None = Field(default=None, pattern="^(tech|science|life)$")

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        params = SearchParams(**arguments)
    except ValidationError as e:
        return [TextContent(type="text", text=f"参数错误: {e}")]
    # 继续处理...

4.3 错误处理

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        result = await process_request(name, arguments)
        return [TextContent(type="text", text=result)]
    except PermissionError:
        return [TextContent(type="text", text="⚠️ 权限不足,请检查 MCP Server 配置的访问权限")]
    except TimeoutError:
        return [TextContent(type="text", text="⏰ 请求超时,外部服务可能暂时不可用")]
    except Exception as e:
        # 记录错误但不暴露敏感信息
        logger.error(f"Tool {name} failed: {e}", exc_info=True)
        return [TextContent(type="text", text="❌ 工具调用失败,请稍后重试")]

4.4 资源类型的使用

除了 Tools(工具),MCP 还支持 Resources(资源)和 Prompts(提示)。

Resources 用于让 AI 读取数据(而非执行操作):

@app.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="config://app",
            name="应用配置",
            description="当前应用的配置文件内容",
            mime_type="application/json"
        )
    ]

@app.read_resource()
async def read_resource(uri: str) -> str:
    if uri == "config://app":
        return json.dumps(load_config(), ensure_ascii=False)
    raise ValueError(f"未知资源: {uri}")

4.5 测试你的 MCP Server

推荐的测试工具:

  1. MCP Inspector(官方):交互式测试工具

    npx @modelcontextprotocol/inspector python my_server.py
  2. claude-code 本地测试:直接让 Claude 调用你的工具

  3. 单元测试:直接测试工具逻辑函数

import pytest

# 测试工具逻辑(不依赖 MCP 协议)
async def test_search_in_files():
    server = FileSearchServer()
    result = await server.search_in_files(keyword="TODO", pattern="*.py")
    assert "TODO" in result
    assert len(result) > 0

总结

语言适合场景生态成熟度学习成本
Python快速原型、数据处理、AI 集成⭐⭐⭐⭐⭐
Node.jsWeb 服务、SSE 模式、前端工具链⭐⭐⭐⭐
Go高性能服务、系统工具、生产部署⭐⭐⭐

最佳实践总结:

  • 工具名字要语义化(search_users 而非 func1
  • 描述要详细,告诉 AI “何时用、怎么用、返回什么”
  • 参数定义要精确,类型、长度、范围都要限制
  • 错误信息要友好,让 AI 能理解并重试
  • 每个 MCP Server 只做一件事,做精

从今天开始,把常用的 API 和数据源包装成 MCP Server,你的 AI 工具链会顺手十倍。

📤 分享到