Background: MLOpsインフラストラクチャ

デプロイされた瞬間から、モデルは劣化を始めます。MLOpsはオプションの高度な設定ではありません。Quant戦略を生き続けさせるインフラストラクチャです。


「モデルが動く」から「モデルが役立つ」へ

2023年、あるQuant研究者がモメンタム戦略モデルを完成させた:

  • バックテストシャープ: 1.8
  • IC平均: 0.04
  • クリーンなコード、テスト合格

彼は興奮して本番環境にデプロイした。

3ヶ月後:

  • 1ヶ月目: シャープ1.2(「市場状況」)
  • 2ヶ月目: シャープ0.4(「もっと観察しよう」)
  • 3ヶ月目: シャープ-0.3(「モデルが壊れている?」)

何が起きたのか?

調査の結果判明:

  1. 本番環境の特徴量計算コードがバックテストと異なっていた。バグによりRSIが1日ずれていた
  2. モデルバージョン管理が混乱していた。実際に実行されているバージョンが不明
  3. 特徴量スナップショットとモデル入力が保存されていなかったため、問題を追跡する方法がなかった
  4. 問題が発見された時には、いつから始まったか誰も分からなかった

教訓: モデル開発は始まりに過ぎません。再現性、バージョン管理、ドリフト監視が本番システムの核心です。これがMLOpsです。


1. なぜQuantにMLOpsが必要か

Quant固有の課題

従来のMLQuant ML
デプロイ後モデルは比較的安定市場構造は絶えず変化、モデルは必然的に劣化
データ分布は比較的固定金融データは高度に非定常
モデルエラーはユーザー体験に影響モデルエラーは直接的に資本損失を引き起こす
オフラインバッチ予測で許容リアルタイム推論が必要、レイテンシに敏感
特徴量は安定したデータソースから来る特徴量は複数ベンダーから、遅延や欠損の可能性

MLOpsの三本柱

Quant MLOps = Feature Store + Model Registry + Drift Monitor

機能:
1. Feature Store  -> バックテストとライブの特徴量一貫性を保証(再現性)
2. Model Registry -> モデルバージョンとパフォーマンスを追跡(監査可能性)
3. Drift Monitor  -> モデル劣化を検出(タイムリーなストップロス)

2. Feature Store

コア問題: Point-in-Time正確性

Quantにおける最も陰湿なバグはルックアヘッドバイアスです。

誤った例(ルックアヘッドバイアス):

2024-01-15トレーニングサンプル:
  特徴量: RSI = 65(2024-01-15終値を使用して計算)
  ラベル: 翌日のリターン

問題:
  2024-01-15終値は16:00まで分からない
  しかしRSI計算でこの値を使用した
  -> モデルは「未来の情報」から学習

正しいアプローチ:
  2024-01-15トレーニングサンプル:
    特徴量: 2024-01-14終値を使用して計算したRSI
    ラベル: 2024-01-15から2024-01-16のリターン

Feature Storeのコア能力はPoint-in-Timeクエリの保証です: 任意の過去のタイムスタンプを与えると、その時点で既知だった特徴量値を返します。

デュアルタイムスタンプ設計

特徴量イベントテーブル (feature_events):
+--------------+---------------+-----------------+-----------------+---------+
| entity_key   | feature_name  | event_time      | ingest_time     | value   |
+--------------+---------------+-----------------+-----------------+---------+
| AAPL.NASDAQ  | momentum_5d   | 2024-01-15      | 2024-01-15 20:00 | 0.035  |
| AAPL.NASDAQ  | rsi_14        | 2024-01-15      | 2024-01-15 20:00 | 62.5   |
+--------------+---------------+-----------------+-----------------+---------+

2つのタイムスタンプの意味:
- event_time: 特徴量が対応するビジネス時間(例: 「これは2024-01-15のRSI」)
- ingest_time: 特徴量がシステムに書き込まれた時刻(例: 「20:00に計算」)

Point-in-Timeクエリルール:
  WHERE event_time <= as_of_time AND ingest_time <= as_of_time

なぜ2つのタイムスタンプが必要か?

シナリオ: 2024-01-16 09:30の取引決定をバックテスト

event_timeのみ使用の場合:
  クエリ: event_time <= '2024-01-16 09:30'
  event_time='2024-01-15'だがingest_time='2024-01-16 22:00'のデータを返す可能性
  -> ルックアヘッドバイアス!

