什么是 AI Agent
AI Agent 是能够感知环境、做出决策并采取行动以实现特定目标的智能系统。与简单的聊天机器人不同,AI Agent 具有:
- 自主性:能够独立做出决策
- 反应性:对环境变化做出响应
- 主动性:主动采取行动达成目标
- 社交性:与其他 Agent 或人类协作
核心架构设计
1. Agent 基础架构
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import openai
class BaseAgent(ABC):
def __init__(self, name: str, llm_model: str = "gpt-4"):
self.name = name
self.llm_model = llm_model
self.memory = []
self.tools = {}
@abstractmethod
def perceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
"""感知环境信息"""
pass
@abstractmethod
def decide(self, perception: Dict[str, Any]) -> str:
"""基于感知做出决策"""
pass
@abstractmethod
def act(self, decision: str) -> Any:
"""执行决策"""
pass
def run(self, environment: Dict[str, Any]) -> Any:
"""Agent 主循环"""
perception = self.perceive(environment)
decision = self.decide(perception)
result = self.act(decision)
# 更新记忆
self.memory.append({
"perception": perception,
"decision": decision,
"result": result
})
return result
2. 实现具体 Agent
class ResearchAgent(BaseAgent):
def __init__(self, name: str):
super().__init__(name)
self.tools = {
"search": self.search_tool,
"summarize": self.summarize_tool,
"analyze": self.analyze_tool
}
def perceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
"""解析研究任务"""
return {
"query": environment.get("research_query"),
"context": environment.get("context", ""),
"constraints": environment.get("constraints", [])
}
def decide(self, perception: Dict[str, Any]) -> str:
"""决定研究策略"""
prompt = f"""
作为研究助手,请为以下查询制定研究计划:
查询:{perception['query']}
上下文:{perception['context']}
请返回要执行的工具和参数,格式如下:
tool: search
params: {{...}}
"""
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
def act(self, decision: str) -> Any:
"""执行研究动作"""
# 解析决策
tool_name, params = self.parse_decision(decision)
if tool_name in self.tools:
return self.tools[tool_name](**params)
else:
return {"error": f"Unknown tool: {tool_name}"}
def search_tool(self, query: str, num_results: int = 5) -> List[Dict]:
"""搜索工具实现"""
# 这里可以集成真实的搜索 API
return [{"title": f"Result {i}", "content": "..."}
for i in range(num_results)]
def summarize_tool(self, text: str) -> str:
"""摘要工具实现"""
prompt = f"请总结以下内容:\n{text}"
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
工具集成
1. 工具抽象层
from typing import Callable, Dict, Any
class Tool:
def __init__(self, name: str, description: str,
func: Callable, parameters: Dict[str, Any]):
self.name = name
self.description = description
self.func = func
self.parameters = parameters
def run(self, **kwargs) -> Any:
"""执行工具"""
# 验证参数
for param, param_info in self.parameters.items():
if param_info.get("required") and param not in kwargs:
raise ValueError(f"Missing required parameter: {param}")
return self.func(**kwargs)
def to_json_schema(self) -> Dict[str, Any]:
"""转换为 JSON Schema 格式,供 LLM 理解"""
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": self.parameters,
"required": [k for k, v in self.parameters.items()
if v.get("required")]
}
}
# 创建工具集
class ToolKit:
def __init__(self):
self.tools = {}
def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
def get_schemas(self) -> List[Dict[str, Any]]:
"""获取所有工具的 Schema"""
return [tool.to_json_schema() for tool in self.tools.values()]
def execute(self, tool_name: str, **kwargs) -> Any:
"""执行指定工具"""
if tool_name not in self.tools:
raise ValueError(f"Tool not found: {tool_name}")
return self.tools[tool_name].run(**kwargs)
2. 集成实际工具
import requests
from datetime import datetime
import json
# Web 搜索工具
def web_search(query: str, num_results: int = 5) -> List[Dict]:
"""使用搜索 API 进行网络搜索"""
# 示例:使用 SerpAPI
response = requests.get(
"https://serpapi.com/search",
params={
"q": query,
"num": num_results,
"api_key": os.environ["SERPAPI_KEY"]
}
)
if response.status_code == 200:
results = response.json().get("organic_results", [])
return [
{
"title": r.get("title"),
"snippet": r.get("snippet"),
"link": r.get("link")
}
for r in results
]
return []
# 代码执行工具
def execute_python(code: str) -> Dict[str, Any]:
"""安全执行 Python 代码"""
import subprocess
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
f.flush()
try:
result = subprocess.run(
['python', f.name],
capture_output=True,
text=True,
timeout=10
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Code execution timeout"
}
# 创建工具实例
toolkit = ToolKit()
toolkit.register(Tool(
name="web_search",
description="搜索网络获取相关信息",
func=web_search,
parameters={
"query": {
"type": "string",
"description": "搜索查询",
"required": True
},
"num_results": {
"type": "integer",
"description": "返回结果数量",
"default": 5
}
}
))
toolkit.register(Tool(
name="python_repl",
description="执行 Python 代码进行计算或数据处理",
func=execute_python,
parameters={
"code": {
"type": "string",
"description": "要执行的 Python 代码",
"required": True
}
}
))
记忆系统
1. 短期记忆
from collections import deque
from typing import Optional
class ShortTermMemory:
def __init__(self, capacity: int = 10):
self.capacity = capacity
self.memory = deque(maxlen=capacity)
def add(self, item: Dict[str, Any]):
"""添加记忆"""
item["timestamp"] = datetime.now()
self.memory.append(item)
def get_recent(self, n: int = 5) -> List[Dict[str, Any]]:
"""获取最近的 n 条记忆"""
return list(self.memory)[-n:]
def search(self, query: str) -> List[Dict[str, Any]]:
"""搜索相关记忆"""
results = []
for item in self.memory:
if query.lower() in str(item).lower():
results.append(item)
return results
2. 长期记忆(向量数据库)
import numpy as np
from typing import List, Tuple
import faiss
class LongTermMemory:
def __init__(self, embedding_dim: int = 1536):
self.embedding_dim = embedding_dim
self.index = faiss.IndexFlatL2(embedding_dim)
self.memories = []
def _get_embedding(self, text: str) -> np.ndarray:
"""获取文本嵌入向量"""
response = openai.Embedding.create(
model="text-embedding-ada-002",
input=text
)
return np.array(response['data'][0]['embedding'])
def add(self, content: str, metadata: Dict[str, Any] = None):
"""添加长期记忆"""
embedding = self._get_embedding(content)
self.index.add(embedding.reshape(1, -1))
self.memories.append({
"content": content,
"metadata": metadata or {},
"timestamp": datetime.now()
})
def search(self, query: str, k: int = 5) -> List[Tuple[Dict, float]]:
"""语义搜索记忆"""
query_embedding = self._get_embedding(query)
distances, indices = self.index.search(
query_embedding.reshape(1, -1), k
)
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.memories):
results.append((
self.memories[idx],
float(distances[0][i])
))
return results
多 Agent 协作
1. Agent 管理器
class AgentManager:
def __init__(self):
self.agents = {}
self.message_queue = Queue()
def register_agent(self, agent: BaseAgent):
"""注册 Agent"""
self.agents[agent.name] = agent
def send_message(self, from_agent: str, to_agent: str,
message: Dict[str, Any]):
"""Agent 间通信"""
self.message_queue.put({
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": datetime.now()
})
def process_messages(self):
"""处理消息队列"""
while not self.message_queue.empty():
msg = self.message_queue.get()
if msg["to"] in self.agents:
# 将消息传递给目标 Agent
self.agents[msg["to"]].receive_message(msg)
def coordinate_task(self, task: Dict[str, Any]) -> Any:
"""协调多个 Agent 完成任务"""
# 任务分解
subtasks = self.decompose_task(task)
results = {}
for subtask in subtasks:
agent_name = subtask["agent"]
if agent_name in self.agents:
result = self.agents[agent_name].run(subtask["environment"])
results[agent_name] = result
# 综合结果
return self.synthesize_results(results)
2. 协作示例
# 创建专门的 Agent
research_agent = ResearchAgent("researcher")
writer_agent = WriterAgent("writer")
reviewer_agent = ReviewerAgent("reviewer")
# 注册到管理器
manager = AgentManager()
manager.register_agent(research_agent)
manager.register_agent(writer_agent)
manager.register_agent(reviewer_agent)
# 执行协作任务
task = {
"type": "create_article",
"topic": "AI Agent 的未来发展",
"requirements": {
"length": 2000,
"style": "technical",
"audience": "developers"
}
}
# 协作流程
# 1. Research Agent 收集资料
# 2. Writer Agent 撰写文章
# 3. Reviewer Agent 审核修改
result = manager.coordinate_task(task)
实战案例:客服 Agent
class CustomerServiceAgent(BaseAgent):
def __init__(self):
super().__init__("customer_service")
self.knowledge_base = self.load_knowledge_base()
self.conversation_history = []
def handle_query(self, user_query: str) -> str:
"""处理用户查询"""
# 1. 理解意图
intent = self.understand_intent(user_query)
# 2. 搜索相关信息
relevant_info = self.search_knowledge(intent)
# 3. 生成回复
response = self.generate_response(
user_query,
intent,
relevant_info
)
# 4. 记录对话
self.conversation_history.append({
"user": user_query,
"agent": response,
"timestamp": datetime.now()
})
return response
def understand_intent(self, query: str) -> Dict[str, Any]:
"""意图识别"""
prompt = f"""
分析以下用户查询的意图:
"{query}"
返回 JSON 格式:
{{
"intent": "查询类型",
"entities": ["相关实体"],
"sentiment": "情感倾向"
}}
"""
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
部署和优化
1. 异步处理
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncAgent(BaseAgent):
def __init__(self, name: str):
super().__init__(name)
self.executor = ThreadPoolExecutor(max_workers=5)
async def run_async(self, environment: Dict[str, Any]) -> Any:
"""异步执行 Agent"""
perception = await self.aperceive(environment)
decision = await self.adecide(perception)
result = await self.aact(decision)
return result
async def aperceive(self, environment: Dict[str, Any]) -> Dict[str, Any]:
"""异步感知"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.perceive,
environment
)
2. 性能监控
import time
from functools import wraps
def monitor_performance(func):
"""性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# 记录性能数据
performance_data = {
"function": func.__name__,
"duration": end_time - start_time,
"timestamp": datetime.now()
}
# 发送到监控系统
send_to_monitoring(performance_data)
return result
return wrapper
class MonitoredAgent(BaseAgent):
@monitor_performance
def run(self, environment: Dict[str, Any]) -> Any:
return super().run(environment)
总结
AI Agent 开发是一个充满挑战和机遇的领域。通过合理的架构设计、工具集成和记忆系统,我们可以构建出真正智能和实用的 Agent 系统。
关键要点:
- 模块化设计:保持 Agent 组件的独立性和可扩展性
- 工具生态:集成丰富的工具提升 Agent 能力
- 记忆系统:结合短期和长期记忆提供上下文
- 协作机制:通过多 Agent 协作解决复杂问题
随着技术的发展,AI Agent 将在更多领域发挥重要作用,成为人类的得力助手。