第 21 課:プロジェクト実践
目標:最初の20レッスンからの知識を統合して、ゼロから実行可能なMulti-Agent Trading Systemプロトタイプを構築する。
プロジェクト概要
このレッスンでは、エンドツーエンドのMulti-Agent Trading Systemの構築を案内します。これはおもちゃのプロジェクトではなく、実際の取引システムに拡張できるフレームワークです。
システム目標
| 次元 | 目標 | 非目標 |
|---|---|---|
| 機能 | 市場Regimeを特定、シグナルを生成、リスクを制御、取引を実行 | High-frequency trading、複雑なデリバティブ |
| 市場 | US株式日次頻度戦略 | 分未満の戦略 |
| 規模 | 10-50銘柄のポートフォリオ | 数千からの銘柄選択 |
| 運用 | ローカル開発環境 + Paper Trading | 直接ライブ取引 |
最終成果物
multi-agent-trading-system/
├── agents/
│ ├── regime_agent.py # 市場Regime検出
│ ├── signal_agent.py # シグナル生成
│ ├── risk_agent.py # リスクコントロール
│ ├── execution_agent.py # 注文実行
│ └── monitor_agent.py # システム監視
├── core/
│ ├── data_manager.py # データ管理
│ ├── portfolio.py # ポートフォリオ管理
│ └── order.py # 注文モデル
├── strategies/
│ ├── momentum.py # Momentum戦略
│ └── mean_revert.py # Mean reversion戦略
├── config/
│ └── settings.yaml # 設定ファイル
├── tests/
│ └── ... # テストケース
└── main.py # エントリーポイント
21.1 システムアーキテクチャ
全体アーキテクチャ図
データフロー
Market Data ---------> Data Manager ---------> All Agents
|
v
Positions/Balance <---- Portfolio <---------- Execution Agent
|
v
Logs/Metrics --------> Monitor Agent --------> Alerts
Modular Monolith First
上記のアーキテクチャ図は、クリーンな境界を持つ個別のAgentを示しています。よくある間違いは、各Agentを独自のプロセス、メッセージキュー、データベースを持つ別々のマイクロサービスとしてすぐにデプロイすることです。これをしないでください。
Modular Monolithから始める:モジュール間でクリーンなモジュール境界と共有Protobuf契約を持つ単一プロセス。
上記のAgentは論理的な分離であり、デプロイメント境界ではありません。最初のバージョンでは、regime_agent.py、signal_agent.py、risk_agent.py、execution_agent.pyはすべて同じプロセスで実行され、明確に定義されたデータ契約(dataclassesまたはProtobufメッセージ)を使用して関数呼び出しを通じて通信します。
次の場合にのみ別々のサービスに抽出します:
- スケーリングが独立したデプロイメントを必要とする(例:シグナル研究はGPUが必要だが実行は不要)
- 異なるコンポーネントが根本的に異なるレイテンシ層を持つ(例:Rustでのサブミリ秒のリスクチェック vs. Pythonでの秒単位の研究)
- チームサイズが独立したリリースサイクルを必要とする
重要な洞察:モジュール境界は設計決定です。サービス境界はインフラストラクチャ決定です。 最初に設計を正しくします。早すぎる分割は、ネットワークレイテンシ、分散デバッグの複雑さ、運用オーバーヘッドを追加します - いずれも戦略が機能するかどうかを検証するのに役立ちません。
21.2 ステップバイステップ実装
ステップ1:Data Manager
目標:市場データを取得して管理する。
Data Manager責任:
- 履歴相場を取得(バックテスト用)
- リアルタイム相場を取得(ライブ取引用)
- データクリーニングと標準化
- テクニカル指標を計算
紙設計:Data Interface
| メソッド | 入力 | 出力 | 目的 |
|---|---|---|---|
get_history | symbol, days | DataFrame | 履歴データを取得 |
get_latest | symbols | Dict | 最新価格を取得 |
calculate_indicators | df | df with indicators | テクニカル指標を計算 |
validate | df | bool, errors | データ品質チェック |
コードフレームワーク(エンジニアリファレンス)
import yfinance as yf
import pandas as pd
import numpy as np
from typing import List, Dict, Optional
class DataManager:
"""Data Manager"""
def __init__(self, cache_dir: str = "./data_cache"):
self.cache_dir = cache_dir
def get_history(
self,
symbol: str,
days: int = 252,
end_date: Optional[str] = None
) -> pd.DataFrame:
"""履歴市場データを取得"""
ticker = yf.Ticker(symbol)
df = ticker.history(period=f"{days}d")
# 列名を標準化
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume"
})
return df[["open", "high", "low", "close", "volume"]]
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""テクニカル指標を計算"""
# 移動平均
df["sma_20"] = df["close"].rolling(20).mean()
df["sma_50"] = df["close"].rolling(50).mean()
# ボラティリティ
df["volatility"] = df["close"].pct_change().rolling(20).std() * np.sqrt(252)
# RSI
delta = df["close"].diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
df["rsi"] = 100 - (100 / (1 + gain / loss))
# ATR
high_low = df["high"] - df["low"]
high_close = (df["high"] - df["close"].shift()).abs()
low_close = (df["low"] - df["close"].shift()).abs()
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
df["atr"] = tr.rolling(14).mean()
return df
def validate(self, df: pd.DataFrame) -> tuple:
"""データ品質チェック"""
errors = []
if df.empty:
errors.append("DataFrame is empty")
if df["close"].isnull().any():
errors.append(f"Missing close prices: {df['close'].isnull().sum()}")
if (df["close"] <= 0).any():
errors.append("Invalid prices (<=0)")
return len(errors) == 0, errorsステップ2:Regime Agent
目標:現在の市場Regimeを特定する。
Regime Agent出力:
- regime: "trending" | "mean_reverting" | "crisis" | "uncertain"
- confidence: 0.0 - 1.0
- regime_weights: {"trending": 0.6, "mean_reverting": 0.3, "crisis": 0.1}
紙設計:Regime検出ルール
| 条件 | 状態 | 重み配分 |
|---|---|---|
| ADX >25 and Vol < 25% | Trending | Trend 80%、Mean reversion 15%、Defensive 5% |
| ADX < 20 and Vol < 20% | Range-bound | Trend 20%、Mean reversion 70%、Defensive 10% |
| Vol >30% and Corr >0.7 | Crisis | Trend 10%、Mean reversion 10%、Defensive 80% |
| その他 | Uncertain | 各33% |
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from typing import Dict
@dataclass
class RegimeState:
regime: str
confidence: float
weights: Dict[str, float]
class RegimeAgent:
"""市場Regime検出Agent"""
def __init__(self, config: dict = None):
self.config = config or {}
self.adx_threshold = self.config.get("adx_threshold", 25)
self.vol_crisis = self.config.get("vol_crisis", 0.30)
def detect(self, market_data: dict) -> RegimeState:
"""
市場Regimeを検出
market_data: {
"adx": float,
"volatility": float,
"correlation": float,
"trend_strength": float
}
"""
adx = market_data.get("adx", 20)
vol = market_data.get("volatility", 0.15)
corr = market_data.get("correlation", 0.5)
# Crisis検出が優先
if vol > self.vol_crisis and corr > 0.7:
return RegimeState(
regime="crisis",
confidence=0.8,
weights={"momentum": 0.1, "mean_revert": 0.1, "defensive": 0.8}
)
# Trend検出
if adx > self.adx_threshold and vol < 0.25:
return RegimeState(
regime="trending",
confidence=0.7,
weights={"momentum": 0.7, "mean_revert": 0.2, "defensive": 0.1}
)
# Range-bound検出
if adx < 20 and vol < 0.20:
return RegimeState(
regime="mean_reverting",
confidence=0.6,
weights={"momentum": 0.2, "mean_revert": 0.7, "defensive": 0.1}
)
# Uncertain
return RegimeState(
regime="uncertain",
confidence=0.3,
weights={"momentum": 0.33, "mean_revert": 0.33, "defensive": 0.34}
)ステップ3:Signal Agent
目標:戦略に基づいて取引シグナルを生成する。
Signal Agent出力:
- signals: [
{"symbol": "AAPL", "direction": "long", "strength": 0.7, "source": "momentum"},
{"symbol": "MSFT", "direction": "short", "strength": 0.5, "source": "mean_revert"}
]
紙設計:シグナル集約ロジック
| シナリオ | 処理 |
|---|---|
| 単一戦略シグナル | 直接出力、strength = 戦略シグナル x Regime重み |
| 複数戦略が一致 | シグナル強度を強化、加重平均を取る |
| 複数戦略が衝突 | より高い重みの戦略を取る、または取引しない |
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Signal:
symbol: str
direction: str # "long" | "short" | "close"
strength: float # 0.0 - 1.0
source: str
timestamp: str
class SignalAgent:
"""シグナル生成Agent"""
def __init__(self, strategies: list):
self.strategies = strategies
def generate_signals(
self,
market_data: dict,
regime_weights: dict
) -> List[Signal]:
"""すべての戦略からシグナルを集約"""
all_signals = {}
for strategy in self.strategies:
strategy_name = strategy.name
weight = regime_weights.get(strategy_name, 0.33)
raw_signals = strategy.generate(market_data)
for sig in raw_signals:
key = (sig.symbol, sig.direction)
weighted_strength = sig.strength * weight
if key in all_signals:
# 同じ方向のシグナルをスタック
all_signals[key].strength += weighted_strength
else:
all_signals[key] = Signal(
symbol=sig.symbol,
direction=sig.direction,
strength=weighted_strength,
source=strategy_name,
timestamp=sig.timestamp
)
# 弱いシグナルをフィルター
return [s for s in all_signals.values() if s.strength > 0.3]ステップ4:Risk Agent
目標:シグナルをレビュー、リスクをコントロールする。
Risk Agent決定:
- APPROVE: 合格、実行可能
- REDUCE: 合格だがポジションサイズを削減
- REJECT: 拒否
紙設計:レビュールール
| ルール | チェック内容 | トリガーアクション |
|---|---|---|
| 単一取引制限 | ポジション >10% | 10%に削減 |
| シンボル制限 | 同じシンボル >20% | 拒否または削減 |
| 総ポジション制限 | 総エクスポージャー >80% | 拒否 |
| Drawdownコントロール | 現在のDrawdown >10% | すべての新規ポジションを拒否 |
| Circuit breaker | Drawdown >15% | 強制デレバレッジ |
コードフレームワーク(エンジニアリファレンス)
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class Decision(Enum):
APPROVE = "approve"
REDUCE = "reduce"
REJECT = "reject"
@dataclass
class RiskDecision:
decision: Decision
reason: str
adjusted_size: Optional[float] = None
class RiskAgent:
"""Risk Control Agent"""
def __init__(self, config: dict):
self.max_single = config.get("max_single_position", 0.10)
self.max_symbol = config.get("max_symbol_exposure", 0.20)
self.max_total = config.get("max_total_exposure", 0.80)
self.drawdown_stop = config.get("drawdown_stop", 0.10)
self.drawdown_circuit = config.get("drawdown_circuit", 0.15)
def check(
self,
signal: Signal,
proposed_size: float,
portfolio: dict,
current_drawdown: float
) -> RiskDecision:
"""取引シグナルをレビュー"""
# Circuit breakerチェック
if current_drawdown >= self.drawdown_circuit:
return RiskDecision(Decision.REJECT, "Circuit breaker active")
# Drawdownコントロール
if current_drawdown >= self.drawdown_stop:
if signal.direction != "close":
return RiskDecision(Decision.REJECT, "Drawdown limit reached")
# 単一取引制限
if proposed_size > self.max_single:
return RiskDecision(
Decision.REDUCE,
f"Size exceeds single limit",
adjusted_size=self.max_single
)
# シンボル制限
current_exposure = portfolio.get(signal.symbol, 0)
if current_exposure + proposed_size > self.max_symbol:
allowed = self.max_symbol - current_exposure
if allowed <= 0:
return RiskDecision(Decision.REJECT, "Symbol limit reached")
return RiskDecision(
Decision.REDUCE,
"Symbol limit",
adjusted_size=allowed
)
# 総エクスポージャー制限
total_exposure = sum(portfolio.values()) + proposed_size
if total_exposure > self.max_total:
return RiskDecision(Decision.REJECT, "Total exposure limit")
return RiskDecision(Decision.APPROVE, "Passed all checks")ステップ5:Execution Agent
目標:取引注文を実行する。
Execution Agent責任:
- シグナルを注文に変換
- 注文タイプを選択(market/limit)
- 注文ステータスを管理
- 実行結果を記録
コードフレームワーク(エンジニアリファレンス)
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import uuid
class OrderStatus(Enum):
PENDING = "pending"
SUBMITTED = "submitted"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
@dataclass
class Order:
order_id: str
symbol: str
side: str # "buy" | "sell"
quantity: int
order_type: str # "market" | "limit"
limit_price: float = None
status: OrderStatus = OrderStatus.PENDING
filled_price: float = None
filled_time: str = None
class ExecutionAgent:
"""注文実行Agent"""
def __init__(self, broker_client=None, is_paper: bool = True):
self.broker = broker_client
self.is_paper = is_paper
self.orders = {}
def create_order(
self,
symbol: str,
direction: str,
size: float,
portfolio_value: float,
current_price: float
) -> Order:
"""注文を作成"""
# 株数を計算
dollar_amount = portfolio_value * size
quantity = int(dollar_amount / current_price)
if quantity <= 0:
return None
side = "buy" if direction == "long" else "sell"
order = Order(
order_id=str(uuid.uuid4())[:8],
symbol=symbol,
side=side,
quantity=quantity,
order_type="market"
)
self.orders[order.order_id] = order
return order
def submit_order(self, order: Order) -> bool:
"""注文を送信"""
if self.is_paper:
# フィルをシミュレート
order.status = OrderStatus.FILLED
order.filled_time = datetime.now().isoformat()
# Paper tradingは現在価格でのフィルを仮定
return True
# ライブ取引はブローカーAPIを呼び出す
# response = self.broker.submit_order(order)
# ...
return Trueステップ6:Mainループ
目標:すべてのAgentを一緒に接続する。
# Mainループ疑似コード
def main_loop():
# 1. データを取得
market_data = data_manager.get_latest(symbols)
# 2. Regimeを検出
regime_state = regime_agent.detect(market_data)
# 3. シグナルを生成
signals = signal_agent.generate(market_data, regime_state.weights)
# 4. リスクレビュー
for signal in signals:
proposed_size = calculate_position_size(signal)
decision = risk_agent.check(signal, proposed_size, portfolio, drawdown)
if decision.decision == Decision.APPROVE:
size = proposed_size
elif decision.decision == Decision.REDUCE:
size = decision.adjusted_size
else:
continue
# 5. 注文を実行
order = execution_agent.create_order(
signal.symbol, signal.direction, size, portfolio_value, current_price
)
execution_agent.submit_order(order)
# 6. ポートフォリオを更新
portfolio.update()
# 7. ログ
monitor_agent.log_daily_summary()
教育コードから本番システムへ
このレッスンは、アクセシビリティのためにすべてのAgentをPythonで実装しています。本番では、異なるコンポーネントは通常、レイテンシ要件に基づいて異なる言語を使用します:
- Risk Engine -- Rust(不変のハードコードされた制限、ナノ秒レイテンシ)
- Order Management -- Go(高並行性、goroutineモデル)
- Research/ML -- Python(豊富なMLエコシステム)
重要なのは言語の選択ではなく、クリーンなモジュール境界と標準化された通信プロトコル(例:gRPC + Protobuf)です。PythonプロトタイプがAgent間に明確に定義されたインターフェースを持っている場合、パフォーマンスクリティカルなパスをRustやGoに後で移行することは、書き直しではなくリファクタリングです。
21.3 Backtest検証
Backtestingフレームワーク
Backtestingは戦略が機能することを証明するためではなく、戦略の問題を発見するためです。
Backtestチェックリスト(レッスン7 Quality Gate参照):
| チェック項目 | 合格基準 |
|---|---|
| 先見バイアスなし | すべてのデータがT+1で利用可能 |
| 現実的なコストモデリング | スリッページ、手数料を含む |
| Out-of-sample検証 | OOS Sharpe > IS Sharpe x 0.7 |
| パラメータの安定性 | パラメータ+/-20%で戦略が崩壊しない |
| 極端なシナリオテスト | 2008、2020を別々にテスト |
主要Backtestメトリクス
| メトリクス | 目標 | 警告レベル |
|---|---|---|
| 年次リターン | >15% | < 10% |
| Sharpe Ratio | >1.0 | < 0.5 |
| Max Drawdown | < 20% | >30% |
| Win Rate | >50% | < 40% |
| Profit/Loss Ratio | >1.5 | < 1.0 |
| Turnover | < 500%/年 | >1000%/年 |
21.4 PaperからLive Tradingへ
段階的検証
ステージ1:Historical Backtest(2-4週間)
├── 戦略ロジックが正しいことを検証
├── システムが完全に実行できることを検証
└── 目標:Backtestメトリクスが基準を満たす
ステージ2:Paper Trading(2-4週間)
├── 実際の相場、偽の取引を使用
├── リアルタイムデータ処理を検証
└── 目標:実行がエラーフリー
ステージ3:Small Capital Live(4-8週間)
├── 計画資本の5-10%をデプロイ
├── 実際の実行を検証
└── 目標:ライブパフォーマンスがBacktestに近い
ステージ4:Gradual Scale-Up(継続中)
├── 月次評価、段階的に資本を増やす
├── 継続的な監視と調整
└── 目標:安定した収益性
Pre-Liveチェックリスト
[ ] システムチェック
+-- すべてのAgentプロセスが安定して >1週間実行
+-- アラートシステムがテスト済み
+-- ログが完全
+-- 災害復旧プロセスが検証済み
[ ] 戦略チェック
+-- BacktestがQuality Gateを通過
+-- Paper trading >2週間
+-- スリッページ見積もりが合理的
+-- 極端なシナリオの緊急時計画
[ ] 運用チェック
+-- 市場前/後のチェックリストが準備完了
+-- 連絡先情報が更新済み
+-- バックアッププランが準備済み
+-- 資本移転が確認済み
[ ] 精神的準備
+-- リスク許容度が明確
+-- 明確なストップロスラインを設定
+-- Drawdownに直面する準備ができている
21.5 プロジェクト拡張方向
オプションの拡張機能
| 方向 | 説明 | 複雑さ |
|---|---|---|
| Multi-market | A株、HK株、暗号通貨をサポート | 中 |
| Multi-timeframe | 分レベルの戦略をサポート | 高 |
| LLM統合 | ニュース分析、調査レポート解析 | 中 |
| Online Learning | 戦略自動進化 | 高 |
| Webインターフェース | ビジュアル監視 | 中 |
| 分散デプロイメント | マルチサーバー運用 | 高 |
継続的改善サイクル
+------------------------------------------------------------+
| |
| Monitor ------> 問題を発見 ------> 原因を分析 |
| ^ | |
| | v |
| Deploy <------- テスト検証 <------- 改善計画 |
| |
+------------------------------------------------------------+
合格基準
プロジェクト合格チェックリスト
| ステージ | 基準 | 合格標準 |
|---|---|---|
| コード | システムが実行される | python main.py エラーなし |
| すべてのAgentが動作 | ログがAgent出力を表示 | |
| 設定が変更可能 | settings.yamlへの変更が有効 | |
| Backtest | Backtestが実行される | Backtestレポートを生成 |
| メトリクスが正しく計算される | 2-3取引を手動で検証 | |
| 明らかなバグなし | Backtest曲線が合理的 | |
| Paper Trading | リアルタイムデータを取得できる | ログが最新価格を表示 |
| シグナルを生成できる | シグナルリストが空でない | |
| リスクコントロールが機能 | 制限超過注文が拒否される | |
| ドキュメンテーション | READMEがある | 他の人が実行方法を理解できる |
| 設定ドキュメントがある | パラメータの意味が明確 |
自己評価質問
プロジェクトを完了した後、これらの質問に答えてください:
- あなたのシステムはどの市場Regimeで最高/最悪のパフォーマンスを発揮しますか?
- 最大のDrawdownの原因は何でしたか?
- 明日ライブに移行する場合、最も心配することは何ですか?
- システムのどの側面がまだ改善できますか?
レッスン成果物
このレッスンを完了すると、以下が得られます:
- 完全なコードフレームワーク - 実行可能なMulti-Agentシステム
- Backtest検証結果 - 戦略パフォーマンス評価
- Pre-Liveチェックリスト - PaperからLive Tradingへのパス
- 拡張方向ガイド - 将来の改善のためのアイデア
重要なポイント
- Multi-Agentシステム全体のアーキテクチャを理解
- 各Agentの責任とインターフェース設計をマスター
- 最初の20レッスンからの知識を1つのシステムに統合できる
- BacktestからLive Tradingへの検証パスを理解
さらなる読み物
- レッスン01:Quantitative Tradingの完全な景観 - 出発点をレビュー
- レッスン07:Backtest System Pitfalls - Backtest Quality Gate
- レッスン20:Production Operations - 運用ベストプラクティス
次回レッスンプレビュー
レッスン22:SummaryとAdvanced Directions
このコースはまもなく終了します。最後のレッスンでは、全体の学習の旅をレビューし、コアインサイトをまとめ、高度な方向を展望します - 個人プロジェクトからキャリア開発、技術的深さから業界の視点まで。