第 13 章:编排基础

多 Agent 编排不是让一群 Agent 各干各的,而是让它们像交响乐团一样协作——有指挥、有分工、有配合。但指挥再厉害,乐手水平不行也白搭。


⏱️ 快速通道(5 分钟掌握核心)

  1. 单 Agent 三大硬伤:串行慢、深度浅、单点故障
  2. 编排三要素:任务分解、Agent 分配、结果综合
  3. 自动 vs 配置:简单任务自动匹配,复杂任务显式配置
  4. 协调成本不可忽略:任务简单时单 Agent 反而更快
  5. 编排是架构决策,需要权衡并行收益与协调开销

10 分钟路径:13.1-13.3 → 13.5 → Shannon Lab


13.1 为什么单 Agent 不够用?

这章解决一个核心问题:当单个 Agent 无法高效完成任务时,如何让多个 Agent 协作?

想象你在管理一个小型研究项目——需要分析三家竞争对手(Tesla、BYD、Rivian)的电动车战略。如果只有你一个人,你会怎么做?

你会串行处理:今天研究 Tesla,明天研究 BYD,后天研究 Rivian。三天后,你终于把所有信息收集完,开始写对比分析。

但如果你有三个助手呢?你会让他们同时开工:Alice 研究 Tesla,Bob 研究 BYD,Carol 研究 Rivian。一天后,三份报告同时到手,你只需要综合对比就行。

效率提升 3 倍。

单 Agent 就像一个人单打独斗——能完成任务,但效率低、深度浅。多 Agent 编排就是组建团队、分工协作。

但组建团队不是简单地"多雇几个人"。你需要:分配任务、协调进度、整合结果、处理冲突。编排器(Orchestrator)就是做这件事的。

单 Agent 的三大硬伤

先说结论:单 Agent 有三个硬伤。

硬伤一:串行执行,效率太低

三家公司的搜索完全独立,没有依赖关系。但单 Agent 只能一个接一个做。如果并行呢?差距明显:

串行 vs 并行执行对比

省了 40 秒。任务越多,差距越大。

硬伤二:通才做专家的活,深度不够

「设计一个 AI 创业公司的商业计划」,这个任务需要什么?

  • 市场分析:行业规模、增长趋势、竞争格局
  • 技术架构:技术选型、成本估算、可行性评估
  • 财务预测:收入模型、成本结构、盈亏分析
  • 营销策略:目标用户、获客渠道、品牌定位

让一个「通才」Agent 同时搞定这四件事?它可能每个都懂一点,但每个都不够深。

更好的方式是:4 个专家 Agent,各司其职。

硬伤三:单点故障,没有冗余

一个 Agent 挂了——网络超时、LLM 报错、工具调用失败——整个任务就废了。

多 Agent 系统可以做容错:一个挂了,其他继续;关键任务可以有备份。

多 Agent vs 单 Agent

能力单 Agent多 Agent
并行能力串行执行并发执行
专业深度通才,样样懂点专家分工,各有所长
容错能力单点故障冗余容错
成本控制统一模型按任务选模型(简单任务用便宜模型)

注意:多 Agent 不是银弹。协调多个 Agent 本身就有开销——通信、同步、结果整合。任务简单的时候,单 Agent 反而更快。只有任务复杂到一定程度,多 Agent 的收益才能覆盖协调成本。我见过有人把「查个天气」都拆成 3 个 Agent——完全没必要,反而更慢。


13.2 编排器:多 Agent 的指挥家

多 Agent 系统需要一个「指挥家」——Orchestrator(编排器)。

它不亲自干活,但它决定:

  • 任务怎么拆
  • 谁来做什么
  • 什么顺序执行
  • 结果怎么整合

四大职责

编排器四大职责

类比一下:编排器就像餐厅的主厨。

客人说「我要一份牛排套餐」。主厨不会自己一个人做,他会:

  1. 分解:牛排、配菜、酱汁、甜点
  2. 分发:牛排给烤台、配菜给冷厨、酱汁给酱料师
  3. 协调:牛排好了再淋酱、配菜和牛排同时出
  4. 综合:摆盘,确保温度和卖相

主厨不需要每样都会做,但他要知道:谁擅长什么、什么顺序合理、怎么整合成一道菜。

执行流程

编排执行四步骤


13.3 路由决策:该用什么策略?

不是所有任务都需要多 Agent。编排器的第一个决策是:这个任务该走什么路径?

Shannon 的路由逻辑

Shannon 的 OrchestratorWorkflow 是这样判断的:

