Skip to content

多Agent协作

概述

随着任务复杂度的增加,单个Agent往往难以胜任所有工作。多Agent协作系统通过让多个专业化的Agent协同工作,可以更好地完成复杂任务。每个Agent专注于自己擅长的领域,通过通信和协调机制实现高效协作。

多Agent架构

1. 层级架构

python
from typing import List, Dict, Any
from abc import ABC, abstractmethod

class BaseAgent(ABC):
    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.memory = []
    
    @abstractmethod
    def process(self, task: Dict) -> Dict:
        pass
    
    def add_memory(self, item: Any):
        self.memory.append(item)

class ManagerAgent(BaseAgent):
    def __init__(self, name: str, workers: List[BaseAgent]):
        super().__init__(name, "manager")
        self.workers = {worker.name: worker for worker in workers}
    
    def process(self, task: Dict) -> Dict:
        subtasks = self.decompose(task)
        
        results = {}
        for subtask in subtasks:
            worker_name = subtask["assigned_to"]
            worker = self.workers[worker_name]
            
            result = worker.process(subtask)
            results[subtask["id"]] = result
        
        return self.aggregate(results)
    
    def decompose(self, task: Dict) -> List[Dict]:
        pass
    
    def aggregate(self, results: Dict) -> Dict:
        pass

class WorkerAgent(BaseAgent):
    def __init__(self, name: str, expertise: str):
        super().__init__(name, "worker")
        self.expertise = expertise
    
    def process(self, task: Dict) -> Dict:
        if not self.can_handle(task):
            return {"status": "failed", "reason": "任务超出能力范围"}
        
        result = self.execute(task)
        self.add_memory({"task": task, "result": result})
        
        return {"status": "success", "result": result}
    
    def can_handle(self, task: Dict) -> bool:
        return task.get("type") == self.expertise
    
    def execute(self, task: Dict) -> Any:
        pass

researcher = WorkerAgent("researcher", "research")
writer = WorkerAgent("writer", "writing")
editor = WorkerAgent("editor", "editing")

manager = ManagerAgent("content_manager", [researcher, writer, editor])

result = manager.process({
    "type": "content_creation",
    "topic": "AI发展趋势"
})

2. 对等架构

python
class PeerAgent(BaseAgent):
    def __init__(self, name: str, expertise: List[str]):
        super().__init__(name, "peer")
        self.expertise = expertise
        self.peers = {}
    
    def register_peer(self, peer: 'PeerAgent'):
        self.peers[peer.name] = peer
    
    def process(self, task: Dict) -> Dict:
        if self.can_handle(task):
            return self.execute(task)
        
        for peer_name, peer in self.peers.items():
            if peer.can_handle(task):
                return peer.process(task)
        
        return self.collaborate(task)
    
    def can_handle(self, task: Dict) -> bool:
        return task.get("type") in self.expertise
    
    def collaborate(self, task: Dict) -> Dict:
        subtasks = self.split_task(task)
        
        results = []
        for subtask in subtasks:
            for peer in self.peers.values():
                if peer.can_handle(subtask):
                    result = peer.process(subtask)
                    results.append(result)
                    break
        
        return self.merge_results(results)
    
    def split_task(self, task: Dict) -> List[Dict]:
        pass
    
    def merge_results(self, results: List[Dict]) -> Dict:
        pass

agent1 = PeerAgent("agent1", ["research", "analysis"])
agent2 = PeerAgent("agent2", ["writing", "editing"])
agent3 = PeerAgent("agent3", ["review", "feedback"])

agent1.register_peer(agent2)
agent1.register_peer(agent3)
agent2.register_peer(agent1)
agent2.register_peer(agent3)
agent3.register_peer(agent1)
agent3.register_peer(agent2)

3. 混合架构

python
class HybridAgentSystem:
    def __init__(self):
        self.coordinator = CoordinatorAgent()
        self.specialists = {}
        self.generalists = []
    
    def add_specialist(self, agent: BaseAgent, domain: str):
        self.specialists[domain] = agent
    
    def add_generalist(self, agent: BaseAgent):
        self.generalists.append(agent)
    
    def process(self, task: Dict) -> Dict:
        domain = self.coordinator.identify_domain(task)
        
        if domain in self.specialists:
            return self.specialists[domain].process(task)
        
        suitable_generalists = [
            g for g in self.generalists
            if self.coordinator.can_handle(g, task)
        ]
        
        if suitable_generalists:
            selected = self.coordinator.select_best(suitable_generalists, task)
            return selected.process(task)
        
        return self.coordinator.delegate_to_team(task)