正しいデュアルタイムスタンプクエリ:
  クエリ: event_time <= '2024-01-16 09:30' AND ingest_time <= '2024-01-16 09:30'
  その時点で実際に利用可能だった特徴量のみを返す

データベース設計(TimescaleDB)

-- TimescaleDBは時系列データに最適化されたPostgreSQL拡張
CREATE TABLE IF NOT EXISTS feature_events (
    entity_key       TEXT NOT NULL,               -- : 'AAPL.NASDAQ'
    feature_name     TEXT NOT NULL,               -- 特徴量名
    feature_version  INT  NOT NULL DEFAULT 1,     -- バージョン(計算ロジック変更時にインクリメント)

    event_time       TIMESTAMPTZ NOT NULL,        -- ビジネス時間
    value_double     DOUBLE PRECISION,            -- 数値特徴量
    value_json       JSONB,                       -- 複雑な特徴量(ベクトルなど)

    ingest_time      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- トレーサビリティ
    producer         TEXT,                        -- プロデューサー(例: 'momentum_job'
    producer_version TEXT,                        -- コードバージョン(git SHA)
    run_id           TEXT,                        -- ジョブID

    PRIMARY KEY (entity_key, feature_name, feature_version, event_time)
);

-- ハイパーテーブルに変換して自動パーティショニング
SELECT create_hypertable('feature_events', 'event_time', if_not_exists => TRUE);

-- 最新特徴量クエリ最適化
CREATE INDEX IF NOT EXISTS idx_feature_events_latest
    ON feature_events (entity_key, feature_name, feature_version, event_time DESC);

-- 圧縮ポリシー(7日後に圧縮、90%+のスペース節約)
ALTER TABLE feature_events SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'entity_key, feature_name, feature_version',
    timescaledb.compress_orderby = 'event_time DESC'
);
SELECT add_compression_policy('feature_events', INTERVAL '7 days');

Python実装

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any

@dataclass
class FeatureValue:
    """クエリから返される特徴量値"""
    entity_key: str
    feature_name: str
    feature_version: int
    event_time: datetime
    value: float | dict[str, Any]


