AI Agent 开发实战:从概念到部署

AI Agent 开发实战:从概念到部署

什么是 AI Agent

AI Agent 是能够感知环境、做出决策并采取行动以实现特定目标的智能系统。与简单的聊天机器人不同,AI Agent 具有:

  • 自主性:能够独立做出决策
  • 反应性:对环境变化做出响应
  • 主动性:主动采取行动达成目标
  • 社交性:与其他 Agent 或人类协作

核心架构设计

1. Agent 基础架构

from abc import ABC, abstractmethod
from typing import List, Dict, Any
import openai

class BaseAgent(ABC):
    def __init__(self, name: str, llm_model: str = "gpt-4"):
        self.name = name
        self.llm_model = llm_model
        self.memory = []
        self.tools = {}
        
    @abstractmethod
    def perceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
        """感知环境信息"""
        pass
    
    @abstractmethod
    def decide(self, perception: Dict[str, Any]) -> str:
        """基于感知做出决策"""
        pass
    
    @abstractmethod
    def act(self, decision: str) -> Any:
        """执行决策"""
        pass
    
    def run(self, environment: Dict[str, Any]) -> Any:
        """Agent 主循环"""
        perception = self.perceive(environment)
        decision = self.decide(perception)
        result = self.act(decision)
        
        # 更新记忆
        self.memory.append({
            "perception": perception,
            "decision": decision,
            "result": result
        })
        
        return result

2. 实现具体 Agent