class CoordinatorAgent:
    def identify_domain(self, task: Dict) -> str:
        pass
    
    def can_handle(self, agent: BaseAgent, task: Dict) -> bool:
        pass
    
    def select_best(self, agents: List[BaseAgent], task: Dict) -> BaseAgent:
        pass
    
    def delegate_to_team(self, task: Dict) -> Dict:
        pass

Agent通信机制

1. 直接通信

python
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class Message:
    sender: str
    receiver: str
    content: Any
    message_type: str
    timestamp: datetime = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class DirectCommunication:
    def __init__(self):
        self.agents = {}
        self.message_queue = {}
    
    def register_agent(self, agent: BaseAgent):
        self.agents[agent.name] = agent
        self.message_queue[agent.name] = []
    
    def send_message(self, message: Message):
        if message.receiver in self.message_queue:
            self.message_queue[message.receiver].append(message)
    
    def receive_messages(self, agent_name: str) -> List[Message]:
        messages = self.message_queue.get(agent_name, [])
        self.message_queue[agent_name] = []
        return messages
    
    def broadcast(self, sender: str, content: Any, exclude: List[str] = None):
        exclude = exclude or []
        
        for agent_name in self.agents:
            if agent_name != sender and agent_name not in exclude:
                message = Message(
                    sender=sender,
                    receiver=agent_name,
                    content=content,
                    message_type="broadcast"
                )
                self.send_message(message)

comm = DirectCommunication()
comm.register_agent(agent1)
comm.register_agent(agent2)

comm.send_message(Message(
    sender="agent1",
    receiver="agent2",
    content="请协助完成任务",
    message_type="request"
))

2. 共享黑板

python
class Blackboard:
    def __init__(self):
        self.data = {}
        self.subscribers = {}
        self.lock = threading.Lock()
    
    def write(self, key: str, value: Any, writer: str):
        with self.lock:
            self.data[key] = {
                "value": value,
                "writer": writer,
                "timestamp": datetime.now()
            }
            
            self._notify_subscribers(key, value)
    
    def read(self, key: str) -> Optional[Any]:
        with self.lock:
            if key in self.data:
                return self.data[key]["value"]
        return None
    
    def subscribe(self, key: str, agent: BaseAgent):
        if key not in self.subscribers:
            self.subscribers[key] = []
        self.subscribers[key].append(agent)
    
    def _notify_subscribers(self, key: str, value: Any):
        if key in self.subscribers:
            for agent in self.subscribers[key]:
                agent.notify(key, value)
    
    def get_history(self, key: str) -> List[Dict]:
        pass

blackboard = Blackboard()

blackboard.write("task_status", {"status": "in_progress"}, "agent1")
blackboard.write("research_results", {"data": [...]}, "researcher")

status = blackboard.read("task_status")

3. 消息总线

python
from collections import defaultdict
from queue import Queue
import threading

class MessageBus:
    def __init__(self):
        self.topics = defaultdict(list)
        self.queues = defaultdict(Queue)
        self.running = False
    
    def subscribe(self, topic: str, agent: BaseAgent):
        self.topics[topic].append(agent)
    
    def publish(self, topic: str, message: Any, publisher: str):
        envelope = {
            "topic": topic,
            "message": message,
            "publisher": publisher,
            "timestamp": datetime.now()
        }
        
        self.queues[topic].put(envelope)
    
    def start(self):
        self.running = True
        for topic in self.topics:
            threading.Thread(
                target=self._process_topic,
                args=(topic,),
                daemon=True
            ).start()
    
    def stop(self):
        self.running = False
    
    def _process_topic(self, topic: str):
        while self.running:
            try:
                envelope = self.queues[topic].get(timeout=1)
                
                for agent in self.topics[topic]:
                    if agent.name != envelope["publisher"]:
                        agent.handle_message(envelope)
            except:
                pass

bus = MessageBus()

bus.subscribe("research", researcher)
bus.subscribe("writing", writer)

bus.publish("research", {"query": "AI trends"}, "coordinator")

任务分解与分配

1. 任务分解策略

