マルチエージェント編成って、みんなが勝手に動くことじゃない。オーケストラみたいに、指揮者がいて、分担があって、息が合ってる状態のことさ。ただし、指揮者がいくら優秀でも、演奏者がダメなら意味ないけどね。


5分で要点を掴む

  1. 単体エージェントの3つの限界:直列処理で遅い、専門性が浅い、単一障害点
  2. 編成の3要素:タスク分解、エージェント割り当て、結果統合
  3. 自動 vs 設定:単純なタスクは自動マッチング、複雑なタスクは明示的に設定
  4. 協調コストを忘れるな:タスクが単純なら、単体エージェントの方が速い
  5. 編成はアーキテクチャの決断。並列のメリットと協調オーバーヘッドのトレードオフだ

10分コース:13.1-13.3 → 13.5 → Shannon Lab


13.1 なぜ単体エージェントじゃダメなのか?

この章で解決したいのは1つ。単体エージェントで効率よく片付かないとき、複数のエージェントをどう協力させるか?

ちょっと想像してみて。小さなリサーチプロジェクトを任されたとする。3社の競合(Tesla、BYD、Rivian)のEV戦略を分析してほしいと。一人でやるなら、どうする?

直列でやるよね。今日はTesla、明日はBYD、明後日はRivian。3日後にやっと情報が揃って、比較分析を書き始める。

でも、3人のアシスタントがいたら?同時にやらせるでしょ。AliceはTesla、BobはBYD、CarolはRivian。1日後には3つのレポートが揃って、あとは自分で統合するだけ。

効率3倍。

単体エージェントは一人で戦うようなもの。タスクは完了できるけど、効率は悪いし、深掘りも難しい。マルチエージェント編成は、チームを組んで分業すること。

ただ、チームを組むのは「人を増やす」だけじゃない。タスクの割り当て、進捗の調整、結果の統合、衝突の処理。全部必要になる。Orchestrator(編成器)がそれをやるんだ。

単体エージェントの3つの限界

まず結論から言うと、単体エージェントには3つの限界がある。

限界その1:直列実行、遅すぎる

3社の検索は完全に独立してる。依存関係なし。でも単体エージェントは1つずつしかできない:

タイムライン(単体エージェント):
0秒  ──► Tesla検索 ──► 20秒
20秒 ──► BYD検索   ──► 40秒
40秒 ──► Rivian検索 ──► 60秒

合計:60秒

並列だとどうなる?

タイムライン(マルチエージェント):
0秒  ──┬► Agent A: Tesla検索 ──► 20秒
      ├► Agent B: BYD検索   ──► 20秒
      └► Agent C: Rivian検索 ──► 20秒

合計:20秒

直列 vs 並列実行の比較

40秒の短縮。タスクが増えるほど差は開く。

限界その2:ジェネラリストに専門家の仕事、深さが足りない

「AIスタートアップのビジネスプランを作って」というタスク。何が必要?

  • 市場分析:業界規模、成長トレンド、競合状況
  • 技術アーキテクチャ:技術選定、コスト見積もり、実現可能性評価
  • 財務予測:収益モデル、コスト構造、損益分析
  • マーケティング戦略:ターゲットユーザー、獲得チャネル、ブランドポジショニング

「ジェネラリスト」のエージェント1つで4つ全部?まあ、どれもちょっとは分かるだろうけど、どれも深くない。

ベターな方法は、4人の専門家エージェントで分担すること。

限界その3:単一障害点、冗長性なし

1つのエージェントがダウンしたら(ネットワークタイムアウト、LLMエラー、ツール呼び出し失敗)、タスク全体が終わる。

マルチエージェントシステムならフォールトトレラントにできる。1つ落ちても他は続行。重要なタスクにはバックアップを用意できる。

マルチ vs 単体

能力単体エージェントマルチエージェント
並列能力直列実行並行実行
専門性ジェネラリスト、浅く広く専門家分業、それぞれの強み
耐障害性単一障害点冗長性あり
コスト制御統一モデルタスク別にモデル選択(単純なら安いモデル)

注意:マルチエージェントは万能じゃない。複数エージェントの協調にはオーバーヘッドがある。通信、同期、結果統合。タスクが単純なら、単体エージェントの方が速いこともある。「天気を調べる」だけで3つのエージェントを使う人を見たことあるけど、完全に無駄。かえって遅くなる。


