背景知識:モデルドリフトと再訓練戦略

金融市場は非定常的。今日機能するモデルが明日には失敗する可能性がある。ドリフト検出とタイムリーな再訓練は本番環境システムの必須機能である。


1. モデルドリフトとは何か?

モデルドリフトとは、デプロイ後にモデルの予測パフォーマンスが時間とともに徐々に低下する現象を指す。

1.1 ドリフトの2つのタイプ

タイプ定義金融例
Data Drift入力特徴量の分布が変化ボラティリティが15%から40%に上昇(COVID危機)
Concept Drift特徴量とターゲットの関係が変化Momentumファクターが無効化(レジームスイッチ)

1.2 金融市場におけるドリフトの根本原因

なぜ金融モデルは必然的にドリフトするのか?

1. 市場参加者構造の変化
   - 個人投資家の流入  Momentum効果が強化
   - クオンツファンドの増加  Alphaの減衰

2. マクロ経済環境の変化
   - 金利サイクルの変化(QE  引き締め)
   - 経済サイクルの移行(拡大  不況)

3. 規制政策の変化
   - 空売り規制  価格発見メカニズムの変化
   - HFT規制  市場マイクロ構造の変化

4. 技術と情報の変化
   - 新しいデータソースの出現  古いファクターがフロントラン
   - AIの普及  戦略の均質化

2. ドリフト検出方法

2.1 パフォーマンスモニタリング

最も直接的なアプローチ:ローリングウィンドウで戦略パフォーマンスを監視。

import numpy as np

class PerformanceMonitor:
    """パフォーマンスドリフトモニター"""

    def __init__(self, window: int = 30, sharpe_threshold: float = 0.5):
        self.window = window  # ローリングウィンドウ(日数)
        self.sharpe_threshold = sharpe_threshold
        self.returns = []

    def update(self, daily_return: float) -> dict:
        """更新してドリフトをチェック"""
        self.returns.append(daily_return)

        if len(self.returns) < self.window:
            return {'status': 'warming_up'}

        # ローリングSharpeを計算
        recent = self.returns[-self.window:]
        rolling_sharpe = np.mean(recent) / np.std(recent) * np.sqrt(252)

        # ドリフトを検出
        is_drifting = rolling_sharpe < self.sharpe_threshold

        return {
            'rolling_sharpe': rolling_sharpe,
            'is_drifting': is_drifting,
            'alert': 'DRIFT_DETECTED' if is_drifting else 'OK'
        }

閾値設定の推奨

メトリクス警告閾値クリティカル閾値トリガーアクション
Rolling Sharpe< 0.5< 0再訓練をトリガー
Rolling Win Rate< 45%< 40%シグナル品質をチェック
Rolling Return< -5%< -10%ポジションサイズを削減

2.2 統計的検定方法

Kolmogorov-Smirnov検定(K-S検定)

特徴量分布が著しく変化したかを検出。

import numpy as np
from scipy.stats import ks_2samp

def detect_data_drift(
    training_data: np.ndarray,
    recent_data: np.ndarray,
    significance: float = 0.05
) -> dict:
    """
    データドリフト検出のためのK-S検定

    原理:2つのサンプルが同じ分布から来たかを比較
    H0:2つのサンプルは同じ分布から来る
    p < significanceなら、H0を棄却しドリフトが発生したと結論
    """
    statistic, p_value = ks_2samp(training_data, recent_data)

    return {
        'ks_statistic': statistic,  # D値、大きいほど分布の差が大きい
        'p_value': p_value,
        'is_drifting': p_value < significance,
        'interpretation': 'DRIFT' if p_value < significance else 'STABLE'
    }

# 使用例
training_returns = returns['2020-01':'2022-12']
recent_returns = returns['2024-01':'2024-03']

result = detect_data_drift(training_returns, recent_returns)
print(f"K-S statistic: {result['ks_statistic']:.4f}")
print(f"P-value: {result['p_value']:.4f}")
print(f"Status: {result['interpretation']}")

カイ二乗検定

カテゴリ特徴量のドリフト検出に適している。

