マルチエージェント編成って、みんなが勝手に動くことじゃない。オーケストラみたいに、指揮者がいて、分担があって、息が合ってる状態のことさ。ただし、指揮者がいくら優秀でも、演奏者がダメなら意味ないけどね。
5分で要点を掴む
- 単体エージェントの3つの限界:直列処理で遅い、専門性が浅い、単一障害点
- 編成の3要素:タスク分解、エージェント割り当て、結果統合
- 自動 vs 設定:単純なタスクは自動マッチング、複雑なタスクは明示的に設定
- 協調コストを忘れるな:タスクが単純なら、単体エージェントの方が速い
- 編成はアーキテクチャの決断。並列のメリットと協調オーバーヘッドのトレードオフだ
10分コース:13.1-13.3 → 13.5 → Shannon Lab
13.1 なぜ単体エージェントじゃダメなのか?
この章で解決したいのは1つ。単体エージェントで効率よく片付かないとき、複数のエージェントをどう協力させるか?
ちょっと想像してみて。小さなリサーチプロジェクトを任されたとする。3社の競合(Tesla、BYD、Rivian)のEV戦略を分析してほしいと。一人でやるなら、どうする?
直列でやるよね。今日はTesla、明日はBYD、明後日はRivian。3日後にやっと情報が揃って、比較分析を書き始める。
でも、3人のアシスタントがいたら?同時にやらせるでしょ。AliceはTesla、BobはBYD、CarolはRivian。1日後には3つのレポートが揃って、あとは自分で統合するだけ。
効率3倍。
単体エージェントは一人で戦うようなもの。タスクは完了できるけど、効率は悪いし、深掘りも難しい。マルチエージェント編成は、チームを組んで分業すること。
ただ、チームを組むのは「人を増やす」だけじゃない。タスクの割り当て、進捗の調整、結果の統合、衝突の処理。全部必要になる。Orchestrator(編成器)がそれをやるんだ。
単体エージェントの3つの限界
まず結論から言うと、単体エージェントには3つの限界がある。
限界その1:直列実行、遅すぎる
3社の検索は完全に独立してる。依存関係なし。でも単体エージェントは1つずつしかできない:
タイムライン(単体エージェント):
0秒 ──► Tesla検索 ──► 20秒
20秒 ──► BYD検索 ──► 40秒
40秒 ──► Rivian検索 ──► 60秒
合計:60秒
並列だとどうなる?
タイムライン(マルチエージェント):
0秒 ──┬► Agent A: Tesla検索 ──► 20秒
├► Agent B: BYD検索 ──► 20秒
└► Agent C: Rivian検索 ──► 20秒
合計:20秒
40秒の短縮。タスクが増えるほど差は開く。
限界その2:ジェネラリストに専門家の仕事、深さが足りない
「AIスタートアップのビジネスプランを作って」というタスク。何が必要?
- 市場分析:業界規模、成長トレンド、競合状況
- 技術アーキテクチャ:技術選定、コスト見積もり、実現可能性評価
- 財務予測:収益モデル、コスト構造、損益分析
- マーケティング戦略:ターゲットユーザー、獲得チャネル、ブランドポジショニング
「ジェネラリスト」のエージェント1つで4つ全部?まあ、どれもちょっとは分かるだろうけど、どれも深くない。
ベターな方法は、4人の専門家エージェントで分担すること。
限界その3:単一障害点、冗長性なし
1つのエージェントがダウンしたら(ネットワークタイムアウト、LLMエラー、ツール呼び出し失敗)、タスク全体が終わる。
マルチエージェントシステムならフォールトトレラントにできる。1つ落ちても他は続行。重要なタスクにはバックアップを用意できる。
マルチ vs 単体
| 能力 | 単体エージェント | マルチエージェント |
|---|---|---|
| 並列能力 | 直列実行 | 並行実行 |
| 専門性 | ジェネラリスト、浅く広く | 専門家分業、それぞれの強み |
| 耐障害性 | 単一障害点 | 冗長性あり |
| コスト制御 | 統一モデル | タスク別にモデル選択(単純なら安いモデル) |
注意:マルチエージェントは万能じゃない。複数エージェントの協調にはオーバーヘッドがある。通信、同期、結果統合。タスクが単純なら、単体エージェントの方が速いこともある。「天気を調べる」だけで3つのエージェントを使う人を見たことあるけど、完全に無駄。かえって遅くなる。
13.2 編成器:マルチエージェントの指揮者
マルチエージェントシステムには「指揮者」が必要。それがOrchestrator(編成器)。
自分では作業しないけど、こういうことを決める:
- タスクをどう分解するか
- 誰が何をやるか
- どの順序で実行するか
- 結果をどう統合するか
4つの責務
例え話をしよう:編成器はレストランの料理長みたいなもの。
お客さんが「ステーキセットを」と言う。料理長は一人で全部作らない。こうする:
- 分解:ステーキ、付け合わせ、ソース、デザート
- 分発:ステーキはグリル担当、付け合わせは冷菜担当、ソースはソーシエ
- 協調:ステーキができたらソースをかける、付け合わせとステーキは同時に出す
- 統合:盛り付け、温度と見た目をチェック
料理長は全部できる必要はない。でも、誰が何が得意か、どの順番が合理的か、どう一皿にまとめるか、それは知ってなきゃいけない。
実行フロー
13.3 ルーティング決定:どの戦略を使う?
全てのタスクにマルチエージェントが必要なわけじゃない。編成器の最初の判断は:このタスクにどのパスを使うか?
Shannon のルーティングロジック
Shannon の OrchestratorWorkflow はこう判断する:
// 単純タスクかどうかの判定
simpleByShape := len(decomp.Subtasks) == 0 ||
(len(decomp.Subtasks) == 1 && !needsTools)
isSimple := decomp.ComplexityScore < simpleThreshold && simpleByShape
// 依存関係のチェック
hasDeps := false
for _, st := range decomp.Subtasks {
if len(st.Dependencies) > 0 || len(st.Consumes) > 0 {
hasDeps = true
break
}
}
switch {
case isSimple:
// 単純タスク → 単体エージェントで直接実行
return SimpleTaskWorkflow(input)
case len(decomp.Subtasks) > 5 || hasDeps:
// 複雑タスクまたは依存あり → Supervisorモード
return SupervisorWorkflow(input)
default:
// 標準タスク → DAGワークフロー
return DAGWorkflow(input)
}
実装参考 (Shannon): go/orchestrator/internal/workflows/orchestrator_router.go - OrchestratorWorkflow 関数
決定木
タスク到着
│
▼
複雑度 < 0.3 かつ サブタスク1つ かつ ツール不要? ──はい──► SimpleTaskWorkflow
│ (単体エージェントで直接実行)
いいえ
│
▼
サブタスク > 5 または 依存あり? ──はい──► SupervisorWorkflow
│ (複雑なマルチエージェント協調)
いいえ
│
▼
DAGWorkflow(デフォルト)
(標準的なマルチエージェント並列/直列)
3つの戦略比較
| 戦略 | 適用シーン | 特徴 |
|---|---|---|
| SimpleTask | 単純なQ&A、単ステップタスク | 最軽量、単体エージェント |
| DAGWorkflow | 2-5個のサブタスク、単純な依存あり | 並列/直列/ハイブリッド実行 |
| Supervisor | 6個以上のサブタスク、複雑な依存、動的協調が必要 | チーム管理、メールボックス通信 |
この3つの戦略は後の章で詳しくやる。ここでは覚えておいて:編成器はタスクの複雑度に応じて自動的に戦略を選ぶ。
13.4 3つの実行モード
どの戦略を選んでも、最終的にはエージェントを実行する。実行方式は3つある:
モード1:並列実行(Parallel)
適用シーン:サブタスクが互いに独立、依存関係なし。
コアはセマフォ制御。同時実行するエージェント数を制限して、リソース枯渇を防ぐ。
type ParallelConfig struct {
MaxConcurrency int // 最大同時実行数、デフォルト5
}
func ExecuteParallel(ctx workflow.Context, tasks []ParallelTask, config ParallelConfig) {
// セマフォで同時実行を制御
semaphore := workflow.NewSemaphore(ctx, int64(config.MaxConcurrency))
for i, task := range tasks {
workflow.Go(ctx, func(ctx workflow.Context) {
// セマフォ取得(同時実行数を超えるとブロック)
semaphore.Acquire(ctx, 1)
defer semaphore.Release(1)
// タスク実行
executeTask(task)
})
}
}
実装参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/parallel.go - ExecuteParallel 関数
なぜ同時実行数を制限するのか?
検索タスクが10個あって、MaxConcurrency = 3 だとする:
t0: [Task 1] [Task 2] [Task 3] ← 3つ同時スタート
t1: [1 完了] [Task 4 開始] ← 1が完了、4がすぐ補充
t2: [2 完了] [Task 5 開始] ← 2が完了、5が補充
...
制限しないと、10個のエージェントが同時にLLM APIを叩いて、レート制限に引っかかる可能性が高い。かえって遅くなる。
モード2:直列実行(Sequential)
適用シーン:タスクに暗黙の依存があり、後のタスクが前のタスクの結果を必要とする。
type SequentialConfig struct {
PassPreviousResults bool // 前の結果を次に渡すか
}
func ExecuteSequential(ctx workflow.Context, tasks []Task, config SequentialConfig) {
var results []Result
for i, task := range tasks {
// 前の結果をコンテキストに注入
if config.PassPreviousResults && len(results) > 0 {
task.Context["previous_results"] = results
}
result := executeTask(task)
results = append(results, result)
}
}
ポイントは結果の引き継ぎ。例えば:
Task 1: "Teslaの株価を取得"
→ Response: "$250"
↓
Task 2: "去年からの上昇率を計算"
Context: {
previous_results: [
{ response: "$250", numeric_value: 250 }
]
}
→ 250を使って直接計算できる
モード3:ハイブリッド実行(Hybrid/DAG)
適用シーン:一部のタスクは並列可能、一部に依存関係がある。
コアは依存待ち。タスクは全ての依存タスクが完了してから開始できる。
func waitForDependencies(
ctx workflow.Context,
dependencies []string,
completedTasks map[string]bool,
timeout time.Duration,
) bool {
startTime := workflow.Now(ctx)
deadline := startTime.Add(timeout)
for workflow.Now(ctx).Before(deadline) {
// 全依存が完了したかチェック
allDone := true
for _, depID := range dependencies {
if !completedTasks[depID] {
allDone = false
break
}
}
if allDone {
return true
}
// 30秒待ってから再チェック
workflow.AwaitWithTimeout(ctx, 30*time.Second, func() bool {
// 条件チェック
for _, depID := range dependencies {
if !completedTasks[depID] {
return false
}
}
return true
})
}
return false // タイムアウト
}
実装参考 (Shannon): go/orchestrator/internal/workflows/patterns/execution/hybrid.go - waitForDependencies 関数
13.5 結果統合
複数のエージェントが完了した。結果をどうまとめる?
問題
エージェントの生出力は大体こうなってる:
- 冗長:違うエージェントが似た情報を出すことがある
- フォーマットがバラバラ:各エージェントに独自の出力スタイルがある
- 品質にムラ:成功したもの、失敗したもの、中途半端なもの
ユーザーが期待してるのは:統一された、完全で、高品質な回答。
前処理:重複除去とフィルタリング
func preprocessResults(results []AgentResult) []AgentResult {
// 1. 完全一致の重複除去(Hash)
seen := make(map[string]bool)
exact := []AgentResult{}
for _, r := range results {
hash := computeHash(r.Response)
if !seen[hash] {
seen[hash] = true
exact = append(exact, r)
}
}
// 2. 類似重複の除去(Jaccard > 0.85)
similar := []AgentResult{}
for _, r := range exact {
isDuplicate := false
for _, s := range similar {
if jaccardSimilarity(r.Response, s.Response) > 0.85 {
isDuplicate = true
break
}
}
if !isDuplicate {
similar = append(similar, r)
}
}
// 3. 品質フィルタリング
filtered := []AgentResult{}
noInfoPatterns := []string{
"unable to retrieve",
"failed to fetch",
"no information available",
"アクセスできません",
"見つかりません",
}
for _, r := range similar {
if r.Success && !containsAny(r.Response, noInfoPatterns) {
filtered = append(filtered, r)
}
}
return filtered
}
統合方式
シンプル統合:直接結合
結果がすでに整理されている場合に適している:
func simpleSynthesis(results []AgentResult) string {
var parts []string
for _, r := range results {
parts = append(parts, r.Response)
}
return strings.Join(parts, "\n\n")
}
LLM統合:インテリジェントな整合
統一された視点、矛盾の解消、インサイト生成が必要な場合に適している:
func llmSynthesis(query string, results []AgentResult) string {
prompt := fmt.Sprintf(`以下のリサーチ結果を統合して、質問に回答してください:%s
要件:
1. 重複情報を除去
2. 矛盾があれば解決
3. 重要なインサイトを強調
4. 統一フォーマットで提示
`, query)
for i, r := range results {
prompt += fmt.Sprintf("=== ソース %d ===\n%s\n\n", i+1, r.Response)
}
return callLLM(prompt)
}
13.6 Token予算配分
マルチエージェントシナリオでは、コスト制御がもっと重要になる。
なぜ?
単体エージェントで1000トークン使うところ、マルチエージェントだと5000トークンになることも。制御しないと、1つの複雑なタスクで1日分の予算を使い切る可能性がある。
予算配分戦略
シンプル戦略:均等配分
func allocateBudgetSimple(totalBudget int, numAgents int) int {
return totalBudget / numAgents
}
// 例:総予算 10000、5エージェント → 各2000
アドバンス戦略:複雑度ベース配分
func allocateBudgetByComplexity(totalBudget int, subtasks []Subtask) map[string]int {
budgets := make(map[string]int)
// 総複雑度を計算
totalComplexity := 0.0
for _, st := range subtasks {
totalComplexity += st.Complexity
}
// 比率で配分
for _, st := range subtasks {
budgets[st.ID] = int(float64(totalBudget) * st.Complexity / totalComplexity)
}
return budgets
}
// 例:総予算 10000
// Task A (複雑度 0.5) → 5000
// Task B (複雑度 0.3) → 3000
// Task C (複雑度 0.2) → 2000
Shannon の実装:
// ルーターから予算を引き継ぐ
n := len(decomp.Subtasks)
if n == 0 {
n = 1
}
agentMax := res.RemainingTaskBudget / n
// 環境変数やリクエストコンテキストで上限を設定可能
if v := os.Getenv("TOKEN_BUDGET_PER_AGENT"); v != "" {
if cap, err := strconv.Atoi(v); err == nil && cap > 0 && cap < agentMax {
agentMax = cap
}
}
input.Context["budget_agent_max"] = agentMax
13.7 制御シグナル
編成プロセス中に、ユーザーがこういうことをしたくなるかもしれない:一時停止、再開、キャンセル。
Temporal のシグナル機構
Shannon は Temporal の Signal 機構で制御を実装している:
// 制御シグナルハンドラーを設定
controlHandler := &ControlSignalHandler{
WorkflowID: workflowID,
AgentID: "orchestrator",
}
controlHandler.Setup(ctx)
// 重要なポイントでシグナルをチェック
checkpoints := []string{
"pre_routing", // ルーティング決定前
"post_decomposition", // タスク分解後
"pre_dag_workflow", // DAG開始前
}
for _, checkpoint := range checkpoints {
if err := controlHandler.CheckPausePoint(ctx, checkpoint); err != nil {
return TaskResult{Success: false, ErrorMessage: err.Error()}, err
}
}
子ワークフローの登録
編成器が子ワークフローを起動するとき、シグナルを伝播させるために登録が必要:
// 子ワークフローを起動
childFuture := workflow.ExecuteChildWorkflow(ctx, DAGWorkflow, input)
// 子ワークフローIDを取得
var childExec workflow.Execution
childFuture.GetChildWorkflowExecution().Get(ctx, &childExec)
// 登録(これで一時停止/キャンセルシグナルが子ワークフローに伝播する)
controlHandler.RegisterChildWorkflow(childExec.ID)
// 完了後に登録解除
defer controlHandler.UnregisterChildWorkflow(childExec.ID)
13.8 完全な例
これまでの内容を繋げて、完全なマルチエージェントリサーチタスクを見てみよう:
func CompanyResearchWorkflow(ctx workflow.Context, query string) (string, error) {
companies := []string{"Tesla", "BYD", "Rivian"}
// 1. 並列タスクを構築
tasks := make([]ParallelTask, len(companies))
for i, company := range companies {
tasks[i] = ParallelTask{
ID: fmt.Sprintf("research-%s", strings.ToLower(company)),
Description: fmt.Sprintf("Research %s's 2024 EV strategy", company),
SuggestedTools: []string{"web_search"},
Role: "researcher",
}
}
// 2. 並列実行
config := ParallelConfig{
MaxConcurrency: 3,
EmitEvents: true,
}
result, err := ExecuteParallel(ctx, tasks, sessionID, history, config, budgetPerAgent, userID, modelTier)
if err != nil {
return "", err
}
// 3. 結果の前処理
processed := preprocessResults(result.Results)
// 4. LLM統合
synthesis := llmSynthesis(query, processed)
return synthesis, nil
}
実行タイムライン:
0秒 ┌─ 編成器起動
├─ タスク分解: 3つのリサーチタスク + 1つの統合タスク
└─ ルーティング決定: DAGWorkflow
1秒 ├─ 3つのリサーチエージェントを並列起動
│ ├─ Agent A (Tesla): 検索中...
│ ├─ Agent B (BYD): 検索中...
│ └─ Agent C (Rivian): 検索中...
15秒 ├─ Agent B 完了
20秒 ├─ Agent C 完了
25秒 ├─ Agent A 完了 (Teslaの情報が一番多い)
26秒 ├─ 結果統合開始
│ ├─ 重複除去: 2件の重複情報を削除
│ ├─ フィルタ: 1件の失敗結果を削除
│ └─ LLM統合分析
45秒 └─ 最終レポート出力
合計: 約45秒 (直列だと約75秒)
13.9 よくある落とし穴
落とし穴1:過度な並列化
// 危険:同時実行100、APIがレート制限に引っかかる
config := ParallelConfig{MaxConcurrency: 100}
// 適切:APIの制限に合わせて設定
config := ParallelConfig{MaxConcurrency: 5}
同時実行数を50に設定した人を見たことがある。結果、LLM APIから429 Too Many Requestsの嵐。直列実行の方がマシだった。
落とし穴2:失敗タスクの無視
// 問題:成功したものだけ処理、失敗は無視
for _, r := range results {
if r.Success {
process(r)
}
}
// 改善:成功率を監視
successRate := float64(successCount) / float64(total)
if successRate < 0.7 {
logger.Warn("Low success rate", "rate", successRate)
// リトライやアラートが必要かも
}
落とし穴3:結果統合で情報ロス
単純な結合だと:
- 情報の重複(2つのエージェントが両方「Teslaの時価総額は8000億ドル」と言う)
- 情報の矛盾(1つは成長15%、もう1つは成長12%と言う)
- インサイトの欠如(羅列するだけで、比較分析がない)
LLM統合するとき、プロンプトで明確に要求する:
synthesisPrompt := `以下のリサーチ結果を統合してください:
要件:
1. 重複を除去
2. 矛盾があれば明記
3. 比較分析テーブルを生成
4. 重要なインサイトをまとめる(3-5点)
...
`
落とし穴4:予算配分の不均衡
// 問題:単純タスクと複雑タスクに同じ予算
budgetPerAgent := totalBudget / numAgents
// 改善:タスクの推定トークン数で配分
for _, st := range subtasks {
budgets[st.ID] = int(float64(totalBudget) * float64(st.EstimatedTokens) / float64(totalEstimated))
}
13.10 他のフレームワークでの実装
編成はマルチエージェントの核心的な問題。各フレームワークに独自のアプローチがある:
| フレームワーク | 編成方式 | 特徴 |
|---|---|---|
| LangGraph | グラフ定義 + ノード実行 | 柔軟、手動でグラフを定義する必要あり |
| AutoGen | GroupChat + Manager | 会話駆動、自動で発言者を選択 |
| CrewAI | Crew + Process | ロール定義が明確、順序/階層をサポート |
| OpenAI Swarm | handoff() | 軽量、エージェント間で直接ハンドオフ |
LangGraph の例:
from langgraph.graph import StateGraph
# 状態を定義
class ResearchState(TypedDict):
query: str
tesla_data: str
byd_data: str
synthesis: str
# グラフを定義
graph = StateGraph(ResearchState)
graph.add_node("research_tesla", research_tesla_node)
graph.add_node("research_byd", research_byd_node)
graph.add_node("synthesize", synthesize_node)
# エッジを定義(依存関係)
graph.add_edge(START, "research_tesla")
graph.add_edge(START, "research_byd")
graph.add_edge("research_tesla", "synthesize")
graph.add_edge("research_byd", "synthesize")
この章はここまで
核心は一言で言える:Orchestratorはマルチエージェントの指揮者。タスクを分解し、実行を分発し、依存を協調し、結果を統合する。
まとめ
- 単体エージェントの3つの限界:直列で遅い、ジェネラリストで浅い、単一障害点
- Orchestratorの4つの責務:Decompose → Dispatch → Coordinate → Synthesize
- ルーティング決定:単純タスクはSimpleTask、複雑タスクはDAGかSupervisor
- 3つの実行モード:並列(独立タスク)、直列(チェーン依存)、ハイブリッド(DAG)
- 結果統合:重複除去 → フィルタ → LLM整合
Shannon Lab(10分でハンズオン)
このセクションで、この章のコンセプトをShannonのソースコードにマッピングする。
必読(1ファイル)
orchestrator_router.go:OrchestratorWorkflow関数のルーティングswitch文を探して、「単純タスク」や「Supervisorが必要」の判定方法、子ワークフローへの委譲を理解する
選択深掘り(2つ、興味に応じて)
execution/parallel.go:セマフォ制御の実装方法(workflow.NewSemaphore)、なぜfuturesChan + Selectorで結果を収集するかを理解execution/hybrid.go:waitForDependenciesのインクリメンタルタイムアウトチェック、なぜworkflow.AwaitWithTimeoutを使って無限待ちを避けるかを理解
練習問題
練習1:ルーティング決定の分析
以下のタスクがどのパスを通るか分析してみて:
- 「今日の東京の天気は?」
- 「iPhoneとAndroidの市場シェアを比較して」
- 「ECシステムの完全なアーキテクチャを設計して。フロントエンド、バックエンド、データベース、キャッシュ、メッセージキューを含めて」
各タスクについて:
- 予想される複雑度スコアの範囲
- どのワークフローを通るか(SimpleTask / DAG / Supervisor)
- なぜそうなるか
練習2:同時実行数の設定
LLM APIの制限が秒間10リクエスト、1タスクで3回のLLM呼び出しが必要、平均所要時間5秒だとする。
質問:
- 20個のサブタスクがある場合、MaxConcurrencyはいくつが適切?
- 高すぎるとどうなる?
- 低すぎるとどうなる?
練習3(応用):統合プロンプトの設計
「複数企業の決算比較分析」タスク用のLLM統合プロンプトを設計してみて。
含めるべき内容:
- 情報重複の処理方法
- データ矛盾の処理方法
- 出力フォーマット要件(テーブル + インサイト)
- 引用表記の要件
もっと深く学ぶなら
- Temporal Workflows - ワークフロー編成の基盤インフラを理解
- LangGraph Multi-Agent - Pythonエコシステムのグラフ編成アプローチ
- AutoGen GroupChat - Microsoftの会話型マルチエージェントフレームワーク
次章の予告
編成器は「誰がやるか」を決めた。でも「どうやるか」はまだ解決してない。
タスク間に複雑な依存関係があるとき(AがBを待ち、BがCを待ち、CはDと並列可能)、単純な直列や並列じゃ対処できない。
次章はDAGワークフロー:有向非巡回グラフでタスク依存をモデリングし、インテリジェントなスケジューリングを実現する。
次章で続きをやろう。