// 判断是否简单任务
simpleByShape := len(decomp.Subtasks) == 0 ||
                 (len(decomp.Subtasks) == 1 && !needsTools)
isSimple := decomp.ComplexityScore < simpleThreshold && simpleByShape

// 检查是否有依赖关系
hasDeps := false
for _, st := range decomp.Subtasks {
    if len(st.Dependencies) > 0 || len(st.Consumes) > 0 {
        hasDeps = true
        break
    }
}

switch {
case isSimple:
    // 简单任务 → 单 Agent 直接执行
    return SimpleTaskWorkflow(input)

case len(decomp.Subtasks) > 5 || hasDeps:
    // 复杂任务或有依赖 → Supervisor 模式
    return SupervisorWorkflow(input)

default:
    // 标准任务 → DAG 工作流
    return DAGWorkflow(input)
}

实现参考 (Shannon): go/orchestrator/internal/workflows/orchestrator_router.go - OrchestratorWorkflow 函数

决策树

任务进入
    
    
复杂度 < 0.3 且单子任务且无工具? ──是──► SimpleTaskWorkflow
                                        ( Agent 直接执行)
    
    
    
子任务 > 5 或有依赖? ──是──► SupervisorWorkflow
                           (复杂多 Agent 协调)
    
    
    
DAGWorkflow(默认)
(标准多 Agent 并行/串行)

三种策略对比

策略适用场景特点
SimpleTask简单问答、单步任务最轻量,单 Agent
DAGWorkflow2-5 个子任务,可能有简单依赖并行/串行/混合执行
Supervisor6+ 子任务,复杂依赖,需要动态协调团队管理、邮箱通信

这三个策略后面几章会详细讲。这里先记住:编排器会根据任务复杂度自动选择策略


13.4 三种执行模式

不管走哪个策略,最终都要执行 Agent。执行方式有三种:

模式一:并行执行(Parallel)

适用场景:子任务相互独立,没有依赖关系。

并行任务执行

核心是信号量控制——限制同时执行的 Agent 数量,防止资源耗尽。

type ParallelConfig struct {
    MaxConcurrency int  // 最大并发数,默认 5
}

func ExecuteParallel(ctx workflow.Context, tasks []ParallelTask, config ParallelConfig) {
    // 信号量控制并发
    semaphore := workflow.NewSemaphore(ctx, int64(config.MaxConcurrency))

    for i, task := range tasks {
        workflow.Go(ctx, func(ctx workflow.Context) {
            // 获取信号量(超过并发数会阻塞)
            semaphore.Acquire(ctx, 1)
            defer semaphore.Release(1)

            // 执行任务
            executeTask(task)
        })
    }
}

实现参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/parallel.go - ExecuteParallel 函数

为什么要限制并发?

假设你有 10 个搜索任务,MaxConcurrency = 3:

t0: [Task 1] [Task 2] [Task 3]   3 个同时开始
t1: [1 done] [Task 4 starts]     1 完成,4 立即补位
t2: [2 done] [Task 5 starts]     2 完成,5 补位
...

如果不限制,10 个 Agent 同时调用 LLM API,很可能触发限流,反而更慢。

模式二:串行执行(Sequential)

适用场景:任务有隐式依赖,后一个需要前一个的结果。

串行任务执行

type SequentialConfig struct {
    PassPreviousResults bool  // 是否把前一个结果传给下一个
}

func ExecuteSequential(ctx workflow.Context, tasks []Task, config SequentialConfig) {
    var results []Result

    for i, task := range tasks {
        // 把前序结果注入上下文
        if config.PassPreviousResults && len(results) > 0 {
            task.Context["previous_results"] = results
        }

        result := executeTask(task)
        results = append(results, result)
    }
}

关键是 结果传递。比如:

Task 1: "获取特斯拉股价"
         Response: "$250"
        
Task 2: "计算相比去年的涨幅"
        Context: {
          previous_results: [
            { response: "$250", numeric_value: 250 }
          ]
        }
         可以直接用 250 做计算

模式三:混合执行(Hybrid/DAG)

适用场景:部分任务可并行,部分任务有依赖。

任务依赖树

核心是依赖等待——任务只有在所有依赖任务完成后才能开始。