from scipy.stats import chi2_contingency

def detect_categorical_drift(
    training_counts: dict,
    recent_counts: dict,
    significance: float = 0.05
) -> dict:
    """
    カテゴリ特徴量ドリフトのカイ二乗検定

    例:市場レジームラベル分布が変化したかを検出
    training_counts = {'bull': 120, 'bear': 80, 'sideways': 50}
    recent_counts = {'bull': 10, 'bear': 35, 'sideways': 5}
    """
    # 分割表を構築
    categories = set(training_counts.keys()) | set(recent_counts.keys())
    train_freq = [training_counts.get(c, 0) for c in categories]
    recent_freq = [recent_counts.get(c, 0) for c in categories]

    contingency_table = [train_freq, recent_freq]
    chi2, p_value, dof, expected = chi2_contingency(contingency_table)

    return {
        'chi2_statistic': chi2,
        'p_value': p_value,
        'degrees_of_freedom': dof,
        'is_drifting': p_value < significance
    }

2.3 CUSUM管理図法

Cumulative Sum管理図:予測誤差の持続的なシフトを検出。

class CUSUMDetector:
    """
    CUSUM(Cumulative Sum)ドリフト検出器

    原理:
    - 予測誤差の偏差を累積
    - 誤差がランダムなら、累積和は0付近で変動すべき
    - 系統的バイアスがあれば、累積和は持続的にドリフト
    """

    def __init__(self, threshold: float = 5.0, drift: float = 0.5):
        """
        パラメータ:
        - threshold: アラートトリガー閾値
        - drift: 許容ドリフト量(感度制御)
        """
        self.threshold = threshold
        self.drift = drift
        self.reset()

    def reset(self):
        self.s_pos = 0  # 正の累積和
        self.s_neg = 0  # 負の累積和
        self.history = []

    def update(self, error: float) -> dict:
        """
        CUSUM値を更新

        パラメータ:
        - error: 予測誤差(予測値 - 実際値)

        戻り値:
        - ドリフト検出結果
        """
        # 正規化された誤差
        normalized_error = error

        # 累積和を更新
        self.s_pos = max(0, self.s_pos + normalized_error - self.drift)
        self.s_neg = max(0, self.s_neg - normalized_error - self.drift)

        self.history.append({
            's_pos': self.s_pos,
            's_neg': self.s_neg,
            'error': error
        })

        # ドリフトを検出
        drift_up = self.s_pos > self.threshold
        drift_down = self.s_neg > self.threshold

        if drift_up or drift_down:
            direction = 'UP' if drift_up else 'DOWN'
            return {
                'is_drifting': True,
                'direction': direction,
                'cusum_value': self.s_pos if drift_up else self.s_neg,
                'action': 'RETRAIN_RECOMMENDED'
            }

        return {
            'is_drifting': False,
            'cusum_pos': self.s_pos,
            'cusum_neg': self.s_neg,
            'action': 'CONTINUE_MONITORING'
        }

# 使用例
detector = CUSUMDetector(threshold=5.0, drift=0.5)

for pred, actual in zip(predictions, actuals):
    error = pred - actual
    result = detector.update(error)
    if result['is_drifting']:
        print(f"Drift detected! Direction: {result['direction']}")
        break

CUSUMの利点

  • 徐々に小さい持続的シフトを検出可能
  • 単一点検出より敏感
  • 確固たる統計的基盤がある

2.4 マルチ指標総合検出

本番環境推奨:複数の検出方法を組み合わせて偽陽性率を削減。