python
class TaskDecomposer:
    def __init__(self, llm):
        self.llm = llm
    
    def decompose(self, task: Dict, strategy: str = "sequential") -> List[Dict]:
        if strategy == "sequential":
            return self._sequential_decompose(task)
        elif strategy == "parallel":
            return self._parallel_decompose(task)
        elif strategy == "hierarchical":
            return self._hierarchical_decompose(task)
    
    def _sequential_decompose(self, task: Dict) -> List[Dict]:
        prompt = f"""
        将以下任务分解为顺序执行的子任务:
        任务:{task}
        
        以JSON格式返回子任务列表,每个子任务包含:
        - id: 子任务ID
        - description: 任务描述
        - dependencies: 依赖的任务ID列表
        - required_expertise: 所需专业能力
        """
        
        response = self.llm.generate(prompt)
        return self._parse_subtasks(response)
    
    def _parallel_decompose(self, task: Dict) -> List[Dict]:
        prompt = f"""
        将以下任务分解为可并行执行的子任务:
        任务:{task}
        
        确保子任务之间没有依赖关系。
        """
        
        response = self.llm.generate(prompt)
        return self._parse_subtasks(response)
    
    def _hierarchical_decompose(self, task: Dict) -> List[Dict]:
        subtasks = []
        self._decompose_recursive(task, subtasks, level=0)
        return subtasks
    
    def _decompose_recursive(self, task: Dict, subtasks: List, level: int):
        if self._is_atomic(task):
            subtasks.append(task)
            return
        
        children = self._split_task(task)
        for child in children:
            child["level"] = level
            self._decompose_recursive(child, subtasks, level + 1)
    
    def _parse_subtasks(self, response: str) -> List[Dict]:
        import json
        return json.loads(response)
    
    def _is_atomic(self, task: Dict) -> bool:
        pass
    
    def _split_task(self, task: Dict) -> List[Dict]:
        pass

2. 任务分配策略

python
class TaskAllocator:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
        self.agent_loads = {agent.name: 0 for agent in agents}
    
    def allocate(self, subtasks: List[Dict]) -> Dict[str, List[Dict]]:
        allocation = {agent.name: [] for agent in self.agents}
        
        sorted_subtasks = self._topological_sort(subtasks)
        
        for subtask in sorted_subtasks:
            agent = self._select_agent(subtask)
            allocation[agent.name].append(subtask)
            self.agent_loads[agent.name] += 1
        
        return allocation
    
    def _select_agent(self, subtask: Dict) -> BaseAgent:
        candidates = [
            agent for agent in self.agents
            if self._can_handle(agent, subtask)
        ]
        
        if not candidates:
            raise ValueError(f"没有Agent能处理任务: {subtask}")
        
        candidates.sort(key=lambda a: self.agent_loads[a.name])
        
        return candidates[0]
    
    def _can_handle(self, agent: BaseAgent, subtask: Dict) -> bool:
        required = subtask.get("required_expertise", [])
        agent_expertise = getattr(agent, "expertise", [])
        
        if isinstance(required, str):
            required = [required]
        
        return all(exp in agent_expertise for exp in required)
    
    def _topological_sort(self, subtasks: List[Dict]) -> List[Dict]:
        from collections import defaultdict, deque
        
        graph = defaultdict(list)
        in_degree = defaultdict(int)
        
        task_map = {t["id"]: t for t in subtasks}
        
        for task in subtasks:
            for dep in task.get("dependencies", []):
                graph[dep].append(task["id"])
                in_degree[task["id"]] += 1
        
        queue = deque([t["id"] for t in subtasks if in_degree[t["id"]] == 0])
        result = []
        
        while queue:
            task_id = queue.popleft()
            result.append(task_map[task_id])
            
            for neighbor in graph[task_id]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return result

3. 动态负载均衡

python
class LoadBalancer:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
        self.task_queues = {agent.name: [] for agent in agents}
        self.agent_status = {agent.name: "idle" for agent in agents}
    
    def assign_task(self, task: Dict) -> str:
        agent = self._find_best_agent(task)
        
        self.task_queues[agent.name].append(task)
        
        if self.agent_status[agent.name] == "idle":
            self._start_agent(agent)
        
        return agent.name
    
    def _find_best_agent(self, task: Dict) -> BaseAgent:
        candidates = [
            agent for agent in self.agents
            if self._can_handle(agent, task)
        ]
        
        def score(agent):
            queue_length = len(self.task_queues[agent.name])
            status_score = 0 if self.agent_status[agent.name] == "idle" else 1
            
            return queue_length + status_score * 2
        
        return min(candidates, key=score)
    
    def _start_agent(self, agent: BaseAgent):
        self.agent_status[agent.name] = "busy"
        
        threading.Thread(
            target=self._process_queue,
            args=(agent,),
            daemon=True
        ).start()
    
    def _process_queue(self, agent: BaseAgent):
        while self.task_queues[agent.name]:
            task = self.task_queues[agent.name].pop(0)
            
            try:
                result = agent.process(task)
                self._handle_result(agent, task, result)
            except Exception as e:
                self._handle_error(agent, task, e)
        
        self.agent_status[agent.name] = "idle"
    
    def _handle_result(self, agent: BaseAgent, task: Dict, result: Dict):
        pass
    
    def _handle_error(self, agent: BaseAgent, task: Dict, error: Exception):
        pass
    
    def get_status(self) -> Dict:
        return {
            "agent_status": self.agent_status,
            "queue_lengths": {
                name: len(queue) 
                for name, queue in self.task_queues.items()
            }
        }

