Claude Code MCP:模型上下文协议详解

Claude Code MCP:模型上下文协议详解

什么是 MCP?

MCP(Model Context Protocol)是 Anthropic 推出的开放协议,用于连接 AI 助手与外部数据源和工具。通过 MCP,Claude Code 可以:

  • 🔌 连接数据源 - 访问数据库、API、文件系统
  • 🛠️ 集成工具 - 调用外部工具和服务
  • 📊 实时数据 - 获取最新的上下文信息
  • 🔄 双向通信 - 实现 AI 与系统的交互

核心概念

1. MCP 服务器

MCP 服务器是提供资源和工具的组件:

// mcp-server.js
import { MCPServer } from '@anthropic/mcp';

const server = new MCPServer({
  name: 'my-tools-server',
  version: '1.0.0',
  description: '自定义工具服务器'
});

// 注册资源
server.resource({
  name: 'database',
  description: '访问数据库',
  handler: async (params) => {
    // 数据库查询逻辑
    return { data: results };
  }
});

// 注册工具
server.tool({
  name: 'calculate',
  description: '执行计算',
  parameters: {
    expression: { type: 'string', required: true }
  },
  handler: async ({ expression }) => {
    return eval(expression); // 示例,实际应安全处理
  }
});

server.start({ port: 3000 });

2. MCP 客户端

Claude Code 作为 MCP 客户端连接到服务器:

// claude-config.json
{
  "mcp": {
    "servers": [
      {
        "name": "my-tools",
        "url": "http://localhost:3000",
        "auth": {
          "type": "bearer",
          "token": "${MCP_TOKEN}"
        }
      }
    ]
  }
}

3. 资源和工具

资源(Resources):提供数据访问

server.resource({
  name: 'user-data',
  description: '获取用户数据',
  parameters: {
    userId: { type: 'string', required: true }
  },
  handler: async ({ userId }) => {
    const user = await db.users.findById(userId);
    return {
      type: 'json',
      data: user
    };
  }
});

工具(Tools):执行操作

server.tool({
  name: 'send-email',
  description: '发送邮件',
  parameters: {
    to: { type: 'string', required: true },
    subject: { type: 'string', required: true },
    body: { type: 'string', required: true }
  },
  handler: async ({ to, subject, body }) => {
    await emailService.send({ to, subject, body });
    return { success: true, message: '邮件已发送' };
  }
});

实战示例

1. 数据库集成

创建一个 MCP 服务器来访问数据库:

// database-mcp-server.js
import { MCPServer } from '@anthropic/mcp';
import { Pool } from 'pg';

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
});

const server = new MCPServer({
  name: 'database-server',
  version: '1.0.0'
});

// 查询资源
server.resource({
  name: 'query',
  description: '执行 SQL 查询',
  parameters: {
    sql: { type: 'string', required: true },
    params: { type: 'array', required: false }
  },
  handler: async ({ sql, params = [] }) => {
    try {
      const result = await pool.query(sql, params);
      return {
        type: 'table',
        columns: Object.keys(result.rows[0] || {}),
        rows: result.rows
      };
    } catch (error) {
      return {
        type: 'error',
        message: error.message
      };
    }
  }
});

// 表结构资源
server.resource({
  name: 'schema',
  description: '获取表结构',
  parameters: {
    table: { type: 'string', required: true }
  },
  handler: async ({ table }) => {
    const query = `
      SELECT column_name, data_type, is_nullable
      FROM information_schema.columns
      WHERE table_name = $1
    `;
    const result = await pool.query(query, [table]);
    return {
      type: 'schema',
      columns: result.rows
    };
  }
});

server.start({ port: 3001 });

2. API 网关集成

// api-gateway-mcp.js
import { MCPServer } from '@anthropic/mcp';
import axios from 'axios';

const server = new MCPServer({
  name: 'api-gateway',
  version: '1.0.0'
});

// 注册多个 API 端点
const apis = {
  weather: {
    url: 'https://api.weather.com/v1',
    key: process.env.WEATHER_API_KEY
  },
  news: {
    url: 'https://api.news.com/v2',
    key: process.env.NEWS_API_KEY
  },
  stocks: {
    url: 'https://api.stocks.com/v1',
    key: process.env.STOCKS_API_KEY
  }
};

Object.entries(apis).forEach(([name, config]) => {
  server.tool({
    name: `${name}-api`,
    description: `调用 ${name} API`,
    parameters: {
      endpoint: { type: 'string', required: true },
      method: { type: 'string', default: 'GET' },
      data: { type: 'object', required: false }
    },
    handler: async ({ endpoint, method, data }) => {
      try {
        const response = await axios({
          method,
          url: `${config.url}${endpoint}`,
          headers: {
            'Authorization': `Bearer ${config.key}`,
            'Content-Type': 'application/json'
          },
          data
        });
        return response.data;
      } catch (error) {
        return {
          error: error.message,
          status: error.response?.status
        };
      }
    }
  });
});

server.start({ port: 3002 });

3. 文件系统访问

// filesystem-mcp.js
import { MCPServer } from '@anthropic/mcp';
import fs from 'fs/promises';
import path from 'path';