class ComprehensiveDriftDetector:
    """総合ドリフト検出器"""

    def __init__(self):
        self.performance_monitor = PerformanceMonitor()
        self.cusum_detector = CUSUMDetector()

    def check_drift(self,
                   daily_return: float,
                   prediction_error: float,
                   training_features: np.array,
                   recent_features: np.array) -> dict:

        results = {}

        # 1. パフォーマンス監視
        perf_result = self.performance_monitor.update(daily_return)
        results['performance'] = perf_result

        # 2. CUSUM検出
        cusum_result = self.cusum_detector.update(prediction_error)
        results['cusum'] = cusum_result

        # 3. K-S検定(定期的に実行、例:週次)
        ks_result = detect_data_drift(training_features, recent_features)
        results['ks_test'] = ks_result

        # 総合判断:多数決投票
        drift_signals = [
            perf_result.get('is_drifting', False),
            cusum_result.get('is_drifting', False),
            ks_result.get('is_drifting', False)
        ]

        drift_count = sum(drift_signals)

        results['overall'] = {
            'drift_count': drift_count,
            'is_drifting': drift_count >= 2,  # 少なくとも2つの検出器がアラーム
            'confidence': drift_count / 3,
            'recommendation': self._get_recommendation(drift_count)
        }

        return results

    def _get_recommendation(self, drift_count: int) -> str:
        if drift_count == 0:
            return 'CONTINUE_NORMAL'
        elif drift_count == 1:
            return 'INCREASE_MONITORING'
        elif drift_count == 2:
            return 'PREPARE_RETRAIN'
        else:
            return 'IMMEDIATE_RETRAIN'

3. 再訓練戦略

3.1 スケジュール再訓練

最もシンプルな戦略:固定スケジュールでモデルを再訓練。

戦略頻度期間適用可能シナリオ長所短所
日次戦略月次中低頻度ファクター戦略シンプル、予測可能遅れる可能性
週次戦略四半期ポートフォリオ配分戦略低コスト突然の変化に適応できない
分次レベル戦略週次高頻度取引タイムリーな更新高コスト
# スケジュール再訓練スケジューラ
class ScheduledRetrainer:

    def __init__(self, retrain_frequency: str = 'monthly'):
        self.frequency = retrain_frequency
        self.last_retrain = None

    def should_retrain(self, current_date) -> bool:
        if self.last_retrain is None:
            return True

        if self.frequency == 'weekly':
            return (current_date - self.last_retrain).days >= 7
        elif self.frequency == 'monthly':
            return (current_date - self.last_retrain).days >= 30
        elif self.frequency == 'quarterly':
            return (current_date - self.last_retrain).days >= 90

        return False

3.2 トリガー再訓練

よりスマートな戦略:ドリフトが検出された場合のみ再訓練をトリガー。

class TriggeredRetrainer:
    """トリガー再訓練器"""

    def __init__(self,
                 performance_threshold: float = 0.3,  # Sharpe閾値
                 cusum_threshold: float = 5.0,
                 min_interval_days: int = 7):  # 最小再訓練間隔
        self.performance_threshold = performance_threshold
        self.cusum_threshold = cusum_threshold
        self.min_interval_days = min_interval_days
        self.last_retrain = None
        self.detector = ComprehensiveDriftDetector()

    def check_and_retrain(self, model, new_data, current_date) -> dict:
        """再訓練が必要かチェック、必要なら実行"""

        # 過度に頻繁な再訓練を防ぐ
        if self.last_retrain:
            days_since = (current_date - self.last_retrain).days
            if days_since < self.min_interval_days:
                return {'action': 'SKIP', 'reason': 'Too soon since last retrain'}

        # ドリフト検出
        drift_result = self.detector.check_drift(...)

        if drift_result['overall']['is_drifting']:
            # 再訓練を実行
            new_model = self._retrain(model, new_data)
            self.last_retrain = current_date

            return {
                'action': 'RETRAINED',
                'drift_confidence': drift_result['overall']['confidence'],
                'new_model': new_model
            }

        return {'action': 'CONTINUE', 'drift_confidence': drift_result['overall']['confidence']}

3.3 オンライン学習

継続的更新:完全な再訓練の代わりに、モデルパラメータを段階的に更新。

