第 06 课:数据工程的残酷现实

数据问题杀死的策略比模型问题多得多。


一个令人崩溃的周末

周五晚上 11 点,小王部署了他精心开发的美股量化策略。回测年化 80%,夏普比率 2.5,一切看起来完美。

周一早上 9 点,他被告警短信吵醒:"策略异常,无法获取数据"。

他爬起来检查,发现 Alpha Vantage API 返回了 429 Too Many Requests。他的程序每分钟请求 10 次,超过了免费账户的 API 限流(每分钟 5 次)。

修复后,策略终于开始运行。

周一下午 2 点,又一条告警:"策略仓位异常"。

这次是数据问题:API 返回了一个缺失的分钟 K 线,导致 MACD 计算出 NaN,程序把 NaN 当成卖出信号,清空了所有仓位。

修复后,他决定做一个全面的数据检查。然后他发现了更多问题:

  • 某些时间戳是 UTC,某些是 ET(美东时间)
  • 有几根 K 线的成交量是 0(盘前盘后时段)
  • 历史数据和实时数据的字段名不一样(close vs Close

整整一周,他没有写一行策略代码,全在处理数据问题。

这就是数据工程的残酷现实。


6.1 数据源选择

免费数据源

数据源覆盖市场优点缺点
Yahoo Finance (yfinance)美股、ETF免费、历史长近年有不稳定反馈(接口变动/限流/缺失数据等),需要容错
Alpaca Markets美股IEX 数据免费,API 友好免费版数据覆盖有限
Binance API加密货币实时、免费只有 Binance 数据
Alpha Vantage股票、外汇、加密免费层可用免费层请求配额通常很低(限流会调整,以官网为准)
CCXT加密货币统一接口依赖各交易所 API

注意:数据服务商可能在产品、接口、配额、定价上发生调整甚至停服/迁移。这提醒我们数据源也有"退市风险",建议做好备份方案。

付费数据源(2024-2025 参考价格)

数据源覆盖市场月费特点
Bloomberg全球$2,500+机构标配,数据质量最高
LSEG Workspace(原 Refinitiv)全球按需报价LSEG 旗下,原 Reuters 数据
Polygon.io(现已更名为 Massive)美股$99+实时+历史,开发者友好
Nasdaq Data Link(原 Quandl)另类数据按数据集定价卫星、消费数据等

如何选择?

个人学习/研究:
     免费数据源 + 容忍数据问题

小型基金(< $1M AUM):
     付费数据源(基础套餐)+ 自建数据校验

机构级(> $10M AUM):
     Bloomberg/Refinitiv + 多数据源交叉验证

核心原则数据质量决定策略上限。垃圾进,垃圾出。

数据源的演进路径

阶段推荐数据源说明
学习/原型Yahoo Finance (yfinance)免费日线,够用
小规模实盘Polygon.io / Alpaca实时行情,API 友好
机构级生产Databento / 交易所直连L1/L2 实时数据,低延迟

本课的练习使用 Yahoo Finance 完全够用。当你准备进入实盘交易时,需要升级到专业级数据源——详见第 19 课。


6.2 API 的痛苦现实

你以为的代码 vs 实际的代码

你以为

data = api.get_history("AAPL", days=365)

实际

💻 伪代码参考(需替换为具体 SDK,如 yfinance / alpaca / ccxt)
# 注意:以下为伪代码,展示健壮数据获取的设计模式
# api, log, EmptyDataError, RateLimitError 需替换为你使用的具体库

import time
from typing import Optional
import pandas as pd

def get_history_robust(
    symbol: str,
    days: int,
    max_retries: int = 5,
    backoff_base: float = 2.0
) -> Optional[pd.DataFrame]:
    """
    健壮的数据获取函数

    处理的问题:
    - Rate Limiting(指数退避重试)
    - 空数据(验证+报警)
    - 连接错误(自动重连)
    - 超时(设置合理超时)
    """
    for attempt in range(max_retries):
        try:
            # 设置超时(替换为你的 API 调用)
            data = api.get_history(
                symbol,
                days=days,
                timeout=30
            )

            # 验证数据(通用逻辑)
            if data is None:
                raise EmptyDataError(f"No data for {symbol}")

            if len(data) == 0:
                raise EmptyDataError(f"Empty data for {symbol}")

            if len(data) < days * 0.9:  # 数据量不足90%
                log.warning(f"Data incomplete: got {len(data)}, expected ~{days}")

            # 检查异常值
            if data['close'].isnull().any():
                log.warning("NaN values detected, filling...")
                data['close'] = data['close'].ffill()

            return data

        except RateLimitError:
            wait_time = backoff_base ** attempt
            log.info(f"Rate limited, waiting {wait_time}s...")
            time.sleep(wait_time)

        except ConnectionError:
            log.warning("Connection lost, reconnecting...")
            api.reconnect()
            time.sleep(1)

        except TimeoutError:
            log.warning("Request timeout, retrying...")

        except Exception as e:
            log.error(f"Unexpected error: {e}")
            if attempt == max_retries - 1:
                raise

    return None

常见 API 问题

问题表现解决方案
Rate Limiting429 错误指数退避重试 + 请求队列
数据缺失空响应或部分数据多数据源备份 + 缺失检测
数据延迟数据时间戳落后记录延迟 + 调整策略
格式不一致字段名变化统一数据适配层
连接不稳定断连、超时心跳检测 + 自动重连

Rate Limiting 的真实成本

假设你需要获取 100 个标的的分钟 K 线:

API 限制完成时间
1 请求/秒100 秒
10 请求/秒10 秒
无限制< 1 秒

高频策略需要高 API 配额,这是付费数据源的核心价值。


6.3 时间对齐问题

时区混乱

不同数据源使用不同时区:

数据源默认时区
BinanceUTC
Yahoo Finance交易所当地时间
A股数据UTC+8
美股数据ET (Eastern Time)

生产提示:标准化数据模型 不同市场和数据源使用不同的代码格式(600519.SHAAPL0700.HKBTC/USDT)。生产系统需要一个统一的规范化层(Normalizer),将外部格式转换为内部标准格式(如 SYMBOL.EXCHANGEAAPL.NASDAQ0700.HKEX)。这是实现多市场交易的前提条件。同时注意符号解析的常见陷阱:退市股票静默失败、公司行为导致代码变更、多段式代码(如 BRK.B)等。详见第 19 课。

如果不处理

  • 9:30 A股数据 + 9:30 美股数据 → 实际相差 12 小时
  • 策略用"同一时刻"的数据,实际是不同时刻

解决方案全部转换为 UTC,存储和计算都用 UTC。

import pandas as pd

def normalize_timezone(df, source_tz='UTC'):
    """统一转换为 UTC"""
    if df.index.tz is None:
        df.index = df.index.tz_localize(source_tz)
    df.index = df.index.tz_convert('UTC')
    return df

Tick 到 K 线的聚合

从 Tick 数据聚合 K 线,需要注意:

问题说明
开盘价该时间段第一笔成交价
收盘价该时间段最后一笔成交价
最高价该时间段最高成交价
最低价该时间段最低成交价
成交量该时间段所有成交量之和

陷阱:如果某分钟没有成交,怎么办?

方案 1:用前一分钟的收盘价填充 OHLC
方案 2:标记为缺失,后续处理
方案 3:从聚合中排除该时间点

不同方案会导致不同的指标计算结果。必须统一处理规则

跨资产数据对齐

股票、期货、外汇的交易时间不同:

资产交易时间
A股9:30-11:30, 13:00-15:00
美股9:30-16:00 ET
外汇7×24(接近)
加密货币7×24

如果策略需要跨资产信号

  • A股收盘时,美股还没开盘
  • 用 A 股收盘价 + 美股开盘价?会有时差
  • 需要仔细设计"同步点"

6.4 数据质量问题

异常值检测与处理

异常类型识别方法处理方法
价格跳变涨跌幅 > 20%检查是否真实(拆股?)
成交量异常0 或极大值检查交易所状态
缺失值NaN前值填充或删除
重复数据时间戳重复保留第一条或最后一条
def detect_anomalies(df, price_col='close', volume_col='volume'):
    """检测数据异常"""
    issues = []

    # 价格跳变
    returns = df[price_col].pct_change()
    jumps = returns.abs() > 0.20
    if jumps.any():
        issues.append(f"Price jumps detected: {jumps.sum()} times")

    # 成交量为 0
    zero_volume = df[volume_col] == 0
    if zero_volume.any():
        issues.append(f"Zero volume: {zero_volume.sum()} bars")

    # 缺失值
    nulls = df.isnull().sum()
    if nulls.any():
        issues.append(f"Null values: {nulls.to_dict()}")

    # 时间戳重复
    duplicates = df.index.duplicated()
    if duplicates.any():
        issues.append(f"Duplicate timestamps: {duplicates.sum()}")

    return issues

股票分红、拆股调整

股票的历史价格需要"复权":

事件实际变化未复权数据复权后数据
10送10股数翻倍,价格减半前后价格断崖平滑连续
分红扣除分红除息日跳空调整历史价格

不复权的后果

  • 回测时,策略会在除权日产生虚假信号
  • 计算收益率会出现极端值

期货换月处理

期货合约有到期日,需要"换月":

2024年1月:持有 IF2401(1月合约)
2024年1月到期前:切换到 IF2402(2月合约)

问题:IF2401 收盘价 4000,IF2402 收盘价 4050
      不是真实涨跌,而是合约不同

处理方法

  • 拼接时计算价差,调整历史价格
  • 或使用"主力合约连续"数据(数据商已处理)

6.5 幸存者偏差 (Survivorship Bias)

什么是幸存者偏差?

如果你只用"现在还存在的股票"做回测:

2010 年的股票池:1000 
2024 年还存在的:800 
退市/破产的:200 

你的回测只用了 800 只,忽略了那 200 只失败的公司
 回测收益被高估

幸存者偏差有多严重?

研究结论
学术研究年化收益高估 1-3%
价值投资策略高估更严重(便宜股更容易退市)
小盘股策略偏差最大

如何避免?

方法实现难度效果
使用含退市股票的数据库最准确
使用历史指数成分股较准确
在结论中说明偏差至少诚实

付费数据源通常提供"无幸存者偏差"的数据集,这是另一个付费价值。


6.6 另类数据简介

什么是另类数据?

传统数据:价格、成交量、财务报表 另类数据:传统数据之外的一切

类型数据源应用场景
卫星数据停车场车辆数、油罐储量预测零售业绩、原油库存
文本数据新闻、社交媒体、财报情感分析、事件驱动
信用卡数据消费支出统计预测公司营收
网络流量网站访问量预测电商业绩
GPS 数据手机位置人流分析

另类数据的挑战

挑战说明
噪声大信号/噪声比远低于价格数据
合规风险隐私问题、数据来源合法性
Alpha 衰减快一旦被广泛使用,优势消失
成本高数据本身贵 + 处理成本高

多智能体视角

数据管道架构

6.7 数据管道设计原则

三大原则

原则说明
不可变性原始数据永不修改,处理后生成新文件
可追溯性每条数据记录来源、处理时间、处理版本
冗余备份至少两个数据源交叉验证

数据管道架构

数据采集层
    
    ├─→ 原始数据存储(不可变)
             
             
    ├─→ 数据清洗层
         - 异常值处理
         - 缺失值填充
         - 时区标准化
             
             
    ├─→ 特征工程层
         - 技术指标计算
         - 标签生成
             
             
    └─→ 就绪数据存储  策略使用

监控与告警

监控项阈值建议告警动作
数据延迟> 1 分钟警告
数据缺失率> 1%警告
异常值比例> 0.1%检查
API 错误率> 5%暂停策略

💻 代码实现(可选)

数据质量检查框架

import pandas as pd
from dataclasses import dataclass
from typing import List

@dataclass
class DataQualityReport:
    symbol: str
    start_date: str
    end_date: str
    total_rows: int
    missing_rows: int
    null_values: dict
    anomalies: List[str]
    is_valid: bool

def check_data_quality(df: pd.DataFrame, symbol: str) -> DataQualityReport:
    """全面的数据质量检查"""

    anomalies = []

    # 检查时间连续性
    # 注意:这里以“日频数据”为例(按工作日粗略估计)。
    # 分钟/Tick 数据需要结合交易所交易时段、节假日、午休等生成 expected_index。
    idx = pd.DatetimeIndex(df.index)
    expected_index = pd.bdate_range(start=idx.min().normalize(), end=idx.max().normalize())
    expected_rows = len(expected_index)
    actual_rows = len(df)
    missing_rows = expected_rows - actual_rows

    if missing_rows > expected_rows * 0.05:
        anomalies.append(f"High missing rate: {missing_rows/expected_rows:.1%}")

    # 检查 NULL 
    null_counts = df.isnull().sum().to_dict()

    # 检查价格跳变
    if 'close' in df.columns:
        returns = df['close'].pct_change()
        extreme_moves = (returns.abs() > 0.2).sum()
        if extreme_moves > 0:
            anomalies.append(f"Extreme price moves: {extreme_moves}")

    # 检查成交量
    if 'volume' in df.columns:
        zero_volume = (df['volume'] == 0).sum()
        if zero_volume > 0:
            anomalies.append(f"Zero volume periods: {zero_volume}")

    # 检查时间戳重复
    duplicates = df.index.duplicated().sum()
    if duplicates > 0:
        anomalies.append(f"Duplicate timestamps: {duplicates}")

    return DataQualityReport(
        symbol=symbol,
        start_date=str(df.index[0]),
        end_date=str(df.index[-1]),
        total_rows=actual_rows,
        missing_rows=missing_rows,
        null_values=null_counts,
        anomalies=anomalies,
        is_valid=len(anomalies) == 0
    )

本课交付物

完成本课后,你将获得:

  1. 对数据问题的深刻认知 - 知道数据工程才是量化的主战场
  2. API 使用的最佳实践 - 健壮的数据获取代码框架
  3. 数据质量检查能力 - 能识别和处理常见数据问题
  4. 数据管道设计思路 - 理解生产级数据系统的架构

本课要点回顾

  • 理解免费数据源和付费数据源的权衡
  • 掌握处理 API 限流、数据缺失等问题的方法
  • 认识时区混乱、幸存者偏差等隐蔽问题
  • 了解另类数据的价值和挑战
  • 理解数据管道的设计原则

延伸阅读

  • 背景知识:交易所与订单簿机制 - Tick 数据的来源
  • 背景知识:加密货币交易特点 - 7×24 数据的挑战

下一课预告

第 07 课:回测系统的陷阱

数据没问题,回测可能还是骗你。Look-Ahead Bias、过拟合、交易成本忽略……这些陷阱让无数看起来完美的策略在实盘中惨败。下一课揭开回测的真相。

Cite this chapter
Zhang, Wayland (2026). 第06课:数据工程的残酷现实. In AI Quantitative Trading: From Zero to One. https://waylandz.com/quant-book/第06课:数据工程的残酷现实
@incollection{zhang2026quant_第06课:数据工程的残酷现实,
  author = {Zhang, Wayland},
  title = {第06课:数据工程的残酷现实},
  booktitle = {AI Quantitative Trading: From Zero to One},
  year = {2026},
  url = {https://waylandz.com/quant-book/第06课:数据工程的残酷现实}
}