class ResearchAgent(BaseAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.tools = {
            "search": self.search_tool,
            "summarize": self.summarize_tool,
            "analyze": self.analyze_tool
        }
    
    def perceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
        """解析研究任务"""
        return {
            "query": environment.get("research_query"),
            "context": environment.get("context", ""),
            "constraints": environment.get("constraints", [])
        }
    
    def decide(self, perception: Dict[str, Any]) -> str:
        """决定研究策略"""
        prompt = f"""
        作为研究助手,请为以下查询制定研究计划:
        查询:{perception['query']}
        上下文:{perception['context']}
        
        请返回要执行的工具和参数,格式如下:
        tool: search
        params: {{...}}
        """
        
        response = openai.ChatCompletion.create(
            model=self.llm_model,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return response.choices[0].message.content
    
    def act(self, decision: str) -> Any:
        """执行研究动作"""
        # 解析决策
        tool_name, params = self.parse_decision(decision)
        
        if tool_name in self.tools:
            return self.tools[tool_name](**params)
        else:
            return {"error": f"Unknown tool: {tool_name}"}
    
    def search_tool(self, query: str, num_results: int = 5) -> List[Dict]:
        """搜索工具实现"""
        # 这里可以集成真实的搜索 API
        return [{"title": f"Result {i}", "content": "..."} 
                for i in range(num_results)]
    
    def summarize_tool(self, text: str) -> str:
        """摘要工具实现"""
        prompt = f"请总结以下内容:\n{text}"
        response = openai.ChatCompletion.create(
            model=self.llm_model,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

工具集成

1. 工具抽象层

from typing import Callable, Dict, Any

class Tool:
    def __init__(self, name: str, description: str, 
                 func: Callable, parameters: Dict[str, Any]):
        self.name = name
        self.description = description
        self.func = func
        self.parameters = parameters
    
    def run(self, **kwargs) -> Any:
        """执行工具"""
        # 验证参数
        for param, param_info in self.parameters.items():
            if param_info.get("required") and param not in kwargs:
                raise ValueError(f"Missing required parameter: {param}")
        
        return self.func(**kwargs)
    
    def to_json_schema(self) -> Dict[str, Any]:
        """转换为 JSON Schema 格式,供 LLM 理解"""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": self.parameters,
                "required": [k for k, v in self.parameters.items() 
                           if v.get("required")]
            }
        }

# 创建工具集
class ToolKit:
    def __init__(self):
        self.tools = {}
    
    def register(self, tool: Tool):
        """注册工具"""
        self.tools[tool.name] = tool
    
    def get_schemas(self) -> List[Dict[str, Any]]:
        """获取所有工具的 Schema"""
        return [tool.to_json_schema() for tool in self.tools.values()]
    
    def execute(self, tool_name: str, **kwargs) -> Any:
        """执行指定工具"""
        if tool_name not in self.tools:
            raise ValueError(f"Tool not found: {tool_name}")
        
        return self.tools[tool_name].run(**kwargs)

2. 集成实际工具

import requests
from datetime import datetime
import json

# Web 搜索工具
def web_search(query: str, num_results: int = 5) -> List[Dict]:
    """使用搜索 API 进行网络搜索"""
    # 示例:使用 SerpAPI
    response = requests.get(
        "https://serpapi.com/search",
        params={
            "q": query,
            "num": num_results,
            "api_key": os.environ["SERPAPI_KEY"]
        }
    )
    
    if response.status_code == 200:
        results = response.json().get("organic_results", [])
        return [
            {
                "title": r.get("title"),
                "snippet": r.get("snippet"),
                "link": r.get("link")
            }
            for r in results
        ]
    return []

# 代码执行工具
def execute_python(code: str) -> Dict[str, Any]:
    """安全执行 Python 代码"""
    import subprocess
    import tempfile
    
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(code)
        f.flush()
        
        try:
            result = subprocess.run(
                ['python', f.name],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            return {
                "success": result.returncode == 0,
                "stdout": result.stdout,
                "stderr": result.stderr
            }
        except subprocess.TimeoutExpired:
            return {
                "success": False,
                "error": "Code execution timeout"
            }

# 创建工具实例
toolkit = ToolKit()

toolkit.register(Tool(
    name="web_search",
    description="搜索网络获取相关信息",
    func=web_search,
    parameters={
        "query": {
            "type": "string",
            "description": "搜索查询",
            "required": True
        },
        "num_results": {
            "type": "integer",
            "description": "返回结果数量",
            "default": 5
        }
    }
))

toolkit.register(Tool(
    name="python_repl",
    description="执行 Python 代码进行计算或数据处理",
    func=execute_python,
    parameters={
        "code": {
            "type": "string",
            "description": "要执行的 Python 代码",
            "required": True
        }
    }
))

记忆系统

1. 短期记忆

from collections import deque
from typing import Optional

class ShortTermMemory:
    def __init__(self, capacity: int = 10):
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)
    
    def add(self, item: Dict[str, Any]):
        """添加记忆"""
        item["timestamp"] = datetime.now()
        self.memory.append(item)
    
    def get_recent(self, n: int = 5) -> List[Dict[str, Any]]:
        """获取最近的 n 条记忆"""
        return list(self.memory)[-n:]
    
    def search(self, query: str) -> List[Dict[str, Any]]:
        """搜索相关记忆"""
        results = []
        for item in self.memory:
            if query.lower() in str(item).lower():
                results.append(item)
        return results

2. 长期记忆(向量数据库)

import numpy as np
from typing import List, Tuple
import faiss

class LongTermMemory:
    def __init__(self, embedding_dim: int = 1536):
        self.embedding_dim = embedding_dim
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.memories = []
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """获取文本嵌入向量"""
        response = openai.Embedding.create(
            model="text-embedding-ada-002",
            input=text
        )
        return np.array(response['data'][0]['embedding'])
    
    def add(self, content: str, metadata: Dict[str, Any] = None):
        """添加长期记忆"""
        embedding = self._get_embedding(content)
        self.index.add(embedding.reshape(1, -1))
        
        self.memories.append({
            "content": content,
            "metadata": metadata or {},
            "timestamp": datetime.now()
        })
    
    def search(self, query: str, k: int = 5) -> List[Tuple[Dict, float]]:
        """语义搜索记忆"""
        query_embedding = self._get_embedding(query)
        
        distances, indices = self.index.search(
            query_embedding.reshape(1, -1), k
        )
        
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.memories):
                results.append((
                    self.memories[idx],
                    float(distances[0][i])
                ))
        
        return results

多 Agent 协作

1. Agent 管理器