13.2 編成器:マルチエージェントの指揮者

マルチエージェントシステムには「指揮者」が必要。それがOrchestrator(編成器)。

自分では作業しないけど、こういうことを決める:

  • タスクをどう分解するか
  • 誰が何をやるか
  • どの順序で実行するか
  • 結果をどう統合するか

4つの責務

編成器の4つの責務

例え話をしよう:編成器はレストランの料理長みたいなもの。

お客さんが「ステーキセットを」と言う。料理長は一人で全部作らない。こうする:

  1. 分解:ステーキ、付け合わせ、ソース、デザート
  2. 分発:ステーキはグリル担当、付け合わせは冷菜担当、ソースはソーシエ
  3. 協調:ステーキができたらソースをかける、付け合わせとステーキは同時に出す
  4. 統合:盛り付け、温度と見た目をチェック

料理長は全部できる必要はない。でも、誰が何が得意か、どの順番が合理的か、どう一皿にまとめるか、それは知ってなきゃいけない。

実行フロー

編成実行の4ステップ


13.3 ルーティング決定:どの戦略を使う?

全てのタスクにマルチエージェントが必要なわけじゃない。編成器の最初の判断は:このタスクにどのパスを使うか?

Shannon のルーティングロジック

Shannon の OrchestratorWorkflow はこう判断する:

// 単純タスクかどうかの判定
simpleByShape := len(decomp.Subtasks) == 0 ||
                 (len(decomp.Subtasks) == 1 && !needsTools)
isSimple := decomp.ComplexityScore < simpleThreshold && simpleByShape

// 依存関係のチェック
hasDeps := false
for _, st := range decomp.Subtasks {
    if len(st.Dependencies) > 0 || len(st.Consumes) > 0 {
        hasDeps = true
        break
    }
}

switch {
case isSimple:
    // 単純タスク → 単体エージェントで直接実行
    return SimpleTaskWorkflow(input)

case len(decomp.Subtasks) > 5 || hasDeps:
    // 複雑タスクまたは依存あり → Supervisorモード
    return SupervisorWorkflow(input)

default:
    // 標準タスク → DAGワークフロー
    return DAGWorkflow(input)
}

実装参考 (Shannon): go/orchestrator/internal/workflows/orchestrator_router.go - OrchestratorWorkflow 関数

決定木

タスク到着
    
    
複雑度 < 0.3 かつ サブタスク1つ かつ ツール不要? ──はい──► SimpleTaskWorkflow
                                                  (単体エージェントで直接実行)
    いいえ
    
    
サブタスク > 5 または 依存あり? ──はい──► SupervisorWorkflow
                                   (複雑なマルチエージェント協調)
    いいえ
    
    
DAGWorkflow(デフォルト)
(標準的なマルチエージェント並列/直列)

3つの戦略比較

戦略適用シーン特徴
SimpleTask単純なQ&A、単ステップタスク最軽量、単体エージェント
DAGWorkflow2-5個のサブタスク、単純な依存あり並列/直列/ハイブリッド実行
Supervisor6個以上のサブタスク、複雑な依存、動的協調が必要チーム管理、メールボックス通信

この3つの戦略は後の章で詳しくやる。ここでは覚えておいて:編成器はタスクの複雑度に応じて自動的に戦略を選ぶ


13.4 3つの実行モード

どの戦略を選んでも、最終的にはエージェントを実行する。実行方式は3つある:

モード1:並列実行(Parallel)

適用シーン:サブタスクが互いに独立、依存関係なし。

並列タスク実行

コアはセマフォ制御。同時実行するエージェント数を制限して、リソース枯渇を防ぐ。

type ParallelConfig struct {
    MaxConcurrency int  // 最大同時実行数、デフォルト5
}

func ExecuteParallel(ctx workflow.Context, tasks []ParallelTask, config ParallelConfig) {
    // セマフォで同時実行を制御
    semaphore := workflow.NewSemaphore(ctx, int64(config.MaxConcurrency))

    for i, task := range tasks {
        workflow.Go(ctx, func(ctx workflow.Context) {
            // セマフォ取得(同時実行数を超えるとブロック)
            semaphore.Acquire(ctx, 1)
            defer semaphore.Release(1)

            // タスク実行
            executeTask(task)
        })
    }
}

