7  金融大数据处理与分布式计算

在金融工程领域,传统的计量经济学模型往往基于低频数据(如日度、月度数据)。然而,随着交易技术的进步和信息获取手段的多样化,“大数据”已成为量化投资和风险管理的核心资产。本章将探讨金融大数据的特征,并重点介绍如何利用 Python 中的分布式计算框架 Dask 来处理超出内存限制的巨量数据集。我们将以长三角(Yangtze River Delta, YRD)地区的代表性上市公司代码为例,演示高频特征的并行提取过程。

7.1 金融大数据的特征与挑战

金融大数据通常被描述为具有 3V 特征:

  1. 体量 (Volume):Tick 级别的逐笔成交数据(L1/L2 Market Data)、期权波动率曲面数据,其体量远超 Excel 或即便是单机 Pandas 所能处理的范畴。
  2. 速度 (Velocity):高频交易要求在微秒级处理数据流。即便是在非高频场景下,每日盘后处理全市场数千只股票的分钟级数据也是巨大的计算挑战。
  3. 多样性 (Variety):除了结构化的行情数据,现代金融工程越来越多地利用另类数据 (Alternative Data),例如:
    • 卫星图像(分析港口集装箱吞吐量以预测贸易额)。
    • 社交媒体与新闻文本(用于情绪分析)。
    • 物联网传感器数据(用于供应链监控)。

在处理这些数据时,传统的单线程循环处理方式效率极低。我们需要借助并行计算和分布式处理技术。

7.2 分布式计算框架 Dask

Dask 是一个灵活的 Python 并行计算库。它的核心优势在于:它不仅能处理比内存更大的数据集,而且其 API 设计与 Pandas 和 NumPy 高度兼容。这意味着金融工程师可以以很低的学习成本迁移到大数据处理环境。

7.2.1 惰性求值与任务图 (DAG)

与 Pandas 的“立即执行”不同,Dask 采用惰性求值 (Lazy Evaluation) 机制。当你对一个 Dask DataFrame 进行操作(如 groupby, mean)时,它不会立即计算结果,而是构建一个有向无环图 (DAG) 来表示计算任务。只有当你调用 .compute() 时,Dask 才会根据 DAG 调度任务,利用多核 CPU 或集群资源并行执行。

注记

什么是 DAG?为什么它能解释 Dask 的‘惰性’?

把 DAG 想象成一张“施工计划图”:

  • 节点(node)是具体任务(如读取某个 Parquet 分块、计算某一列的滚动求和)。
  • 有向边(edge)表示依赖关系:箭头从“先做的任务”指向“后做的任务”。
  • 无环(acyclic)意味着你沿着箭头走,永远不会绕回原点。数学上,这保证任务存在拓扑排序:总能找到一个从上游到下游的执行顺序。

因此,Dask 在你写 ddf.groupby(...).apply(...) 时并不是立刻‘算’,而是先把这张计划图搭好;当你执行 .compute() 才真正开工。

注意

常见误区:把 .compute() 当作‘显示结果’按钮

在 Dask 语境下,.compute() 的含义更接近“立刻执行并把结果搬回本机内存”。两点务必注意:

  • 如果你在同一段分析里多次 .compute(),可能会重复计算;更高效的写法常常是 persist() 或复用中间结果。
  • 写在 apply 里的函数应尽量纯函数(无外部状态、无随机副作用、无写文件等),否则在分布式调度下容易出现难以复现的结果。

7.2.2 Dask 架构示意图

下图展示了 Dask 如何将一个巨大的数据集(Logical DataFrame)切分为多个小的分块(Partitions/Chunks),并映射(Map)到不同的计算核心上进行并行处理,最后归约(Reduce)得到结果。

海量数据源 (CSV/Parquet) 分块 1 分块 2 分块 N 计算任务 (Map) 计算任务 (Map) 计算任务 (Map) 结果
图 7.1: Dask 分布式计算任务图

7.3 案例研究:长三角上市公司高频特征提取

长三角地区是中国经济最活跃的区域之一,汇聚了大量优质上市公司。本节以本项目的真实本地数据快照为起点,处理多家龙头企业多年的交易数据,演示在较大规模数据上如何并行计算一个在机构交易中常用的指标——成交量加权平均价格 (Volume Weighted Average Price, VWAP)

7.3.1 7.3.1 数学模型:VWAP

VWAP 是算法交易和机构投资者绩效评估的重要基准。对于一个给定的时间窗口,其计算公式为:

\[ VWAP = \frac{\sum_{j=1}^{n} (P_j \times V_j)}{\sum_{j=1}^{n} V_j} \]

其中: * \(P_j\) 是第 \(j\) 笔交易(或时间单元)的价格。 * \(V_j\) 是第 \(j\) 笔交易(或时间单元)的成交量。

在大数据场景下,我们需要对每只股票、在很长的时间跨度内、基于滑动窗口(Rolling Window)并行计算该指标。

7.3.2 样本选择:长三角核心资产

我们将选取以下四家具有代表性的长三角地区上市公司,涵盖上海、江苏和浙江:

  1. 上汽集团 (600104.SH):上海,汽车制造业龙头。
  2. 恒瑞医药 (600276.SH):江苏连云港,医药生物行业领军者。
  3. 海康威视 (002415.SZ):浙江杭州,安防与科技巨头。
  4. 宁波银行 (002142.SZ):浙江宁波,优秀的城商行代表。

7.3.3 Python 代码实现

以下代码展示了如何从本项目的本地数据快照(Parquet)读取数据,并利用 dask 进行并行计算。为了强调工程规范,本书的分析文档不再直接联网获取数据。

import dask.dataframe as dd
import numpy as np
import pandas as pd
from pathlib import Path

# 1. 准备数据:直接从本地 Parquet 读取(离线、可复现)
yrd_stocks = {
    '601018.SH': '宁波港',
    '600276.SH': '恒瑞医药',
    '002415.SZ': '海康威视',
    '002142.SZ': '宁波银行',
}

ddf = dd.read_parquet('data/cn_equity_daily_latest.parquet')

# 仅保留需要的股票与字段,降低计算规模
ddf = ddf[ddf['ts_code'].isin(list(yrd_stocks.keys()))][['ts_code', 'trade_date', 'volume', 'amount', 'adj_close']]
ddf = ddf.dropna(subset=['volume', 'amount', 'adj_close'])

# trade_date 统一转换为 datetime
ddf['trade_date'] = dd.to_datetime(ddf['trade_date'])

# 2. 定义计算逻辑
# 目标:计算 20 日滚动 VWAP
# VWAP ≈ 滚动总成交额 / 滚动总成交量 (这是一种近似计算,严谨计算需用高频数据)
# 针对日线数据,更严谨的 VWAP 需要高频成交数据;在日线层面,我们用“滚动成交额 / 滚动成交量”近似。
# 这里我们直接使用 Amount (成交额) 和 Volume (成交量) 计算一段时间的均价。

def compute_vwap_pandas_style(df):
    df = df.sort_values('trade_date')
    # 计算 20 日累计
    roll_amount = df['amount'].rolling(20).sum()
    roll_volume = df['volume'].rolling(20).sum()
    df['vwap_20'] = roll_amount / roll_volume
    return df

# meta 参数告诉 Dask 预期输出的结构
meta = {
    'ts_code': 'object',
    'trade_date': 'datetime64[ns]',
    'volume': 'float64',
    'amount': 'float64',
    'adj_close': 'float64',
    'vwap_20': 'float64',
}

result_dask = ddf.groupby('ts_code', group_keys=False).apply(compute_vwap_pandas_style, meta=meta)

# 3. 触发计算 (Lazy Evaluation -> Execution)
print('\n开始 Dask 并行计算任务 (DAG Execution)...')
try:
    final_result = result_dask.compute()
    final_result['name'] = final_result['ts_code'].map(yrd_stocks)
    print('\n计算完成!长三角股票 VWAP 特征预览:')
    
    # 展示部分结果
    display_cols = ['ts_code', 'name', 'trade_date', 'adj_close', 'vwap_20']
    preview = final_result[display_cols].dropna().groupby('ts_code').tail(3)
    print(preview)
    
except Exception as e:
    print(f'计算过程中发生错误: {e}')

开始 Dask 并行计算任务 (DAG Execution)...

计算完成!长三角股票 VWAP 特征预览:
         ts_code  name trade_date  adj_close   vwap_20
1448   002142.SZ  宁波银行 2023-12-27      19.48  2.080194
1449   002142.SZ  宁波银行 2023-12-28      20.20  2.063405
1450   002142.SZ  宁波银行 2023-12-29      20.11  2.050670
4360   002415.SZ  海康威视 2023-12-27      32.94  3.350882
4361   002415.SZ  海康威视 2023-12-28      33.85  3.348252
4362   002415.SZ  海康威视 2023-12-29      34.72  3.347820
8731   600276.SH  恒瑞医药 2023-12-27      44.09  4.537537
8732   600276.SH  恒瑞医药 2023-12-28      44.75  4.520619
8733   600276.SH  恒瑞医药 2023-12-29      45.23  4.507200
10188  601018.SH   宁波港 2023-12-27       3.61  0.363504
10189  601018.SH   宁波港 2023-12-28       3.57  0.363345
10190  601018.SH   宁波港 2023-12-29       3.56  0.362917