func waitForDependencies(
    ctx workflow.Context,
    dependencies []string,
    completedTasks map[string]bool,
    timeout time.Duration,
) bool {
    startTime := workflow.Now(ctx)
    deadline := startTime.Add(timeout)

    for workflow.Now(ctx).Before(deadline) {
        // 检查所有依赖是否完成
        allDone := true
        for _, depID := range dependencies {
            if !completedTasks[depID] {
                allDone = false
                break
            }
        }
        if allDone {
            return true
        }

        // 等 30 秒再检查
        workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
            // 条件检查
            for _, depID := range dependencies {
                if !completedTasks[depID] {
                    return false
                }
            }
            return true
        })
    }

    return false  // 超时
}

实现参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/hybrid.go - waitForDependencies 函数


13.5 结果综合

多个 Agent 跑完了,怎么整合结果?

问题

Agent 的原始输出通常是:

  1. 冗余的:不同 Agent 可能给出相似信息
  2. 格式不一:每个 Agent 有自己的输出风格
  3. 质量参差:有的成功,有的失败,有的半吊子

用户期望的是:一个统一、完整、高质量的回答。

预处理:去重与过滤

func preprocessResults(results []AgentResult) []AgentResult {
    // 1. 精确去重(Hash)
    seen := make(map[string]bool)
    exact := []AgentResult{}
    for _, r := range results {
        hash := computeHash(r.Response)
        if !seen[hash] {
            seen[hash] = true
            exact = append(exact, r)
        }
    }

    // 2. 相似去重(Jaccard > 0.85)
    similar := []AgentResult{}
    for _, r := range exact {
        isDuplicate := false
        for _, s := range similar {
            if jaccardSimilarity(r.Response, s.Response) > 0.85 {
                isDuplicate = true
                break
            }
        }
        if !isDuplicate {
            similar = append(similar, r)
        }
    }

    // 3. 质量过滤
    filtered := []AgentResult{}
    noInfoPatterns := []string{
        "unable to retrieve",
        "failed to fetch",
        "no information available",
        "无法访问",
        "未找到",
    }
    for _, r := range similar {
        if r.Success && !containsAny(r.Response, noInfoPatterns) {
            filtered = append(filtered, r)
        }
    }

    return filtered
}

综合方式

简单综合:直接拼接

适合结果已经很规整的情况:

func simpleSynthesis(results []AgentResult) string {
    var parts []string
    for _, r := range results {
        parts = append(parts, r.Response)
    }
    return strings.Join(parts, "\n\n")
}

LLM 综合:智能整合

适合需要统一视角、消除矛盾、生成洞察的情况:

func llmSynthesis(query string, results []AgentResult) string {
    prompt := fmt.Sprintf(`综合以下研究结果,回答问题:%s

要求:
1. 消除重复信息
2. 解决矛盾(如果有)
3. 突出关键洞察
4. 用统一的格式呈现

`, query)

    for i, r := range results {
        prompt += fmt.Sprintf("=== 来源 %d ===\n%s\n\n", i+1, r.Response)
    }

    return callLLM(prompt)
}

13.6 Token 预算分配

多 Agent 场景下,成本控制更重要。

为什么?

单 Agent 烧 1000 个 token,多 Agent 可能烧 5000 个。如果不控制,一个复杂任务就可能把一天的预算用光。

预算分配策略

简单策略:平均分配

func allocateBudgetSimple(totalBudget int, numAgents int) int {
    return totalBudget / numAgents
}

// 例:总预算 10000,5 个 Agent → 每个 2000

进阶策略:按复杂度分配

func allocateBudgetByComplexity(totalBudget int, subtasks []Subtask) map[string]int {
    budgets := make(map[string]int)

    // 计算总复杂度
    totalComplexity := 0.0
    for _, st := range subtasks {
        totalComplexity += st.Complexity
    }

    // 按比例分配
    for _, st := range subtasks {
        budgets[st.ID] = int(float64(totalBudget) * st.Complexity / totalComplexity)
    }

    return budgets
}

// 例:总预算 10000
//     Task A (复杂度 0.5) → 5000
//     Task B (复杂度 0.3) → 3000
//     Task C (复杂度 0.2) → 2000

Shannon 的实现:

// 从路由器传递预算
n := len(decomp.Subtasks)
if n == 0 {
    n = 1
}
agentMax := res.RemainingTaskBudget / n

// 可以通过环境变量或请求上下文设置上限
if v := os.Getenv("TOKEN_BUDGET_PER_AGENT"); v != "" {
    if cap, err := strconv.Atoi(v); err == nil && cap > 0 && cap < agentMax {
        agentMax = cap
    }
}

input.Context["budget_agent_max"] = agentMax

13.7 控制信号

编排过程中,用户可能想要:暂停、恢复、取消。