协作模式

1. 顺序协作

python
class SequentialCollaboration:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
    
    def execute(self, task: Dict) -> Dict:
        current_data = task
        
        for agent in self.agents:
            result = agent.process(current_data)
            
            if result["status"] == "failed":
                return result
            
            current_data = result.get("output", current_data)
        
        return {
            "status": "success",
            "output": current_data
        }

pipeline = SequentialCollaboration([
    ResearchAgent(),
    AnalysisAgent(),
    WritingAgent(),
    EditingAgent()
])

result = pipeline.execute({"topic": "AI发展报告"})

2. 并行协作

python
import concurrent.futures

class ParallelCollaboration:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
    
    def execute(self, task: Dict) -> Dict:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(agent.process, task): agent
                for agent in self.agents
            }
            
            results = []
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                results.append(result)
        
        return self._merge_results(results)
    
    def _merge_results(self, results: List[Dict]) -> Dict:
        pass

parallel_team = ParallelCollaboration([
    ResearchAgent(),
    FactCheckAgent(),
    StyleCheckAgent()
])

result = parallel_team.execute({"content": "文章内容"})

3. 投票协作

python
class VotingCollaboration:
    def __init__(self, agents: List[BaseAgent], decision_rule: str = "majority"):
        self.agents = agents
        self.decision_rule = decision_rule
    
    def execute(self, task: Dict) -> Dict:
        results = []
        
        for agent in self.agents:
            result = agent.process(task)
            results.append(result)
        
        return self._make_decision(results)
    
    def _make_decision(self, results: List[Dict]) -> Dict:
        if self.decision_rule == "majority":
            return self._majority_vote(results)
        elif self.decision_rule == "unanimous":
            return self._unanimous_vote(results)
        elif self.decision_rule == "weighted":
            return self._weighted_vote(results)
    
    def _majority_vote(self, results: List[Dict]) -> Dict:
        from collections import Counter
        
        votes = [r.get("decision") for r in results]
        vote_counts = Counter(votes)
        
        winner = vote_counts.most_common(1)[0][0]
        
        return {
            "status": "success",
            "decision": winner,
            "confidence": vote_counts[winner] / len(votes)
        }
    
    def _unanimous_vote(self, results: List[Dict]) -> Dict:
        decisions = [r.get("decision") for r in results]
        
        if len(set(decisions)) == 1:
            return {
                "status": "success",
                "decision": decisions[0],
                "confidence": 1.0
            }
        else:
            return {
                "status": "failed",
                "reason": "未达成一致"
            }
    
    def _weighted_vote(self, results: List[Dict]) -> Dict:
        weighted_votes = {}
        
        for result in results:
            decision = result.get("decision")
            weight = result.get("confidence", 1.0)
            
            weighted_votes[decision] = weighted_votes.get(decision, 0) + weight
        
        winner = max(weighted_votes, key=weighted_votes.get)
        
        return {
            "status": "success",
            "decision": winner,
            "confidence": weighted_votes[winner] / sum(weighted_votes.values())
        }

jury = VotingCollaboration([
    JudgeAgent("judge1"),
    JudgeAgent("judge2"),
    JudgeAgent("judge3")
], decision_rule="majority")

verdict = jury.execute({"case": "案件信息"})

4. 辩论协作

python
class DebateCollaboration:
    def __init__(self, agents: List[BaseAgent], max_rounds: int = 3):
        self.agents = agents
        self.max_rounds = max_rounds
    
    def execute(self, task: Dict) -> Dict:
        positions = {agent.name: None for agent in self.agents}
        arguments = {agent.name: [] for agent in self.agents}
        
        for round_num in range(self.max_rounds):
            for agent in self.agents:
                position = agent.process({
                    **task,
                    "round": round_num,
                    "other_arguments": {
                        name: args 
                        for name, args in arguments.items() 
                        if name != agent.name
                    }
                })
                
                positions[agent.name] = position.get("position")
                arguments[agent.name].append(position.get("argument"))
            
            if self._check_consensus(positions):
                break
        
        return self._synthesize_result(positions, arguments)
    
    def _check_consensus(self, positions: Dict) -> bool:
        unique_positions = set(positions.values())
        return len(unique_positions) == 1
    
    def _synthesize_result(self, positions: Dict, arguments: Dict) -> Dict:
        pass

