第 06 课:数据工程的残酷现实
数据问题杀死的策略比模型问题多得多。
一个令人崩溃的周末
周五晚上 11 点,小王部署了他精心开发的美股量化策略。回测年化 80%,夏普比率 2.5,一切看起来完美。
周一早上 9 点,他被告警短信吵醒:"策略异常,无法获取数据"。
他爬起来检查,发现 Alpha Vantage API 返回了 429 Too Many Requests。他的程序每分钟请求 10 次,超过了免费账户的 API 限流(每分钟 5 次)。
修复后,策略终于开始运行。
周一下午 2 点,又一条告警:"策略仓位异常"。
这次是数据问题:API 返回了一个缺失的分钟 K 线,导致 MACD 计算出 NaN,程序把 NaN 当成卖出信号,清空了所有仓位。
修复后,他决定做一个全面的数据检查。然后他发现了更多问题:
- 某些时间戳是 UTC,某些是 ET(美东时间)
- 有几根 K 线的成交量是 0(盘前盘后时段)
- 历史数据和实时数据的字段名不一样(
closevsClose)
整整一周,他没有写一行策略代码,全在处理数据问题。
这就是数据工程的残酷现实。
6.1 数据源选择
免费数据源
| 数据源 | 覆盖市场 | 优点 | 缺点 |
|---|---|---|---|
| Yahoo Finance (yfinance) | 美股、ETF | 免费、历史长 | 近年有不稳定反馈(接口变动/限流/缺失数据等),需要容错 |
| Alpaca Markets | 美股 | IEX 数据免费,API 友好 | 免费版数据覆盖有限 |
| Binance API | 加密货币 | 实时、免费 | 只有 Binance 数据 |
| Alpha Vantage | 股票、外汇、加密 | 免费层可用 | 免费层请求配额通常很低(限流会调整,以官网为准) |
| CCXT | 加密货币 | 统一接口 | 依赖各交易所 API |
注意:数据服务商可能在产品、接口、配额、定价上发生调整甚至停服/迁移。这提醒我们数据源也有"退市风险",建议做好备份方案。
付费数据源(2024-2025 参考价格)
| 数据源 | 覆盖市场 | 月费 | 特点 |
|---|---|---|---|
| Bloomberg | 全球 | $2,500+ | 机构标配,数据质量最高 |
| LSEG Workspace(原 Refinitiv) | 全球 | 按需报价 | LSEG 旗下,原 Reuters 数据 |
| Polygon.io(现已更名为 Massive) | 美股 | $99+ | 实时+历史,开发者友好 |
| Nasdaq Data Link(原 Quandl) | 另类数据 | 按数据集定价 | 卫星、消费数据等 |
如何选择?
个人学习/研究:
→ 免费数据源 + 容忍数据问题
小型基金(< $1M AUM):
→ 付费数据源(基础套餐)+ 自建数据校验
机构级(> $10M AUM):
→ Bloomberg/Refinitiv + 多数据源交叉验证
核心原则:数据质量决定策略上限。垃圾进,垃圾出。
数据源的演进路径
阶段 推荐数据源 说明 学习/原型 Yahoo Finance (yfinance) 免费日线,够用 小规模实盘 Polygon.io / Alpaca 实时行情,API 友好 机构级生产 Databento / 交易所直连 L1/L2 实时数据,低延迟 本课的练习使用 Yahoo Finance 完全够用。当你准备进入实盘交易时,需要升级到专业级数据源——详见第 19 课。
6.2 API 的痛苦现实
你以为的代码 vs 实际的代码
你以为:
data = api.get_history("AAPL", days=365)
实际:
💻 伪代码参考(需替换为具体 SDK,如 yfinance / alpaca / ccxt)
# 注意:以下为伪代码,展示健壮数据获取的设计模式
# api, log, EmptyDataError, RateLimitError 需替换为你使用的具体库
import time
from typing import Optional
import pandas as pd
def get_history_robust(
symbol: str,
days: int,
max_retries: int = 5,
backoff_base: float = 2.0
) -> Optional[pd.DataFrame]:
"""
健壮的数据获取函数
处理的问题:
- Rate Limiting(指数退避重试)
- 空数据(验证+报警)
- 连接错误(自动重连)
- 超时(设置合理超时)
"""
for attempt in range(max_retries):
try:
# 设置超时(替换为你的 API 调用)
data = api.get_history(
symbol,
days=days,
timeout=30
)
# 验证数据(通用逻辑)
if data is None:
raise EmptyDataError(f"No data for {symbol}")
if len(data) == 0:
raise EmptyDataError(f"Empty data for {symbol}")
if len(data) < days * 0.9: # 数据量不足90%
log.warning(f"Data incomplete: got {len(data)}, expected ~{days}")
# 检查异常值
if data['close'].isnull().any():
log.warning("NaN values detected, filling...")
data['close'] = data['close'].ffill()
return data
except RateLimitError:
wait_time = backoff_base ** attempt
log.info(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
except ConnectionError:
log.warning("Connection lost, reconnecting...")
api.reconnect()
time.sleep(1)
except TimeoutError:
log.warning("Request timeout, retrying...")
except Exception as e:
log.error(f"Unexpected error: {e}")
if attempt == max_retries - 1:
raise
return None常见 API 问题
| 问题 | 表现 | 解决方案 |
|---|---|---|
| Rate Limiting | 429 错误 | 指数退避重试 + 请求队列 |
| 数据缺失 | 空响应或部分数据 | 多数据源备份 + 缺失检测 |
| 数据延迟 | 数据时间戳落后 | 记录延迟 + 调整策略 |
| 格式不一致 | 字段名变化 | 统一数据适配层 |
| 连接不稳定 | 断连、超时 | 心跳检测 + 自动重连 |
Rate Limiting 的真实成本
假设你需要获取 100 个标的的分钟 K 线:
| API 限制 | 完成时间 |
|---|---|
| 1 请求/秒 | 100 秒 |
| 10 请求/秒 | 10 秒 |
| 无限制 | < 1 秒 |
高频策略需要高 API 配额,这是付费数据源的核心价值。
6.3 时间对齐问题
时区混乱
不同数据源使用不同时区:
| 数据源 | 默认时区 |
|---|---|
| Binance | UTC |
| Yahoo Finance | 交易所当地时间 |
| A股数据 | UTC+8 |
| 美股数据 | ET (Eastern Time) |
生产提示:标准化数据模型 不同市场和数据源使用不同的代码格式(
600519.SH、AAPL、0700.HK、BTC/USDT)。生产系统需要一个统一的规范化层(Normalizer),将外部格式转换为内部标准格式(如SYMBOL.EXCHANGE:AAPL.NASDAQ、0700.HKEX)。这是实现多市场交易的前提条件。同时注意符号解析的常见陷阱:退市股票静默失败、公司行为导致代码变更、多段式代码(如 BRK.B)等。详见第 19 课。
如果不处理:
- 9:30 A股数据 + 9:30 美股数据 → 实际相差 12 小时
- 策略用"同一时刻"的数据,实际是不同时刻
解决方案:全部转换为 UTC,存储和计算都用 UTC。
import pandas as pd
def normalize_timezone(df, source_tz='UTC'):
"""统一转换为 UTC"""
if df.index.tz is None:
df.index = df.index.tz_localize(source_tz)
df.index = df.index.tz_convert('UTC')
return df
Tick 到 K 线的聚合
从 Tick 数据聚合 K 线,需要注意:
| 问题 | 说明 |
|---|---|
| 开盘价 | 该时间段第一笔成交价 |
| 收盘价 | 该时间段最后一笔成交价 |
| 最高价 | 该时间段最高成交价 |
| 最低价 | 该时间段最低成交价 |
| 成交量 | 该时间段所有成交量之和 |
陷阱:如果某分钟没有成交,怎么办?
方案 1:用前一分钟的收盘价填充 OHLC
方案 2:标记为缺失,后续处理
方案 3:从聚合中排除该时间点
不同方案会导致不同的指标计算结果。必须统一处理规则。
跨资产数据对齐
股票、期货、外汇的交易时间不同:
| 资产 | 交易时间 |
|---|---|
| A股 | 9:30-11:30, 13:00-15:00 |
| 美股 | 9:30-16:00 ET |
| 外汇 | 7×24(接近) |
| 加密货币 | 7×24 |
如果策略需要跨资产信号:
- A股收盘时,美股还没开盘
- 用 A 股收盘价 + 美股开盘价?会有时差
- 需要仔细设计"同步点"
6.4 数据质量问题
异常值检测与处理
| 异常类型 | 识别方法 | 处理方法 |
|---|---|---|
| 价格跳变 | 涨跌幅 > 20% | 检查是否真实(拆股?) |
| 成交量异常 | 0 或极大值 | 检查交易所状态 |
| 缺失值 | NaN | 前值填充或删除 |
| 重复数据 | 时间戳重复 | 保留第一条或最后一条 |
def detect_anomalies(df, price_col='close', volume_col='volume'):
"""检测数据异常"""
issues = []
# 价格跳变
returns = df[price_col].pct_change()
jumps = returns.abs() > 0.20
if jumps.any():
issues.append(f"Price jumps detected: {jumps.sum()} times")
# 成交量为 0
zero_volume = df[volume_col] == 0
if zero_volume.any():
issues.append(f"Zero volume: {zero_volume.sum()} bars")
# 缺失值
nulls = df.isnull().sum()
if nulls.any():
issues.append(f"Null values: {nulls.to_dict()}")
# 时间戳重复
duplicates = df.index.duplicated()
if duplicates.any():
issues.append(f"Duplicate timestamps: {duplicates.sum()}")
return issues
股票分红、拆股调整
股票的历史价格需要"复权":
| 事件 | 实际变化 | 未复权数据 | 复权后数据 |
|---|---|---|---|
| 10送10 | 股数翻倍,价格减半 | 前后价格断崖 | 平滑连续 |
| 分红 | 扣除分红 | 除息日跳空 | 调整历史价格 |
不复权的后果:
- 回测时,策略会在除权日产生虚假信号
- 计算收益率会出现极端值
期货换月处理
期货合约有到期日,需要"换月":
2024年1月:持有 IF2401(1月合约)
2024年1月到期前:切换到 IF2402(2月合约)
问题:IF2401 收盘价 4000,IF2402 收盘价 4050
不是真实涨跌,而是合约不同
处理方法:
- 拼接时计算价差,调整历史价格
- 或使用"主力合约连续"数据(数据商已处理)
6.5 幸存者偏差 (Survivorship Bias)
什么是幸存者偏差?
如果你只用"现在还存在的股票"做回测:
2010 年的股票池:1000 只
2024 年还存在的:800 只
退市/破产的:200 只
你的回测只用了 800 只,忽略了那 200 只失败的公司
→ 回测收益被高估
幸存者偏差有多严重?
| 研究 | 结论 |
|---|---|
| 学术研究 | 年化收益高估 1-3% |
| 价值投资策略 | 高估更严重(便宜股更容易退市) |
| 小盘股策略 | 偏差最大 |
如何避免?
| 方法 | 实现难度 | 效果 |
|---|---|---|
| 使用含退市股票的数据库 | 高 | 最准确 |
| 使用历史指数成分股 | 中 | 较准确 |
| 在结论中说明偏差 | 低 | 至少诚实 |
付费数据源通常提供"无幸存者偏差"的数据集,这是另一个付费价值。
6.6 另类数据简介
什么是另类数据?
传统数据:价格、成交量、财务报表 另类数据:传统数据之外的一切
| 类型 | 数据源 | 应用场景 |
|---|---|---|
| 卫星数据 | 停车场车辆数、油罐储量 | 预测零售业绩、原油库存 |
| 文本数据 | 新闻、社交媒体、财报 | 情感分析、事件驱动 |
| 信用卡数据 | 消费支出统计 | 预测公司营收 |
| 网络流量 | 网站访问量 | 预测电商业绩 |
| GPS 数据 | 手机位置 | 人流分析 |
另类数据的挑战
| 挑战 | 说明 |
|---|---|
| 噪声大 | 信号/噪声比远低于价格数据 |
| 合规风险 | 隐私问题、数据来源合法性 |
| Alpha 衰减快 | 一旦被广泛使用,优势消失 |
| 成本高 | 数据本身贵 + 处理成本高 |
多智能体视角
6.7 数据管道设计原则
三大原则
| 原则 | 说明 |
|---|---|
| 不可变性 | 原始数据永不修改,处理后生成新文件 |
| 可追溯性 | 每条数据记录来源、处理时间、处理版本 |
| 冗余备份 | 至少两个数据源交叉验证 |
数据管道架构
数据采集层
│
├─→ 原始数据存储(不可变)
│ │
│ ▼
├─→ 数据清洗层
│ - 异常值处理
│ - 缺失值填充
│ - 时区标准化
│ │
│ ▼
├─→ 特征工程层
│ - 技术指标计算
│ - 标签生成
│ │
│ ▼
└─→ 就绪数据存储 → 策略使用
监控与告警
| 监控项 | 阈值建议 | 告警动作 |
|---|---|---|
| 数据延迟 | > 1 分钟 | 警告 |
| 数据缺失率 | > 1% | 警告 |
| 异常值比例 | > 0.1% | 检查 |
| API 错误率 | > 5% | 暂停策略 |
💻 代码实现(可选)
数据质量检查框架
import pandas as pd
from dataclasses import dataclass
from typing import List
@dataclass
class DataQualityReport:
symbol: str
start_date: str
end_date: str
total_rows: int
missing_rows: int
null_values: dict
anomalies: List[str]
is_valid: bool
def check_data_quality(df: pd.DataFrame, symbol: str) -> DataQualityReport:
"""全面的数据质量检查"""
anomalies = []
# 检查时间连续性
# 注意:这里以“日频数据”为例(按工作日粗略估计)。
# 分钟/Tick 数据需要结合交易所交易时段、节假日、午休等生成 expected_index。
idx = pd.DatetimeIndex(df.index)
expected_index = pd.bdate_range(start=idx.min().normalize(), end=idx.max().normalize())
expected_rows = len(expected_index)
actual_rows = len(df)
missing_rows = expected_rows - actual_rows
if missing_rows > expected_rows * 0.05:
anomalies.append(f"High missing rate: {missing_rows/expected_rows:.1%}")
# 检查 NULL 值
null_counts = df.isnull().sum().to_dict()
# 检查价格跳变
if 'close' in df.columns:
returns = df['close'].pct_change()
extreme_moves = (returns.abs() > 0.2).sum()
if extreme_moves > 0:
anomalies.append(f"Extreme price moves: {extreme_moves}")
# 检查成交量
if 'volume' in df.columns:
zero_volume = (df['volume'] == 0).sum()
if zero_volume > 0:
anomalies.append(f"Zero volume periods: {zero_volume}")
# 检查时间戳重复
duplicates = df.index.duplicated().sum()
if duplicates > 0:
anomalies.append(f"Duplicate timestamps: {duplicates}")
return DataQualityReport(
symbol=symbol,
start_date=str(df.index[0]),
end_date=str(df.index[-1]),
total_rows=actual_rows,
missing_rows=missing_rows,
null_values=null_counts,
anomalies=anomalies,
is_valid=len(anomalies) == 0
)
本课交付物
完成本课后,你将获得:
- 对数据问题的深刻认知 - 知道数据工程才是量化的主战场
- API 使用的最佳实践 - 健壮的数据获取代码框架
- 数据质量检查能力 - 能识别和处理常见数据问题
- 数据管道设计思路 - 理解生产级数据系统的架构
本课要点回顾
- 理解免费数据源和付费数据源的权衡
- 掌握处理 API 限流、数据缺失等问题的方法
- 认识时区混乱、幸存者偏差等隐蔽问题
- 了解另类数据的价值和挑战
- 理解数据管道的设计原则
延伸阅读
- 背景知识:交易所与订单簿机制 - Tick 数据的来源
- 背景知识:加密货币交易特点 - 7×24 数据的挑战
下一课预告
第 07 课:回测系统的陷阱
数据没问题,回测可能还是骗你。Look-Ahead Bias、过拟合、交易成本忽略……这些陷阱让无数看起来完美的策略在实盘中惨败。下一课揭开回测的真相。