第 13 章:编排基础
多 Agent 编排不是让一群 Agent 各干各的,而是让它们像交响乐团一样协作——有指挥、有分工、有配合。但指挥再厉害,乐手水平不行也白搭。
⏱️ 快速通道(5 分钟掌握核心)
- 单 Agent 三大硬伤:串行慢、深度浅、单点故障
- 编排三要素:任务分解、Agent 分配、结果综合
- 自动 vs 配置:简单任务自动匹配,复杂任务显式配置
- 协调成本不可忽略:任务简单时单 Agent 反而更快
- 编排是架构决策,需要权衡并行收益与协调开销
10 分钟路径:13.1-13.3 → 13.5 → Shannon Lab
13.1 为什么单 Agent 不够用?
这章解决一个核心问题:当单个 Agent 无法高效完成任务时,如何让多个 Agent 协作?
想象你在管理一个小型研究项目——需要分析三家竞争对手(Tesla、BYD、Rivian)的电动车战略。如果只有你一个人,你会怎么做?
你会串行处理:今天研究 Tesla,明天研究 BYD,后天研究 Rivian。三天后,你终于把所有信息收集完,开始写对比分析。
但如果你有三个助手呢?你会让他们同时开工:Alice 研究 Tesla,Bob 研究 BYD,Carol 研究 Rivian。一天后,三份报告同时到手,你只需要综合对比就行。
效率提升 3 倍。
单 Agent 就像一个人单打独斗——能完成任务,但效率低、深度浅。多 Agent 编排就是组建团队、分工协作。
但组建团队不是简单地"多雇几个人"。你需要:分配任务、协调进度、整合结果、处理冲突。编排器(Orchestrator)就是做这件事的。
单 Agent 的三大硬伤
先说结论:单 Agent 有三个硬伤。
硬伤一:串行执行,效率太低
三家公司的搜索完全独立,没有依赖关系。但单 Agent 只能一个接一个做。如果并行呢?差距明显:
省了 40 秒。任务越多,差距越大。
硬伤二:通才做专家的活,深度不够
「设计一个 AI 创业公司的商业计划」,这个任务需要什么?
- 市场分析:行业规模、增长趋势、竞争格局
- 技术架构:技术选型、成本估算、可行性评估
- 财务预测:收入模型、成本结构、盈亏分析
- 营销策略:目标用户、获客渠道、品牌定位
让一个「通才」Agent 同时搞定这四件事?它可能每个都懂一点,但每个都不够深。
更好的方式是:4 个专家 Agent,各司其职。
硬伤三:单点故障,没有冗余
一个 Agent 挂了——网络超时、LLM 报错、工具调用失败——整个任务就废了。
多 Agent 系统可以做容错:一个挂了,其他继续;关键任务可以有备份。
多 Agent vs 单 Agent
| 能力 | 单 Agent | 多 Agent |
|---|---|---|
| 并行能力 | 串行执行 | 并发执行 |
| 专业深度 | 通才,样样懂点 | 专家分工,各有所长 |
| 容错能力 | 单点故障 | 冗余容错 |
| 成本控制 | 统一模型 | 按任务选模型(简单任务用便宜模型) |
注意:多 Agent 不是银弹。协调多个 Agent 本身就有开销——通信、同步、结果整合。任务简单的时候,单 Agent 反而更快。只有任务复杂到一定程度,多 Agent 的收益才能覆盖协调成本。我见过有人把「查个天气」都拆成 3 个 Agent——完全没必要,反而更慢。
13.2 编排器:多 Agent 的指挥家
多 Agent 系统需要一个「指挥家」——Orchestrator(编排器)。
它不亲自干活,但它决定:
- 任务怎么拆
- 谁来做什么
- 什么顺序执行
- 结果怎么整合
四大职责
类比一下:编排器就像餐厅的主厨。
客人说「我要一份牛排套餐」。主厨不会自己一个人做,他会:
- 分解:牛排、配菜、酱汁、甜点
- 分发:牛排给烤台、配菜给冷厨、酱汁给酱料师
- 协调:牛排好了再淋酱、配菜和牛排同时出
- 综合:摆盘,确保温度和卖相
主厨不需要每样都会做,但他要知道:谁擅长什么、什么顺序合理、怎么整合成一道菜。
执行流程
13.3 路由决策:该用什么策略?
不是所有任务都需要多 Agent。编排器的第一个决策是:这个任务该走什么路径?
Shannon 的路由逻辑
Shannon 的 OrchestratorWorkflow 是这样判断的:
// 判断是否简单任务
simpleByShape := len(decomp.Subtasks) == 0 ||
(len(decomp.Subtasks) == 1 && !needsTools)
isSimple := decomp.ComplexityScore < simpleThreshold && simpleByShape
// 检查是否有依赖关系
hasDeps := false
for _, st := range decomp.Subtasks {
if len(st.Dependencies) > 0 || len(st.Consumes) > 0 {
hasDeps = true
break
}
}
switch {
case isSimple:
// 简单任务 → 单 Agent 直接执行
return SimpleTaskWorkflow(input)
case len(decomp.Subtasks) > 5 || hasDeps:
// 复杂任务或有依赖 → Supervisor 模式
return SupervisorWorkflow(input)
default:
// 标准任务 → DAG 工作流
return DAGWorkflow(input)
}
实现参考 (Shannon): go/orchestrator/internal/workflows/orchestrator_router.go - OrchestratorWorkflow 函数
决策树
任务进入
│
▼
复杂度 < 0.3 且单子任务且无工具? ──是──► SimpleTaskWorkflow
│ (单 Agent 直接执行)
否
│
▼
子任务 > 5 或有依赖? ──是──► SupervisorWorkflow
│ (复杂多 Agent 协调)
否
│
▼
DAGWorkflow(默认)
(标准多 Agent 并行/串行)
三种策略对比
| 策略 | 适用场景 | 特点 |
|---|---|---|
| SimpleTask | 简单问答、单步任务 | 最轻量,单 Agent |
| DAGWorkflow | 2-5 个子任务,可能有简单依赖 | 并行/串行/混合执行 |
| Supervisor | 6+ 子任务,复杂依赖,需要动态协调 | 团队管理、邮箱通信 |
这三个策略后面几章会详细讲。这里先记住:编排器会根据任务复杂度自动选择策略。
13.4 三种执行模式
不管走哪个策略,最终都要执行 Agent。执行方式有三种:
模式一:并行执行(Parallel)
适用场景:子任务相互独立,没有依赖关系。
核心是信号量控制——限制同时执行的 Agent 数量,防止资源耗尽。
type ParallelConfig struct {
MaxConcurrency int // 最大并发数,默认 5
}
func ExecuteParallel(ctx workflow.Context, tasks []ParallelTask, config ParallelConfig) {
// 信号量控制并发
semaphore := workflow.NewSemaphore(ctx, int64(config.MaxConcurrency))
for i, task := range tasks {
workflow.Go(ctx, func(ctx workflow.Context) {
// 获取信号量(超过并发数会阻塞)
semaphore.Acquire(ctx, 1)
defer semaphore.Release(1)
// 执行任务
executeTask(task)
})
}
}
实现参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/parallel.go - ExecuteParallel 函数
为什么要限制并发?
假设你有 10 个搜索任务,MaxConcurrency = 3:
t0: [Task 1] [Task 2] [Task 3] ← 3 个同时开始
t1: [1 done] [Task 4 starts] ← 1 完成,4 立即补位
t2: [2 done] [Task 5 starts] ← 2 完成,5 补位
...
如果不限制,10 个 Agent 同时调用 LLM API,很可能触发限流,反而更慢。
模式二:串行执行(Sequential)
适用场景:任务有隐式依赖,后一个需要前一个的结果。
type SequentialConfig struct {
PassPreviousResults bool // 是否把前一个结果传给下一个
}
func ExecuteSequential(ctx workflow.Context, tasks []Task, config SequentialConfig) {
var results []Result
for i, task := range tasks {
// 把前序结果注入上下文
if config.PassPreviousResults && len(results) > 0 {
task.Context["previous_results"] = results
}
result := executeTask(task)
results = append(results, result)
}
}
关键是 结果传递。比如:
Task 1: "获取特斯拉股价"
→ Response: "$250"
↓
Task 2: "计算相比去年的涨幅"
Context: {
previous_results: [
{ response: "$250", numeric_value: 250 }
]
}
→ 可以直接用 250 做计算
模式三:混合执行(Hybrid/DAG)
适用场景:部分任务可并行,部分任务有依赖。
核心是依赖等待——任务只有在所有依赖任务完成后才能开始。
func waitForDependencies(
ctx workflow.Context,
dependencies []string,
completedTasks map[string]bool,
timeout time.Duration,
) bool {
startTime := workflow.Now(ctx)
deadline := startTime.Add(timeout)
for workflow.Now(ctx).Before(deadline) {
// 检查所有依赖是否完成
allDone := true
for _, depID := range dependencies {
if !completedTasks[depID] {
allDone = false
break
}
}
if allDone {
return true
}
// 等 30 秒再检查
workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
// 条件检查
for _, depID := range dependencies {
if !completedTasks[depID] {
return false
}
}
return true
})
}
return false // 超时
}
实现参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/hybrid.go - waitForDependencies 函数
13.5 结果综合
多个 Agent 跑完了,怎么整合结果?
问题
Agent 的原始输出通常是:
- 冗余的:不同 Agent 可能给出相似信息
- 格式不一:每个 Agent 有自己的输出风格
- 质量参差:有的成功,有的失败,有的半吊子
用户期望的是:一个统一、完整、高质量的回答。
预处理:去重与过滤
func preprocessResults(results []AgentResult) []AgentResult {
// 1. 精确去重(Hash)
seen := make(map[string]bool)
exact := []AgentResult{}
for _, r := range results {
hash := computeHash(r.Response)
if !seen[hash] {
seen[hash] = true
exact = append(exact, r)
}
}
// 2. 相似去重(Jaccard > 0.85)
similar := []AgentResult{}
for _, r := range exact {
isDuplicate := false
for _, s := range similar {
if jaccardSimilarity(r.Response, s.Response) > 0.85 {
isDuplicate = true
break
}
}
if !isDuplicate {
similar = append(similar, r)
}
}
// 3. 质量过滤
filtered := []AgentResult{}
noInfoPatterns := []string{
"unable to retrieve",
"failed to fetch",
"no information available",
"无法访问",
"未找到",
}
for _, r := range similar {
if r.Success && !containsAny(r.Response, noInfoPatterns) {
filtered = append(filtered, r)
}
}
return filtered
}
综合方式
简单综合:直接拼接
适合结果已经很规整的情况:
func simpleSynthesis(results []AgentResult) string {
var parts []string
for _, r := range results {
parts = append(parts, r.Response)
}
return strings.Join(parts, "\n\n")
}
LLM 综合:智能整合
适合需要统一视角、消除矛盾、生成洞察的情况:
func llmSynthesis(query string, results []AgentResult) string {
prompt := fmt.Sprintf(`综合以下研究结果,回答问题:%s
要求:
1. 消除重复信息
2. 解决矛盾(如果有)
3. 突出关键洞察
4. 用统一的格式呈现
`, query)
for i, r := range results {
prompt += fmt.Sprintf("=== 来源 %d ===\n%s\n\n", i+1, r.Response)
}
return callLLM(prompt)
}
13.6 Token 预算分配
多 Agent 场景下,成本控制更重要。
为什么?
单 Agent 烧 1000 个 token,多 Agent 可能烧 5000 个。如果不控制,一个复杂任务就可能把一天的预算用光。
预算分配策略
简单策略:平均分配
func allocateBudgetSimple(totalBudget int, numAgents int) int {
return totalBudget / numAgents
}
// 例:总预算 10000,5 个 Agent → 每个 2000
进阶策略:按复杂度分配
func allocateBudgetByComplexity(totalBudget int, subtasks []Subtask) map[string]int {
budgets := make(map[string]int)
// 计算总复杂度
totalComplexity := 0.0
for _, st := range subtasks {
totalComplexity += st.Complexity
}
// 按比例分配
for _, st := range subtasks {
budgets[st.ID] = int(float64(totalBudget) * st.Complexity / totalComplexity)
}
return budgets
}
// 例:总预算 10000
// Task A (复杂度 0.5) → 5000
// Task B (复杂度 0.3) → 3000
// Task C (复杂度 0.2) → 2000
Shannon 的实现:
// 从路由器传递预算
n := len(decomp.Subtasks)
if n == 0 {
n = 1
}
agentMax := res.RemainingTaskBudget / n
// 可以通过环境变量或请求上下文设置上限
if v := os.Getenv("TOKEN_BUDGET_PER_AGENT"); v != "" {
if cap, err := strconv.Atoi(v); err == nil && cap > 0 && cap < agentMax {
agentMax = cap
}
}
input.Context["budget_agent_max"] = agentMax
13.7 控制信号
编排过程中,用户可能想要:暂停、恢复、取消。
Temporal 的信号机制
Shannon 使用 Temporal 的 Signal 机制实现控制:
// 设置控制信号处理器
controlHandler := &ControlSignalHandler{
WorkflowID: workflowID,
AgentID: "orchestrator",
}
controlHandler.Setup(ctx)
// 在关键点检查信号
checkpoints := []string{
"pre_routing", // 路由决策前
"post_decomposition", // 任务分解后
"pre_dag_workflow", // 进入 DAG 前
}
for _, checkpoint := range checkpoints {
if err := controlHandler.CheckPausePoint(ctx, checkpoint); err != nil {
return TaskResult{Success: false, ErrorMessage: err.Error()}, err
}
}
子工作流注册
当编排器启动子工作流时,需要注册它们以便传递信号:
// 启动子工作流
childFuture := workflow.ExecuteChildWorkflow(ctx, DAGWorkflow, input)
// 获取子工作流 ID
var childExec workflow.Execution
childFuture.GetChildWorkflowExecution().Get(ctx, &childExec)
// 注册(这样暂停/取消信号会传递给子工作流)
controlHandler.RegisterChildWorkflow(childExec.ID)
// 执行完毕后注销
defer controlHandler.UnregisterChildWorkflow(childExec.ID)
13.8 完整示例
把前面的内容串起来,看一个完整的多 Agent 研究任务:
func CompanyResearchWorkflow(ctx workflow.Context, query string) (string, error) {
companies := []string{"Tesla", "BYD", "Rivian"}
// 1. 构建并行任务
tasks := make([]ParallelTask, len(companies))
for i, company := range companies {
tasks[i] = ParallelTask{
ID: fmt.Sprintf("research-%s", strings.ToLower(company)),
Description: fmt.Sprintf("Research %s's 2024 EV strategy", company),
SuggestedTools: []string{"web_search"},
Role: "researcher",
}
}
// 2. 并行执行
config := ParallelConfig{
MaxConcurrency: 3,
EmitEvents: true,
}
result, err := ExecuteParallel(ctx, tasks, sessionID, history, config, budgetPerAgent, userID, modelTier)
if err != nil {
return "", err
}
// 3. 预处理结果
processed := preprocessResults(result.Results)
// 4. LLM 综合
synthesis := llmSynthesis(query, processed)
return synthesis, nil
}
执行时间线:
0s ┌─ 编排器启动
├─ 任务分解: 3 个研究任务 + 1 个综合任务
└─ 路由决策: DAGWorkflow
1s ├─ 并行启动 3 个研究 Agent
│ ├─ Agent A (Tesla): 搜索中...
│ ├─ Agent B (BYD): 搜索中...
│ └─ Agent C (Rivian): 搜索中...
15s ├─ Agent B 完成
20s ├─ Agent C 完成
25s ├─ Agent A 完成 (Tesla 信息最多)
26s ├─ 开始结果综合
│ ├─ 去重: 移除 2 条重复信息
│ ├─ 过滤: 移除 1 条失败结果
│ └─ LLM 综合分析
45s └─ 输出最终报告
总耗时: ~45 秒 (串行需要 ~75 秒)
13.9 常见的坑
坑 1:过度并行
// 危险:并发 100,API 会限流
config := ParallelConfig{MaxConcurrency: 100}
// 合理:根据 API 限制设置
config := ParallelConfig{MaxConcurrency: 5}
我见过有人把并发设成 50,结果 LLM API 返回一堆 429 Too Many Requests。还不如串行执行。
坑 2:忽略失败任务
// 问题:只处理成功的,失败的被无视
for _, r := range results {
if r.Success {
process(r)
}
}
// 改进:监控成功率
successRate := float64(successCount) / float64(total)
if successRate < 0.7 {
logger.Warn("Low success rate", "rate", successRate)
// 可能需要重试或报警
}
坑 3:结果综合丢信息
简单拼接可能导致:
- 信息重复(两个 Agent 都提到「Tesla 市值 8000 亿」)
- 信息矛盾(一个说增长 15%,一个说增长 12%)
- 缺乏洞察(只是罗列,没有对比分析)
用 LLM 综合时,prompt 要明确要求:
synthesisPrompt := `综合以下研究结果:
要求:
1. 消除重复
2. 标注矛盾(如果有)
3. 生成对比分析表格
4. 总结关键洞察(3-5 条)
...
`
坑 4:预算分配不合理
// 问题:简单任务和复杂任务同等预算
budgetPerAgent := totalBudget / numAgents
// 改进:按任务估算 token 分配
for _, st := range subtasks {
budgets[st.ID] = int(float64(totalBudget) * float64(st.EstimatedTokens) / float64(totalEstimated))
}
13.10 其他框架的实现
编排是多 Agent 的核心问题,各框架都有自己的方案:
| 框架 | 编排方式 | 特点 |
|---|---|---|
| LangGraph | 图定义 + 节点执行 | 灵活,需要手动定义图 |
| AutoGen | GroupChat + Manager | 对话驱动,自动选择发言者 |
| CrewAI | Crew + Process | 角色定义清晰,支持顺序/层级 |
| OpenAI Swarm | handoff() | 轻量级,Agent 之间直接交接 |
LangGraph 示例:
from langgraph.graph import StateGraph
# 定义状态
class ResearchState(TypedDict):
query: str
tesla_data: str
byd_data: str
synthesis: str
# 定义图
graph = StateGraph(ResearchState)
graph.add_node("research_tesla", research_tesla_node)
graph.add_node("research_byd", research_byd_node)
graph.add_node("synthesize", synthesize_node)
# 定义边(依赖)
graph.add_edge(START, "research_tesla")
graph.add_edge(START, "research_byd")
graph.add_edge("research_tesla", "synthesize")
graph.add_edge("research_byd", "synthesize")
这章讲完了
核心就一句话:Orchestrator 是多 Agent 的指挥家——分解任务、分发执行、协调依赖、综合结果。
小结
- 单 Agent 三硬伤:串行低效、通才不专、单点故障
- Orchestrator 四职责:Decompose → Dispatch → Coordinate → Synthesize
- 路由决策:简单任务用 SimpleTask,复杂任务用 DAG 或 Supervisor
- 三种执行模式:并行(独立任务)、串行(链式依赖)、混合(DAG)
- 结果综合:去重 → 过滤 → LLM 整合
Shannon Lab(10 分钟上手)
本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。
必读(1 个文件)
orchestrator_router.go:找 OrchestratorWorkflow 函数的路由 switch 语句,理解怎么判断「简单任务」、「需要 Supervisor」、怎么委托给子工作流
选读深挖(2 个,按兴趣挑)
execution/parallel.go:理解信号量控制怎么实现(workflow.NewSemaphore)、为什么用 futuresChan + Selector 收集结果execution/hybrid.go:理解 waitForDependencies 的增量超时检查、为什么用 workflow.AwaitWithTimeout 而不是死等
练习
练习 1:路由决策分析
分析以下任务会走哪个路径:
- 「今天北京天气怎么样」
- 「对比 iPhone 和 Android 的市场份额」
- 「设计一个电商系统的完整架构,包括前端、后端、数据库、缓存、消息队列」
对于每个任务,说明:
- 预期的复杂度评分范围
- 会走哪个工作流(SimpleTask / DAG / Supervisor)
- 为什么
练习 2:并发度设置
假设你的 LLM API 限制是每秒 10 次请求,一个任务需要 3 次 LLM 调用,平均耗时 5 秒。
问题:
- 如果有 20 个子任务,MaxConcurrency 设多少合适?
- 设太高会怎样?
- 设太低会怎样?
练习 3(进阶):设计综合 Prompt
为一个「多公司财报对比分析」任务设计 LLM 综合的 prompt。
要求包含:
- 如何处理信息重复
- 如何处理数据矛盾
- 输出格式要求(表格 + 洞察)
- 引用标注要求
想深入?
- Temporal Workflows - 理解工作流编排的基础设施
- LangGraph Multi-Agent - Python 生态的图编排方案
- AutoGen GroupChat - 微软的对话式多 Agent 框架
下一章预告
编排器决定了「谁来做」,但「怎么做」还没解决。
当任务之间有复杂的依赖关系——A 等 B,B 等 C,C 又可以和 D 并行——简单的串行或并行都搞不定。
下一章讲 DAG 工作流:用有向无环图来建模任务依赖,实现智能调度。
下一章我们继续。