const server = new MCPServer({
  name: 'filesystem',
  version: '1.0.0'
});

// 读取文件
server.resource({
  name: 'read-file',
  description: '读取文件内容',
  parameters: {
    filepath: { type: 'string', required: true }
  },
  handler: async ({ filepath }) => {
    const safePath = path.resolve(process.env.SAFE_DIR, filepath);
    
    // 安全检查
    if (!safePath.startsWith(process.env.SAFE_DIR)) {
      throw new Error('访问被拒绝');
    }
    
    const content = await fs.readFile(safePath, 'utf-8');
    return {
      type: 'text',
      content,
      path: safePath
    };
  }
});

// 列出目录
server.resource({
  name: 'list-directory',
  description: '列出目录内容',
  parameters: {
    dirpath: { type: 'string', default: '.' }
  },
  handler: async ({ dirpath }) => {
    const safePath = path.resolve(process.env.SAFE_DIR, dirpath);
    
    if (!safePath.startsWith(process.env.SAFE_DIR)) {
      throw new Error('访问被拒绝');
    }
    
    const files = await fs.readdir(safePath, { withFileTypes: true });
    return {
      type: 'directory',
      files: files.map(f => ({
        name: f.name,
        type: f.isDirectory() ? 'directory' : 'file',
        path: path.join(dirpath, f.name)
      }))
    };
  }
});

// 写入文件
server.tool({
  name: 'write-file',
  description: '写入文件',
  parameters: {
    filepath: { type: 'string', required: true },
    content: { type: 'string', required: true }
  },
  handler: async ({ filepath, content }) => {
    const safePath = path.resolve(process.env.SAFE_DIR, filepath);
    
    if (!safePath.startsWith(process.env.SAFE_DIR)) {
      throw new Error('访问被拒绝');
    }
    
    await fs.writeFile(safePath, content, 'utf-8');
    return { success: true, path: safePath };
  }
});

server.start({ port: 3003 });

高级特性

1. 流式响应

支持流式数据传输:

server.resource({
  name: 'logs',
  description: '实时日志流',
  stream: true,
  handler: async function* (params) {
    const logFile = '/var/log/app.log';
    const stream = createReadStream(logFile);
    
    for await (const chunk of stream) {
      yield {
        type: 'log',
        content: chunk.toString(),
        timestamp: new Date().toISOString()
      };
    }
  }
});

2. 身份验证

实现安全的身份验证:

server.use(async (context, next) => {
  const token = context.headers.authorization?.replace('Bearer ', '');
  
  if (!token) {
    throw new Error('未授权');
  }
  
  try {
    const user = await verifyToken(token);
    context.user = user;
  } catch (error) {
    throw new Error('无效的令牌');
  }
  
  return next();
});

3. 缓存策略

实现智能缓存:

const cache = new Map();

server.resource({
  name: 'cached-data',
  description: '缓存的数据资源',
  cache: {
    ttl: 300, // 5 分钟
    key: (params) => JSON.stringify(params)
  },
  handler: async (params) => {
    const cacheKey = JSON.stringify(params);
    
    if (cache.has(cacheKey)) {
      const cached = cache.get(cacheKey);
      if (cached.expires > Date.now()) {
        return cached.data;
      }
    }
    
    const data = await fetchExpensiveData(params);
    cache.set(cacheKey, {
      data,
      expires: Date.now() + 300000
    });
    
    return data;
  }
});

配置 Claude Code

1. 基础配置

.claude/mcp.json:

{
  "servers": [
    {
      "name": "local-tools",
      "url": "http://localhost:3000",
      "description": "本地工具服务器",
      "auth": {
        "type": "bearer",
        "token": "${MCP_LOCAL_TOKEN}"
      },
      "resources": ["database", "filesystem"],
      "tools": ["calculate", "send-email"]
    },
    {
      "name": "remote-api",
      "url": "https://api.example.com/mcp",
      "description": "远程 API 服务",
      "auth": {
        "type": "oauth2",
        "clientId": "${OAUTH_CLIENT_ID}",
        "clientSecret": "${OAUTH_CLIENT_SECRET}"
      }
    }
  ],
  "defaults": {
    "timeout": 30000,
    "retries": 3
  }
}

2. 环境变量

.env:

# MCP 服务器配置
MCP_LOCAL_TOKEN=your-secret-token
MCP_SERVER_PORT=3000

# 数据库配置
DATABASE_URL=postgresql://user:pass@localhost/db

# API 密钥
WEATHER_API_KEY=xxx
NEWS_API_KEY=xxx
STOCKS_API_KEY=xxx

# OAuth 配置
OAUTH_CLIENT_ID=xxx
OAUTH_CLIENT_SECRET=xxx

3. Docker 部署

docker-compose.yml:

version: '3.8'