debate = DebateCollaboration([
    ProponentAgent(),
    OpponentAgent(),
    ModeratorAgent()
])

result = debate.execute({"topic": "是否应该发展AI"})

冲突解决

1. 优先级机制

python
class PriorityResolver:
    def __init__(self):
        self.priorities = {}
    
    def set_priority(self, agent_name: str, priority: int):
        self.priorities[agent_name] = priority
    
    def resolve(self, conflicting_results: List[Dict]) -> Dict:
        best_result = max(
            conflicting_results,
            key=lambda r: self.priorities.get(r["agent"], 0)
        )
        
        return best_result

2. 协商机制

python
class NegotiationResolver:
    def __init__(self, llm):
        self.llm = llm
    
    def resolve(self, conflicting_results: List[Dict]) -> Dict:
        prompt = f"""
        多个Agent产生了不同的结果,请协调并给出最佳解决方案:
        
        冲突结果:
        {conflicting_results}
        
        请分析每个结果的优缺点,并给出最终建议。
        """
        
        resolution = self.llm.generate(prompt)
        
        return {
            "status": "resolved",
            "resolution": resolution,
            "original_conflicts": conflicting_results
        }

3. 仲裁机制

python
class ArbitrationResolver:
    def __init__(self, arbitrator: BaseAgent):
        self.arbitrator = arbitrator
    
    def resolve(self, conflicting_results: List[Dict]) -> Dict:
        arbitration_task = {
            "type": "arbitration",
            "conflicts": conflicting_results
        }
        
        return self.arbitrator.process(arbitration_task)

完整多Agent系统示例

python
class MultiAgentSystem:
    def __init__(self, llm):
        self.llm = llm
        
        self.agents = {
            "coordinator": CoordinatorAgent(llm),
            "researcher": ResearchAgent(llm),
            "analyst": AnalystAgent(llm),
            "writer": WriterAgent(llm),
            "reviewer": ReviewerAgent(llm)
        }
        
        self.communication = DirectCommunication()
        for agent in self.agents.values():
            self.communication.register_agent(agent)
        
        self.blackboard = Blackboard()
        
        self.decomposer = TaskDecomposer(llm)
        self.allocator = TaskAllocator(list(self.agents.values()))
        self.load_balancer = LoadBalancer(list(self.agents.values()))
    
    def execute(self, task: Dict) -> Dict:
        self.blackboard.write("original_task", task, "system")
        
        subtasks = self.decomposer.decompose(task, strategy="sequential")
        self.blackboard.write("subtasks", subtasks, "coordinator")
        
        allocation = self.allocator.allocate(subtasks)
        self.blackboard.write("allocation", allocation, "coordinator")
        
        results = {}
        for agent_name, agent_tasks in allocation.items():
            agent = self.agents[agent_name]
            
            for agent_task in agent_tasks:
                dependencies_met = self._check_dependencies(
                    agent_task, 
                    results
                )
                
                if not dependencies_met:
                    return {
                        "status": "failed",
                        "reason": "依赖未满足"
                    }
                
                result = agent.process(agent_task)
                results[agent_task["id"]] = result
                
                self.blackboard.write(
                    f"result_{agent_task['id']}",
                    result,
                    agent_name
                )
        
        final_result = self._synthesize_final_result(results)
        
        return final_result
    
    def _check_dependencies(self, task: Dict, completed: Dict) -> bool:
        dependencies = task.get("dependencies", [])
        return all(dep in completed for dep in dependencies)
    
    def _synthesize_final_result(self, results: Dict) -> Dict:
        return {
            "status": "success",
            "results": results,
            "summary": self._generate_summary(results)
        }
    
    def _generate_summary(self, results: Dict) -> str:
        pass

system = MultiAgentSystem(llm)
result = system.execute({
    "type": "research_report",
    "topic": "人工智能在医疗领域的应用",
    "requirements": {
        "length": "3000字",
        "style": "学术报告",
        "include_references": True
    }
})

小结

多Agent协作是解决复杂任务的有效方式,关键设计要点包括:

  1. 合理架构 - 根据任务特点选择层级、对等或混合架构
  2. 高效通信 - 直接通信、共享黑板、消息总线各有优势
  3. 智能分配 - 任务分解、负载均衡、动态调度
  4. 协作模式 - 顺序、并行、投票、辩论等模式适应不同场景
  5. 冲突解决 - 优先级、协商、仲裁机制保证系统稳定

下一章我们将探讨Agent设计模式,学习如何构建更加健壮和高效的Agent系统。