class OnlineLearner:
    """
    オンライン学習更新器

    適用可能シナリオ:
    - 市場変化に素早く適応する必要がある
    - 完全な再訓練がコストがかかりすぎる
    - データストリームが継続的に到着

    リスク:
    - 壊滅的忘却(履歴パターンを失う)
    - ノイズに敏感
    """

    def __init__(self, model, learning_rate: float = 0.001):
        self.model = model
        self.learning_rate = learning_rate
        self.update_count = 0

    def incremental_update(self, new_x, new_y):
        """
        モデルを段階的に更新

        小さな学習率で単一ステップ勾配降下を使用
        """
        # 順伝播
        prediction = self.model.predict(new_x)
        error = new_y - prediction

        # 逆伝播(簡略化した説明)
        gradient = self._compute_gradient(new_x, error)

        # パラメータ更新
        for param, grad in zip(self.model.parameters(), gradient):
            param -= self.learning_rate * grad

        self.update_count += 1

        return {
            'prediction': prediction,
            'error': error,
            'update_count': self.update_count
        }

    def _compute_gradient(self, x, error):
        # 実際の実装はモデルタイプに依存
        pass

オンライン学習の落とし穴

  1. 壊滅的忘却:新しいデータが古い知識を上書き
  2. ノイズの蓄積:単一サンプル更新はノイズに惑わされやすい
  3. 学習率の感度:大きすぎる → 不安定、小さすぎる → 適応が遅い

3.4 ハイブリッド戦略(推奨)

ベストプラクティス:スケジュールとトリガー再訓練を組み合わせる。

class HybridRetrainer:
    """ハイブリッド再訓練戦略"""

    def __init__(self):
        self.scheduled_interval_days = 30  # スケジュール:月次
        self.drift_detector = ComprehensiveDriftDetector()
        self.last_scheduled_retrain = None
        self.last_triggered_retrain = None

    def should_retrain(self, current_date, metrics) -> dict:
        """再訓練が必要か判断"""

        # スケジュール再訓練をチェック
        scheduled_due = self._check_scheduled(current_date)

        # トリガー再訓練をチェック
        drift_result = self.drift_detector.check_drift(metrics)
        triggered_due = drift_result['overall']['is_drifting']

        if scheduled_due and triggered_due:
            return {
                'should_retrain': True,
                'reason': 'BOTH_SCHEDULED_AND_DRIFT',
                'priority': 'HIGH'
            }
        elif triggered_due:
            return {
                'should_retrain': True,
                'reason': 'DRIFT_DETECTED',
                'priority': 'HIGH'
            }
        elif scheduled_due:
            return {
                'should_retrain': True,
                'reason': 'SCHEDULED',
                'priority': 'NORMAL'
            }

        return {'should_retrain': False, 'reason': 'NO_TRIGGER'}

4. 再訓練のベストプラクティス

4.1 訓練データ選択

戦略説明長所短所
Expanding Windowすべての履歴データを使用大きなサンプルサイズ古いデータが時代遅れの可能性
Sliding Window最近のN日のみを使用新しいパターンに適応重要な履歴を失う可能性
Weighted Window最近のデータに高い重み履歴と現在をバランス重み選択が困難

推奨:スライディングウィンドウ + 危機期間データを保持

def prepare_training_data(all_data, window_days=252*2, keep_crisis=True):
    """再訓練データを準備"""

    # スライディングウィンドウ
    recent_data = all_data.iloc[-window_days:]

    if keep_crisis:
        # 重要な危機期間データを保持
        crisis_periods = [
            ('2008-09', '2009-03'),  # 金融危機
            ('2020-02', '2020-04'),  # COVID
            ('2022-01', '2022-06'),  # 金利上昇ショック
        ]

        crisis_data = []
        for start, end in crisis_periods:
            if start in all_data.index:
                crisis_data.append(all_data.loc[start:end])

        # マージ
        training_data = pd.concat([recent_data] + crisis_data)
        training_data = training_data.drop_duplicates()

    return training_data

4.2 モデルバージョン管理