Temporal 的信号机制

Shannon 使用 Temporal 的 Signal 机制实现控制:

// 设置控制信号处理器
controlHandler := &ControlSignalHandler{
    WorkflowID: workflowID,
    AgentID:    "orchestrator",
}
controlHandler.Setup(ctx)

// 在关键点检查信号
checkpoints := []string{
    "pre_routing",         // 路由决策前
    "post_decomposition",  // 任务分解后
    "pre_dag_workflow",    // 进入 DAG 前
}

for _, checkpoint := range checkpoints {
    if err := controlHandler.CheckPausePoint(ctx, checkpoint); err != nil {
        return TaskResult{Success: false, ErrorMessage: err.Error()}, err
    }
}

子工作流注册

当编排器启动子工作流时,需要注册它们以便传递信号:

// 启动子工作流
childFuture := workflow.ExecuteChildWorkflow(ctx, DAGWorkflow, input)

// 获取子工作流 ID
var childExec workflow.Execution
childFuture.GetChildWorkflowExecution().Get(ctx, &childExec)

// 注册(这样暂停/取消信号会传递给子工作流)
controlHandler.RegisterChildWorkflow(childExec.ID)

// 执行完毕后注销
defer controlHandler.UnregisterChildWorkflow(childExec.ID)

13.8 完整示例

把前面的内容串起来,看一个完整的多 Agent 研究任务:

func CompanyResearchWorkflow(ctx workflow.Context, query string) (string, error) {
    companies := []string{"Tesla", "BYD", "Rivian"}

    // 1. 构建并行任务
    tasks := make([]ParallelTask, len(companies))
    for i, company := range companies {
        tasks[i] = ParallelTask{
            ID:          fmt.Sprintf("research-%s", strings.ToLower(company)),
            Description: fmt.Sprintf("Research %s's 2024 EV strategy", company),
            SuggestedTools: []string{"web_search"},
            Role:        "researcher",
        }
    }

    // 2. 并行执行
    config := ParallelConfig{
        MaxConcurrency: 3,
        EmitEvents:     true,
    }
    result, err := ExecuteParallel(ctx, tasks, sessionID, history, config, budgetPerAgent, userID, modelTier)
    if err != nil {
        return "", err
    }

    // 3. 预处理结果
    processed := preprocessResults(result.Results)

    // 4. LLM 综合
    synthesis := llmSynthesis(query, processed)

    return synthesis, nil
}

执行时间线:

0s   ┌─ 编排器启动
     ├─ 任务分解: 3 个研究任务 + 1 个综合任务
     └─ 路由决策: DAGWorkflow

1s   ├─ 并行启动 3 个研究 Agent
        ├─ Agent A (Tesla):  搜索中...
        ├─ Agent B (BYD):    搜索中...
        └─ Agent C (Rivian): 搜索中...

15s  ├─ Agent B 完成
20s  ├─ Agent C 完成
25s  ├─ Agent A 完成 (Tesla 信息最多)

26s  ├─ 开始结果综合
        ├─ 去重: 移除 2 条重复信息
        ├─ 过滤: 移除 1 条失败结果
        └─ LLM 综合分析

45s  └─ 输出最终报告

总耗时: ~45  (串行需要 ~75 )

13.9 常见的坑

坑 1:过度并行

// 危险:并发 100,API 会限流
config := ParallelConfig{MaxConcurrency: 100}

// 合理:根据 API 限制设置
config := ParallelConfig{MaxConcurrency: 5}

我见过有人把并发设成 50,结果 LLM API 返回一堆 429 Too Many Requests。还不如串行执行。

坑 2:忽略失败任务

// 问题:只处理成功的,失败的被无视
for _, r := range results {
    if r.Success {
        process(r)
    }
}

// 改进:监控成功率
successRate := float64(successCount) / float64(total)
if successRate < 0.7 {
    logger.Warn("Low success rate", "rate", successRate)
    // 可能需要重试或报警
}

坑 3:结果综合丢信息

简单拼接可能导致:

  • 信息重复(两个 Agent 都提到「Tesla 市值 8000 亿」)
  • 信息矛盾(一个说增长 15%,一个说增长 12%)
  • 缺乏洞察(只是罗列,没有对比分析)

用 LLM 综合时,prompt 要明确要求:

synthesisPrompt := `综合以下研究结果:

要求:
1. 消除重复
2. 标注矛盾(如果有)
3. 生成对比分析表格
4. 总结关键洞察(3-5 条)

...
`