実装参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/parallel.go - ExecuteParallel 関数

なぜ同時実行数を制限するのか?

検索タスクが10個あって、MaxConcurrency = 3 だとする:

t0: [Task 1] [Task 2] [Task 3]   3つ同時スタート
t1: [1 完了] [Task 4 開始]     1が完了、4がすぐ補充
t2: [2 完了] [Task 5 開始]     2が完了、5が補充
...

制限しないと、10個のエージェントが同時にLLM APIを叩いて、レート制限に引っかかる可能性が高い。かえって遅くなる。

モード2:直列実行(Sequential)

適用シーン:タスクに暗黙の依存があり、後のタスクが前のタスクの結果を必要とする。

直列タスク実行

type SequentialConfig struct {
    PassPreviousResults bool  // 前の結果を次に渡すか
}

func ExecuteSequential(ctx workflow.Context, tasks []Task, config SequentialConfig) {
    var results []Result

    for i, task := range tasks {
        // 前の結果をコンテキストに注入
        if config.PassPreviousResults && len(results) > 0 {
            task.Context["previous_results"] = results
        }

        result := executeTask(task)
        results = append(results, result)
    }
}

ポイントは結果の引き継ぎ。例えば:

Task 1: "Teslaの株価を取得"
         Response: "$250"
        
Task 2: "去年からの上昇率を計算"
        Context: {
          previous_results: [
            { response: "$250", numeric_value: 250 }
          ]
        }
         250を使って直接計算できる

モード3:ハイブリッド実行(Hybrid/DAG)

適用シーン:一部のタスクは並列可能、一部に依存関係がある。

タスク依存ツリー

コアは依存待ち。タスクは全ての依存タスクが完了してから開始できる。

func waitForDependencies(
    ctx workflow.Context,
    dependencies []string,
    completedTasks map[string]bool,
    timeout time.Duration,
) bool {
    startTime := workflow.Now(ctx)
    deadline := startTime.Add(timeout)

    for workflow.Now(ctx).Before(deadline) {
        // 全依存が完了したかチェック
        allDone := true
        for _, depID := range dependencies {
            if !completedTasks[depID] {
                allDone = false
                break
            }
        }
        if allDone {
            return true
        }

        // 30秒待ってから再チェック
        workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
            // 条件チェック
            for _, depID := range dependencies {
                if !completedTasks[depID] {
                    return false
                }
            }
            return true
        })
    }

    return false  // タイムアウト
}

実装参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/hybrid.go - waitForDependencies 関数


13.5 結果統合

複数のエージェントが完了した。結果をどうまとめる?

問題

エージェントの生出力は大体こうなってる:

  1. 冗長:違うエージェントが似た情報を出すことがある
  2. フォーマットがバラバラ:各エージェントに独自の出力スタイルがある
  3. 品質にムラ:成功したもの、失敗したもの、中途半端なもの

ユーザーが期待してるのは:統一された、完全で、高品質な回答。

前処理:重複除去とフィルタリング

func preprocessResults(results []AgentResult) []AgentResult {
    // 1. 完全一致の重複除去(Hash)
    seen := make(map[string]bool)
    exact := []AgentResult{}
    for _, r := range results {
        hash := computeHash(r.Response)
        if !seen[hash] {
            seen[hash] = true
            exact = append(exact, r)
        }
    }

    // 2. 類似重複の除去(Jaccard > 0.85)
    similar := []AgentResult{}
    for _, r := range exact {
        isDuplicate := false
        for _, s := range similar {
            if jaccardSimilarity(r.Response, s.Response) > 0.85 {
                isDuplicate = true
                break
            }
        }
        if !isDuplicate {
            similar = append(similar, r)
        }
    }

    // 3. 品質フィルタリング
    filtered := []AgentResult{}
    noInfoPatterns := []string{
        "unable to retrieve",
        "failed to fetch",
        "no information available",
        "アクセスできません",
        "見つかりません",
    }
    for _, r := range similar {
        if r.Success && !containsAny(r.Response, noInfoPatterns) {
            filtered = append(filtered, r)
        }
    }

    return filtered
}

統合方式

シンプル統合:直接結合