# モデルバージョン管理
class ModelVersionManager:

    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.versions = []

    def save_version(self, model, metrics: dict, reason: str):
        """モデルバージョンを保存"""
        version_id = f"v{len(self.versions)+1}_{datetime.now():%Y%m%d_%H%M}"

        version_info = {
            'version_id': version_id,
            'timestamp': datetime.now(),
            'reason': reason,
            'metrics': metrics,
            'model_path': f"{self.storage_path}/{version_id}.pkl"
        }

        # モデルを保存
        joblib.dump(model, version_info['model_path'])

        self.versions.append(version_info)
        return version_id

    def rollback(self, version_id: str):
        """指定バージョンにロールバック"""
        for v in self.versions:
            if v['version_id'] == version_id:
                return joblib.load(v['model_path'])
        raise ValueError(f"Version {version_id} not found")

4.3 A/Bテスト

再訓練後、古いモデルを直接置き換えない。代わりに比較テストを実行。

class ABTester:
    """モデルA/Bテスト"""

    def __init__(self, old_model, new_model, test_days: int = 5):
        self.old_model = old_model
        self.new_model = new_model
        self.test_days = test_days
        self.old_results = []
        self.new_results = []

    def run_comparison(self, data) -> dict:
        """比較テストを実行"""

        for day_data in data:
            old_pred = self.old_model.predict(day_data)
            new_pred = self.new_model.predict(day_data)

            self.old_results.append(old_pred)
            self.new_results.append(new_pred)

        # パフォーマンス比較を計算
        old_sharpe = calculate_sharpe(self.old_results)
        new_sharpe = calculate_sharpe(self.new_results)

        improvement = (new_sharpe - old_sharpe) / abs(old_sharpe) if old_sharpe != 0 else 0

        return {
            'old_sharpe': old_sharpe,
            'new_sharpe': new_sharpe,
            'improvement': improvement,
            'recommendation': 'DEPLOY_NEW' if improvement &gt; 0.1 else 'KEEP_OLD'
        }

5. 本番環境ドリフトモニタリングアーキテクチャ

前のセクションでは理論的なドリフト検出方法を扱った。このセクションでは本番環境のドリフトモニタリングシステム実装を提示する。

5.1 コア設計パターン

本番環境システムに必要なもの:

  • マルチメトリクス監視:IC、PSI、Sharpeを同時に追跡
  • 設定可能な閾値:異なる戦略は異なる許容度を持つ
  • 永続ストレージ:分析と監査のためのドリフト履歴
  • アラートレベル:警告とクリティカルの重大度を区別

AlertConfigパターン

from dataclasses import dataclass

@dataclass
class AlertConfig:
    """アラート閾値設定"""

    # IC(Information Coefficient)閾値
    ic_warning: float = 0.02    # IC &lt; 0.02で警告トリガー
    ic_critical: float = 0.01   # IC &lt; 0.01でクリティカルアラート

    # PSI(Population Stability Index)閾値
    psi_warning: float = 0.10   # PSI &gt; 0.10は分布シフトを示す
    psi_critical: float = 0.25  # PSI &gt; 0.25は重大なシフトを示す

    # Sharpe閾値
    sharpe_warning: float = 0.5   # Sharpe &lt; 0.5でパフォーマンス低下
    sharpe_critical: float = 0.0  # Sharpe &lt; 0で戦略が損失

閾値の解釈

メトリクス警告閾値クリティカル閾値ビジネス意味
IC< 0.02< 0.01シグナル予測力が低下
PSI> 0.10> 0.25特徴量分布がシフト
Sharpe< 0.5< 0.0リスク調整後リターンが悪化

5.2 DriftMetricsデータ構造

ドリフトメトリクスを日次で計算・保存:

from dataclasses import dataclass
from datetime import date

@dataclass
class DriftMetrics:
    """日次ドリフトメトリクス"""

    date: date
    strategy_id: str

    # ICメトリクス(Information Coefficient)
    ic: float | None = None           # 日次IC
    ic_5d_avg: float | None = None    # 5日間ローリング平均
    ic_20d_avg: float | None = None   # 20日間ローリング平均

    # PSIメトリクス(分布安定性)
    psi: float | None = None
    psi_5d_avg: float | None = None

    # Sharpeメトリクス(リスク調整後リターン)
    sharpe_5d: float | None = None    # 5日間Sharpe
    sharpe_20d: float | None = None   # 20日間Sharpe
    sharpe_60d: float | None = None   # 60日間Sharpe

    # ビジネスメトリクス
    daily_return: float | None = None
    cumulative_return: float | None = None
    trade_count: int = 0
    signal_count: int = 0

    # アラート状態
    ic_alert: bool = False
    psi_alert: bool = False
    sharpe_alert: bool = False