services:
  mcp-database:
    build: ./mcp-servers/database
    ports:
      - "3001:3001"
    environment:
      - DATABASE_URL=${DATABASE_URL}
      - MCP_TOKEN=${MCP_LOCAL_TOKEN}
    networks:
      - mcp-network

  mcp-api-gateway:
    build: ./mcp-servers/api-gateway
    ports:
      - "3002:3002"
    environment:
      - WEATHER_API_KEY=${WEATHER_API_KEY}
      - NEWS_API_KEY=${NEWS_API_KEY}
      - MCP_TOKEN=${MCP_LOCAL_TOKEN}
    networks:
      - mcp-network

  mcp-filesystem:
    build: ./mcp-servers/filesystem
    ports:
      - "3003:3003"
    volumes:
      - ./workspace:/workspace:ro
    environment:
      - SAFE_DIR=/workspace
      - MCP_TOKEN=${MCP_LOCAL_TOKEN}
    networks:
      - mcp-network

networks:
  mcp-network:
    driver: bridge

最佳实践

1. 安全性

  • 身份验证:始终验证请求来源
  • 权限控制:实施细粒度的访问控制
  • 输入验证:验证所有输入参数
  • 沙箱执行:在隔离环境中执行代码
// 安全的参数验证
server.tool({
  name: 'execute-query',
  parameters: {
    query: { 
      type: 'string', 
      required: true,
      validate: (value) => {
        // 防止 SQL 注入
        const forbidden = ['DROP', 'DELETE', 'TRUNCATE'];
        if (forbidden.some(word => value.toUpperCase().includes(word))) {
          throw new Error('危险操作被拒绝');
        }
        return true;
      }
    }
  },
  handler: async ({ query }) => {
    // 使用参数化查询
    return await db.query(query);
  }
});

2. 性能优化

  • 连接池:复用数据库连接
  • 缓存:缓存频繁访问的数据
  • 批处理:合并多个请求
  • 异步处理:使用队列处理耗时任务
// 批处理示例
const batchQueue = [];
let batchTimer = null;

server.tool({
  name: 'batch-process',
  handler: async (item) => {
    batchQueue.push(item);
    
    if (!batchTimer) {
      batchTimer = setTimeout(async () => {
        const items = [...batchQueue];
        batchQueue.length = 0;
        batchTimer = null;
        
        // 批量处理
        await processBatch(items);
      }, 100);
    }
    
    return { queued: true };
  }
});

3. 监控和日志

// 请求日志中间件
server.use(async (context, next) => {
  const start = Date.now();
  const { method, resource, tool } = context;
  
  try {
    const result = await next();
    
    logger.info({
      type: 'mcp-request',
      method,
      resource,
      tool,
      duration: Date.now() - start,
      status: 'success'
    });
    
    return result;
  } catch (error) {
    logger.error({
      type: 'mcp-request',
      method,
      resource,
      tool,
      duration: Date.now() - start,
      status: 'error',
      error: error.message
    });
    
    throw error;
  }
});

常见用例

1. 知识库集成

server.resource({
  name: 'knowledge-base',
  description: '搜索知识库',
  parameters: {
    query: { type: 'string', required: true },
    limit: { type: 'number', default: 10 }
  },
  handler: async ({ query, limit }) => {
    const results = await vectorDB.search(query, { limit });
    return {
      type: 'knowledge',
      results: results.map(r => ({
        content: r.content,
        score: r.score,
        metadata: r.metadata
      }))
    };
  }
});

2. 任务自动化

server.tool({
  name: 'automation',
  description: '执行自动化任务',
  parameters: {
    workflow: { type: 'string', required: true },
    params: { type: 'object' }
  },
  handler: async ({ workflow, params }) => {
    const steps = workflows[workflow];
    const results = [];
    
    for (const step of steps) {
      const result = await executeStep(step, params);
      results.push(result);
      
      if (result.error) {
        break;
      }
    }
    
    return { workflow, results };
  }
});

3. 实时通知

server.resource({
  name: 'notifications',
  description: '实时通知流',
  stream: true,
  handler: async function* () {
    const eventSource = new EventSource('/events');
    
    for await (const event of eventSource) {
      yield {
        type: 'notification',
        event: event.type,
        data: event.data,
        timestamp: new Date().toISOString()
      };
    }
  }
});

故障排除

常见问题

1. 连接失败

# 检查服务器状态
curl http://localhost:3000/health

# 查看日志
docker logs mcp-server

# 测试连接
claude-code mcp test --server local-tools

2. 认证错误

# 验证令牌
claude-code mcp auth --server local-tools --token $MCP_TOKEN

# 刷新令牌
claude-code mcp refresh-token

3. 超时问题

// 增加超时时间
server.configure({
  timeout: 60000, // 60 秒
  keepAlive: true,
  keepAliveInterval: 30000
});

总结

MCP 为 Claude Code 提供了强大的扩展能力,让 AI 能够:

  • ✅ 访问实时数据和外部系统
  • ✅ 执行复杂的操作和工具
  • ✅ 与现有基础设施无缝集成
  • ✅ 保持安全性和可控性

通过合理使用 MCP,可以将 Claude Code 打造成真正的智能开发助手,不仅能理解代码,还能主动与系统交互,完成复杂的开发任务。


💡 提示:MCP 是一个开放协议,社区正在不断贡献新的服务器实现。查看 MCP Hub 获取更多现成的集成。