7.4 练习题

练习7.1:大样本统计(Dask groupby)

用 Dask 对全体股票(data/cn_equity_daily_latest.parquet)计算:每只股票在样本期内的交易天数、日收益率均值、日收益率标准差(使用对数收益率),并给出按波动率排序的前 10 只股票。

练习7.2:特征落地与一致性检查

基于本章 VWAP(20) 计算结果,随机抽取 1 只股票,用 Pandas 单线程实现同样的 VWAP(20) 计算,并与 Dask 结果做一致性比较(例如最大绝对误差)。

7.5 练习题完整解答

import dask.dataframe as dd
import pandas as pd
import numpy as np

ddf = dd.read_parquet('data/cn_equity_daily_latest.parquet')
ddf['trade_date'] = dd.to_datetime(ddf['trade_date'])

# 练习7.1:按股票统计交易天数与波动率
ddf = ddf.dropna(subset=['ts_code', 'adj_close'])
ddf = ddf.map_partitions(lambda pdf: pdf.sort_values(['ts_code', 'trade_date']))

def add_log_ret(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf = pdf.sort_values('trade_date')
    pdf['log_ret'] = np.log(pdf['adj_close'] / pdf['adj_close'].shift(1))
    return pdf

meta = ddf._meta.copy()
meta['log_ret'] = np.float64()
ddf2 = ddf.groupby('ts_code', group_keys=False).apply(add_log_ret, meta=meta)

stats = ddf2.groupby('ts_code').agg({
    'log_ret': ['count', 'mean', 'std']
}).compute()
stats.columns = ['n_days', 'mean_log_ret', 'std_log_ret']
print(stats.sort_values('std_log_ret', ascending=False).head(10))

# 练习7.2:一致性比较(以宁波银行 002142.SZ 为例)
target = '002142.SZ'
ddf_small = ddf2[ddf2['ts_code'].eq(target)][['ts_code', 'trade_date', 'volume', 'amount']]

def compute_vwap_pandas_style(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values('trade_date')
    df['vwap_20'] = df['amount'].rolling(20).sum() / df['volume'].rolling(20).sum()
    return df

meta2 = {
    'ts_code': 'object',
    'trade_date': 'datetime64[ns]',
    'volume': 'float64',
    'amount': 'float64',
    'vwap_20': 'float64',
}

vwap_dask = ddf_small.groupby('ts_code', group_keys=False).apply(compute_vwap_pandas_style, meta=meta2).compute()
vwap_pd = compute_vwap_pandas_style(vwap_dask[['ts_code', 'trade_date', 'volume', 'amount']].copy())
err = (vwap_dask['vwap_20'] - vwap_pd['vwap_20']).abs().max()
print({'max_abs_error': float(err)})
           n_days  mean_log_ret  std_log_ret
ts_code                                     
603501.SH    1370      0.000899     0.033643
002230.SZ    1456      0.000105     0.028233
002415.SZ    1454      0.000002     0.024115
688981.SH     841     -0.000532     0.023406
600276.SH    1456      0.000280     0.022348
601601.SH    1456     -0.000239     0.021902
002142.SZ    1450      0.000162     0.021168
600104.SH    1456     -0.000415     0.018727
601018.SH    1456     -0.000191     0.016613
600000.SH    1456     -0.000283     0.011979
{'max_abs_error': 0.0}

7.5.1 结果分析与分布式优势

在上述代码中,dask_df.groupby(...).apply(...) 构建了一个复杂的任务图。Dask 会自动处理:

  1. 数据经过:如果数据量超过内存,Dask 会自动将分块溢写到磁盘(Spilling),防止内存溢出(OOM)。
  2. 并行调度:4 个分块(Partition)可能被调度到 4 个 CPU 核心上同时进行运算。
  3. 扩展性:同样的代码,如果部署到 Dask Cluster(多台服务器),只需修改连接配置,即可处理存储在 HDFS 或 S3 上的 TB 级数据。

通过对上汽集团、恒瑞医药等长三角企业的分析,我们可以看到大数据处理技术如何帮助我们从海量历史交易中快速提取有效因子,为进一步的量化策略构建奠定基础。