複数の時間ウィンドウを使う理由

  • 5日間ウィンドウ:素早い反応、短期ドリフトを捉える
  • 20日間ウィンドウ:ノイズをフィルタ、トレンドを確認
  • 60日間ウィンドウ:長期ベースライン、構造的変化を識別

5.3 DriftMonitorコア実装

import logging
import numpy as np
import psycopg
from psycopg.rows import dict_row

logger = logging.getLogger(__name__)

class DriftMonitor:
    """
    本番環境ドリフトモニタリングサービス

    責務:
    1. IC、PSI、Sharpeメトリクスを計算
    2. 設定された閾値と比較してアラート
    3. PostgreSQLに永続化
    4. 戦略ごとの分離をサポート
    """

    def __init__(self, dsn: str, strategy_id: str = "default"):
        """
        引数:
            dsn: PostgreSQL接続文字列
            strategy_id: 戦略識別子(マルチ戦略分離をサポート)
        """
        self.dsn = dsn
        self.strategy_id = strategy_id
        self._config: AlertConfig | None = None

    def load_config(self) -> AlertConfig:
        """データベースからアラート設定を読み込み"""
        with psycopg.connect(self.dsn) as conn:
            with conn.cursor(row_factory=dict_row) as cur:
                cur.execute(
                    """
                    SELECT ic_warning, ic_critical, psi_warning, psi_critical,
                           sharpe_warning, sharpe_critical
                    FROM drift_alert_config
                    WHERE strategy_id = %s
                    """,
                    (self.strategy_id,),
                )
                row = cur.fetchone()
                if row:
                    self._config = AlertConfig(**row)
                else:
                    self._config = AlertConfig()  # デフォルトを使用
        return self._config

    def calculate_metrics(self, target_date: date) -> DriftMetrics:
        """
        指定日のすべてのドリフトメトリクスを計算

        コアロジック:
        1. シグナルとリターンを取得、ICを計算
        2. 履歴リターンを取得、ローリングSharpeを計算
        3. 閾値と比較してアラート状態
        """
        if self._config is None:
            self.load_config()

        metrics = DriftMetrics(date=target_date, strategy_id=self.strategy_id)

        # IC(シグナル-リターン相関)を計算
        signals, returns = self.get_signals_and_returns(target_date)
        if len(signals) &gt; 0 and len(returns) &gt; 0:
            metrics.ic = calculate_ic(signals, returns)
            metrics.signal_count = len(signals)

        # ローリングSharpeを計算
        daily_returns = self.get_daily_returns(lookback_days=60)
        if len(daily_returns) >= 5:
            metrics.sharpe_5d = calculate_sharpe(daily_returns[-5:])
        if len(daily_returns) >= 20:
            metrics.sharpe_20d = calculate_sharpe(daily_returns[-20:])
        if len(daily_returns) >= 60:
            metrics.sharpe_60d = calculate_sharpe(daily_returns)

        # アラート状態を判定
        config = self._config or AlertConfig()
        if metrics.ic is not None:
            metrics.ic_alert = metrics.ic < config.ic_critical
        if metrics.psi is not None:
            metrics.psi_alert = metrics.psi > config.psi_critical
        if metrics.sharpe_20d is not None:
            metrics.sharpe_alert = metrics.sharpe_20d < config.sharpe_critical

        return metrics

5.4 PostgreSQL永続化

ドリフトメトリクスの永続化が必要な理由:

  • 履歴トレンド分析
  • コンプライアンス監査
  • 再訓練決定の証拠