class FeatureStore:
    """
    TimescaleDBベースのFeature Store

    コア機能:
    1. write_features: 特徴量を書き込む
    2. get_latest: 最新の特徴量値を取得
    3. get_point_in_time: バッチPoint-in-Timeクエリ(トレーニングセット構築用)
    """

    def __init__(self, conninfo: str, producer: str | None = None):
        self._conninfo = conninfo
        self._producer = producer

    def write_features(
        self,
        entity_key: str,
        timestamp: datetime,
        features: dict[str, float],
        *,
        feature_version: int = 1,
        availability_lag: timedelta | None = None,
    ) -> int:
        """
        特徴量値を書き込む

        Args:
            entity_key: エンティティ識別子(例: 'AAPL.NASDAQ'
            timestamp: 特徴量のビジネス時間(event_time)
            features: 特徴量辞書 {feature_name: value}
            feature_version: 特徴量バージョン(計算ロジック変更時にインクリメント)
            availability_lag: データ利用可能性遅延(バックフィル用)
                特徴量がT+1でのみ利用可能な場合、availability_lag=timedelta(days=1)
                これによりingest_time = event_time + 1日となる

        Returns:
            書き込まれた特徴量の数
        """
        if not features:
            return 0

        # ingest_timeを計算
        ingest_time = datetime.now()
        if availability_lag is not None:
            ingest_time = timestamp + availability_lag

        # バッチ挿入を構築(ON CONFLICTで冪等性を保証)
        sql = """
            INSERT INTO feature_events
                (entity_key, feature_name, feature_version, event_time, value_double, ingest_time, producer)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (entity_key, feature_name, feature_version, event_time) DO NOTHING
        """

        with self._get_connection() as conn:
            with conn.cursor() as cur:
                for name, value in features.items():
                    cur.execute(sql, [
                        entity_key, name, feature_version,
                        timestamp, float(value), ingest_time, self._producer
                    ])
            conn.commit()

        return len(features)

    def get_latest(
        self,
        entity_key: str,
        feature_names: list[str] | None = None,
        *,
        as_of: datetime | None = None,
    ) -> dict[str, FeatureValue]:
        """
        エンティティの最新特徴量値を取得

        Args:
            entity_key: エンティティ識別子
            feature_names: クエリする特徴量リスト(Noneですべて)
            as_of: Point-in-Timeタイムスタンプ(None で現在)

        Returns:
            {feature_name: FeatureValue}
        """
        # キー: デュアルタイムスタンプフィルタリング
        sql = """
            SELECT DISTINCT ON (feature_name, feature_version)
                feature_name, feature_version, value_double, event_time
            FROM feature_events
            WHERE entity_key = %s
              AND feature_version = 1
        """
        params = [entity_key]

        # Point-in-Timeフィルター
        if as_of is not None:
            sql += " AND event_time <= %s AND ingest_time <= %s"
            params.extend([as_of, as_of])

        # 特徴量名フィルター
        if feature_names:
            sql += " AND feature_name = ANY(%s)"
            params.append(feature_names)

        sql += " ORDER BY feature_name, feature_version, event_time DESC"

        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(sql, params)
                rows = cur.fetchall()

        return {
            row[0]: FeatureValue(
                entity_key=entity_key,
                feature_name=row[0],
                feature_version=row[1],
                event_time=row[3],
                value=row[2],
            )
            for row in rows
        }

    def get_point_in_time(
        self,
        entity_times: list[tuple[str, datetime]],
        feature_names: list[str] | None = None,
    ) -> list[FeatureValue]:
        """
        バッチPoint-in-Timeクエリ(トレーニングセット構築のコアメソッド)

        Args:
            entity_times: [(entity_key, as_of_time), ...]
            feature_names: クエリする特徴量リスト

        Returns:
            各(entity, time)ペアに対して、その時点で利用可能な最新特徴量を返す
        """
        if not entity_times:
            return []

        # CTEとDISTINCT ONを使用した効率的なPITクエリ
        values_sql = ", ".join(["(%s, %s)"] * len(entity_times))
        params = []
        for entity_key, as_of_time in entity_times:
            params.extend([entity_key, as_of_time])

        sql = f"""
        WITH entity_times(entity_key, as_of_time) AS (
            VALUES {values_sql}
        )
        SELECT DISTINCT ON (et.entity_key, fe.feature_name)
            et.entity_key,
            et.as_of_time,
            fe.feature_name,
            fe.feature_version,
            fe.value_double,
            fe.event_time AS feature_time
        FROM entity_times et
        JOIN feature_events fe
            ON fe.entity_key = et.entity_key
           AND fe.event_time <= et.as_of_time
           AND fe.ingest_time <= et.as_of_time
        WHERE fe.feature_version = 1
        ORDER BY et.entity_key, fe.feature_name, fe.event_time DESC
        """

        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(sql, params)
                rows = cur.fetchall()

        return [
            FeatureValue(
                entity_key=row[0],
                feature_name=row[2],
                feature_version=row[3],
                event_time=row[5],
                value=row[4],
            )
            for row in rows
        ]

使用例

# 初期化
store = FeatureStore(
    conninfo="postgres://localhost:5432/trading",
    producer="momentum_job_v2"
)

# 特徴量を書き込む
store.write_features(
    entity_key="AAPL.NASDAQ",
    timestamp=datetime(2024, 1, 15, 16, 0),  # 市場終了
    features={
        "momentum_5d": 0.035,
        "rsi_14": 62.5,
        "volume_ratio": 1.15,
    }
)

# リアルタイム推論: 最新特徴量を取得
latest = store.get_latest("AAPL.NASDAQ", ["momentum_5d", "rsi_14"])
print(f"Latest RSI: {latest['rsi_14'].value}")

# トレーニングセット構築: Point-in-Timeクエリ
training_dates = [
    ("AAPL.NASDAQ", datetime(2024, 1, 10, 9, 30)),
    ("AAPL.NASDAQ", datetime(2024, 1, 11, 9, 30)),
    ("AAPL.NASDAQ", datetime(2024, 1, 12, 9, 30)),
    ("MSFT.NASDAQ", datetime(2024, 1, 10, 9, 30)),
    ("MSFT.NASDAQ", datetime(2024, 1, 11, 9, 30)),
]

features = store.get_point_in_time(training_dates, ["momentum_5d", "rsi_14"])
# 各時点で利用可能な特徴量値を返す、ルックアヘッドバイアスなし

3. Model Registry

なぜモデル登録が必要か?

シナリオ: モデルパフォーマンス低下、調査が必要

レジストリなしの場合:
  - 「今実行しているバージョンは?」 -> 不明
  - 「このバージョンのパラメータは?」 -> ファイルを検索
  - 「前のバージョンはどこ?」 -> 上書きされた可能性
  - 「このバージョンのバックテストパフォーマンスは?」 -> 再実行