class AgentManager:
    def __init__(self):
        self.agents = {}
        self.message_queue = Queue()
    
    def register_agent(self, agent: BaseAgent):
        """注册 Agent"""
        self.agents[agent.name] = agent
    
    def send_message(self, from_agent: str, to_agent: str, 
                     message: Dict[str, Any]):
        """Agent 间通信"""
        self.message_queue.put({
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": datetime.now()
        })
    
    def process_messages(self):
        """处理消息队列"""
        while not self.message_queue.empty():
            msg = self.message_queue.get()
            
            if msg["to"] in self.agents:
                # 将消息传递给目标 Agent
                self.agents[msg["to"]].receive_message(msg)
    
    def coordinate_task(self, task: Dict[str, Any]) -> Any:
        """协调多个 Agent 完成任务"""
        # 任务分解
        subtasks = self.decompose_task(task)
        
        results = {}
        for subtask in subtasks:
            agent_name = subtask["agent"]
            if agent_name in self.agents:
                result = self.agents[agent_name].run(subtask["environment"])
                results[agent_name] = result
        
        # 综合结果
        return self.synthesize_results(results)

2. 协作示例

# 创建专门的 Agent
research_agent = ResearchAgent("researcher")
writer_agent = WriterAgent("writer")
reviewer_agent = ReviewerAgent("reviewer")

# 注册到管理器
manager = AgentManager()
manager.register_agent(research_agent)
manager.register_agent(writer_agent)
manager.register_agent(reviewer_agent)

# 执行协作任务
task = {
    "type": "create_article",
    "topic": "AI Agent 的未来发展",
    "requirements": {
        "length": 2000,
        "style": "technical",
        "audience": "developers"
    }
}

# 协作流程
# 1. Research Agent 收集资料
# 2. Writer Agent 撰写文章
# 3. Reviewer Agent 审核修改
result = manager.coordinate_task(task)

实战案例:客服 Agent

class CustomerServiceAgent(BaseAgent):
    def __init__(self):
        super().__init__("customer_service")
        self.knowledge_base = self.load_knowledge_base()
        self.conversation_history = []
        
    def handle_query(self, user_query: str) -> str:
        """处理用户查询"""
        # 1. 理解意图
        intent = self.understand_intent(user_query)
        
        # 2. 搜索相关信息
        relevant_info = self.search_knowledge(intent)
        
        # 3. 生成回复
        response = self.generate_response(
            user_query, 
            intent, 
            relevant_info
        )
        
        # 4. 记录对话
        self.conversation_history.append({
            "user": user_query,
            "agent": response,
            "timestamp": datetime.now()
        })
        
        return response
    
    def understand_intent(self, query: str) -> Dict[str, Any]:
        """意图识别"""
        prompt = f"""
        分析以下用户查询的意图:
        "{query}"
        
        返回 JSON 格式:
        {{
            "intent": "查询类型",
            "entities": ["相关实体"],
            "sentiment": "情感倾向"
        }}
        """
        
        response = openai.ChatCompletion.create(
            model=self.llm_model,
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )
        
        return json.loads(response.choices[0].message.content)

部署和优化

1. 异步处理

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncAgent(BaseAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.executor = ThreadPoolExecutor(max_workers=5)
    
    async def run_async(self, environment: Dict[str, Any]) -> Any:
        """异步执行 Agent"""
        perception = await self.aperceive(environment)
        decision = await self.adecide(perception)
        result = await self.aact(decision)
        
        return result
    
    async def aperceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
        """异步感知"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor, 
            self.perceive, 
            environment
        )

2. 性能监控

import time
from functools import wraps

def monitor_performance(func):
    """性能监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        # 记录性能数据
        performance_data = {
            "function": func.__name__,
            "duration": end_time - start_time,
            "timestamp": datetime.now()
        }
        
        # 发送到监控系统
        send_to_monitoring(performance_data)
        
        return result
    
    return wrapper

class MonitoredAgent(BaseAgent):
    @monitor_performance
    def run(self, environment: Dict[str, Any]) -> Any:
        return super().run(environment)

总结

AI Agent 开发是一个充满挑战和机遇的领域。通过合理的架构设计、工具集成和记忆系统,我们可以构建出真正智能和实用的 Agent 系统。

关键要点:

  1. 模块化设计:保持 Agent 组件的独立性和可扩展性
  2. 工具生态:集成丰富的工具提升 Agent 能力
  3. 记忆系统:结合短期和长期记忆提供上下文
  4. 协作机制:通过多 Agent 协作解决复杂问题

随着技术的发展,AI Agent 将在更多领域发挥重要作用,成为人类的得力助手。