def save_metrics(self, metrics: DriftMetrics) -> None:
    """メトリクスをデータベースに保存(冪等upsertをサポート)"""
    with psycopg.connect(self.dsn) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO drift_metrics (
                    date, strategy_id, ic, ic_5d_avg, ic_20d_avg,
                    psi, psi_5d_avg, sharpe_5d, sharpe_20d, sharpe_60d,
                    daily_return, cumulative_return, trade_count, signal_count,
                    ic_alert, psi_alert, sharpe_alert
                ) VALUES (
                    %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                    %s, %s, %s, %s, %s, %s, %s
                )
                ON CONFLICT (date, strategy_id) DO UPDATE SET
                    ic = EXCLUDED.ic,
                    sharpe_20d = EXCLUDED.sharpe_20d,
                    ic_alert = EXCLUDED.ic_alert,
                    psi_alert = EXCLUDED.psi_alert,
                    sharpe_alert = EXCLUDED.sharpe_alert
                """,
                (
                    metrics.date, metrics.strategy_id, metrics.ic,
                    metrics.ic_5d_avg, metrics.ic_20d_avg, metrics.psi,
                    metrics.psi_5d_avg, metrics.sharpe_5d, metrics.sharpe_20d,
                    metrics.sharpe_60d, metrics.daily_return,
                    metrics.cumulative_return, metrics.trade_count,
                    metrics.signal_count, metrics.ic_alert,
                    metrics.psi_alert, metrics.sharpe_alert,
                ),
            )
        conn.commit()
    logger.info(f"Saved drift metrics for {metrics.date}")

データベーススキーマ

CREATE TABLE drift_metrics (
    date DATE NOT NULL,
    strategy_id VARCHAR(64) NOT NULL,
    ic FLOAT,
    ic_5d_avg FLOAT,
    ic_20d_avg FLOAT,
    psi FLOAT,
    psi_5d_avg FLOAT,
    sharpe_5d FLOAT,
    sharpe_20d FLOAT,
    sharpe_60d FLOAT,
    daily_return FLOAT,
    cumulative_return FLOAT,
    trade_count INT DEFAULT 0,
    signal_count INT DEFAULT 0,
    ic_alert BOOLEAN DEFAULT FALSE,
    psi_alert BOOLEAN DEFAULT FALSE,
    sharpe_alert BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (date, strategy_id)
);

CREATE TABLE drift_alert_config (
    strategy_id VARCHAR(64) PRIMARY KEY,
    ic_warning FLOAT DEFAULT 0.02,
    ic_critical FLOAT DEFAULT 0.01,
    psi_warning FLOAT DEFAULT 0.10,
    psi_critical FLOAT DEFAULT 0.25,
    sharpe_warning FLOAT DEFAULT 0.5,
    sharpe_critical FLOAT DEFAULT 0.0
);

5.5 日次モニタリングジョブ

def run_daily(self, target_date: date | None = None) -> DriftMetrics:
    """
    日次ドリフトモニタリングジョブエントリポイント

    典型的なデプロイ:市場終了後にcronまたはAirflowで実行
    """
    if target_date is None:
        target_date = date.today()

    logger.info(f"Running drift monitoring for {target_date}")
    metrics = self.calculate_metrics(target_date)
    self.save_metrics(metrics)

    # アラートログ
    if metrics.ic_alert:
        logger.warning(f"IC ALERT: IC={metrics.ic:.4f} below threshold")
    if metrics.psi_alert:
        logger.warning(f"PSI ALERT: PSI={metrics.psi:.4f} above threshold")
    if metrics.sharpe_alert:
        logger.warning(f"SHARPE ALERT: Sharpe={metrics.sharpe_20d:.4f} below threshold")

    return metrics

5.6 統合例:再訓練をトリガーするタイミング

ドリフトモニタリングと再訓練決定を組み合わせる:

class RetrainOrchestrator:
    """再訓練オーケストレーター"""

    def __init__(self, drift_monitor: DriftMonitor):
        self.monitor = drift_monitor
        self.consecutive_alerts = 0
        self.alert_threshold = 3  # 3日連続後にトリガー

    def check_retrain_needed(self, target_date: date) -> dict:
        """
        再訓練をトリガーすべきか判断

        ルール:
        1. IC &lt; 0.01が3日連続 -> トリガー
        2. PSI &gt; 0.25の単一発生 -> トリガー
        3. 20日間Sharpe &lt; 0 -> トリガー
        """
        metrics = self.monitor.run_daily(target_date)

        # 連続アラートを追跡
        if metrics.ic_alert or metrics.sharpe_alert:
            self.consecutive_alerts += 1
        else:
            self.consecutive_alerts = 0

        # トリガー条件を評価
        triggers = []

        if self.consecutive_alerts >= self.alert_threshold:
            triggers.append(f"IC/Sharpe alert for {self.consecutive_alerts} consecutive days")

        if metrics.psi_alert:
            triggers.append(f"PSI={metrics.psi:.3f} exceeds critical threshold")

        if metrics.sharpe_20d is not None and metrics.sharpe_20d < 0:
            triggers.append(f"20-day Sharpe={metrics.sharpe_20d:.2f} is negative")

        should_retrain = len(triggers) &gt; 0

        return {
            'should_retrain': should_retrain,
            'triggers': triggers,
            'metrics': metrics,
            'action': 'RETRAIN' if should_retrain else 'CONTINUE'
        }

# 使用例
monitor = DriftMonitor(
    dsn="postgres://trading:trading@localhost:5432/trading",
    strategy_id="momentum_v2"
)
orchestrator = RetrainOrchestrator(monitor)

result = orchestrator.check_retrain_needed(date.today())
if result['should_retrain']:
    print(f"Triggering retrain, reasons: {result['triggers']}")
    # 再訓練パイプラインを呼び出し

5.7 アーキテクチャまとめ

コンポーネント責務主要設計
AlertConfig閾値設定Dataclass、DB読み込みサポート
DriftMetricsメトリクスコンテナマルチウィンドウ、アラート状態
DriftMonitorコアサービス計算 + 保存 + アラート
PostgreSQL永続化冪等upsert、監査サポート
RetrainOrchestrator決定オーケストレーション連続アラート、マルチ条件トリガー

本番環境デプロイ推奨

  1. 市場終了後T+30分で実行(データ準備を待つ)
  2. アラートをSlack/PagerDutyに接続
  3. IC/PSI/Sharpeトレンドチャートを表示するダッシュボード
  4. 再訓練トリガーは自動的にA/Bテストフローに入る

6. まとめ

検出方法クイックリファレンス

方法検出対象感度計算コスト推奨シナリオ
パフォーマンスモニタリング戦略リターンすべての戦略(必須)
K-S検定特徴量分布定期チェック(週次/月次)
カイ二乗検定カテゴリ特徴量市場レジームラベル
CUSUM予測誤差継続的監視(日次)
総合検出多次元最高本番環境システム(推奨)

再訓練戦略クイックリファレンス

戦略トリガータイプ長所短所適用可能シナリオ
スケジュール時間駆動シンプル、予測可能遅れる可能性安定した市場
トリガードリフト駆動タイムリーな応答高い複雑さ変動の激しい市場
オンライン学習継続的更新最速の適応不安定高頻度シナリオ
ハイブリッドスケジュール + トリガーバランスチューニングが必要本番環境(推奨)

重要な洞察:モデルドリフトは「もし」ではなく「いつ」の問題。堅牢な検出と再訓練メカニズムを確立することがクオンツ戦略の長期的生存の鍵。

この章を引用する
Zhang, Wayland (2026). 背景知識:モデルドリフトと再訓練戦略. In AIクオンツ取引:ゼロからイチへ. https://waylandz.com/quant-book-ja/Model-Drift-and-Retraining
@incollection{zhang2026quant_Model_Drift_and_Retraining,
  author = {Zhang, Wayland},
  title = {背景知識:モデルドリフトと再訓練戦略},
  booktitle = {AIクオンツ取引:ゼロからイチへ},
  year = {2026},
  url = {https://waylandz.com/quant-book-ja/Model-Drift-and-Retraining}
}