レジストリありの場合:
  SELECT * FROM models WHERE name = 'momentum_v2';
  -> バージョン、パラメータ、メトリクス、トレーニング時間、コードバージョンが一目で分かる

データベース設計

-- モデルメタデータ
CREATE TABLE IF NOT EXISTS models (
    model_id      UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name          TEXT NOT NULL,
    version       INT NOT NULL,
    strategy_type TEXT,                  -- 'momentum', 'mean_reversion'など
    description   TEXT,
    created_at    TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(name, version)
);

-- モデルメトリクス
CREATE TABLE IF NOT EXISTS model_metrics (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    model_id      UUID REFERENCES models(model_id),
    metric_name   TEXT NOT NULL,         -- 'sharpe_ratio', 'ic', 'max_drawdown'
    value         DOUBLE PRECISION,
    dataset_type  TEXT,                  -- 'train', 'val', 'test', 'backtest', 'live'
    evaluated_at  TIMESTAMPTZ DEFAULT NOW()
);

-- モデルアーティファクト(重みファイルなど)
CREATE TABLE IF NOT EXISTS model_artifacts (
    artifact_id   UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    model_id      UUID REFERENCES models(model_id),
    artifact_path TEXT NOT NULL,         -- 's3://models/momentum_v2/weights.pkl'
    artifact_type TEXT,                  -- 'weights', 'config', 'scaler', 'onnx'
    checksum      TEXT,                  -- SHA256
    size_bytes    BIGINT,
    created_at    TIMESTAMPTZ DEFAULT NOW()
);

-- トレーニング実行記録
CREATE TABLE IF NOT EXISTS model_training_runs (
    run_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    model_id      UUID REFERENCES models(model_id),
    params        JSONB,                 -- トレーニングハイパーパラメータ
    dataset_start TIMESTAMPTZ,
    dataset_end   TIMESTAMPTZ,
    started_at    TIMESTAMPTZ,
    finished_at   TIMESTAMPTZ,
    status        TEXT DEFAULT 'running' -- 'running', 'completed', 'failed'
);

Python実装

from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from uuid import UUID
import hashlib
import json


@dataclass
class ModelInfo:
    """モデルメタデータ"""
    model_id: UUID
    name: str
    version: int
    strategy_type: str | None
    description: str | None
    created_at: datetime


@dataclass
class ModelWithMetrics:
    """メトリクスを含むモデル"""
    model: ModelInfo
    metrics: dict[str, float]  # {metric_name_dataset: value}