結果がすでに整理されている場合に適している:

func simpleSynthesis(results []AgentResult) string {
    var parts []string
    for _, r := range results {
        parts = append(parts, r.Response)
    }
    return strings.Join(parts, "\n\n")
}

LLM統合:インテリジェントな整合

統一された視点、矛盾の解消、インサイト生成が必要な場合に適している:

func llmSynthesis(query string, results []AgentResult) string {
    prompt := fmt.Sprintf(`以下のリサーチ結果を統合して、質問に回答してください:%s

要件:
1. 重複情報を除去
2. 矛盾があれば解決
3. 重要なインサイトを強調
4. 統一フォーマットで提示

`, query)

    for i, r := range results {
        prompt += fmt.Sprintf("=== ソース %d ===\n%s\n\n", i+1, r.Response)
    }

    return callLLM(prompt)
}

13.6 Token予算配分

マルチエージェントシナリオでは、コスト制御がもっと重要になる。

なぜ?

単体エージェントで1000トークン使うところ、マルチエージェントだと5000トークンになることも。制御しないと、1つの複雑なタスクで1日分の予算を使い切る可能性がある。

予算配分戦略

シンプル戦略:均等配分

func allocateBudgetSimple(totalBudget int, numAgents int) int {
    return totalBudget / numAgents
}

// 例:総予算 10000、5エージェント → 各2000

アドバンス戦略:複雑度ベース配分

func allocateBudgetByComplexity(totalBudget int, subtasks []Subtask) map[string]int {
    budgets := make(map[string]int)

    // 総複雑度を計算
    totalComplexity := 0.0
    for _, st := range subtasks {
        totalComplexity += st.Complexity
    }

    // 比率で配分
    for _, st := range subtasks {
        budgets[st.ID] = int(float64(totalBudget) * st.Complexity / totalComplexity)
    }

    return budgets
}

// 例:総予算 10000
//     Task A (複雑度 0.5) → 5000
//     Task B (複雑度 0.3) → 3000
//     Task C (複雑度 0.2) → 2000

Shannon の実装:

// ルーターから予算を引き継ぐ
n := len(decomp.Subtasks)
if n == 0 {
    n = 1
}
agentMax := res.RemainingTaskBudget / n

// 環境変数やリクエストコンテキストで上限を設定可能
if v := os.Getenv("TOKEN_BUDGET_PER_AGENT"); v != "" {
    if cap, err := strconv.Atoi(v); err == nil && cap > 0 && cap < agentMax {
        agentMax = cap
    }
}

input.Context["budget_agent_max"] = agentMax

13.7 制御シグナル

編成プロセス中に、ユーザーがこういうことをしたくなるかもしれない:一時停止、再開、キャンセル。

Temporal のシグナル機構

Shannon は Temporal の Signal 機構で制御を実装している:

// 制御シグナルハンドラーを設定
controlHandler := &ControlSignalHandler{
    WorkflowID: workflowID,
    AgentID:    "orchestrator",
}
controlHandler.Setup(ctx)

// 重要なポイントでシグナルをチェック
checkpoints := []string{
    "pre_routing",         // ルーティング決定前
    "post_decomposition",  // タスク分解後
    "pre_dag_workflow",    // DAG開始前
}

for _, checkpoint := range checkpoints {
    if err := controlHandler.CheckPausePoint(ctx, checkpoint); err != nil {
        return TaskResult{Success: false, ErrorMessage: err.Error()}, err
    }
}

子ワークフローの登録

編成器が子ワークフローを起動するとき、シグナルを伝播させるために登録が必要:

// 子ワークフローを起動
childFuture := workflow.ExecuteChildWorkflow(ctx, DAGWorkflow, input)

// 子ワークフローIDを取得
var childExec workflow.Execution
childFuture.GetChildWorkflowExecution().Get(ctx, &childExec)

// 登録(これで一時停止/キャンセルシグナルが子ワークフローに伝播する)
controlHandler.RegisterChildWorkflow(childExec.ID)

// 完了後に登録解除
defer controlHandler.UnregisterChildWorkflow(childExec.ID)

13.8 完全な例

これまでの内容を繋げて、完全なマルチエージェントリサーチタスクを見てみよう:

func CompanyResearchWorkflow(ctx workflow.Context, query string) (string, error) {
    companies := []string{"Tesla", "BYD", "Rivian"}

    // 1. 並列タスクを構築
    tasks := make([]ParallelTask, len(companies))
    for i, company := range companies {
        tasks[i] = ParallelTask{
            ID:          fmt.Sprintf("research-%s", strings.ToLower(company)),
            Description: fmt.Sprintf("Research %s's 2024 EV strategy", company),
            SuggestedTools: []string{"web_search"},
            Role:        "researcher",
        }
    }

    // 2. 並列実行
    config := ParallelConfig{
        MaxConcurrency: 3,
        EmitEvents:     true,
    }
    result, err := ExecuteParallel(ctx, tasks, sessionID, history, config, budgetPerAgent, userID, modelTier)
    if err != nil {
        return "", err
    }

    // 3. 結果の前処理
    processed := preprocessResults(result.Results)

    // 4. LLM統合
    synthesis := llmSynthesis(query, processed)

    return synthesis, nil
}

実行タイムライン:

0秒   ┌─ 編成器起動
      ├─ タスク分解: 3つのリサーチタスク + 1つの統合タスク
      └─ ルーティング決定: DAGWorkflow

1秒   ├─ 3つのリサーチエージェントを並列起動
         ├─ Agent A (Tesla):  検索中...
         ├─ Agent B (BYD):    検索中...
         └─ Agent C (Rivian): 検索中...

15秒  ├─ Agent B 完了
20秒  ├─ Agent C 完了
25秒  ├─ Agent A 完了 (Teslaの情報が一番多い)

26秒  ├─ 結果統合開始
         ├─ 重複除去: 2件の重複情報を削除
         ├─ フィルタ: 1件の失敗結果を削除
         └─ LLM統合分析

45秒  └─ 最終レポート出力

合計: 約45秒 (直列だと約75秒)

13.9 よくある落とし穴

落とし穴1:過度な並列化

// 危険:同時実行100、APIがレート制限に引っかかる
config := ParallelConfig{MaxConcurrency: 100}

// 適切:APIの制限に合わせて設定
config := ParallelConfig{MaxConcurrency: 5}

同時実行数を50に設定した人を見たことがある。結果、LLM APIから429 Too Many Requestsの嵐。直列実行の方がマシだった。

落とし穴2:失敗タスクの無視

// 問題:成功したものだけ処理、失敗は無視
for _, r := range results {
    if r.Success {
        process(r)
    }
}

// 改善:成功率を監視
successRate := float64(successCount) / float64(total)
if successRate < 0.7 {
    logger.Warn("Low success rate", "rate", successRate)
    // リトライやアラートが必要かも
}

落とし穴3:結果統合で情報ロス

単純な結合だと:

  • 情報の重複(2つのエージェントが両方「Teslaの時価総額は8000億ドル」と言う)
  • 情報の矛盾(1つは成長15%、もう1つは成長12%と言う)
  • インサイトの欠如(羅列するだけで、比較分析がない)

LLM統合するとき、プロンプトで明確に要求する:

synthesisPrompt := `以下のリサーチ結果を統合してください:

要件:
1. 重複を除去
2. 矛盾があれば明記
3. 比較分析テーブルを生成
4. 重要なインサイトをまとめる(3-5点)

...
`

落とし穴4:予算配分の不均衡

// 問題:単純タスクと複雑タスクに同じ予算
budgetPerAgent := totalBudget / numAgents

// 改善:タスクの推定トークン数で配分
for _, st := range subtasks {
    budgets[st.ID] = int(float64(totalBudget) * float64(st.EstimatedTokens) / float64(totalEstimated))
}

13.10 他のフレームワークでの実装

編成はマルチエージェントの核心的な問題。各フレームワークに独自のアプローチがある:

フレームワーク編成方式特徴
LangGraphグラフ定義 + ノード実行柔軟、手動でグラフを定義する必要あり
AutoGenGroupChat + Manager会話駆動、自動で発言者を選択
CrewAICrew + Processロール定義が明確、順序/階層をサポート
OpenAI Swarmhandoff()軽量、エージェント間で直接ハンドオフ

LangGraph の例:

from langgraph.graph import StateGraph

# 状態を定義
class ResearchState(TypedDict):
    query: str
    tesla_data: str
    byd_data: str
    synthesis: str