坑 4:预算分配不合理

// 问题:简单任务和复杂任务同等预算
budgetPerAgent := totalBudget / numAgents

// 改进:按任务估算 token 分配
for _, st := range subtasks {
    budgets[st.ID] = int(float64(totalBudget) * float64(st.EstimatedTokens) / float64(totalEstimated))
}

13.10 其他框架的实现

编排是多 Agent 的核心问题,各框架都有自己的方案:

框架编排方式特点
LangGraph图定义 + 节点执行灵活,需要手动定义图
AutoGenGroupChat + Manager对话驱动,自动选择发言者
CrewAICrew + Process角色定义清晰,支持顺序/层级
OpenAI Swarmhandoff()轻量级,Agent 之间直接交接

LangGraph 示例:

from langgraph.graph import StateGraph

# 定义状态
class ResearchState(TypedDict):
    query: str
    tesla_data: str
    byd_data: str
    synthesis: str

# 定义图
graph = StateGraph(ResearchState)
graph.add_node("research_tesla", research_tesla_node)
graph.add_node("research_byd", research_byd_node)
graph.add_node("synthesize", synthesize_node)

# 定义边(依赖)
graph.add_edge(START, "research_tesla")
graph.add_edge(START, "research_byd")
graph.add_edge("research_tesla", "synthesize")
graph.add_edge("research_byd", "synthesize")

这章讲完了

核心就一句话:Orchestrator 是多 Agent 的指挥家——分解任务、分发执行、协调依赖、综合结果

小结

  1. 单 Agent 三硬伤:串行低效、通才不专、单点故障
  2. Orchestrator 四职责:Decompose → Dispatch → Coordinate → Synthesize
  3. 路由决策:简单任务用 SimpleTask,复杂任务用 DAG 或 Supervisor
  4. 三种执行模式:并行(独立任务)、串行(链式依赖)、混合(DAG)
  5. 结果综合:去重 → 过滤 → LLM 整合

Shannon Lab(10 分钟上手)

本节帮你在 10 分钟内把本章概念对应到 Shannon 源码。

必读(1 个文件)

  • orchestrator_router.go:找 OrchestratorWorkflow 函数的路由 switch 语句,理解怎么判断「简单任务」、「需要 Supervisor」、怎么委托给子工作流

选读深挖(2 个,按兴趣挑)

  • execution/parallel.go:理解信号量控制怎么实现(workflow.NewSemaphore)、为什么用 futuresChan + Selector 收集结果
  • execution/hybrid.go:理解 waitForDependencies 的增量超时检查、为什么用 workflow.AwaitWithTimeout 而不是死等

练习

练习 1:路由决策分析

分析以下任务会走哪个路径:

  1. 「今天北京天气怎么样」
  2. 「对比 iPhone 和 Android 的市场份额」
  3. 「设计一个电商系统的完整架构,包括前端、后端、数据库、缓存、消息队列」

对于每个任务,说明:

  • 预期的复杂度评分范围
  • 会走哪个工作流(SimpleTask / DAG / Supervisor)
  • 为什么

练习 2:并发度设置

假设你的 LLM API 限制是每秒 10 次请求,一个任务需要 3 次 LLM 调用,平均耗时 5 秒。

问题:

  1. 如果有 20 个子任务,MaxConcurrency 设多少合适?
  2. 设太高会怎样?
  3. 设太低会怎样?

练习 3(进阶):设计综合 Prompt

为一个「多公司财报对比分析」任务设计 LLM 综合的 prompt。

要求包含:

  • 如何处理信息重复
  • 如何处理数据矛盾
  • 输出格式要求(表格 + 洞察)
  • 引用标注要求

想深入?


下一章预告

编排器决定了「谁来做」,但「怎么做」还没解决。

当任务之间有复杂的依赖关系——A 等 B,B 等 C,C 又可以和 D 并行——简单的串行或并行都搞不定。

下一章讲 DAG 工作流:用有向无环图来建模任务依赖,实现智能调度。

下一章我们继续。

引用本文 / Cite
Zhang, W. (2026). 第 13 章:编排基础. In AI Agent 架构:从单体到企业级多智能体. https://waylandz.com/ai-agent-book/第13章-编排基础
@incollection{zhang2026aiagent_第13章_编排基础,
  author = {Zhang, Wayland},
  title = {第 13 章:编排基础},
  booktitle = {AI Agent 架构:从单体到企业级多智能体},
  year = {2026},
  url = {https://waylandz.com/ai-agent-book/第13章-编排基础}
}