多Agent协作
概述
随着任务复杂度的增加,单个Agent往往难以胜任所有工作。多Agent协作系统通过让多个专业化的Agent协同工作,可以更好地完成复杂任务。每个Agent专注于自己擅长的领域,通过通信和协调机制实现高效协作。
多Agent架构
1. 层级架构
python
from typing import List, Dict, Any
from abc import ABC, abstractmethod
class BaseAgent(ABC):
def __init__(self, name: str, role: str):
self.name = name
self.role = role
self.memory = []
@abstractmethod
def process(self, task: Dict) -> Dict:
pass
def add_memory(self, item: Any):
self.memory.append(item)
class ManagerAgent(BaseAgent):
def __init__(self, name: str, workers: List[BaseAgent]):
super().__init__(name, "manager")
self.workers = {worker.name: worker for worker in workers}
def process(self, task: Dict) -> Dict:
subtasks = self.decompose(task)
results = {}
for subtask in subtasks:
worker_name = subtask["assigned_to"]
worker = self.workers[worker_name]
result = worker.process(subtask)
results[subtask["id"]] = result
return self.aggregate(results)
def decompose(self, task: Dict) -> List[Dict]:
pass
def aggregate(self, results: Dict) -> Dict:
pass
class WorkerAgent(BaseAgent):
def __init__(self, name: str, expertise: str):
super().__init__(name, "worker")
self.expertise = expertise
def process(self, task: Dict) -> Dict:
if not self.can_handle(task):
return {"status": "failed", "reason": "任务超出能力范围"}
result = self.execute(task)
self.add_memory({"task": task, "result": result})
return {"status": "success", "result": result}
def can_handle(self, task: Dict) -> bool:
return task.get("type") == self.expertise
def execute(self, task: Dict) -> Any:
pass
researcher = WorkerAgent("researcher", "research")
writer = WorkerAgent("writer", "writing")
editor = WorkerAgent("editor", "editing")
manager = ManagerAgent("content_manager", [researcher, writer, editor])
result = manager.process({
"type": "content_creation",
"topic": "AI发展趋势"
})2. 对等架构
python
class PeerAgent(BaseAgent):
def __init__(self, name: str, expertise: List[str]):
super().__init__(name, "peer")
self.expertise = expertise
self.peers = {}
def register_peer(self, peer: 'PeerAgent'):
self.peers[peer.name] = peer
def process(self, task: Dict) -> Dict:
if self.can_handle(task):
return self.execute(task)
for peer_name, peer in self.peers.items():
if peer.can_handle(task):
return peer.process(task)
return self.collaborate(task)
def can_handle(self, task: Dict) -> bool:
return task.get("type") in self.expertise
def collaborate(self, task: Dict) -> Dict:
subtasks = self.split_task(task)
results = []
for subtask in subtasks:
for peer in self.peers.values():
if peer.can_handle(subtask):
result = peer.process(subtask)
results.append(result)
break
return self.merge_results(results)
def split_task(self, task: Dict) -> List[Dict]:
pass
def merge_results(self, results: List[Dict]) -> Dict:
pass
agent1 = PeerAgent("agent1", ["research", "analysis"])
agent2 = PeerAgent("agent2", ["writing", "editing"])
agent3 = PeerAgent("agent3", ["review", "feedback"])
agent1.register_peer(agent2)
agent1.register_peer(agent3)
agent2.register_peer(agent1)
agent2.register_peer(agent3)
agent3.register_peer(agent1)
agent3.register_peer(agent2)3. 混合架构
python
class HybridAgentSystem:
def __init__(self):
self.coordinator = CoordinatorAgent()
self.specialists = {}
self.generalists = []
def add_specialist(self, agent: BaseAgent, domain: str):
self.specialists[domain] = agent
def add_generalist(self, agent: BaseAgent):
self.generalists.append(agent)
def process(self, task: Dict) -> Dict:
domain = self.coordinator.identify_domain(task)
if domain in self.specialists:
return self.specialists[domain].process(task)
suitable_generalists = [
g for g in self.generalists
if self.coordinator.can_handle(g, task)
]
if suitable_generalists:
selected = self.coordinator.select_best(suitable_generalists, task)
return selected.process(task)
return self.coordinator.delegate_to_team(task)
class CoordinatorAgent:
def identify_domain(self, task: Dict) -> str:
pass
def can_handle(self, agent: BaseAgent, task: Dict) -> bool:
pass
def select_best(self, agents: List[BaseAgent], task: Dict) -> BaseAgent:
pass
def delegate_to_team(self, task: Dict) -> Dict:
passAgent通信机制
1. 直接通信
python
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Message:
sender: str
receiver: str
content: Any
message_type: str
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class DirectCommunication:
def __init__(self):
self.agents = {}
self.message_queue = {}
def register_agent(self, agent: BaseAgent):
self.agents[agent.name] = agent
self.message_queue[agent.name] = []
def send_message(self, message: Message):
if message.receiver in self.message_queue:
self.message_queue[message.receiver].append(message)
def receive_messages(self, agent_name: str) -> List[Message]:
messages = self.message_queue.get(agent_name, [])
self.message_queue[agent_name] = []
return messages
def broadcast(self, sender: str, content: Any, exclude: List[str] = None):
exclude = exclude or []
for agent_name in self.agents:
if agent_name != sender and agent_name not in exclude:
message = Message(
sender=sender,
receiver=agent_name,
content=content,
message_type="broadcast"
)
self.send_message(message)
comm = DirectCommunication()
comm.register_agent(agent1)
comm.register_agent(agent2)
comm.send_message(Message(
sender="agent1",
receiver="agent2",
content="请协助完成任务",
message_type="request"
))2. 共享黑板
python
class Blackboard:
def __init__(self):
self.data = {}
self.subscribers = {}
self.lock = threading.Lock()
def write(self, key: str, value: Any, writer: str):
with self.lock:
self.data[key] = {
"value": value,
"writer": writer,
"timestamp": datetime.now()
}
self._notify_subscribers(key, value)
def read(self, key: str) -> Optional[Any]:
with self.lock:
if key in self.data:
return self.data[key]["value"]
return None
def subscribe(self, key: str, agent: BaseAgent):
if key not in self.subscribers:
self.subscribers[key] = []
self.subscribers[key].append(agent)
def _notify_subscribers(self, key: str, value: Any):
if key in self.subscribers:
for agent in self.subscribers[key]:
agent.notify(key, value)
def get_history(self, key: str) -> List[Dict]:
pass
blackboard = Blackboard()
blackboard.write("task_status", {"status": "in_progress"}, "agent1")
blackboard.write("research_results", {"data": [...]}, "researcher")
status = blackboard.read("task_status")3. 消息总线
python
from collections import defaultdict
from queue import Queue
import threading
class MessageBus:
def __init__(self):
self.topics = defaultdict(list)
self.queues = defaultdict(Queue)
self.running = False
def subscribe(self, topic: str, agent: BaseAgent):
self.topics[topic].append(agent)
def publish(self, topic: str, message: Any, publisher: str):
envelope = {
"topic": topic,
"message": message,
"publisher": publisher,
"timestamp": datetime.now()
}
self.queues[topic].put(envelope)
def start(self):
self.running = True
for topic in self.topics:
threading.Thread(
target=self._process_topic,
args=(topic,),
daemon=True
).start()
def stop(self):
self.running = False
def _process_topic(self, topic: str):
while self.running:
try:
envelope = self.queues[topic].get(timeout=1)
for agent in self.topics[topic]:
if agent.name != envelope["publisher"]:
agent.handle_message(envelope)
except:
pass
bus = MessageBus()
bus.subscribe("research", researcher)
bus.subscribe("writing", writer)
bus.publish("research", {"query": "AI trends"}, "coordinator")任务分解与分配
1. 任务分解策略
python
class TaskDecomposer:
def __init__(self, llm):
self.llm = llm
def decompose(self, task: Dict, strategy: str = "sequential") -> List[Dict]:
if strategy == "sequential":
return self._sequential_decompose(task)
elif strategy == "parallel":
return self._parallel_decompose(task)
elif strategy == "hierarchical":
return self._hierarchical_decompose(task)
def _sequential_decompose(self, task: Dict) -> List[Dict]:
prompt = f"""
将以下任务分解为顺序执行的子任务:
任务:{task}
以JSON格式返回子任务列表,每个子任务包含:
- id: 子任务ID
- description: 任务描述
- dependencies: 依赖的任务ID列表
- required_expertise: 所需专业能力
"""
response = self.llm.generate(prompt)
return self._parse_subtasks(response)
def _parallel_decompose(self, task: Dict) -> List[Dict]:
prompt = f"""
将以下任务分解为可并行执行的子任务:
任务:{task}
确保子任务之间没有依赖关系。
"""
response = self.llm.generate(prompt)
return self._parse_subtasks(response)
def _hierarchical_decompose(self, task: Dict) -> List[Dict]:
subtasks = []
self._decompose_recursive(task, subtasks, level=0)
return subtasks
def _decompose_recursive(self, task: Dict, subtasks: List, level: int):
if self._is_atomic(task):
subtasks.append(task)
return
children = self._split_task(task)
for child in children:
child["level"] = level
self._decompose_recursive(child, subtasks, level + 1)
def _parse_subtasks(self, response: str) -> List[Dict]:
import json
return json.loads(response)
def _is_atomic(self, task: Dict) -> bool:
pass
def _split_task(self, task: Dict) -> List[Dict]:
pass2. 任务分配策略
python
class TaskAllocator:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self.agent_loads = {agent.name: 0 for agent in agents}
def allocate(self, subtasks: List[Dict]) -> Dict[str, List[Dict]]:
allocation = {agent.name: [] for agent in self.agents}
sorted_subtasks = self._topological_sort(subtasks)
for subtask in sorted_subtasks:
agent = self._select_agent(subtask)
allocation[agent.name].append(subtask)
self.agent_loads[agent.name] += 1
return allocation
def _select_agent(self, subtask: Dict) -> BaseAgent:
candidates = [
agent for agent in self.agents
if self._can_handle(agent, subtask)
]
if not candidates:
raise ValueError(f"没有Agent能处理任务: {subtask}")
candidates.sort(key=lambda a: self.agent_loads[a.name])
return candidates[0]
def _can_handle(self, agent: BaseAgent, subtask: Dict) -> bool:
required = subtask.get("required_expertise", [])
agent_expertise = getattr(agent, "expertise", [])
if isinstance(required, str):
required = [required]
return all(exp in agent_expertise for exp in required)
def _topological_sort(self, subtasks: List[Dict]) -> List[Dict]:
from collections import defaultdict, deque
graph = defaultdict(list)
in_degree = defaultdict(int)
task_map = {t["id"]: t for t in subtasks}
for task in subtasks:
for dep in task.get("dependencies", []):
graph[dep].append(task["id"])
in_degree[task["id"]] += 1
queue = deque([t["id"] for t in subtasks if in_degree[t["id"]] == 0])
result = []
while queue:
task_id = queue.popleft()
result.append(task_map[task_id])
for neighbor in graph[task_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result3. 动态负载均衡
python
class LoadBalancer:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self.task_queues = {agent.name: [] for agent in agents}
self.agent_status = {agent.name: "idle" for agent in agents}
def assign_task(self, task: Dict) -> str:
agent = self._find_best_agent(task)
self.task_queues[agent.name].append(task)
if self.agent_status[agent.name] == "idle":
self._start_agent(agent)
return agent.name
def _find_best_agent(self, task: Dict) -> BaseAgent:
candidates = [
agent for agent in self.agents
if self._can_handle(agent, task)
]
def score(agent):
queue_length = len(self.task_queues[agent.name])
status_score = 0 if self.agent_status[agent.name] == "idle" else 1
return queue_length + status_score * 2
return min(candidates, key=score)
def _start_agent(self, agent: BaseAgent):
self.agent_status[agent.name] = "busy"
threading.Thread(
target=self._process_queue,
args=(agent,),
daemon=True
).start()
def _process_queue(self, agent: BaseAgent):
while self.task_queues[agent.name]:
task = self.task_queues[agent.name].pop(0)
try:
result = agent.process(task)
self._handle_result(agent, task, result)
except Exception as e:
self._handle_error(agent, task, e)
self.agent_status[agent.name] = "idle"
def _handle_result(self, agent: BaseAgent, task: Dict, result: Dict):
pass
def _handle_error(self, agent: BaseAgent, task: Dict, error: Exception):
pass
def get_status(self) -> Dict:
return {
"agent_status": self.agent_status,
"queue_lengths": {
name: len(queue)
for name, queue in self.task_queues.items()
}
}协作模式
1. 顺序协作
python
class SequentialCollaboration:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
def execute(self, task: Dict) -> Dict:
current_data = task
for agent in self.agents:
result = agent.process(current_data)
if result["status"] == "failed":
return result
current_data = result.get("output", current_data)
return {
"status": "success",
"output": current_data
}
pipeline = SequentialCollaboration([
ResearchAgent(),
AnalysisAgent(),
WritingAgent(),
EditingAgent()
])
result = pipeline.execute({"topic": "AI发展报告"})2. 并行协作
python
import concurrent.futures
class ParallelCollaboration:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
def execute(self, task: Dict) -> Dict:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(agent.process, task): agent
for agent in self.agents
}
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
return self._merge_results(results)
def _merge_results(self, results: List[Dict]) -> Dict:
pass
parallel_team = ParallelCollaboration([
ResearchAgent(),
FactCheckAgent(),
StyleCheckAgent()
])
result = parallel_team.execute({"content": "文章内容"})3. 投票协作
python
class VotingCollaboration:
def __init__(self, agents: List[BaseAgent], decision_rule: str = "majority"):
self.agents = agents
self.decision_rule = decision_rule
def execute(self, task: Dict) -> Dict:
results = []
for agent in self.agents:
result = agent.process(task)
results.append(result)
return self._make_decision(results)
def _make_decision(self, results: List[Dict]) -> Dict:
if self.decision_rule == "majority":
return self._majority_vote(results)
elif self.decision_rule == "unanimous":
return self._unanimous_vote(results)
elif self.decision_rule == "weighted":
return self._weighted_vote(results)
def _majority_vote(self, results: List[Dict]) -> Dict:
from collections import Counter
votes = [r.get("decision") for r in results]
vote_counts = Counter(votes)
winner = vote_counts.most_common(1)[0][0]
return {
"status": "success",
"decision": winner,
"confidence": vote_counts[winner] / len(votes)
}
def _unanimous_vote(self, results: List[Dict]) -> Dict:
decisions = [r.get("decision") for r in results]
if len(set(decisions)) == 1:
return {
"status": "success",
"decision": decisions[0],
"confidence": 1.0
}
else:
return {
"status": "failed",
"reason": "未达成一致"
}
def _weighted_vote(self, results: List[Dict]) -> Dict:
weighted_votes = {}
for result in results:
decision = result.get("decision")
weight = result.get("confidence", 1.0)
weighted_votes[decision] = weighted_votes.get(decision, 0) + weight
winner = max(weighted_votes, key=weighted_votes.get)
return {
"status": "success",
"decision": winner,
"confidence": weighted_votes[winner] / sum(weighted_votes.values())
}
jury = VotingCollaboration([
JudgeAgent("judge1"),
JudgeAgent("judge2"),
JudgeAgent("judge3")
], decision_rule="majority")
verdict = jury.execute({"case": "案件信息"})4. 辩论协作
python
class DebateCollaboration:
def __init__(self, agents: List[BaseAgent], max_rounds: int = 3):
self.agents = agents
self.max_rounds = max_rounds
def execute(self, task: Dict) -> Dict:
positions = {agent.name: None for agent in self.agents}
arguments = {agent.name: [] for agent in self.agents}
for round_num in range(self.max_rounds):
for agent in self.agents:
position = agent.process({
**task,
"round": round_num,
"other_arguments": {
name: args
for name, args in arguments.items()
if name != agent.name
}
})
positions[agent.name] = position.get("position")
arguments[agent.name].append(position.get("argument"))
if self._check_consensus(positions):
break
return self._synthesize_result(positions, arguments)
def _check_consensus(self, positions: Dict) -> bool:
unique_positions = set(positions.values())
return len(unique_positions) == 1
def _synthesize_result(self, positions: Dict, arguments: Dict) -> Dict:
pass
debate = DebateCollaboration([
ProponentAgent(),
OpponentAgent(),
ModeratorAgent()
])
result = debate.execute({"topic": "是否应该发展AI"})冲突解决
1. 优先级机制
python
class PriorityResolver:
def __init__(self):
self.priorities = {}
def set_priority(self, agent_name: str, priority: int):
self.priorities[agent_name] = priority
def resolve(self, conflicting_results: List[Dict]) -> Dict:
best_result = max(
conflicting_results,
key=lambda r: self.priorities.get(r["agent"], 0)
)
return best_result2. 协商机制
python
class NegotiationResolver:
def __init__(self, llm):
self.llm = llm
def resolve(self, conflicting_results: List[Dict]) -> Dict:
prompt = f"""
多个Agent产生了不同的结果,请协调并给出最佳解决方案:
冲突结果:
{conflicting_results}
请分析每个结果的优缺点,并给出最终建议。
"""
resolution = self.llm.generate(prompt)
return {
"status": "resolved",
"resolution": resolution,
"original_conflicts": conflicting_results
}3. 仲裁机制
python
class ArbitrationResolver:
def __init__(self, arbitrator: BaseAgent):
self.arbitrator = arbitrator
def resolve(self, conflicting_results: List[Dict]) -> Dict:
arbitration_task = {
"type": "arbitration",
"conflicts": conflicting_results
}
return self.arbitrator.process(arbitration_task)完整多Agent系统示例
python
class MultiAgentSystem:
def __init__(self, llm):
self.llm = llm
self.agents = {
"coordinator": CoordinatorAgent(llm),
"researcher": ResearchAgent(llm),
"analyst": AnalystAgent(llm),
"writer": WriterAgent(llm),
"reviewer": ReviewerAgent(llm)
}
self.communication = DirectCommunication()
for agent in self.agents.values():
self.communication.register_agent(agent)
self.blackboard = Blackboard()
self.decomposer = TaskDecomposer(llm)
self.allocator = TaskAllocator(list(self.agents.values()))
self.load_balancer = LoadBalancer(list(self.agents.values()))
def execute(self, task: Dict) -> Dict:
self.blackboard.write("original_task", task, "system")
subtasks = self.decomposer.decompose(task, strategy="sequential")
self.blackboard.write("subtasks", subtasks, "coordinator")
allocation = self.allocator.allocate(subtasks)
self.blackboard.write("allocation", allocation, "coordinator")
results = {}
for agent_name, agent_tasks in allocation.items():
agent = self.agents[agent_name]
for agent_task in agent_tasks:
dependencies_met = self._check_dependencies(
agent_task,
results
)
if not dependencies_met:
return {
"status": "failed",
"reason": "依赖未满足"
}
result = agent.process(agent_task)
results[agent_task["id"]] = result
self.blackboard.write(
f"result_{agent_task['id']}",
result,
agent_name
)
final_result = self._synthesize_final_result(results)
return final_result
def _check_dependencies(self, task: Dict, completed: Dict) -> bool:
dependencies = task.get("dependencies", [])
return all(dep in completed for dep in dependencies)
def _synthesize_final_result(self, results: Dict) -> Dict:
return {
"status": "success",
"results": results,
"summary": self._generate_summary(results)
}
def _generate_summary(self, results: Dict) -> str:
pass
system = MultiAgentSystem(llm)
result = system.execute({
"type": "research_report",
"topic": "人工智能在医疗领域的应用",
"requirements": {
"length": "3000字",
"style": "学术报告",
"include_references": True
}
})小结
多Agent协作是解决复杂任务的有效方式,关键设计要点包括:
- 合理架构 - 根据任务特点选择层级、对等或混合架构
- 高效通信 - 直接通信、共享黑板、消息总线各有优势
- 智能分配 - 任务分解、负载均衡、动态调度
- 协作模式 - 顺序、并行、投票、辩论等模式适应不同场景
- 冲突解决 - 优先级、协商、仲裁机制保证系统稳定
下一章我们将探讨Agent设计模式,学习如何构建更加健壮和高效的Agent系统。