class ModelRegistry:
    """
    Model Registry

    機能:
    1. register_model: 新しいモデルバージョンを登録
    2. log_metrics: 評価メトリクスを記録
    3. log_artifact: モデルアーティファクトを記録
    4. get_best_model: 最高パフォーマンスのモデルを取得
    """

    def __init__(self, dsn: str):
        self.dsn = dsn

    def register_model(
        self,
        name: str,
        strategy_type: str | None = None,
        params: dict | None = None,
        description: str | None = None,
        version: int | None = None,
    ) -> UUID:
        """
        新しいモデルバージョンを登録

        Args:
            name: モデル名(例: 'momentum_v2'
            strategy_type: 戦略タイプ
            params: トレーニングパラメータ
            description: 説明
            version: バージョン番号(None で自動インクリメント)

        Returns:
            モデルUUID
        """
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                # 自動バージョン番号
                if version is None:
                    cur.execute(
                        "SELECT COALESCE(MAX(version), 0) + 1 FROM models WHERE name = %s",
                        (name,)
                    )
                    version = cur.fetchone()[0]

                # モデル挿入
                cur.execute(
                    """
                    INSERT INTO models (name, version, strategy_type, description)
                    VALUES (%s, %s, %s, %s)
                    RETURNING model_id
                    """,
                    (name, version, strategy_type, description)
                )
                model_id = cur.fetchone()[0]

                # トレーニングパラメータを記録
                if params:
                    cur.execute(
                        """
                        INSERT INTO model_training_runs (model_id, params, started_at, status)
                        VALUES (%s, %s, %s, 'completed')
                        """,
                        (model_id, json.dumps(params), datetime.now())
                    )

            conn.commit()
            return model_id

    def log_metrics(
        self,
        model_id: UUID,
        metrics: dict[str, float],
        dataset_type: str | None = None,
    ) -> None:
        """
        モデルメトリクスを記録

        Args:
            model_id: モデルUUID
            metrics: {metric_name: value}、例: {'sharpe_ratio': 1.5, 'ic': 0.04}
            dataset_type: データセットタイプ('train', 'val', 'test', 'backtest', 'live'
        """
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                for metric_name, value in metrics.items():
                    cur.execute(
                        """
                        INSERT INTO model_metrics (model_id, metric_name, value, dataset_type)
                        VALUES (%s, %s, %s, %s)
                        """,
                        (model_id, metric_name, value, dataset_type)
                    )
            conn.commit()

    def log_artifact(
        self,
        model_id: UUID,
        path: str | Path,
        artifact_type: str | None = None,
    ) -> UUID:
        """
        モデルアーティファクトを記録

        Args:
            model_id: モデルUUID
            path: アーティファクトパス(ローカルまたはS3)
            artifact_type: タイプ('weights', 'config', 'scaler'

        Returns:
            アーティファクトUUID
        """
        path = Path(path)
        checksum = None
        size_bytes = None

        if path.exists():
            size_bytes = path.stat().st_size
            # SHA256を計算
            sha256 = hashlib.sha256()
            with open(path, "rb") as f:
                for chunk in iter(lambda: f.read(8192), b""):
                    sha256.update(chunk)
            checksum = sha256.hexdigest()

        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO model_artifacts
                        (model_id, artifact_path, artifact_type, checksum, size_bytes)
                    VALUES (%s, %s, %s, %s, %s)
                    RETURNING artifact_id
                    """,
                    (model_id, str(path), artifact_type, checksum, size_bytes)
                )
                artifact_id = cur.fetchone()[0]
            conn.commit()
            return artifact_id

    def get_model(self, name: str, version: int | None = None) -> ModelInfo | None:
        """モデルを取得(デフォルトで最新バージョン)"""
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                if version is None:
                    cur.execute(
                        """
                        SELECT model_id, name, version, strategy_type, description, created_at
                        FROM models WHERE name = %s
                        ORDER BY version DESC LIMIT 1
                        """,
                        (name,)
                    )
                else:
                    cur.execute(
                        """
                        SELECT model_id, name, version, strategy_type, description, created_at
                        FROM models WHERE name = %s AND version = %s
                        """,
                        (name, version)
                    )

                row = cur.fetchone()
                if row:
                    return ModelInfo(*row)
                return None

    def get_best_model(
        self,
        strategy_type: str,
        metric_name: str,
        dataset_type: str = "test",
        higher_is_better: bool = True,
    ) -> ModelWithMetrics | None:
        """
        戦略タイプの最高パフォーマンスモデルを取得

        Args:
            strategy_type: 戦略タイプ
            metric_name: ソートメトリクス(例: 'sharpe_ratio'
            dataset_type: データセットタイプ
            higher_is_better: 高い値が良いかどうか

        Returns:
            最高モデルとそのメトリクス
        """
        order = "DESC" if higher_is_better else "ASC"

        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    f"""
                    SELECT m.model_id, m.name, m.version, m.strategy_type,
                           m.description, m.created_at, mm.value
                    FROM models m
                    JOIN model_metrics mm ON m.model_id = mm.model_id
                    WHERE m.strategy_type = %s
                      AND mm.metric_name = %s
                      AND mm.dataset_type = %s
                    ORDER BY mm.value {order}
                    LIMIT 1
                    """,
                    (strategy_type, metric_name, dataset_type)
                )

                row = cur.fetchone()
                if not row:
                    return None

                model = ModelInfo(*row[:6])

                # このモデルのすべてのメトリクスを取得
                cur.execute(
                    """
                    SELECT metric_name, value, dataset_type
                    FROM model_metrics
                    WHERE model_id = %s
                    """,
                    (model.model_id,)
                )

                metrics = {
                    f"{r[0]}_{r[2]}": r[1]
                    for r in cur.fetchall()
                }

                return ModelWithMetrics(model=model, metrics=metrics)

使用例

registry = ModelRegistry(dsn="postgres://localhost:5432/trading")

# 新しいモデルを登録
model_id = registry.register_model(
    name="momentum_xgb",
    strategy_type="momentum",
    params={
        "n_estimators": 100,
        "max_depth": 5,
        "learning_rate": 0.1,
        "features": ["ret_5d", "ret_20d", "vol_20d", "rsi_14"],
    },
    description="XGBoost momentum model with RSI features"
)

# バックテストメトリクスを記録
registry.log_metrics(model_id, {
    "sharpe_ratio": 1.65,
    "total_return": 0.28,
    "max_drawdown": 0.12,
    "ic": 0.042,
    "ir": 0.85,
}, dataset_type="backtest")

# テストセットメトリクスを記録
registry.log_metrics(model_id, {
    "sharpe_ratio": 1.35,
    "ic": 0.035,
}, dataset_type="test")

# モデルアーティファクトを保存
registry.log_artifact(model_id, "models/momentum_xgb_v3.pkl", "weights")
registry.log_artifact(model_id, "models/momentum_xgb_v3_config.json", "config")

# 最高のモメンタムモデルを取得
best = registry.get_best_model("momentum", "sharpe_ratio", "test")
if best:
    print(f"Best model: {best.model.name} v{best.model.version}")
    print(f"Test Sharpe: {best.metrics.get('sharpe_ratio_test', 'N/A')}")

4. Drift Monitor

ドリフトの三次元

次元検出メトリクス意味閾値推奨
Data DriftPSI特徴量分布の変化< 0.10 正常、> 0.25 深刻
Prediction DriftIC予測と実際のリターンの相関> 0.02 正常、< 0.01 深刻
Performance DriftRolling Sharpe戦略のリスク調整後リターン> 0.5 正常、< 0 深刻

コアメトリクス計算

import numpy as np
from scipy.stats import spearmanr


def calculate_ic(signals: np.ndarray, returns: np.ndarray) -> float:
    """
    Information Coefficientを計算

    IC = Spearman相関(予測シグナル、実際のリターン)

    解釈:
    - IC &gt; 0.05: 優秀
    - IC 0.02-0.05: 良好
    - IC &lt; 0.02: 要注意
    - IC &lt; 0: モデルに問題がある可能性
    """
    if len(signals) &lt; 2:
        return 0.0

    # NaNを削除
    mask = ~(np.isnan(signals) | np.isnan(returns))
    signals, returns = signals[mask], returns[mask]

    if len(signals) &lt; 2:
        return 0.0

    ic, _ = spearmanr(signals, returns)
    return float(ic) if not np.isnan(ic) else 0.0


def calculate_psi(
    expected: np.ndarray,
    actual: np.ndarray,
    bins: int = 10,
) -> float:
    """
    Population Stability Index (PSI)を計算

    PSI = sum((actual% - expected%) * ln(actual% / expected%))

    解釈:
    - PSI &lt; 0.10: 分布安定
    - PSI 0.10-0.25: 軽度のドリフト、監視
    - PSI &gt; 0.25: 重大なドリフト、対応が必要
    """
    eps = 1e-6

    # ベースライン分布に基づいてビンを作成
    _, bin_edges = np.histogram(expected, bins=bins)

    # 各ビンの割合を計算
    expected_counts, _ = np.histogram(expected, bins=bin_edges)
    actual_counts, _ = np.histogram(actual, bins=bin_edges)

    expected_pct = expected_counts / len(expected) + eps
    actual_pct = actual_counts / len(actual) + eps

    # PSI公式
    psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))

    return float(psi)


def calculate_sharpe(
    returns: np.ndarray,
    periods_per_year: int = 252,
) -> float:
    """
    年率換算シャープレシオを計算

    Sharpe = mean(returns) / std(returns) * sqrt(252)
    """
    returns = returns[~np.isnan(returns)]

    if len(returns) &lt; 2:
        return 0.0

    mean_ret = np.mean(returns)
    std_ret = np.std(returns, ddof=1)

    if std_ret &lt; 1e-10:
        return 0.0

    return (mean_ret / std_ret) * np.sqrt(periods_per_year)

Drift Monitor実装

from dataclasses import dataclass
from datetime import date


@dataclass
class DriftMetrics:
    """日次ドリフトメトリクス"""
    date: date
    strategy_id: str
    ic: float | None = None
    ic_5d_avg: float | None = None
    psi: float | None = None
    sharpe_5d: float | None = None
    sharpe_20d: float | None = None
    ic_alert: bool = False
    psi_alert: bool = False
    sharpe_alert: bool = False


@dataclass
class AlertConfig:
    """アラート閾値設定"""
    ic_warning: float = 0.02
    ic_critical: float = 0.01
    psi_warning: float = 0.10
    psi_critical: float = 0.25
    sharpe_warning: float = 0.5
    sharpe_critical: float = 0.0


class DriftMonitor:
    """
    ドリフト監視サービス

    日次実行、IC、PSI、シャープメトリクスを計算、データベースに保存、アラートをトリガー。
    """

    def __init__(self, dsn: str, strategy_id: str = "default"):
        self.dsn = dsn
        self.strategy_id = strategy_id
        self.config = AlertConfig()

    def calculate_metrics(self, target_date: date) -> DriftMetrics:
        """指定された日付のドリフトメトリクスを計算"""
        metrics = DriftMetrics(date=target_date, strategy_id=self.strategy_id)

        # シグナルとリターンを取得
        signals, returns = self._get_signals_and_returns(target_date)
        if len(signals) &gt; 0:
            metrics.ic = calculate_ic(signals, returns)

        # シャープを計算するための過去リターンを取得
        daily_returns = self._get_daily_returns(lookback_days=60)
        if len(daily_returns) &gt;= 5:
            metrics.sharpe_5d = calculate_sharpe(daily_returns[-5:])
        if len(daily_returns) &gt;= 20:
            metrics.sharpe_20d = calculate_sharpe(daily_returns[-20:])

        # アラートをチェック
        if metrics.ic is not None:
            metrics.ic_alert = metrics.ic &lt; self.config.ic_critical
        if metrics.psi is not None:
            metrics.psi_alert = metrics.psi &gt; self.config.psi_critical
        if metrics.sharpe_20d is not None:
            metrics.sharpe_alert = metrics.sharpe_20d &lt; self.config.sharpe_critical

        return metrics

    def save_metrics(self, metrics: DriftMetrics) -> None:
        """メトリクスをデータベースに保存"""
        sql = """
            INSERT INTO drift_metrics (
                date, strategy_id, ic, ic_5d_avg, psi, sharpe_5d, sharpe_20d,
                ic_alert, psi_alert, sharpe_alert
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (date, strategy_id) DO UPDATE SET
                ic = EXCLUDED.ic,
                psi = EXCLUDED.psi,
                sharpe_5d = EXCLUDED.sharpe_5d,
                sharpe_20d = EXCLUDED.sharpe_20d,
                ic_alert = EXCLUDED.ic_alert,
                psi_alert = EXCLUDED.psi_alert,
                sharpe_alert = EXCLUDED.sharpe_alert
        """
        with self._get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(sql, [
                    metrics.date, metrics.strategy_id,
                    metrics.ic, metrics.ic_5d_avg, metrics.psi,
                    metrics.sharpe_5d, metrics.sharpe_20d,
                    metrics.ic_alert, metrics.psi_alert, metrics.sharpe_alert,
                ])
            conn.commit()

    def run_daily(self, target_date: date | None = None) -> DriftMetrics:
        """日次ドリフト監視ジョブ"""
        if target_date is None:
            target_date = date.today()

        print(f"Running drift monitoring for {target_date}")

        metrics = self.calculate_metrics(target_date)
        self.save_metrics(metrics)

        # アラートを出力
        if metrics.ic_alert:
            print(f"[ALERT] IC = {metrics.ic:.4f} below threshold {self.config.ic_critical}")
        if metrics.psi_alert:
            print(f"[ALERT] PSI = {metrics.psi:.4f} above threshold {self.config.psi_critical}")
        if metrics.sharpe_alert:
            print(f"[ALERT] Sharpe = {metrics.sharpe_20d:.4f} below threshold {self.config.sharpe_critical}")

        return metrics

アラート対応マトリクス

アラートタイプ深刻度推奨アクション
IC < 0.02 が5日連続警告特徴量計算が正しいか確認
IC < 0.01深刻ポジションを50%削減、モデル診断開始
IC < 0 が3日連続重大戦略を一時停止、手動レビュー
PSI > 0.10警告後続の変化を監視
PSI > 0.25深刻再トレーニングプロセスをトリガー
シャープ < 0.5 が10日連続警告市場状況を確認
シャープ < 0 が5日連続深刻ポジション削減、再トレーニング準備

5. 統合: 研究から本番へ

完全ワークフロー

+----------------------------------------------------------------------+
|                          研究フェーズ                                  |
+----------------------------------------------------------------------+
|  1. 特徴量開発                                                         |
|     +-> Feature Storeに書き込む(正しいavailability_lagを設定)        |
|                                                                       |
|  2. トレーニングセット構築                                             |
|     +-> FeatureStore.get_point_in_time()                              |
|     +-> Parquetエクスポート(不変スナップショット)                     |
|                                                                       |
|  3. モデルトレーニング                                                 |
|     +-> パラメータ、コードバージョンを記録                             |
|     +-> Model Registryに登録                                          |
|                                                                       |
|  4. バックテスト評価                                                   |
|     +-> log_metrics(dataset_type='backtest')                          |
+----------------------------------------------------------------------+
                               |
                               v
+----------------------------------------------------------------------+
|                          デプロイフェーズ                              |
+----------------------------------------------------------------------+
|  5. モデル選択                                                         |
|     +-> get_best_model(strategy_type, metric, dataset_type='test')    |
|                                                                       |
|  6. モデルロード                                                       |
|     +-> artifact_pathから重みをロード                                 |
|     +-> チェックサムを検証                                             |
+----------------------------------------------------------------------+
                               |
                               v
+----------------------------------------------------------------------+
|                          ランタイムフェーズ                            |
+----------------------------------------------------------------------+
|  7. リアルタイム推論                                                   |
|     +-> FeatureStore.get_latest()で特徴量を取得                        |
|     +-> モデル予測                                                     |
|     +-> シグナル出力                                                   |
|                                                                       |
|  8. 日次監視                                                           |
|     +-> Drift MonitorがIC/PSI/シャープを計算                          |
|     +-> アラートをトリガー                                             |
|                                                                       |
|  9. 再トレーニング(必要に応じて)                                     |
|     +-> ステップ2に戻る                                                |
+----------------------------------------------------------------------+

再現性チェックリスト

チェック項目実装方法検証方法
コードバージョンgit SHAを記録producer_versionフィールド
特徴量バージョンfeature_versionカラムクエリでバージョン指定
トレーニングデータParquetスナップショット + フィンガープリント再トレーニングで同じ結果
モデルパラメータmodel_training_runs.paramsJSON保存
モデル重みmodel_artifacts.checksumSHA256検証
評価メトリクスmodel_metricsテーブル時間で追跡

日次運用スクリプト例

from datetime import date, datetime

def daily_mlops_job(
    feature_store: FeatureStore,
    model_registry: ModelRegistry,
    drift_monitor: DriftMonitor,
    strategy_id: str,
):
    """日次MLOpsジョブ"""
    today = date.today()
    print(f"=== MLOps Daily Job: {today} ===")

    # 1. 特徴量ヘルスチェック
    print("\n[1] Feature Health Check")
    latest = feature_store.get_latest("AAPL.NASDAQ")
    for name, fv in latest.items():
        age_hours = (datetime.now() - fv.event_time).total_seconds() / 3600
        if age_hours &gt; 24:
            print(f"  WARNING: {name} is {age_hours:.1f} hours old")
        else:
            print(f"  OK: {name} updated {age_hours:.1f} hours ago")

    # 2. モデルステータスチェック
    print("\n[2] Model Status Check")
    current_model = model_registry.get_model("momentum_xgb")
    if current_model:
        print(f"  Current: {current_model.name} v{current_model.version}")
        print(f"  Created: {current_model.created_at}")

    # 3. ドリフト監視
    print("\n[3] Drift Monitoring")
    drift_metrics = drift_monitor.run_daily(today)
    print(f"  IC: {drift_metrics.ic}")
    print(f"  Sharpe (20d): {drift_metrics.sharpe_20d}")

    # 4. 決定
    if drift_metrics.ic_alert or drift_metrics.sharpe_alert:
        print("\n[ACTION REQUIRED] Consider retraining or reducing position size")
    else:
        print("\n[OK] All metrics within normal range")


# スケジュールジョブ(例: cron)
# 0 6 * * * python -c "from mlops import daily_mlops_job; daily_mlops_job(...)"

さらなる読書

この章を引用する
Zhang, Wayland (2026). 背景知識: MLOpsインフラストラクチャ. In AIクオンツ取引:ゼロからイチへ. https://waylandz.com/quant-book-ja/MLOps-Infrastructure
@incollection{zhang2026quant_MLOps_Infrastructure,
  author = {Zhang, Wayland},
  title = {背景知識: MLOpsインフラストラクチャ},
  booktitle = {AIクオンツ取引:ゼロからイチへ},
  year = {2026},
  url = {https://waylandz.com/quant-book-ja/MLOps-Infrastructure}
}