# グラフを定義
graph = StateGraph(ResearchState)
graph.add_node("research_tesla", research_tesla_node)
graph.add_node("research_byd", research_byd_node)
graph.add_node("synthesize", synthesize_node)

# エッジを定義(依存関係)
graph.add_edge(START, "research_tesla")
graph.add_edge(START, "research_byd")
graph.add_edge("research_tesla", "synthesize")
graph.add_edge("research_byd", "synthesize")

この章はここまで

核心は一言で言える:Orchestratorはマルチエージェントの指揮者。タスクを分解し、実行を分発し、依存を協調し、結果を統合する

まとめ

  1. 単体エージェントの3つの限界:直列で遅い、ジェネラリストで浅い、単一障害点
  2. Orchestratorの4つの責務:Decompose → Dispatch → Coordinate → Synthesize
  3. ルーティング決定:単純タスクはSimpleTask、複雑タスクはDAGかSupervisor
  4. 3つの実行モード:並列(独立タスク)、直列(チェーン依存)、ハイブリッド(DAG)
  5. 結果統合:重複除去 → フィルタ → LLM整合

Shannon Lab(10分でハンズオン)

このセクションで、この章のコンセプトをShannonのソースコードにマッピングする。

必読(1ファイル)

  • orchestrator_router.go:OrchestratorWorkflow関数のルーティングswitch文を探して、「単純タスク」や「Supervisorが必要」の判定方法、子ワークフローへの委譲を理解する

選択深掘り(2つ、興味に応じて)

  • execution/parallel.go:セマフォ制御の実装方法(workflow.NewSemaphore)、なぜfuturesChan + Selectorで結果を収集するかを理解
  • execution/hybrid.go:waitForDependenciesのインクリメンタルタイムアウトチェック、なぜworkflow.AwaitWithTimeoutを使って無限待ちを避けるかを理解

練習問題

練習1:ルーティング決定の分析

以下のタスクがどのパスを通るか分析してみて:

  1. 「今日の東京の天気は?」
  2. 「iPhoneとAndroidの市場シェアを比較して」
  3. 「ECシステムの完全なアーキテクチャを設計して。フロントエンド、バックエンド、データベース、キャッシュ、メッセージキューを含めて」

各タスクについて:

  • 予想される複雑度スコアの範囲
  • どのワークフローを通るか(SimpleTask / DAG / Supervisor)
  • なぜそうなるか

練習2:同時実行数の設定

LLM APIの制限が秒間10リクエスト、1タスクで3回のLLM呼び出しが必要、平均所要時間5秒だとする。

質問:

  1. 20個のサブタスクがある場合、MaxConcurrencyはいくつが適切?
  2. 高すぎるとどうなる?
  3. 低すぎるとどうなる?

練習3(応用):統合プロンプトの設計

「複数企業の決算比較分析」タスク用のLLM統合プロンプトを設計してみて。

含めるべき内容:

  • 情報重複の処理方法
  • データ矛盾の処理方法
  • 出力フォーマット要件(テーブル + インサイト)
  • 引用表記の要件

もっと深く学ぶなら


次章の予告

編成器は「誰がやるか」を決めた。でも「どうやるか」はまだ解決してない。

タスク間に複雑な依存関係があるとき(AがBを待ち、BがCを待ち、CはDと並列可能)、単純な直列や並列じゃ対処できない。

次章はDAGワークフロー:有向非巡回グラフでタスク依存をモデリングし、インテリジェントなスケジューリングを実現する。

次章で続きをやろう。

この記事を引用する / Cite
Zhang, W. (2026). 第 13 章:オーケストレーション基礎. In AI Agent アーキテクチャ:単体からエンタープライズ級マルチエージェントへ. https://waylandz.com/ai-agent-book-ja/第13章-オーケストレーション基礎
@incollection{zhang2026aiagent_ja_第13章_オーケストレーション基礎,
  author = {Zhang, Wayland},
  title = {第 13 章:オーケストレーション基礎},
  booktitle = {AI Agent アーキテクチャ:単体からエンタープライズ級マルチエージェントへ},
  year = {2026},
  url = {https://waylandz.com/ai-agent-book-ja/第13章-オーケストレーション基礎}
}