6  数据整理:连接、合并与重塑

6.1 引言与学习目标

学习目标

通过本章学习,你应该能够:

  • 理论目标
    • 深刻理解分层索引(MultiIndex)的数学表示和操作原理
    • 掌握数据合并的数学基础,包括不同连接类型的集合论表示
    • 理解数据重塑的数学变换原理
    • 掌握时间序列重采样的数学定义
  • 实践目标
    • 熟练使用pandas的merge、concat等函数进行多数据源整合
    • 运用stack/unstack进行数据维度变换
    • 掌握透视表和交叉表的创建与应用
  • 应用目标
    • 能够整合多个本地数据源(行情数据、财务数据、宏观数据)
    • 实现面板数据(Panel Data)的构建与操作
    • 运用数据整理技术准备用于计量经济学建模的数据集

在大多数真实世界的应用中,你所需要的数据往往分散在多个文件、数据库或API中。这些数据很少会以一种立即可用于分析的格式呈现。本章将重点介绍 pandas 中用于合并、连接和重排数据,从而将其整理成干净、便于分析的格式的核心工具。

首先,我们将介绍分层索引 (Hierarchical Indexing) 的概念,这是 pandas 的一个强大功能,它允许我们构建更复杂的数据结构。这将是后续许多操作的基石。然后,我们将深入探讨数据操作的具体机制,并通过源自中国经济与金融的实际案例,将这些抽象的工具付诸实践。

6.1.1 理论基础:数据合并的数学原理

数据合并的集合论基础

数据合并操作本质上是基于键(keys)的集合运算。给定两个数据集:

  • 左表 \(D_L\),键集合 \(K_L = \{k_1, k_2, ..., k_m\}\)
  • 右表 \(D_R\),键集合 \(K_R = \{k'_1, k'_2, ..., k'_n\}\)

四种连接类型的数学定义

  1. 内连接 (Inner Join): $ K_{inner} = K_L K_R $ 结果仅包含两个表中都存在的键。

  2. 左连接 (Left Join): $ K_{left} = K_L $ 结果包含左表的所有键,右表中不匹配的键填充缺失值。

  3. 右连接 (Right Join): $ K_{right} = K_R $ 结果包含右表的所有键,左表中不匹配的键填充缺失值。

  4. 外连接 (Outer Join): $ K_{outer} = K_L K_R $ 结果包含两个表的所有键,不匹配的位置填充缺失值。

数据合并的映射表示

设合并操作为 \(\text{Merge}: D_L \times D_R \to D_M\),对于每一行 \(r \in D_M\)

\[ r = \text{Merge}(r_L, r_R) = \begin{cases} (r_L, r_R) & \text{if } k_L = k_R \text{ and match} \\ (r_L, \text{NA}) & \text{if } k_L \notin K_R \text{ (left join)} \\ (\text{NA}, r_R) & \text{if } k_R \notin K_L \text{ (right join)} \\ \text{NA} & \text{if no match} \end{cases} \]

笛卡尔积 (Cartesian Product) 的数学表示

在多对多合并中,如果两个表都有重复的键,结果将产生笛卡尔积:

\[ |D_M| = |D_L| \times |D_R| \]

这是需要特别警惕的情况,因为它可能导致数据量的指数级增长。

6.2 分层索引

分层索引是 pandas 的一个关键特性,它允许你在单个轴上拥有多个(两个或更多)索引层级。你可以将其视为一种在熟悉的二维 DataFrame 结构中表示更高维度数据的方法。这在经济学中对于处理面板数据(Panel Data)尤其有用,面板数据旨在追踪多个主体(如不同省份或公司)在一段时间内的变化。

让我们从一个实际的例子开始。我们将从美国联邦储备经济数据库(FRED)获取中国几个核心经济指标的季度数据:实际国内生产总值(RGDP)、居民消费价格指数(CPI)和工业生产总值(IPI)。这类数据天然适合使用两级索引:一级是指标名称,另一级是日期。

核心概念:分层索引 (MultiIndex) 的直观逻辑

在处理复杂的金融数据集时,你可以将多重索引想象成电子表格中的“合并单元格”架构。

  1. 高维数据的二维表示:外层索引(级别 0,如:经济指标、股票代码)作为主要分类,其下嵌套了内层索引(级别 1,如:日期、财务科目)。
  2. 面板数据结构:这种结构允许我们在二维表格中自然地表达三维甚至更高维度的数据。例如,一个主体的多项指标在多个时间点上的值。
  3. 计算优势:在计量经济学中,这是处理面板数据 (Panel Data) 的核心基础,使得针对特定截面或特定时间段的选择性聚合变得极其高效。

首先,让我们创建一个带有这种多级索引的 Series

列表 6.1
#| label: lst-hierarchical-series
#| lst-cap: '使用真实的中国指数数据创建一个带有 MultiIndex 的 pandas Series'

import pandas as pd
import numpy as np

# 从本地获取多支指数的收盘价
index_data = pd.read_parquet('C:/qiufei/data/index/indexes.parquet')
symbols = ['000001.XSHG', '000300.XSHG', '399001.XSHE']
names = {'000001.XSHG': 'RGDP', '000300.XSHG': 'CPI', '399001.XSHE': 'IPI'} # 模拟指标名称

all_series = []
for sym in symbols:
    temp = index_data[index_data['symbol'] == sym].copy()
    temp['datetime'] = pd.to_datetime(temp['datetime'].astype(str), format='%Y%m%d%H%M%S')
    temp.set_index('datetime', inplace=True)
    s = temp['close'].sort_index().loc['2023-01-01':'2023-12-31']
    s.name = 'Value'
    # 通过添加指标名称创建多重索引
    s.index = pd.MultiIndex.from_product([[names[sym]], s.index], names=['indicator', 'date'])
    all_series.append(s)

# 将所有 Series 连接成一个并排序索引
china_macro_indicators_series = pd.concat(all_series).sort_index()

# 显示结果序列的头部
print(china_macro_indicators_series.head())
indicator  date      
CPI        2023-01-03    3887.8992
           2023-01-04    3892.9477
           2023-01-05    3968.5782
           2023-01-06    3980.8888
           2023-01-09    4013.1196
Name: Value, dtype: float64

你在 列表 6.1 的输出中看到的是一个 Series 的“美化”视图,它的索引是一个 MultiIndex。外层 ‘indicator’ 级别中的空白意味着“使用正上方的标签”。让我们来检查一下索引对象本身。

china_macro_indicators_series.index
列表 6.2
MultiIndex([( 'CPI', '2023-01-03'),
            ( 'CPI', '2023-01-04'),
            ( 'CPI', '2023-01-05'),
            ( 'CPI', '2023-01-06'),
            ( 'CPI', '2023-01-09'),
            ( 'CPI', '2023-01-10'),
            ( 'CPI', '2023-01-11'),
            ( 'CPI', '2023-01-12'),
            ( 'CPI', '2023-01-13'),
            ( 'CPI', '2023-01-16'),
            ...
            ('RGDP', '2023-12-18'),
            ('RGDP', '2023-12-19'),
            ('RGDP', '2023-12-20'),
            ('RGDP', '2023-12-21'),
            ('RGDP', '2023-12-22'),
            ('RGDP', '2023-12-25'),
            ('RGDP', '2023-12-26'),
            ('RGDP', '2023-12-27'),
            ('RGDP', '2023-12-28'),
            ('RGDP', '2023-12-29')],
           names=['indicator', 'date'], length=726)

对于一个分层索引的对象,所谓的部分索引 (partial indexing) 是可行的,它使你能够简洁地选择数据的子集。例如,我们可以轻松地选择所有的CPI数据。

china_macro_indicators_series['CPI']
列表 6.3
date
2023-01-03    3887.8992
2023-01-04    3892.9477
2023-01-05    3968.5782
2023-01-06    3980.8888
2023-01-09    4013.1196
                ...    
2023-12-25    3347.4508
2023-12-26    3324.7902
2023-12-27    3336.3572
2023-12-28    3414.5403
2023-12-29    3431.1099
Name: Value, Length: 242, dtype: float64

你也可以像对标准索引一样,对外层索引进行切片操作。

china_macro_indicators_series['CPI':'IPI']
列表 6.4
indicator  date      
CPI        2023-01-03    3887.8992
           2023-01-04    3892.9477
           2023-01-05    3968.5782
           2023-01-06    3980.8888
           2023-01-09    4013.1196
                           ...    
IPI        2023-12-25    9256.2830
           2023-12-26    9157.2496
           2023-12-27    9191.7415
           2023-12-28    9441.0545
           2023-12-29    9524.6895
Name: Value, Length: 484, dtype: float64

甚至可以从“内层”级别进行选择。在这里,我们可以选择所有指标在特定日期的数据。为此,我们使用 .loc 索引器。第一个位置的冒号 : 表示我们想要选择第一个(外层)索引级别中的所有项。

实战技巧:.loc 索引器的高级选择逻辑

在处理分层索引时,.loc 是实现精确切片的最强工具。

  • 全能语法.loc[(level_0_selection, level_1_selection, ...)]
  • 占位符机制:使用冒号 : 作为占位符,代表“选择该层级的所有标签”。
  • 示例data.loc[:, '2023-01-03'] 意味着在保持指标层级完整的前提下,精确提取 2023 年 1 月 3 日的所有横截面观测值。这在分析突发市场事件(如政策发布日)对所有资产的影响时非常有效。
# 选择 2023 年 1 月 3 日的数据
china_macro_indicators_series.loc[:, '2023-01-03']
列表 6.5
indicator
CPI      3887.8992
IPI     11117.1303
RGDP     3116.5119
Name: Value, dtype: float64

6.2.1 使用 stackunstack 进行重塑

分层索引在数据重塑中扮演着重要角色。一个常见的操作是将带有 MultiIndexSeries 重排成 DataFrame。这可以通过 unstack() 方法完成,该方法会将最内层的索引级别旋转到列索引中。

china_macro_indicators_df = china_macro_indicators_series.unstack()
china_macro_indicators_df.head()
表 6.1: 使用 unstack() 将中国经济指标数据重塑为 DataFrame
date 2023-01-03 2023-01-04 2023-01-05 2023-01-06 2023-01-09 2023-01-10 2023-01-11 2023-01-12 2023-01-13 2023-01-16 ... 2023-12-18 2023-12-19 2023-12-20 2023-12-21 2023-12-22 2023-12-25 2023-12-26 2023-12-27 2023-12-28 2023-12-29
indicator
CPI 3887.8992 3892.9477 3968.5782 3980.8888 4013.1196 4017.4737 4010.0309 4017.8692 4074.3772 4137.9643 ... 3329.3657 3334.0412 3297.5022 3330.8698 3337.2281 3347.4508 3324.7902 3336.3572 3414.5403 3431.1099
IPI 11117.1303 11095.3733 11332.0103 11367.7321 11450.1472 11506.7938 11439.4395 11465.7280 11602.3042 11785.7679 ... 9279.3889 9289.3358 9158.4374 9257.0905 9221.3126 9256.2830 9157.2496 9191.7415 9441.0545 9524.6895
RGDP 3116.5119 3123.5164 3155.2162 3157.6365 3176.0845 3169.5072 3161.8376 3163.4509 3195.3059 3227.5916 ... 2930.8036 2932.3908 2902.1096 2918.7149 2914.7752 2918.8126 2898.8787 2914.6138 2954.7035 2974.9348

3 rows × 242 columns

unstack() 的逆操作是 stack(),它将列旋转到行中,再次生成一个 Series。这些操作对于在“长”格式和“宽”格式数据之间转换至关重要,我们将在 小节 6.4 中深入探讨这个主题。

# 注意:原始数据中存在缺失值,stack 默认会移除它们
# 这就是为什么如果某些指标缺少某些季度的数据,输出会变短
china_macro_indicators_df.stack().head()
列表 6.6
indicator  date      
CPI        2023-01-03    3887.8992
           2023-01-04    3892.9477
           2023-01-05    3968.5782
           2023-01-06    3980.8888
           2023-01-09    4013.1196
dtype: float64

6.2.2 DataFrame 上的分层索引

对于 DataFrame,行轴和列轴都可以拥有分层索引。让我们构建一个更复杂的例子。我们将使用来自 Tushare API 的真实 A 股市场数据——宁波港(601018)宁波银行(002142) 的开盘价、收盘价和日交易量。这将允许我们创建一个在行(股票代码、日期)和列(度量类型)上都具有 MultiIndexDataFrame

import pandas as pd
from pathlib import Path

# 读取宁波港和宁波银行的数据
# 从本地 Parquet 文件读取数据代替 HDF5
ningbo_port = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
ningbo_bank = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})

# 选择2024年1月的前几个交易日用于演示
ningbo_port_demo = ningbo_port[
    (ningbo_port['trade_date'] >= '2024-01-01') &
    (ningbo_port['trade_date'] <= '2024-01-10')
].copy()
ningbo_bank_demo = ningbo_bank[
    (ningbo_bank['trade_date'] >= '2024-01-01') &
    (ningbo_bank['trade_date'] <= '2024-01-10')
].copy()

# 选择需要的列并重命名
ningbo_port_demo = ningbo_port_demo[['trade_date', 'open', 'high', 'low', 'close', 'volume']]
ningbo_bank_demo = ningbo_bank_demo[['trade_date', 'open', 'high', 'low', 'close', 'volume']]

# 重命名列以符合规范
ningbo_port_demo.columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
ningbo_bank_demo.columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']

# 为每个 DataFrame 添加一个 'ticker' 列
ningbo_port_demo['ticker'] = 'NingboPort'
ningbo_bank_demo['ticker'] = 'NingboBank'

# 连接并设置 MultiIndex
stocks = pd.concat([ningbo_port_demo, ningbo_bank_demo])
stocks = stocks.set_index(['ticker', 'Date'])

# 为列创建 MultiIndex
stock_price_volume_multiindex_df = stocks[['Open', 'Close', 'Volume']].copy()
stock_price_volume_multiindex_df.columns = pd.MultiIndex.from_tuples([
    ('Price', 'Open'),
    ('Price', 'Close'),
    ('Volume', 'Total')
])

stock_price_volume_multiindex_df
表 6.2: 在行轴和列轴上都具有 MultiIndex 的股票数据 DataFrame
Price Volume
Open Close Total
ticker Date
NingboPort 2024-01-02 3.3400 3.3777 24852644.0
2024-01-03 3.3683 3.4059 17502674.0
2024-01-04 3.4153 3.4059 16564281.0
2024-01-05 3.4153 3.3871 15904201.0
2024-01-08 3.3965 3.3494 16029300.0
2024-01-09 3.3306 3.3588 20051699.0
2024-01-10 3.3588 3.3777 19764948.0
NingboBank 2024-01-02 18.7598 18.3594 39575754.0
2024-01-03 18.3222 18.1546 37235409.0
2024-01-04 18.1453 17.7636 50880362.0
2024-01-05 17.6891 18.1546 56130083.0
2024-01-08 18.0243 17.7170 38023194.0
2024-01-09 17.7729 18.0708 42221761.0
2024-01-10 17.9870 18.0987 33169881.0

分层索引的每个级别都可以有名称。让我们来为它们命名。

stock_price_volume_multiindex_df.index.names = ['Ticker', 'TradeDate']
stock_price_volume_multiindex_df.columns.names = ['Category', 'Metric']

stock_price_volume_multiindex_df
列表 6.7
Category Price Volume
Metric Open Close Total
Ticker TradeDate
NingboPort 2024-01-02 3.3400 3.3777 24852644.0
2024-01-03 3.3683 3.4059 17502674.0
2024-01-04 3.4153 3.4059 16564281.0
2024-01-05 3.4153 3.3871 15904201.0
2024-01-08 3.3965 3.3494 16029300.0
2024-01-09 3.3306 3.3588 20051699.0
2024-01-10 3.3588 3.3777 19764948.0
NingboBank 2024-01-02 18.7598 18.3594 39575754.0
2024-01-03 18.3222 18.1546 37235409.0
2024-01-04 18.1453 17.7636 50880362.0
2024-01-05 17.6891 18.1546 56130083.0
2024-01-08 18.0243 17.7170 38023194.0
2024-01-09 17.7729 18.0708 42221761.0
2024-01-10 17.9870 18.0987 33169881.0

我们可以通过访问索引的 nlevels 属性来查看它有多少个级别:

print(f'行索引级别数: {stock_price_volume_multiindex_df.index.nlevels}')
print(f'列索引级别数: {stock_price_volume_multiindex_df.columns.nlevels}')
行索引级别数: 2
列索引级别数: 2

通过部分列索引,你同样可以选择列的分组。例如,要选择所有的 ‘Price’ 数据:

stock_price_volume_multiindex_df['Price']
表 6.3: 选择 Price 类别下的所有列
Metric Open Close
Ticker TradeDate
NingboPort 2024-01-02 3.3400 3.3777
2024-01-03 3.3683 3.4059
2024-01-04 3.4153 3.4059
2024-01-05 3.4153 3.3871
2024-01-08 3.3965 3.3494
2024-01-09 3.3306 3.3588
2024-01-10 3.3588 3.3777
NingboBank 2024-01-02 18.7598 18.3594
2024-01-03 18.3222 18.1546
2024-01-04 18.1453 17.7636
2024-01-05 17.6891 18.1546
2024-01-08 18.0243 17.7170
2024-01-09 17.7729 18.0708
2024-01-10 17.9870 18.0987

6.2.3 重排序和排序级别

有时,你可能需要重新排列轴上级别的顺序,或者根据某个特定级别中的值对数据进行排序。swaplevel() 方法接收两个级别编号或名称,并返回一个级别互换的新对象(但数据本身保持不变)。

stock_price_volume_multiindex_df.swaplevel('Ticker', 'TradeDate')
表 6.4: 交换 TickerTradeDate 索引级别
Category Price Volume
Metric Open Close Total
TradeDate Ticker
2024-01-02 NingboPort 3.3400 3.3777 24852644.0
2024-01-03 NingboPort 3.3683 3.4059 17502674.0
2024-01-04 NingboPort 3.4153 3.4059 16564281.0
2024-01-05 NingboPort 3.4153 3.3871 15904201.0
2024-01-08 NingboPort 3.3965 3.3494 16029300.0
2024-01-09 NingboPort 3.3306 3.3588 20051699.0
2024-01-10 NingboPort 3.3588 3.3777 19764948.0
2024-01-02 NingboBank 18.7598 18.3594 39575754.0
2024-01-03 NingboBank 18.3222 18.1546 37235409.0
2024-01-04 NingboBank 18.1453 17.7636 50880362.0
2024-01-05 NingboBank 17.6891 18.1546 56130083.0
2024-01-08 NingboBank 18.0243 17.7170 38023194.0
2024-01-09 NingboBank 17.7729 18.0708 42221761.0
2024-01-10 NingboBank 17.9870 18.0987 33169881.0

sort_index() 默认使用所有索引级别按字典顺序对数据进行排序。你可以选择按特定级别排序。例如,让我们按 TradeDate 排序。

stock_price_volume_multiindex_df.sort_index(level='TradeDate')
表 6.5: 按 TradeDate 级别对 DataFrame 进行排序
Category Price Volume
Metric Open Close Total
Ticker TradeDate
NingboBank 2024-01-02 18.7598 18.3594 39575754.0
NingboPort 2024-01-02 3.3400 3.3777 24852644.0
NingboBank 2024-01-03 18.3222 18.1546 37235409.0
NingboPort 2024-01-03 3.3683 3.4059 17502674.0
NingboBank 2024-01-04 18.1453 17.7636 50880362.0
NingboPort 2024-01-04 3.4153 3.4059 16564281.0
NingboBank 2024-01-05 17.6891 18.1546 56130083.0
NingboPort 2024-01-05 3.4153 3.3871 15904201.0
NingboBank 2024-01-08 18.0243 17.7170 38023194.0
NingboPort 2024-01-08 3.3965 3.3494 16029300.0
NingboBank 2024-01-09 17.7729 18.0708 42221761.0
NingboPort 2024-01-09 3.3306 3.3588 20051699.0
NingboBank 2024-01-10 17.9870 18.0987 33169881.0
NingboPort 2024-01-10 3.3588 3.3777 19764948.0

性能优化:词典排序 (Lexicographical Sorting) 的重要性

在分层索引的对象上进行数据检索时,索引的排序状态直接决定了底层搜索算法的效率。

  1. 排序逻辑pandas 默认首先按最外层索引排序,然后在每个外层分类内部对次级索引进行排序。
  2. 性能差异:经过 sort_index() 排序后的索引可以利用二分查找等高效算法,避免昂贵的线性扫描。
  3. 最佳实践:在执行复杂的切片(Slicing)或部分索引选择之前,习惯性地调用 sort_index() 能够显著缩短在大规模数据集上的响应时间。

6.2.4 按级别进行汇总统计

DataFrameSeries 上的许多描述性和汇总统计都有一个 level 选项,你可以用它来指定在特定轴上进行聚合的级别。思考一下我们在 表 6.2 中创建的股票 DataFrame。我们可以在行或列上按级别进行聚合。例如,让我们计算每个股票代码的所有指标的平均值。

stock_price_volume_multiindex_df.groupby(level='Ticker').mean()
表 6.6: 计算每个 Ticker 的平均统计数据
Category Price Volume
Metric Open Close Total
Ticker
NingboBank 18.100086 18.045529 4.246235e+07
NingboPort 3.374971 3.380357 1.866711e+07

现在,让我们在列轴上计算每天每个类别的总和。

# 这个操作只需要数值型数据
stock_price_volume_multiindex_df.groupby(level='Category', axis='columns').sum().head()
表 6.7: 计算每个 Category 的总和
Category Price Volume
Ticker TradeDate
NingboPort 2024-01-02 6.7177 24852644.0
2024-01-03 6.7742 17502674.0
2024-01-04 6.8212 16564281.0
2024-01-05 6.8024 15904201.0
2024-01-08 6.7459 16029300.0

6.2.5 使用 DataFrame 的列进行索引

从文件中加载数据时,期望的索引通常存储在一个或多个常规列中。set_index 函数可以使用一个或多个列作为索引来创建一个新的 DataFrame。让我们使用世界银行的数据为例,这些数据通常以“扁平”格式提供。我们将关注中国及其主要的亚洲贸易伙伴。

# 使用本地指数数据代替 wbdata
import pandas as pd

indices = ['000001.XSHG', '000300.XSHG', '399001.XSHE', '399006.XSHE']
names = {'000001.XSHG': 'CHN', '000300.XSHG': 'JPN', '399001.XSHE': 'KOR', '399006.XSHE': 'SGP'}

index_data = pd.read_parquet('C:/qiufei/data/index/indexes.parquet')

df_list = []
for sym in indices:
    temp = index_data[index_data['symbol'] == sym][['datetime', 'close', 'volume']].copy()
    temp['datetime'] = pd.to_datetime(temp['datetime'].astype(str), format='%Y%m%d%H%M%S')
    temp['country'] = names[sym] # 模拟国家
    temp = temp.rename(columns={'datetime': 'date', 'close': 'GDP', 'volume': 'Population'}) 
    df_list.append(temp)

asian_economy_flat_df = pd.concat(df_list)
asian_economy_flat_df = asian_economy_flat_df.dropna().reset_index(drop=True)
asian_economy_flat_df.head()
表 6.8: 一个典型的扁平格式经济指标 DataFrame
date GDP Population country
0 2005-01-04 1242.7740 816177000.0 CHN
1 2005-01-05 1251.9370 867865100.0 CHN
2 2005-01-06 1239.4301 792225400.0 CHN
3 2005-01-07 1244.7460 894087100.0 CHN
4 2005-01-10 1252.4010 723468300.0 CHN

现在,我们使用 set_index 从 ‘country’ 和 ‘date’ 列创建一个 MultiIndex

asian_economy_indexed_df = asian_economy_flat_df.set_index(['country', 'date'])
asian_economy_indexed_df.head()
表 6.9: 从列创建新的分层索引的 DataFrame
GDP Population
country date
CHN 2005-01-04 1242.7740 816177000.0
2005-01-05 1251.9370 867865100.0
2005-01-06 1239.4301 792225400.0
2005-01-07 1244.7460 894087100.0
2005-01-10 1252.4010 723468300.0

默认情况下,用作索引的列会从 DataFrame 中移除。你可以通过传递 drop=False 来保留它们。

asian_economy_flat_df.set_index(['country', 'date'], drop=False).head()
表 6.10: 使用 set_index 同时保留原始列
date GDP Population country
country date
CHN 2005-01-04 2005-01-04 1242.7740 816177000.0 CHN
2005-01-05 2005-01-05 1251.9370 867865100.0 CHN
2005-01-06 2005-01-06 1239.4301 792225400.0 CHN
2005-01-07 2005-01-07 1244.7460 894087100.0 CHN
2005-01-10 2005-01-10 1252.4010 723468300.0 CHN

另一方面,reset_index 的作用与 set_index 相反;分层索引的级别会被移回到列中。

asian_economy_indexed_df.reset_index().head()
表 6.11: 使用 reset_index() 将分层索引级别移回列中
country date GDP Population
0 CHN 2005-01-04 1242.7740 816177000.0
1 CHN 2005-01-05 1251.9370 867865100.0
2 CHN 2005-01-06 1239.4301 792225400.0
3 CHN 2005-01-07 1244.7460 894087100.0
4 CHN 2005-01-10 1252.4010 723468300.0

6.3 合并与连接数据集

pandas 对象中的数据可以通过几种方式进行组合:

  • pandas.merge: 基于一个或多个键连接 DataFrame 中的行。这是数据库 join 操作的基石。
  • pandas.concat: 沿着一个轴连接或“堆叠”对象。
  • combine_first: 将重叠的数据拼接在一起,用一个对象中的值填充另一个对象中的缺失值。

我们将通过应用实例来逐一介绍这些方法。

6.3.1 数据库风格的 DataFrame 连接

合并 (Merge)连接 (Join) 操作通过使用一个或多个键来链接行,从而组合数据集。这些是关系型数据库(例如 SQL)中的基本操作。pandas.merge 函数是在你的数据上使用这些算法的主要入口点。

让我们构建一个场景。我们希望分析中国经济增长与利率水平之间的关系。我们将获取中国的季度实际GDP(NGDPRSAXDCCHQ)和作为基准利率的贷款市场报价利率(LPR, DBLPR1Y)。由于LPR是月度数据,我们需要将其重采样为季度数据。

# 使用本地指数数据模拟 GDP 和 LPR
import numpy as np

index_data = pd.read_parquet('C:/qiufei/data/index/indexes.parquet')
# 模拟 GDP (上证指数)
quarterly_gdp_df_s = index_data[index_data['symbol'] == '000001.XSHG'].copy()
quarterly_gdp_df_s['datetime'] = pd.to_datetime(quarterly_gdp_df_s['datetime'].astype(str), format='%Y%m%d%H%M%S')
quarterly_gdp_df_s.set_index('datetime', inplace=True)
quarterly_gdp_df = quarterly_gdp_df_s.sort_index().loc['2015-01-01':'2023-12-31']['close'].resample('QS').last().to_frame(name='gdp')
quarterly_gdp_df['year'] = quarterly_gdp_df.index.year

# 模拟 LPR (沪深300)
lpr_s = index_data[index_data['symbol'] == '000300.XSHG'].copy()
lpr_s['datetime'] = pd.to_datetime(lpr_s['datetime'].astype(str), format='%Y%m%d%H%M%S')
lpr_s.set_index('datetime', inplace=True)
quarterly_lpr_df = lpr_s.sort_index().loc['2015-01-01':'2023-12-31']['close'].resample('QS').mean().to_frame(name='lpr_rate')
quarterly_lpr_df['year'] = quarterly_lpr_df.index.year

print('--- GDP 数据 (df1) ---')
print(quarterly_gdp_df.head())
print('\n--- LPR 利率数据 (df2) ---')
print(quarterly_lpr_df.head())
--- GDP 数据 (df1) ---
                  gdp  year
datetime                   
2015-01-01  3747.8990  2015
2015-04-01  4277.2219  2015
2015-07-01  3052.7814  2015
2015-10-01  3539.1819  2015
2016-01-01  3003.9152  2016

--- LPR 利率数据 (df2) ---
               lpr_rate  year
datetime                     
2015-01-01  3594.276088  2015
2015-04-01  4750.967681  2015
2015-07-01  3690.326275  2015
2015-10-01  3660.401720  2015
2016-01-01  3107.658607  2016

这是一个一对一 (one-to-one) 连接的例子,因为在按季度重采样后,两个DataFrame的日期索引都是唯一的。我们可以直接在日期索引上进行合并。

# 在这里,基于索引的合并更清晰。我们将 right_index 和 left_index 设置为 True
integrated_financial_df = pd.merge(quarterly_gdp_df, quarterly_lpr_df, left_index=True, right_index=True)
integrated_financial_df.head()
表 6.12: GDP 和 LPR 数据的一对一合并
gdp year_x lpr_rate year_y
datetime
2015-01-01 3747.8990 2015 3594.276088 2015
2015-04-01 4277.2219 2015 4750.967681 2015
2015-07-01 3052.7814 2015 3690.326275 2015
2015-10-01 3539.1819 2015 3660.401720 2015
2016-01-01 3003.9152 2016 3107.658607 2016

请注意,当在索引上合并时,我不需要指定要连接的列。如果是在列上连接,pandas.merge 会使用重叠的列名作为键。一个好的做法是使用 on 参数明确指定。让我们尝试在 year 列上合并。

# 注意:这将为每年创建一个笛卡尔积
pd.merge(quarterly_gdp_df, quarterly_lpr_df, on='year').head(8)
表 6.13: 在 year 列上显式合并
gdp year lpr_rate
0 3747.8990 2015 3594.276088
1 3747.8990 2015 4750.967681
2 3747.8990 2015 3690.326275
3 3747.8990 2015 3660.401720
4 4277.2219 2015 3594.276088
5 4277.2219 2015 4750.967681
6 4277.2219 2015 3690.326275
7 4277.2219 2015 3660.401720

默认情况下,pandas.merge 执行的是内连接 (inner join)。结果中的键是两个表中键的交集,即共同的部分。其他选项是 "left""right""outer"outer 连接取键的并集。让我们创建两个略有不同的数据集来说明这一点。

我们将获取中国的城镇登记失业率(LMUNRRTTCNM156S)和CPI(CHNCPIALLMINMEI),但选取的时间范围略有不同。

# 使用本地指数数据模拟失业率和通胀率
urban_unemployment_df_s = index_data[index_data['symbol'] == '399001.XSHE'].copy()
urban_unemployment_df_s['datetime'] = pd.to_datetime(urban_unemployment_df_s['datetime'].astype(str), format='%Y%m%d%H%M%S')
urban_unemployment_df_s.set_index('datetime', inplace=True)
urban_unemployment_df = urban_unemployment_df_s.sort_index().loc['2018-01-01':'2021-12-31']['close'].resample('QS').mean().to_frame(name='unemployment')

consumer_inflation_df_s = index_data[index_data['symbol'] == '399006.XSHE'].copy()
consumer_inflation_df_s['datetime'] = pd.to_datetime(consumer_inflation_df_s['datetime'].astype(str), format='%Y%m%d%H%M%S')
consumer_inflation_df_s.set_index('datetime', inplace=True)
consumer_inflation_df = consumer_inflation_df_s.sort_index().loc['2020-01-01':'2023-12-31']['close'].resample('QS').mean().to_frame(name='inflation')

print('--- 失业率数据 (df1) ---')
print(urban_unemployment_df)
print('\n--- 通货膨胀率数据 (df2) ---')
print(consumer_inflation_df)
--- 失业率数据 (df1) ---
            unemployment
datetime                
2018-01-01  11014.491959
2018-04-01  10319.740377
2018-07-01   8720.947461
2018-10-01   7652.099778
2019-01-01   8508.671010
2019-04-01   9404.323467
2019-07-01   9388.073331
2019-10-01   9824.681792
2020-01-01  10827.637905
2020-04-01  10892.488990
2020-07-01  13351.373455
2020-10-01  13747.496943
2021-01-01  14718.413584
2021-04-01  14418.922195
2021-07-01  14656.856244
2021-10-01  14671.669257

--- 通货膨胀率数据 (df2) ---
              inflation
datetime               
2020-01-01  2007.838016
2020-04-01  2111.580805
2020-07-01  2663.731241
2020-10-01  2727.297028
2021-01-01  3009.887488
2021-04-01  3083.179847
2021-07-01  3332.796908
2021-10-01  3372.324298
2022-01-01  2847.481574
2022-04-01  2470.199131
2022-07-01  2628.706325
2022-10-01  2364.190063
2023-01-01  2450.480071
2023-04-01  2279.031581
2023-07-01  2130.840172
2023-10-01  1926.966497

现在,让我们执行一个 outer 连接。

pd.merge(urban_unemployment_df, consumer_inflation_df, left_index=True, right_index=True, how='outer')
表 6.14: 一个显示键的并集的外连接
unemployment inflation
datetime
2018-01-01 11014.491959 NaN
2018-04-01 10319.740377 NaN
2018-07-01 8720.947461 NaN
2018-10-01 7652.099778 NaN
2019-01-01 8508.671010 NaN
2019-04-01 9404.323467 NaN
2019-07-01 9388.073331 NaN
2019-10-01 9824.681792 NaN
2020-01-01 10827.637905 2007.838016
2020-04-01 10892.488990 2111.580805
2020-07-01 13351.373455 2663.731241
2020-10-01 13747.496943 2727.297028
2021-01-01 14718.413584 3009.887488
2021-04-01 14418.922195 3083.179847
2021-07-01 14656.856244 3332.796908
2021-10-01 14671.669257 3372.324298
2022-01-01 NaN 2847.481574
2022-04-01 NaN 2470.199131
2022-07-01 NaN 2628.706325
2022-10-01 NaN 2364.190063
2023-01-01 NaN 2450.480071
2023-04-01 NaN 2279.031581
2023-07-01 NaN 2130.840172
2023-10-01 NaN 1926.966497

outer 连接中,左侧(urban_unemployment_df)或右侧(consumer_inflation_dfDataFrame 对象中,如果某行的键在另一个 DataFrame 中不匹配,那么在另一个 DataFrame 的列中将显示为 NaN 值。表 6.15 总结了 how 选项。

表 6.15
选项 行为
how='inner' 只使用在两个表中都观察到的键组合(交集)。
how='left' 使用在左表中找到的所有键组合。
how='right' 使用在右表中找到的所有键组合。
how='outer' 使用在两个表中观察到的所有键组合(并集)。

多对多 (Many-to-many) 合并会形成匹配键的笛卡尔积。这是一个至关重要的概念。想象一下,你有一个按季度报告的公司财务数据集,和另一个这些公司的分析师评级数据集,其中多个分析师可能在任何给定的季度发布评级。

风险警示:量化建模中的笛卡尔积 (Cartesian Product) 隐患

在执行多对多合并(Many-to-many join)时,左表中每个键的每一行都会与右表中对应键的每一行进行笛卡尔式组合。

  1. 数据爆炸:如果左侧有 \(m\) 行键 \(k\),右侧有 \(n\) 行键 \(k\),合并结果将产生 \(m \times n\) 行。这在处理包含重复时间戳的高频订单簿数据或未清洗的财务报告时,容易导致内存瞬间耗尽。
  2. 逻辑污染:笛卡尔积会在不经意间引入重复的计算样本,导致后续回归模型(如 Fama-French 三因子模型)的权重失真。
  3. 避坑指南:在执行 merge() 之前,强烈建议使用 duplicated() 检查连接键的唯一性,或通过 validate='one_to_one' 等参数进行强制断言校验。

让我们用腾讯和阿里的例子模拟这种情况。

financials = pd.DataFrame({
    'ticker': ['Tencent', 'Tencent', 'Alibaba', 'Alibaba'],
    'quarter': ['2023Q1', '2023Q2', '2023Q1', '2023Q2'],
    'revenue': [1499.9, 1492.1, 2377.6, 2329.3]
})

ratings = pd.DataFrame({
    'ticker': ['Tencent', 'Tencent', 'Tencent', 'Alibaba'],
    'quarter': ['2023Q1', '2023Q1', '2023Q2', '2023Q1'],
    'analyst': ['中信证券', '高盛', '摩根大通', '中信证券'],
    'rating': ['买入', '买入', '持有', '买入']
})

pd.merge(financials, ratings, on=['ticker', 'quarter'])
表 6.16: 一个导致笛卡尔积的多对多合并
ticker quarter revenue analyst rating
0 Tencent 2023Q1 1499.9 中信证券 买入
1 Tencent 2023Q1 1499.9 高盛 买入
2 Tencent 2023Q2 1492.1 摩根大通 持有
3 Alibaba 2023Q1 2377.6 中信证券 买入

注意,对于腾讯在 2023Q1,单一的收入数据与两个分析师评级都匹配了,结果产生了相应的两行数据。

连接时,你可能会有不属于连接键的重叠列名。pandas.merge 有一个 suffixes 选项,可以为这些列名附加字符串。

df_left = pd.DataFrame({'key': ['a', 'b'], 'data': [1, 2]})
df_right = pd.DataFrame({'key': ['a', 'b'], 'data': [3, 4]})

pd.merge(df_left, df_right, on='key', suffixes=('_left_data', '_right_data'))
表 6.17: 使用后缀处理重叠的列名
key data_left_data data_right_data
0 a 1 3
1 b 2 4

pd.merge 的完整参数列表在 表 6.18 中提供。

表 6.18
参数 描述
left 位于左侧要合并的 DataFrame。
right 位于右侧要合并的 DataFrame。
how 应用的连接类型:'inner''outer''left''right' 之一;默认为 'inner'
on 用于连接的列名。必须在两个 DataFrame 对象中都存在。
left_on 左侧 DataFrame 中用作连接键的列。
right_on 右侧 DataFrame 中用作连接键的列。
left_index 使用左侧的行索引作为其连接键。
right_index 使用右侧的行索引作为其连接键。
sort 按连接键对合并后的数据进行词典排序;默认为 False
suffixes 附加到重叠列名上的字符串元组。
validate 验证合并是否为指定类型(例如,'one_to_one')。
indicator 添加一个特殊的 _merge 列,指示每行的来源。

6.3.2 按索引合并

正如我们在 表 6.12 中看到的,在某些情况下,DataFrame 中的合并键可能位于其索引中。在这种情况下,你可以传递 left_index=Trueright_index=True(或两者都传)来指示应使用索引作为合并键。这在处理时间序列数据时极为常见。让我们重新审视我们的 urban_unemployment_dfconsumer_inflation_df DataFrame。

pd.merge(urban_unemployment_df, consumer_inflation_df, left_index=True, right_index=True, how='inner')
表 6.19: 按日期索引合并两个时间序列 DataFrame
unemployment inflation
datetime
2020-01-01 10827.637905 2007.838016
2020-04-01 10892.488990 2111.580805
2020-07-01 13351.373455 2663.731241
2020-10-01 13747.496943 2727.297028
2021-01-01 14718.413584 3009.887488
2021-04-01 14418.922195 3083.179847
2021-07-01 14656.856244 3332.796908
2021-10-01 14671.669257 3372.324298

对于分层索引的数据,按索引连接等同于多键合并。

DataFrame.join 方法提供了一种便捷的按索引合并的方式。它默认执行左连接。

方法进阶:merge().join() 在工程实践中的取舍

pandas 提供的这两种接口虽然功能交织,但在高性能量化系统开发中有明确的应用边界:

  • pd.merge() (通用底层函数)
    • 核心特性:基于列名的灵活连接。
    • 适用场景:处理复杂的异构数据集(如将“高管持股变动表”与“股价行情表”按 symbol 关联)。支持多键、不同列名匹配,以及通过 validate 参数进行严密的逻辑校验。
  • df.join() (高效实例方法)
    • 核心特性:默认基于索引的快速合并。
    • 性能逻辑:在已排序的日期索引上,join 的执行效率通常高于基于非索引列的 merge,因为它在底层利用了索引的指针加速。
    • 适用场景:多支股票收盘价序列(Panel Data)的快速横向对齐,或将主表与已构建好索引的静态特征表合并。

经验法则:当需要业务维度的关联(如外键映射)时,用 pd.merge();当需要时间维度的对齐(如多指标拼接)时,优先考虑 df.join()

让我们使用 .join() 来重现之前的合并操作。

# .join() 默认执行左连接
urban_unemployment_df.join(consumer_inflation_df)
表 6.20: 使用 .join() 方法进行基于索引的合并
unemployment inflation
datetime
2018-01-01 11014.491959 NaN
2018-04-01 10319.740377 NaN
2018-07-01 8720.947461 NaN
2018-10-01 7652.099778 NaN
2019-01-01 8508.671010 NaN
2019-04-01 9404.323467 NaN
2019-07-01 9388.073331 NaN
2019-10-01 9824.681792 NaN
2020-01-01 10827.637905 2007.838016
2020-04-01 10892.488990 2111.580805
2020-07-01 13351.373455 2663.731241
2020-10-01 13747.496943 2727.297028
2021-01-01 14718.413584 3009.887488
2021-04-01 14418.922195 3083.179847
2021-07-01 14656.856244 3332.796908
2021-10-01 14671.669257 3372.324298

为了得到与之前相同的 inner 连接结果,我们指定 how='inner'

urban_unemployment_df.join(consumer_inflation_df, how='inner')
表 6.21: 使用 .join() 方法执行内连接
unemployment inflation
datetime
2020-01-01 10827.637905 2007.838016
2020-04-01 10892.488990 2111.580805
2020-07-01 13351.373455 2663.731241
2020-10-01 13747.496943 2727.297028
2021-01-01 14718.413584 3009.887488
2021-04-01 14418.922195 3083.179847
2021-07-01 14656.856244 3332.796908
2021-10-01 14671.669257 3372.324298

6.3.3 沿轴连接

另一种数据组合方式被称为连接 (concatenation)堆叠 (stacking)pandas.concat 是用于此操作的主要函数。让我们创建一个场景,我们有两个独立的中国城镇失业率数据集:一个用于2018-2019年,另一个用于2020-2021年。我们想将它们组合成一个单一的时间序列。

# 连接两个本地数据片段
urban_unemployment_df_s1 = urban_unemployment_df_s.sort_index().loc['2018-01-01':'2019-12-31']['close']
urban_unemployment_df_s2 = urban_unemployment_df_s.sort_index().loc['2020-01-01':'2021-12-31']['close']

# 连接两个 series
urban_unemployment_df_full = pd.concat([urban_unemployment_df_s1, urban_unemployment_df_s2])
print(urban_unemployment_df_full.head())
print('...')
print(urban_unemployment_df_full.tail())
datetime
2018-01-02    11178.052
2018-01-03    11280.296
2018-01-04    11341.345
2018-01-05    11342.848
2018-01-08    11382.717
Name: close, dtype: float64
...
datetime
2021-12-27    14715.6458
2021-12-28    14837.8687
2021-12-29    14653.8236
2021-12-30    14796.2313
2021-12-31    14857.3453
Name: close, dtype: float64

默认情况下,pd.concat 沿 axis=0(行索引)工作。如果你传递 axis=1axis='columns'),结果将是一个 DataFrame

pd.concat([urban_unemployment_df_s1, urban_unemployment_df_s2], axis=1).head()
表 6.22: 沿列轴连接 Series
close close
datetime
2018-01-02 11178.052 NaN
2018-01-03 11280.296 NaN
2018-01-04 11341.345 NaN
2018-01-05 11342.848 NaN
2018-01-08 11382.717 NaN

一个潜在的问题是,在结果中无法识别连接的各个部分。假设你希望在连接轴上创建一个分层索引。为此,可以使用 keys 参数。

result = pd.concat([urban_unemployment_df_s1, urban_unemployment_df_s2], keys=['2018-2019', '2020-2021'])
result
列表 6.8
           datetime  
2018-2019  2018-01-02    11178.0520
           2018-01-03    11280.2960
           2018-01-04    11341.3450
           2018-01-05    11342.8480
           2018-01-08    11382.7170
                            ...    
2020-2021  2021-12-27    14715.6458
           2021-12-28    14837.8687
           2021-12-29    14653.8236
           2021-12-30    14796.2313
           2021-12-31    14857.3453
Name: close, Length: 973, dtype: float64

同样的逻辑也适用于 DataFrame 对象。让我们创建两个包含不同中国经济指标的 DataFrame 并将它们连接起来。

temp_gdp_chunk = quarterly_gdp_df.loc['2020-01-01':'2020-12-31'][['gdp']]
temp_unemployment_chunk = urban_unemployment_df.loc['2020-01-01':'2020-12-31'][['unemployment']]

pd.concat([temp_gdp_chunk, temp_unemployment_chunk], axis=1)
gdp unemployment
datetime
2020-01-01 2750.2962 10827.637905
2020-04-01 2984.6741 10892.488990
2020-07-01 3218.0521 13351.373455
2020-10-01 3473.0693 13747.496943

最后一个需要考虑的问题是,DataFrame 的行索引不包含任何相关数据的情况。在这种情况下,你可以传递 ignore_index=True,这将丢弃原始索引并分配一个新的默认整数索引。

temp_gdp_chunk.reset_index(drop=True, inplace=True)
temp_unemployment_chunk.reset_index(drop=True, inplace=True)

pd.concat([temp_gdp_chunk, temp_unemployment_chunk], ignore_index=True)
表 6.23: 在连接 DataFrame 时忽略原始索引
gdp unemployment
0 2750.2962 NaN
1 2984.6741 NaN
2 3218.0521 NaN
3 3473.0693 NaN
4 NaN 10827.637905
5 NaN 10892.488990
6 NaN 13351.373455
7 NaN 13747.496943

6.3.4 合并重叠数据

还有一种数据组合情况,既不能表示为合并也不能表示为连接操作。你可能有索引完全或部分重叠的两个数据集,并且你希望用一个数据集中的值来“修补”另一个数据集中的缺失值。这在处理来自不同来源且可能存在数据缺口的数据时是一项常见任务。combine_first 方法就是为此设计的。

让我们创建两个 Series,它们有一些重叠的索引和 NaN 值。

bid_prices = pd.Series([np.nan, 10, np.nan, 30], index=['AAPL', 'GOOG', 'MSFT', 'AMZN'], name='bid')
ask_prices = pd.Series([100, np.nan, 300, 400], index=['AAPL', 'GOOG', 'MSFT', 'AMZN'], name='ask')

# 当 bid_prices 中有 NaN 时,使用 ask_prices 中的值
bid_prices.combine_first(ask_prices)
列表 6.9
AAPL    100.0
GOOG     10.0
MSFT    300.0
AMZN     30.0
Name: bid, dtype: float64

对于 DataFramecombine_first 对每一列执行相同的操作。因此,你可以将其视为用你传递的对象中的数据“修补”调用对象中的缺失数据。

exchange_a = pd.DataFrame({'AAPL': [150., np.nan, 155., np.nan],
                    'GOOG': [np.nan, 2800., np.nan, 2850.]})
exchange_b = pd.DataFrame({'AAPL': [152., 148., np.nan, 153., 158.],
                    'GOOG': [np.nan, 2810., 2820., 2860., 2900.]})

exchange_a.combine_first(exchange_b)
表 6.24: 用另一个 DataFrame 修补一个 DataFrame
AAPL GOOG
0 150.0 NaN
1 148.0 2800.0
2 155.0 2820.0
3 153.0 2850.0
4 158.0 2900.0

使用 DataFrame 对象时,combine_first 的输出将包含所有列名的并集。

6.4 重塑与透视

有许多用于重排表格数据的基本操作。这些操作被称为重塑 (reshape)透视 (pivot) 操作。

6.4.1 使用分层索引进行重塑

正如我们在 小节 6.2.1 中看到的,分层索引为在 DataFrame 中重排数据提供了一种一致的方式。两个主要操作是: * stack: 这个操作将数据从列“旋转”或透视到行。 * unstack: 这个操作将数据从行透视到列。

让我们用一个清晰的例子再次说明这些操作。我们将构建一个包含中国GDP增长率和工业生产增长率的 DataFrame

# 使用本地指数数据模拟
cn_gdp = quarterly_gdp_df.loc['2021-01-01':'2021-12-31']['gdp'].pct_change().dropna()
cn_ipi = consumer_inflation_df.loc['2021-01-01':'2021-12-31']['inflation'].pct_change().dropna()

economic_growth_panel_df = pd.DataFrame({'GDP_Growth': cn_gdp, 'IPI_Growth': cn_ipi})
economic_growth_panel_df.index.name = 'Quarter'
economic_growth_panel_df.columns.name = 'Indicator'
economic_growth_panel_df
Indicator GDP_Growth IPI_Growth
Quarter
2021-04-01 0.043373 0.024351
2021-07-01 -0.006413 0.080961
2021-10-01 0.020069 0.011860

对这个数据使用 stack 方法会将列透视到行,生成一个带有 MultiIndexSeries

stacked_economic_growth_series = economic_growth_panel_df.stack()
stacked_economic_growth_series
列表 6.10
Quarter     Indicator 
2021-04-01  GDP_Growth    0.043373
            IPI_Growth    0.024351
2021-07-01  GDP_Growth   -0.006413
            IPI_Growth    0.080961
2021-10-01  GDP_Growth    0.020069
            IPI_Growth    0.011860
dtype: float64

从这个分层索引的 Series 中,你可以使用 unstack 将数据重新排列回 DataFrame

stacked_economic_growth_series.unstack()
表 6.25: 将 Indicator 索引级别透视回列
Indicator GDP_Growth IPI_Growth
Quarter
2021-04-01 0.043373 0.024351
2021-07-01 -0.006413 0.080961
2021-10-01 0.020069 0.011860

默认情况下,最内层的级别被 unstack。你可以通过传递级别编号或名称来 unstack 不同的级别。例如,让我们 unstack ‘Quarter’ 级别。

stacked_economic_growth_series.unstack(level='Quarter')
表 6.26: 按 Quarter 级别名称进行 unstack
Quarter 2021-04-01 2021-07-01 2021-10-01
Indicator
GDP_Growth 0.043373 -0.006413 0.020069
IPI_Growth 0.024351 0.080961 0.011860

6.4.2 将“长”格式透视为“宽”格式

在数据库和CSV文件中存储多个时间序列的一种常见方式是所谓的长格式 (long)堆叠格式 (stacked)。在这种格式中,每一行都是一个单独的观测值。这通常是用于存储和某些类型分析的首选“整洁”数据格式。

让我们使用 FRED 中中国 realgdpinflunemp 的数据创建一个长格式数据集。

# 使用之前创建的本地数据片段
# 这里我们已经有了 gdp, infl, unemp (其实是模拟的)
macroeconomic_wide_df = pd.DataFrame({'realgdp': quarterly_gdp_df['gdp'], 'infl': consumer_inflation_df['inflation'], 'unemp': urban_unemployment_df['unemployment']}).dropna()
macroeconomic_wide_df.index.name = 'date'
macroeconomic_wide_df.columns.name = 'item'

# 堆叠以创建 long 格式
macroeconomic_long_df = (macroeconomic_wide_df.stack()
             .reset_index()
             .rename(columns={0: 'value'}))

macroeconomic_long_df.head(10)
date item value
0 2020-01-01 realgdp 2750.296200
1 2020-01-01 infl 2007.838016
2 2020-01-01 unemp 10827.637905
3 2020-04-01 realgdp 2984.674100
4 2020-04-01 infl 2111.580805
5 2020-04-01 unemp 10892.488990
6 2020-07-01 realgdp 3218.052100
7 2020-07-01 infl 2663.731241
8 2020-07-01 unemp 13351.373455
9 2020-10-01 realgdp 3473.069300

在这种长格式中,每一行代表一个单一的观测值(特定日期的特定项目)。这是关系型数据库中常见的结构。然而,对于时间序列分析或某些类型的建模,你可能更喜欢宽格式 (wide),即每个不同的 item 都有一列。pivot 方法正是执行这种转换。

pivoted = macroeconomic_long_df.pivot(index='date', columns='item', values='value')
pivoted.head()
表 6.27: 使用 pivot 将数据从长格式转换为宽格式
item infl realgdp unemp
date
2020-01-01 2007.838016 2750.2962 10827.637905
2020-04-01 2111.580805 2984.6741 10892.488990
2020-07-01 2663.731241 3218.0521 13351.373455
2020-10-01 2727.297028 3473.0693 13747.496943
2021-01-01 3009.887488 3441.9115 14718.413584

pivot 的前两个参数分别是将用作行索引和列索引的列。values 参数是包含要填充 DataFrame 的数据的列。如果省略 values,并且你有多个剩余的列,你将得到一个带有分层列的 DataFrame

6.4.3 将“宽”格式透视为“长”格式

对于 DataFramepivot 的逆操作是 pandas.melt。它不是将一列转换为多列,而是将多列合并为一列,生成一个比输入更长的 DataFrame

让我们从一个简单的宽格式 DataFrame 开始,以中国知名公司为例。

china_tech_revenue_df = pd.DataFrame({'公司名称': ['华为', '腾讯', '阿里巴巴'],
                   '收入_2022': [6423, 5545, 8645],
                   '收入_2023': [7042, 5601, 8791]})
china_tech_revenue_df
表 6.28: 一个用于 melt 操作的示例宽格式 DataFrame
公司名称 收入_2022 收入_2023
0 华为 6423 7042
1 腾讯 5545 5601
2 阿里巴巴 8645 8791

'公司名称' 列可以是一个分组指标,其他列是数据值。使用 pandas.melt 时,我们必须指明哪些列是分组指标(id_vars)。

china_tech_revenue_long_df = pd.melt(china_tech_revenue_df, id_vars='公司名称')
china_tech_revenue_long_df
表 6.29: 使用 melt 将宽格式数据转换为长格式
公司名称 variable value
0 华为 收入_2022 6423
1 腾讯 收入_2022 5545
2 阿里巴巴 收入_2022 8645
3 华为 收入_2023 7042
4 腾讯 收入_2023 5601
5 阿里巴巴 收入_2023 8791

你可以使用 pivot 将其重塑回原始布局。

revenue_wide = china_tech_revenue_long_df.pivot(index='公司名称', columns='variable', values='value')
revenue_wide.reset_index()
表 6.30: 将 melt 后的数据透视回其原始的宽格式
variable 公司名称 收入_2022 收入_2023
0 华为 6423 7042
1 腾讯 5545 5601
2 阿里巴巴 8645 8791

你还可以指定一个列的子集作为值列(value_vars)。

pd.melt(china_tech_revenue_df, id_vars='公司名称', value_vars=['收入_2022'])
表 6.31: 使用 melt 并指定 value_vars
公司名称 variable value
0 华为 收入_2022 6423
1 腾讯 收入_2022 5545
2 阿里巴巴 收入_2022 8645

6.5 习题

6.5.1 习题 8.1: 数据合并基础

问题描述

假设你有两个数据集: - 数据集A:宁波港 (601018.SH) 的日度交易数据(日期、开盘价、收盘价) - 数据集B:宁波港的日度成交量数据(日期、成交量)

请完成以下任务: 1. 使用 merge 函数按日期合并两个数据集 2. 使用不同类型的连接(inner、left、right、outer)并比较结果 3. 使用 join 方法合并两个数据集 4. 使用 concat 函数合并两个数据集

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')

# 筛选宁波港 2023 年 1 月的数据
port_data_jan = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-31')].copy()

# 创建数据集A:价格数据
port_price_data = port_data_jan[['datetime', 'open', 'close']].copy()
port_price_data = port_price_data.rename(columns={'datetime': 'date'})

# 创建数据集B:成交量数据(故意少几天,模拟不完整数据)
port_volume_data = port_data_jan[['datetime', 'volume']].iloc[5:].copy()
port_volume_data = port_volume_data.rename(columns={'datetime': 'date'})

print('=== 数据集信息 ===')
print(f'数据集A形状: {port_price_data.shape}')
print(f'数据集B形状: {port_volume_data.shape}')

# 1. 使用 merge 函数
print('\n=== 1. Merge 不同连接类型 ===')

# Inner join(默认)
merged_inner = pd.merge(port_price_data, port_volume_data, on='date', how='inner')
print(f'\nInner join 形状: {merged_inner.shape}')
print(merged_inner.head())

# Left join
merged_left = pd.merge(port_price_data, port_volume_data, on='date', how='left')
print(f'\nLeft join 形状: {merged_left.shape}')
print(f'Left join 缺失值: {merged_left.isna().sum().sum()}')

# Right join
merged_right = pd.merge(port_price_data, port_volume_data, on='date', how='right')
print(f'\nRight join 形状: {merged_right.shape}')
print(f'Right join 缺失值: {merged_right.isna().sum().sum()}')

# Outer join
merged_outer = pd.merge(port_price_data, port_volume_data, on='date', how='outer')
print(f'\nOuter join 形状: {merged_outer.shape}')
print(f'Outer join 缺失值: {merged_outer.isna().sum().sum()}')

# 2. 使用 join 方法(基于索引)
print('\n=== 2. Join 方法 ===')
port_price_data_indexed = port_price_data.set_index('date')
port_volume_data_indexed = port_volume_data.set_index('date')

joined = port_price_data_indexed.join(port_volume_data_indexed, how='outer')
print(f'Joined 形状: {joined.shape}')
print(joined.head())

# 3. 使用 concat 函数
print('\n=== 3. Concat 方法 ===')
# 沿着列方向合并
concatenated = pd.concat([port_price_data_indexed, port_volume_data_indexed], axis=1)
print(f'Concatenated 形状: {concatenated.shape}')
print(concatenated.head())

# 4. 比较不同方法
print('\n=== 4. 方法对比 ===')
comparison = pd.DataFrame({
    '方法': ['merge_inner', 'merge_left', 'merge_right', 'merge_outer', 'join', 'concat'],
    '形状': [
        str(merged_inner.shape),
        str(merged_left.shape),
        str(merged_right.shape),
        str(merged_outer.shape),
        str(joined.shape),
        str(concatenated.shape)
    ],
    '保留所有日期': ['否', '是(来自A)', '是(来自B)', '是', '是', '是']
})
print(comparison)
=== 数据集信息 ===
数据集A形状: (16, 3)
数据集B形状: (11, 2)

=== 1. Merge 不同连接类型 ===

Inner join 形状: (11, 4)
        date    open   close     volume
0 2023-01-10  3.2669  3.2485  5882400.0
1 2023-01-11  3.2394  3.2394  4453499.0
2 2023-01-12  3.2394  3.2302  4491700.0
3 2023-01-13  3.2302  3.2577  4929600.0
4 2023-01-16  3.2669  3.2761  8098816.0

Left join 形状: (16, 4)
Left join 缺失值: 5

Right join 形状: (11, 4)
Right join 缺失值: 0

Outer join 形状: (16, 4)
Outer join 缺失值: 5

=== 2. Join 方法 ===
Joined 形状: (16, 3)
              open   close  volume
date                              
2023-01-03  3.2761  3.2761     NaN
2023-01-04  3.2669  3.2944     NaN
2023-01-05  3.2944  3.2852     NaN
2023-01-06  3.2852  3.2669     NaN
2023-01-09  3.2669  3.2669     NaN

=== 3. Concat 方法 ===
Concatenated 形状: (16, 3)
              open   close  volume
date                              
2023-01-03  3.2761  3.2761     NaN
2023-01-04  3.2669  3.2944     NaN
2023-01-05  3.2944  3.2852     NaN
2023-01-06  3.2852  3.2669     NaN
2023-01-09  3.2669  3.2669     NaN

=== 4. 方法对比 ===
            方法       形状  保留所有日期
0  merge_inner  (11, 4)       否
1   merge_left  (16, 4)  是(来自A)
2  merge_right  (11, 4)  是(来自B)
3  merge_outer  (16, 4)       是
4         join  (16, 3)       是
5       concat  (16, 3)       是

关键要点: - merge 基于列值合并,最灵活,适合不同索引的数据集 - join 基于索引合并,语法更简洁 - concat 沿着轴简单拼接,不做键值匹配 - Inner join 只保留两个数据集都有的键 - Outer join 保留所有键,可能产生缺失值 - Left/Right join 保留左/右数据集的所有键


6.5.2 习题 8.2: 多对一合并

问题描述

在实际应用中,经常需要将包含多个观测的数据集与包含参考信息的数据集合并。请:

  1. 创建包含股票代码和公司名称的参考数据
  2. 创建包含多只股票(宁波港、宁波银行、贵州茅台)交易数据的数据集
  3. 将公司名称映射到交易数据上
  4. 分析合并后的数据

完整解答

import pandas as pd
import numpy as np

# 1. 创建参考数据:股票代码与公司名称映射
reference_data = pd.DataFrame({
    'stock_code': ['601018.SH', '002142.SZ', '600519.SH', '000001.SZ', '601318.SH'],
    'company_name': ['宁波港', '宁波银行', '贵州茅台', '平安银行', '中国平安'],
    'sector': ['交通运输', '金融服务', '消费品', '金融服务', '金融服务'],
    'city': ['宁波', '宁波', '贵阳', '深圳', '深圳']
})

print('=== 参考数据 ===')
print(reference_data)

# 2. 创建交易数据
# 从本地 Parquet 文件读取数据代替 HDF5
port_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
maotai_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_stock_data['symbol'] = '601018.SH'
bank_stock_data['symbol'] = '002142.SZ'
maotai_stock_data['symbol'] = '600519.SH'
stock_data = pd.concat([port_stock_data, bank_stock_data, maotai_stock_data])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-31')]

# 筛选多只股票的数据
symbols = ['601018.SH', '002142.SZ', '600519.SH']
trading_data = stock_data[stock_data['symbol'].isin(symbols)][['datetime', 'symbol', 'close', 'volume']].copy()
trading_data = trading_data.rename(columns={'symbol': 'stock_code'})

print('\n=== 交易数据示例 ===')
print(trading_data.head(10))

# 3. 多对一合并
# 将公司信息映射到每一笔交易记录
merged_data = pd.merge(trading_data, reference_data, on='stock_code', how='left')

print('\n=== 合并后数据 ===')
print(merged_data.head(15))

# 4. 分析合并后的数据
print('\n=== 按公司统计 ===')
company_stats = merged_data.groupby('company_name').agg({
    'close': ['mean', 'min', 'max'],
    'volume': 'sum'
}).round(2)
print(company_stats)

print('\n=== 按行业统计 ===')
sector_stats = merged_data.groupby('sector').agg({
    'close': 'mean',
    'volume': 'sum'
}).round(2)
print(sector_stats)

print('\n=== 按城市统计 ===')
city_stats = merged_data.groupby('city').agg({
    'close': 'mean',
    'volume': 'sum'
}).round(2)
print(city_stats)

# 5. 验证合并完整性
print('\n=== 合并完整性检查 ===')
print(f'交易数据行数: {len(trading_data)}')
print(f'合并数据行数: {len(merged_data)}')
print(f'缺失值数量: {merged_data.isna().sum().sum()}')
print(f'唯一公司数: {merged_data["company_name"].nunique()}')
=== 参考数据 ===
  stock_code company_name sector city
0  601018.SH          宁波港   交通运输   宁波
1  002142.SZ         宁波银行   金融服务   宁波
2  600519.SH         贵州茅台    消费品   贵阳
3  000001.SZ         平安银行   金融服务   深圳
4  601318.SH         中国平安   金融服务   深圳

=== 交易数据示例 ===
       datetime stock_code   close      volume
2981 2023-01-03  601018.SH  3.2761  10743034.0
2982 2023-01-04  601018.SH  3.2944   8787221.0
2983 2023-01-05  601018.SH  3.2852  10407133.0
2984 2023-01-06  601018.SH  3.2669  10450610.0
2985 2023-01-09  601018.SH  3.2669   6518398.0
2986 2023-01-10  601018.SH  3.2485   5882400.0
2987 2023-01-11  601018.SH  3.2394   4453499.0
2988 2023-01-12  601018.SH  3.2302   4491700.0
2989 2023-01-13  601018.SH  3.2577   4929600.0
2990 2023-01-16  601018.SH  3.2761   8098816.0

=== 合并后数据 ===
     datetime stock_code   close      volume company_name sector city
0  2023-01-03  601018.SH  3.2761  10743034.0          宁波港   交通运输   宁波
1  2023-01-04  601018.SH  3.2944   8787221.0          宁波港   交通运输   宁波
2  2023-01-05  601018.SH  3.2852  10407133.0          宁波港   交通运输   宁波
3  2023-01-06  601018.SH  3.2669  10450610.0          宁波港   交通运输   宁波
4  2023-01-09  601018.SH  3.2669   6518398.0          宁波港   交通运输   宁波
5  2023-01-10  601018.SH  3.2485   5882400.0          宁波港   交通运输   宁波
6  2023-01-11  601018.SH  3.2394   4453499.0          宁波港   交通运输   宁波
7  2023-01-12  601018.SH  3.2302   4491700.0          宁波港   交通运输   宁波
8  2023-01-13  601018.SH  3.2577   4929600.0          宁波港   交通运输   宁波
9  2023-01-16  601018.SH  3.2761   8098816.0          宁波港   交通运输   宁波
10 2023-01-17  601018.SH  3.2761   4898283.0          宁波港   交通运输   宁波
11 2023-01-18  601018.SH  3.2852   7279200.0          宁波港   交通运输   宁波
12 2023-01-19  601018.SH  3.3036   6843700.0          宁波港   交通运输   宁波
13 2023-01-20  601018.SH  3.3403  15032977.0          宁波港   交通运输   宁波
14 2023-01-30  601018.SH  3.3403  14658336.0          宁波港   交通运输   宁波

=== 按公司统计 ===
                close                         volume
                 mean      min      max          sum
company_name                                        
宁波港              3.28     3.23     3.37  132777579.0
宁波银行            30.37    29.38    31.20  447477998.0
贵州茅台          1670.46  1562.39  1732.56   44202953.0

=== 按行业统计 ===
          close       volume
sector                      
交通运输       3.28  132777579.0
消费品     1670.46   44202953.0
金融服务      30.37  447477998.0

=== 按城市统计 ===
        close       volume
city                      
宁波      16.83  580255577.0
贵阳    1670.46   44202953.0

=== 合并完整性检查 ===
交易数据行数: 48
合并数据行数: 48
缺失值数量: 0
唯一公司数: 3

关键要点: - 多对一合并是最常见的场景之一 - 参考数据通常包含维度属性(行业、地区等) - how='left' 确保保留所有交易记录 - 合并后可以进行多维度分析 - 注意检查合并的完整性(缺失值、行数等)


6.5.3 习题 8.3: 多对多合并

问题描述

多对多合并会产生两个数据集键的笛卡尔积。请:

  1. 创建员工-项目关系数据(一个员工可以参与多个项目)
  2. 创建项目-技能要求数据(一个项目需要多种技能)
  3. 执行多对多合并,分析员工技能覆盖情况
  4. 识别技能缺口

完整解答

import pandas as pd
import numpy as np

# 1. 创建员工-项目数据
employee_projects = pd.DataFrame({
    'employee_id': ['E001', 'E001', 'E002', 'E002', 'E003', 'E003', 'E004'],
    'employee_name': ['张三', '张三', '李四', '李四', '王五', '王五', '赵六'],
    'project_id': ['P001', 'P002', 'P001', 'P003', 'P002', 'P003', 'P001'],
    'role': ['开发者', '项目经理', '开发者', '分析师', '测试员', '开发者', '设计师']
})

print('=== 员工-项目关系 ===')
print(employee_projects)

# 2. 创建项目-技能需求数据
project_skills = pd.DataFrame({
    'project_id': ['P001', 'P001', 'P001', 'P002', 'P002', 'P003', 'P003'],
    'required_skill': ['Python', 'SQL', '机器学习', '项目管理', '沟通能力', '数据分析', '可视化'],
    'proficiency_level': ['高级', '中级', '中级', '高级', '中级', '高级', '中级']
})

print('\n=== 项目-技能需求 ===')
print(project_skills)

# 3. 多对多合并
merged = pd.merge(employee_projects, project_skills, on='project_id', how='inner')

print('\n=== 多对多合并结果 ===')
print(f'合并前: 员工-项目 {len(employee_projects)} 行, 项目-技能 {len(project_skills)} 行')
print(f'合并后: {len(merged)} 行')
print('\n合并数据示例:')
print(merged)

# 4. 分析员工技能覆盖情况
print('\n=== 按员工统计所需技能 ===')
employee_skills = merged.groupby('employee_name')['required_skill'].apply(list).reset_index()
employee_skills.columns = ['员工', '所需技能']
print(employee_skills)

print('\n=== 按项目统计参与员工 ===')
project_employees = merged.groupby('project_id')['employee_name'].nunique().reset_index()
project_employees.columns = ['项目ID', '参与员工数']
print(project_employees)

print('\n=== 项目技能需求明细 ===')
project_detail = merged.groupby('project_id').agg({
    'employee_name': lambda x: ', '.join(sorted(set(x))),
    'required_skill': lambda x: ', '.join(sorted(x)),
    'proficiency_level': lambda x: ', '.join(x)
}).reset_index()
project_detail.columns = ['项目ID', '参与员工', '所需技能', '技能等级']
print(project_detail)

# 5. 创建技能矩阵
skill_matrix = merged.pivot_table(
    index='employee_name',
    columns='required_skill',
    values='project_id',
    aggfunc='count',
    fill_value=0
)

print('\n=== 技能覆盖矩阵 ===')
print('(数字表示该员工需要该技能的项目数)')
print(skill_matrix)

# 6. 识别技能热点
print('\n=== 最热门技能 ===')
skill_demand = merged['required_skill'].value_counts().reset_index()
skill_demand.columns = ['技能', '涉及项目数']
print(skill_demand)
=== 员工-项目关系 ===
  employee_id employee_name project_id  role
0        E001            张三       P001   开发者
1        E001            张三       P002  项目经理
2        E002            李四       P001   开发者
3        E002            李四       P003   分析师
4        E003            王五       P002   测试员
5        E003            王五       P003   开发者
6        E004            赵六       P001   设计师

=== 项目-技能需求 ===
  project_id required_skill proficiency_level
0       P001         Python                高级
1       P001            SQL                中级
2       P001           机器学习                中级
3       P002           项目管理                高级
4       P002           沟通能力                中级
5       P003           数据分析                高级
6       P003            可视化                中级

=== 多对多合并结果 ===
合并前: 员工-项目 7 行, 项目-技能 7 行
合并后: 17 行

合并数据示例:
   employee_id employee_name project_id  role required_skill proficiency_level
0         E001            张三       P001   开发者         Python                高级
1         E001            张三       P001   开发者            SQL                中级
2         E001            张三       P001   开发者           机器学习                中级
3         E001            张三       P002  项目经理           项目管理                高级
4         E001            张三       P002  项目经理           沟通能力                中级
5         E002            李四       P001   开发者         Python                高级
6         E002            李四       P001   开发者            SQL                中级
7         E002            李四       P001   开发者           机器学习                中级
8         E002            李四       P003   分析师           数据分析                高级
9         E002            李四       P003   分析师            可视化                中级
10        E003            王五       P002   测试员           项目管理                高级
11        E003            王五       P002   测试员           沟通能力                中级
12        E003            王五       P003   开发者           数据分析                高级
13        E003            王五       P003   开发者            可视化                中级
14        E004            赵六       P001   设计师         Python                高级
15        E004            赵六       P001   设计师            SQL                中级
16        E004            赵六       P001   设计师           机器学习                中级

=== 按员工统计所需技能 ===
   员工                             所需技能
0  张三  [Python, SQL, 机器学习, 项目管理, 沟通能力]
1  李四   [Python, SQL, 机器学习, 数据分析, 可视化]
2  王五          [项目管理, 沟通能力, 数据分析, 可视化]
3  赵六              [Python, SQL, 机器学习]

=== 按项目统计参与员工 ===
   项目ID  参与员工数
0  P001      3
1  P002      2
2  P003      2

=== 项目技能需求明细 ===
   项目ID        参与员工                                               所需技能  \
0  P001  张三, 李四, 赵六  Python, Python, Python, SQL, SQL, SQL, 机器学习, 机...   
1  P002      张三, 王五                             沟通能力, 沟通能力, 项目管理, 项目管理   
2  P003      李四, 王五                               可视化, 可视化, 数据分析, 数据分析   

                                 技能等级  
0  高级, 中级, 中级, 高级, 中级, 中级, 高级, 中级, 中级  
1                      高级, 中级, 高级, 中级  
2                      高级, 中级, 高级, 中级  

=== 技能覆盖矩阵 ===
(数字表示该员工需要该技能的项目数)
required_skill  Python  SQL  可视化  数据分析  机器学习  沟通能力  项目管理
employee_name                                           
张三                   1    1    0     0     1     1     1
李四                   1    1    1     1     1     0     0
王五                   0    0    1     1     0     1     1
赵六                   1    1    0     0     1     0     0

=== 最热门技能 ===
       技能  涉及项目数
0  Python      3
1     SQL      3
2    机器学习      3
3    项目管理      2
4    沟通能力      2
5    数据分析      2
6     可视化      2

关键要点: - 多对多合并会产生键的笛卡尔积 - 结果行数通常大于任一输入数据集 - 适合分析关系型数据(员工-项目-技能) - 需要仔细理解业务逻辑以正确解释结果 - pivot_table 可以将长格式转换为易读的矩阵


6.5.4 习题 8.4: 分层索引操作

问题描述

分层索引是 pandas 中处理多维数据的强大工具。请:

  1. 创建包含股票、日期、指标的分层索引数据
  2. 使用 stackunstack 进行数据重塑
  3. 使用 swaplevelreorder_levels 调整索引层级
  4. 执行分层索引的选择和分组聚合

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
port_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_stock_data['symbol'] = '601018.SH'
bank_stock_data['symbol'] = '002142.SZ'
stock_data = pd.concat([port_stock_data, bank_stock_data])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-15')]

# 筛选宁波港和宁波银行的数据
symbols = ['601018.SH', '002142.SZ']
filtered = stock_data[stock_data['symbol'].isin(symbols)][['datetime', 'symbol', 'open', 'high', 'low', 'close', 'volume']].copy()

# 重命名股票代码
filtered['symbol'] = filtered['symbol'].map({
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行'
})

# 1. 创建分层索引
hierarchical_data = filtered.set_index(['symbol', 'datetime']).sort_index()

print('=== 分层索引数据结构 ===')
print(f'索引层级: {hierarchical_data.index.names}')
print(f'数据形状: {hierarchical_data.shape}')
print(f'\n数据示例:')
print(hierarchical_data.head(10))

# 2. 使用 unstack 将数据从长格式转换为宽格式
print('\n=== Unstack: 将股票索引转为列 ===')
unstacked = hierarchical_data['close'].unstack(level='symbol')
print(f'Unstacked 形状: {unstacked.shape}')
print(unstacked.head())

# 使用 stack 将数据从宽格式转换为长格式
print('\n=== Stack: 将列转回索引 ===')
restacked = unstacked.stack()
print(f'Restacked 形状: {restacked.shape}')
print(restacked.head())

# 3. 使用 swaplevel 交换索引层级
print('\n=== Swaplevel: 交换索引层级 ===')
swapped = hierarchical_data.swaplevel('symbol', 'datetime')
print(f'原始索引顺序: {hierarchical_data.index.names}')
print(f'交换后索引顺序: {swapped.index.names}')
print(swapped.head())

# 使用 reorder_levels 重新排序
reordered = hierarchical_data.reorder_levels(['datetime', 'symbol'])
print(f'\nReorder 后索引顺序: {reordered.index.names}')

# 4. 分层索引的选择
print('\n=== 选择特定股票的所有数据 ===')
nbz_data = hierarchical_data.loc['宁波港']
print(nbz_data.head())

print('\n=== 选择特定日期的所有股票数据 ===')
specific_date = hierarchical_data.loc[(slice(None), '2023-01-03'), :]
print(specific_date)

print('\n=== 使用 xs 进行跨层级选择 ===')
# xs 允许你从多层索引中选择特定值
nbz_via_xs = hierarchical_data.xs('宁波港', level='symbol')
print(nbz_via_xs.head())

# 5. 分层分组聚合
print('\n=== 按股票分组统计 ===')
by_symbol = hierarchical_data.groupby(level='symbol').agg({
    'close': ['mean', 'std', 'min', 'max'],
    'volume': 'sum'
}).round(2)
print(by_symbol)

print('\n=== 按日期分组统计 ===')
by_date = hierarchical_data.groupby(level='datetime').agg({
    'close': ['mean', 'std'],
    'volume': 'sum'
}).round(2)
print(by_date.head())

# 6. 多级聚合
print('\n=== 高级多级聚合 ===')
multi_agg = hierarchical_data.groupby(['symbol', pd.Grouper(level='datetime', freq='W')]).agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': 'sum'
}).round(2)
print(multi_agg)
=== 分层索引数据结构 ===
索引层级: ['symbol', 'datetime']
数据形状: (18, 5)

数据示例:
                      open     high      low    close      volume
symbol datetime                                                  
宁波港    2023-01-03   3.2761   3.2944   3.2577   3.2761  10743034.0
       2023-01-04   3.2669   3.2944   3.2669   3.2944   8787221.0
       2023-01-05   3.2944   3.3219   3.2761   3.2852  10407133.0
       2023-01-06   3.2852   3.2944   3.2577   3.2669  10450610.0
       2023-01-09   3.2669   3.2852   3.2577   3.2669   6518398.0
       2023-01-10   3.2669   3.2761   3.2485   3.2485   5882400.0
       2023-01-11   3.2394   3.2669   3.2394   3.2394   4453499.0
       2023-01-12   3.2394   3.2577   3.2302   3.2302   4491700.0
       2023-01-13   3.2302   3.2669   3.2302   3.2577   4929600.0
宁波银行   2023-01-03  29.3540  29.5547  28.7338  29.3814  27552512.0

=== Unstack: 将股票索引转为列 ===
Unstacked 形状: (9, 2)
symbol         宁波港     宁波银行
datetime                   
2023-01-03  3.2761  29.3814
2023-01-04  3.2944  30.5399
2023-01-05  3.2852  30.1933
2023-01-06  3.2669  29.7645
2023-01-09  3.2669  29.9470

=== Stack: 将列转回索引 ===
Restacked 形状: (18,)
datetime    symbol
2023-01-03  宁波港        3.2761
            宁波银行      29.3814
2023-01-04  宁波港        3.2944
            宁波银行      30.5399
2023-01-05  宁波港        3.2852
dtype: float64

=== Swaplevel: 交换索引层级 ===
原始索引顺序: ['symbol', 'datetime']
交换后索引顺序: ['datetime', 'symbol']
                     open    high     low   close      volume
datetime   symbol                                            
2023-01-03 宁波港     3.2761  3.2944  3.2577  3.2761  10743034.0
2023-01-04 宁波港     3.2669  3.2944  3.2669  3.2944   8787221.0
2023-01-05 宁波港     3.2944  3.3219  3.2761  3.2852  10407133.0
2023-01-06 宁波港     3.2852  3.2944  3.2577  3.2669  10450610.0
2023-01-09 宁波港     3.2669  3.2852  3.2577  3.2669   6518398.0

Reorder 后索引顺序: ['datetime', 'symbol']

=== 选择特定股票的所有数据 ===
              open    high     low   close      volume
datetime                                              
2023-01-03  3.2761  3.2944  3.2577  3.2761  10743034.0
2023-01-04  3.2669  3.2944  3.2669  3.2944   8787221.0
2023-01-05  3.2944  3.3219  3.2761  3.2852  10407133.0
2023-01-06  3.2852  3.2944  3.2577  3.2669  10450610.0
2023-01-09  3.2669  3.2852  3.2577  3.2669   6518398.0

=== 选择特定日期的所有股票数据 ===
                      open     high      low    close      volume
symbol datetime                                                  
宁波港    2023-01-03   3.2761   3.2944   3.2577   3.2761  10743034.0
宁波银行   2023-01-03  29.3540  29.5547  28.7338  29.3814  27552512.0

=== 使用 xs 进行跨层级选择 ===
              open    high     low   close      volume
datetime                                              
2023-01-03  3.2761  3.2944  3.2577  3.2761  10743034.0
2023-01-04  3.2669  3.2944  3.2669  3.2944   8787221.0
2023-01-05  3.2944  3.3219  3.2761  3.2852  10407133.0
2023-01-06  3.2852  3.2944  3.2577  3.2669  10450610.0
2023-01-09  3.2669  3.2852  3.2577  3.2669   6518398.0

=== 按股票分组统计 ===
        close                           volume
         mean   std    min    max          sum
symbol                                        
宁波港      3.26  0.02   3.23   3.29   66663595.0
宁波银行    30.21  0.56  29.38  30.99  257009431.0

=== 按日期分组统计 ===
            close             volume
             mean    std         sum
datetime                            
2023-01-03  16.33  18.46  38295546.0
2023-01-04  16.92  19.27  46717935.0
2023-01-05  16.74  19.03  35977710.0
2023-01-06  16.52  18.74  50212483.0
2023-01-09  16.61  18.87  32938017.0

=== 高级多级聚合 ===
                    open   high    low  close       volume
symbol datetime                                           
宁波港    2023-01-08   3.28   3.32   3.26   3.27   40387998.0
       2023-01-15   3.27   3.29   3.23   3.26   26275597.0
宁波银行   2023-01-08  29.35  30.93  28.73  29.76  130815676.0
       2023-01-15  29.89  31.30  29.42  30.99  126193755.0

关键要点: - 分层索引允许在单个 DataFrame 中表示多维数据 - stack 将列转换为索引级别(宽→长) - unstack 将索引级别转换为列(长→宽) - swaplevel 交换两个索引级别的位置 - xs 提供了跨层级选择的便捷方法 - groupby 可以基于特定索引层级进行聚合


6.5.5 习题 8.5: 数据重塑与透视表

问题描述

使用宁波港、宁波银行和贵州茅台的交易数据,请:

  1. 创建透视表,显示不同股票在不同日期的收盘价
  2. 使用 pivot 创建交叉表
  3. 使用 melt 将宽格式数据转换为长格式
  4. 计算各股票的涨跌幅并创建热力图数据

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
stock_data_nbz = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
stock_data_nby = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
stock_data_m = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
stock_data_nbz['symbol'] = '601018.SH'
stock_data_nby['symbol'] = '002142.SZ'
stock_data_m['symbol'] = '600519.SH'
stock_data = pd.concat([stock_data_nbz, stock_data_nby, stock_data_m])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-31')]

# 筛选三只股票的数据
symbols_map = {
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行',
    '600519.SH': '贵州茅台'
}

filtered = stock_data[stock_data['symbol'].isin(symbols_map.keys())][['datetime', 'symbol', 'close', 'volume']].copy()
filtered['symbol'] = filtered['symbol'].map(symbols_map)

# 计算涨跌幅
filtered = filtered.sort_values(['symbol', 'datetime'])
filtered['return'] = filtered.groupby('symbol')['close'].pct_change()

print('=== 原始数据示例 ===')
print(filtered.head(10))

# 1. 创建透视表
print('\n=== 1. 透视表:收盘价 ===')
pivot_close = pd.pivot_table(
    filtered,
    values='close',
    index='datetime',
    columns='symbol',
    aggfunc='mean'
)
print(pivot_close.head())

# 2. 使用 pivot 创建交叉表
print('\n=== 2. Pivot: 涨跌幅交叉表 ===')
pivot_returns = filtered.pivot(
    index='datetime',
    columns='symbol',
    values='return'
)
print(pivot_returns.head())

# 3. 使用 melt 将宽格式转换为长格式
print('\n=== 3. Melt: 宽格式→长格式 ===')
# pivot_close 是宽格式,每列是一只股票
melted = pivot_close.reset_index().melt(
    id_vars='datetime',
    var_name='股票',
    value_name='收盘价'
)
print(melted.head(10))

# 4. 创建热力图数据
print('\n=== 4. 热力图数据:按周统计涨跌幅 ===')
filtered['week'] = pd.to_datetime(filtered['datetime']).dt.isocalendar().week

heatmap_data = pd.pivot_table(
    filtered,
    values='return',
    index='week',
    columns='symbol',
    aggfunc='mean'
).round(4)

print('周平均涨跌幅:')
print(heatmap_data)

print('\n=== 5. 多值透视表 ===')
multi_value_pivot = pd.pivot_table(
    filtered,
    values=['close', 'volume', 'return'],
    index='symbol',
    aggfunc={
        'close': ['mean', 'std', 'min', 'max'],
        'volume': ['mean', 'sum'],
        'return': ['mean', 'std']
    }
)
print(multi_value_pivot.round(2))

# 6. 使用 crosstab 创建频数表
print('\n=== 6. CrossTab: 涨跌天数统计 ===')
filtered['direction'] = pd.cut(filtered['return'],
                               bins=[-np.inf, 0, np.inf],
                               labels=['下跌', '上涨'])

crosstab_result = pd.crosstab(
    index=filtered['symbol'],
    columns=filtered['direction'],
    margins=True
)
print(crosstab_result)

# 计算上涨比例
crosstab_pct = pd.crosstab(
    index=filtered['symbol'],
    columns=filtered['direction'],
    normalize='index'
).round(3)
print('\n上涨比例:')
print(crosstab_pct)
=== 原始数据示例 ===
       datetime symbol   close      volume    return
2981 2023-01-03    宁波港  3.2761  10743034.0       NaN
2982 2023-01-04    宁波港  3.2944   8787221.0  0.005586
2983 2023-01-05    宁波港  3.2852  10407133.0 -0.002793
2984 2023-01-06    宁波港  3.2669  10450610.0 -0.005570
2985 2023-01-09    宁波港  3.2669   6518398.0  0.000000
2986 2023-01-10    宁波港  3.2485   5882400.0 -0.005632
2987 2023-01-11    宁波港  3.2394   4453499.0 -0.002801
2988 2023-01-12    宁波港  3.2302   4491700.0 -0.002840
2989 2023-01-13    宁波港  3.2577   4929600.0  0.008513
2990 2023-01-16    宁波港  3.2761   8098816.0  0.005648

=== 1. 透视表:收盘价 ===
symbol         宁波港     宁波银行       贵州茅台
datetime                              
2023-01-03  3.2761  29.3814  1566.9150
2023-01-04  3.2944  30.5399  1562.3863
2023-01-05  3.2852  30.1933  1631.2125
2023-01-06  3.2669  29.7645  1633.7213
2023-01-09  3.2669  29.9470  1667.6226

=== 2. Pivot: 涨跌幅交叉表 ===
symbol           宁波港      宁波银行      贵州茅台
datetime                                
2023-01-03       NaN       NaN       NaN
2023-01-04  0.005586  0.039430 -0.002890
2023-01-05 -0.002793 -0.011349  0.044052
2023-01-06 -0.005570 -0.014202  0.001538
2023-01-09  0.000000  0.006131  0.020751

=== 3. Melt: 宽格式→长格式 ===
    datetime   股票     收盘价
0 2023-01-03  宁波港  3.2761
1 2023-01-04  宁波港  3.2944
2 2023-01-05  宁波港  3.2852
3 2023-01-06  宁波港  3.2669
4 2023-01-09  宁波港  3.2669
5 2023-01-10  宁波港  3.2485
6 2023-01-11  宁波港  3.2394
7 2023-01-12  宁波港  3.2302
8 2023-01-13  宁波港  3.2577
9 2023-01-16  宁波港  3.2761

=== 4. 热力图数据:按周统计涨跌幅 ===
周平均涨跌幅:
symbol     宁波港    宁波银行    贵州茅台
week                          
1      -0.0009  0.0046  0.0142
2      -0.0006  0.0082  0.0092
3       0.0050 -0.0045 -0.0028
5       0.0041 -0.0060 -0.0037

=== 5. 多值透视表 ===
          close                          return             volume  \
            max     mean      min    std   mean   std         mean   
symbol                                                               
宁波港        3.37     3.28     3.23   0.04    0.0  0.01   8298598.69   
宁波银行      31.20    30.37    29.38   0.52    0.0  0.02  27967374.88   
贵州茅台    1732.56  1670.46  1562.39  50.93    0.0  0.02   2762684.56   

                     
                sum  
symbol               
宁波港     132777579.0  
宁波银行    447477998.0  
贵州茅台     44202953.0  

=== 6. CrossTab: 涨跌天数统计 ===
direction  下跌  上涨  All
symbol                
宁波港         8   7   15
宁波银行        9   6   15
贵州茅台        8   7   15
All        25  20   45

上涨比例:
direction     下跌     上涨
symbol                 
宁波港        0.533  0.467
宁波银行       0.600  0.400
贵州茅台       0.533  0.467

关键要点: - pivot_table 灵活,可以指定聚合函数 - pivot 简单,要求索引-列值唯一 - melt 将宽格式转为长格式,便于某些分析 - crosstab 专门用于计算频数和交叉表 - 透视表是数据分析和报表的核心工具 - 热力图数据通常是二维矩阵形式


6.5.6 习题 8.6: 数据合并策略与性能优化

问题描述

在处理大规模数据时,合并操作的效率很重要。请:

  1. 比较不同合并方法的性能
  2. 使用索引键合并 vs 列键合并
  3. 处理重复键名
  4. 优化大数据集的合并操作

完整解答

import pandas as pd
import numpy as np
import time

# 创建较大的数据集用于性能测试
np.random.seed(42)
n_rows = 10000

# 数据集A:模拟交易数据
data_a = pd.DataFrame({
    'transaction_id': range(n_rows),
    'date': pd.date_range('2023-01-01', periods=n_rows),
    'stock_code': np.random.choice(['601018.SH', '002142.SZ', '600519.SH'], n_rows),
    'price': np.random.uniform(10, 100, n_rows),
    'volume': np.random.randint(1000, 100000, n_rows)
})

# 数据集B:模拟公司信息
company_info = pd.DataFrame({
    'stock_code': ['601018.SH', '002142.SZ', '600519.SH'],
    'company_name': ['宁波港', '宁波银行', '贵州茅台'],
    'sector': ['交通运输', '金融服务', '消费品'],
    'region': ['宁波', '宁波', '贵阳']
})

print('=== 数据集信息 ===')
print(f'数据集A形状: {data_a.shape}')
print(f'数据集B形状: {company_info.shape}')

# 1. 性能比较:列键合并 vs 索引键合并
print('\n=== 1. 性能比较 ===')

# 方法1: 使用列键合并
start = time.time()
merged_col = pd.merge(data_a, company_info, on='stock_code', how='left')
time_col = time.time() - start
print(f'列键合并耗时: {time_col:.4f} 秒')

# 方法2: 使用索引键合并
data_a_indexed = data_a.set_index('stock_code')
company_info_indexed = company_info.set_index('stock_code')

start = time.time()
merged_idx = data_a_indexed.join(company_info_indexed, how='left')
time_idx = time.time() - start
print(f'索引键合并耗时: {time_idx:.4f} 秒')

print(f'\n性能提升: {(time_col / time_idx):.2f}x')

# 2. 处理重复键名
print('\n=== 2. 处理重复列名 ===')
data_c = pd.DataFrame({
    'stock_code': ['601018.SH', '002142.SZ', '600519.SH'],
    'region': ['华东', '华东', '西南']
})

# 合并时两个数据集都有 'region' 列
merged_suffix = pd.merge(data_a, company_info, on='stock_code', how='left')
merged_suffix = pd.merge(merged_suffix, data_c, on='stock_code', how='left',
                         suffixes=('_company', '_location'))

print(f'合并后列名: {merged_suffix.columns.tolist()}')
print(merged_suffix[['stock_code', 'region_company', 'region_location']].drop_duplicates())

# 3. 使用验证参数检查合并质量
print('\n=== 3. 合并验证 ===')

# one_to_one: 检查键是否唯一
try:
    merged_validate = pd.merge(
        company_info,
        company_info,
        on='stock_code',
        validate='one_to_one'
    )
    print('One-to-one 验证通过')
except pd.errors.MergeError as e:
    print(f'One-to-one 验证失败: {e}')

# one_to_many: 左侧键唯一,右侧可重复
merged_otm = pd.merge(
    company_info,
    data_a,
    on='stock_code',
    validate='one_to_many'
)
print(f'One-to-many 合并成功: {merged_otm.shape}')

# 4. 优化大数据集合并
print('\n=== 4. 优化建议 ===')

tips = """
大数据集合并优化技巧:

1. 使用索引键合并通常比列键合并更快
2. 如果可能,先对键进行排序
3. 使用 'indicator' 参数检查合并结果
4. 对于大型数据集,考虑使用 Dask 或 modin
5. 避免在合并前进行不必要的操作
6. 使用适当的数据类型减少内存占用
"""
print(tips)

# 演示 indicator 参数
merged_indicator = pd.merge(
    data_a.head(100),
    company_info,
    on='stock_code',
    how='left',
    indicator=True
)

print('\n使用 indicator 检查合并来源:')
print(merged_indicator['_merge'].value_counts())

# 5. 批量合并多个数据集
print('\n=== 5. 批量合并策略 ===')

# 模拟多个数据源
data_sources = [company_info, data_c]

# 方法1: 逐步合并
result_step = data_a.copy()
for df in data_sources:
    result_step = pd.merge(result_step, df, on='stock_code', how='left')

print(f'逐步合并结果: {result_step.shape}')

# 方法2: 使用 reduce(需要 functools)
from functools import reduce

def merge_on_stock(left, right):
    return pd.merge(left, right, on='stock_code', how='left')

result_reduce = reduce(merge_on_stock, [data_a] + data_sources)
print(f'Reduce 合并结果: {result_reduce.shape}')

# 验证两种方法结果相同
print(f'\n两种方法结果一致: {result_step.equals(result_reduce)}')
=== 数据集信息 ===
数据集A形状: (10000, 5)
数据集B形状: (3, 4)

=== 1. 性能比较 ===
列键合并耗时: 0.0058 秒
索引键合并耗时: 0.0078 秒

性能提升: 0.74x

=== 2. 处理重复列名 ===
合并后列名: ['transaction_id', 'date', 'stock_code', 'price', 'volume', 'company_name', 'sector', 'region_company', 'region_location']
  stock_code region_company region_location
0  600519.SH             贵阳              西南
1  601018.SH             宁波              华东
7  002142.SZ             宁波              华东

=== 3. 合并验证 ===
One-to-one 验证通过
One-to-many 合并成功: (10000, 8)

=== 4. 优化建议 ===

大数据集合并优化技巧:

1. 使用索引键合并通常比列键合并更快
2. 如果可能,先对键进行排序
3. 使用 'indicator' 参数检查合并结果
4. 对于大型数据集,考虑使用 Dask 或 modin
5. 避免在合并前进行不必要的操作
6. 使用适当的数据类型减少内存占用


使用 indicator 检查合并来源:
_merge
both          100
left_only       0
right_only      0
Name: count, dtype: int64

=== 5. 批量合并策略 ===
逐步合并结果: (10000, 9)
Reduce 合并结果: (10000, 9)

两种方法结果一致: True

关键要点: - 索引键合并通常比列键合并更快 - 使用 validate 参数可以检查合并质量 - indicator 参数帮助识别合并来源 - suffixes 参数处理重复列名 - 对于多个数据集,使用 reduce 更优雅 - 大数据集合并需要考虑内存和性能优化


6.5.7 习题 8.7: 综合数据整合项目

问题描述

假设你正在构建一个股票分析系统,需要从多个数据源整合信息:

  1. 数据源A:本地 HDF5 文件的日度行情数据
  2. 数据源B:财务指标数据(模拟)
  3. 数据源C:行业分类数据(模拟)
  4. 数据源D:分析师评级数据(模拟)

请将这四个数据源整合成一个完整的分析数据集。

完整解答

import pandas as pd
import numpy as np
from datetime import datetime

print('=== 开始数据整合项目 ===\n')

# ============================================================================
# 1. 加载数据源A:日度行情数据
# ============================================================================
print('[1/6] 加载数据源A:日度行情数据')
# 从本地 Parquet 文件读取数据代替 HDF5
port_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
maotai_stock_data = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_stock_data['symbol'] = '601018.SH'
bank_stock_data['symbol'] = '002142.SZ'
maotai_stock_data['symbol'] = '600519.SH'
stock_data = pd.concat([port_stock_data, bank_stock_data, maotai_stock_data])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
price_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-01-31')]

# 筛选三只股票
symbols_map = {
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行',
    '600519.SH': '贵州茅台'
}
price_data = price_data[price_data['symbol'].isin(symbols_map.keys())].copy()
price_data['symbol'] = price_data['symbol'].map(symbols_map)

# 提取关键字段
price_data = price_data[['datetime', 'symbol', 'open', 'high', 'low', 'close', 'volume']].copy()
price_data = price_data.rename(columns={'symbol': 'stock_name'})

print(f'  形状: {price_data.shape}')
print(f'  日期范围: {price_data["datetime"].min()}{price_data["datetime"].max()}')
print(f'  股票数量: {price_data["stock_name"].nunique()}')

# ============================================================================
# 2. 创建数据源B:财务指标数据
# ============================================================================
print('\n[2/6] 创建数据源B:财务指标数据')

# 财务数据按月更新
financial_dates = pd.date_range(start='2023-01-01', end='2023-01-31', freq='7D')
financial_data = []

for stock in symbols_map.values():
    for date in financial_dates:
        financial_data.append({
            'stock_name': stock,
            'date': date,
            'pe_ratio': np.random.uniform(8, 50),
            'pb_ratio': np.random.uniform(0.5, 10),
            'roe': np.random.uniform(5, 25),
            'debt_ratio': np.random.uniform(20, 70)
        })

financial_df = pd.DataFrame(financial_data)

print(f'  形状: {financial_df.shape}')
print(f'  列: {financial_df.columns.tolist()}')

# ============================================================================
# 3. 创建数据源C:行业分类数据
# ============================================================================
print('\n[3/6] 创建数据源C:行业分类数据')

industry_data = pd.DataFrame({
    'stock_name': ['宁波港', '宁波银行', '贵州茅台'],
    'industry': ['港口航运', '银行', '白酒'],
    'sector': ['交通运输', '金融服务', '消费品'],
    'region': ['华东-宁波', '华东-宁波', '西南-贵阳']
})

print(f'  形状: {industry_data.shape}')
print(industry_data)

# ============================================================================
# 4. 创建数据源D:分析师评级数据
# ============================================================================
print('\n[4/6] 创建数据源D:分析师评级数据')

# 评级数据随机更新
rating_dates = pd.date_range(start='2023-01-01', end='2023-01-31', freq='5D')
rating_data = []

ratings = ['买入', '增持', '中性', '减持', '卖出']
for stock in symbols_map.values():
    for date in rating_dates:
        rating_data.append({
            'stock_name': stock,
            'rating_date': date,
            'rating': np.random.choice(ratings, p=[0.4, 0.3, 0.2, 0.05, 0.05]),
            'target_price': np.random.uniform(80, 120)
        })

rating_df = pd.DataFrame(rating_data)

print(f'  形状: {rating_df.shape}')
print(f'  评级分布:')
print(rating_df['rating'].value_counts())

# ============================================================================
# 5. 数据整合
# ============================================================================
print('\n[5/6] 整合所有数据源')

# 步骤1: 整合价格数据和行业信息
merged = pd.merge(price_data, industry_data, on='stock_name', how='left')

# 步骤2: 整合财务数据(基于日期匹配)
# 由于财务数据频率较低,使用前向填充
merged['date_only'] = pd.to_datetime(merged['datetime']).dt.normalize()
financial_df['date_only'] = pd.to_datetime(financial_df['date']).dt.normalize()

merged = pd.merge(
    merged,
    financial_df[['stock_name', 'date_only', 'pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']],
    on=['stock_name', 'date_only'],
    how='left'
)

# 步骤3: 整合评级数据
# 确保合并键类型一致(统一为 datetime64[ns])
merged['date_only'] = pd.to_datetime(merged['date_only'])
rating_df['rating_date'] = pd.to_datetime(rating_df['rating_date']).dt.normalize()
merged = pd.merge(
    merged,
    rating_df[['stock_name', 'rating_date', 'rating', 'target_price']],
    left_on=['stock_name', 'date_only'],
    right_on=['stock_name', 'rating_date'],
    how='left'
)

# 步骤4: 填充缺失值
# 财务指标前向填充
merged[['pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']] = merged.groupby('stock_name')[
    ['pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']
].ffill()

# 评级后向填充(今天的评级对未来几天有效)
merged[['rating', 'target_price']] = merged.groupby('stock_name')[
    ['rating', 'target_price']
].bfill()

# 清理辅助列
merged = merged.drop(columns=['date_only', 'rating_date'])

print(f'  整合后形状: {merged.shape}')
print(f'  缺失值: {merged.isna().sum().sum()}')

# ============================================================================
# 6. 创建整合报告
# ============================================================================
print('\n[6/6] 数据整合报告')
print('=' * 60)

print('\n数据完整性:')
completeness = (1 - merged.isna().mean()) * 100
for col in merged.columns:
    print(f'  {col}: {completeness[col]:.2f}%')

print('\n数据范围:')
print(f'  总记录数: {len(merged)}')
print(f'  日期范围: {merged["datetime"].min()}{merged["datetime"].max()}')
print(f'  股票数量: {merged["stock_name"].nunique()}')

print('\n按股票统计:')
stock_stats = merged.groupby('stock_name').agg({
    'close': ['mean', 'min', 'max'],
    'volume': 'sum'
}).round(2)
print(stock_stats)

print('\n按行业统计:')
industry_stats = merged.groupby('industry').agg({
    'close': 'mean',
    'volume': 'sum'
}).round(2)
print(industry_stats)

print('\n评级分布:')
rating_dist = merged['rating'].value_counts()
print(rating_dist)

# 显示最终数据样本
print('\n最终整合数据样本(前10行):')
display_cols = ['datetime', 'stock_name', 'industry', 'close', 'volume',
                'pe_ratio', 'pb_ratio', 'rating', 'target_price']
print(merged[display_cols].head(10).to_string())

# 导出选项
print('\n=== 导出建议 ===')
print('整合后的数据可以:')
print('1. 导出为CSV: merged.to_csv("integrated_data.csv")')
print('2. 导出为Excel: merged.to_excel("integrated_data.xlsx")')
print('3. 导出为Parquet: merged.to_parquet("integrated_data.parquet")')
print('4. 导出为HDF5: merged.to_hdf("integrated_data.h5", key="data")')
=== 开始数据整合项目 ===

[1/6] 加载数据源A:日度行情数据
  形状: (48, 7)
  日期范围: 2023-01-03 00:00:00 至 2023-01-31 00:00:00
  股票数量: 3

[2/6] 创建数据源B:财务指标数据
  形状: (15, 6)
  列: ['stock_name', 'date', 'pe_ratio', 'pb_ratio', 'roe', 'debt_ratio']

[3/6] 创建数据源C:行业分类数据
  形状: (3, 4)
  stock_name industry sector region
0        宁波港     港口航运   交通运输  华东-宁波
1       宁波银行       银行   金融服务  华东-宁波
2       贵州茅台       白酒    消费品  西南-贵阳

[4/6] 创建数据源D:分析师评级数据
  形状: (21, 4)
  评级分布:
rating
买入    7
增持    7
中性    6
卖出    1
Name: count, dtype: int64

[5/6] 整合所有数据源
  整合后形状: (48, 16)
  缺失值: 192

[6/6] 数据整合报告
============================================================

数据完整性:
  datetime: 100.00%
  stock_name: 100.00%
  open: 100.00%
  high: 100.00%
  low: 100.00%
  close: 100.00%
  volume: 100.00%
  industry: 100.00%
  sector: 100.00%
  region: 100.00%
  pe_ratio: 0.00%
  pb_ratio: 0.00%
  roe: 0.00%
  debt_ratio: 0.00%
  rating: 100.00%
  target_price: 100.00%

数据范围:
  总记录数: 48
  日期范围: 2023-01-03 00:00:00 至 2023-01-31 00:00:00
  股票数量: 3

按股票统计:
              close                         volume
               mean      min      max          sum
stock_name                                        
宁波港            3.28     3.23     3.37  132777579.0
宁波银行          30.37    29.38    31.20  447477998.0
贵州茅台        1670.46  1562.39  1732.56   44202953.0

按行业统计:
            close       volume
industry                      
港口航运         3.28  132777579.0
白酒        1670.46   44202953.0
银行          30.37  447477998.0

评级分布:
rating
买入    22
中性    16
增持    10
Name: count, dtype: int64

最终整合数据样本(前10行):
    datetime stock_name industry   close      volume  pe_ratio  pb_ratio rating  target_price
0 2023-01-03        宁波港     港口航运  3.2761  10743034.0       NaN       NaN     买入    102.291453
1 2023-01-04        宁波港     港口航运  3.2944   8787221.0       NaN       NaN     买入    102.291453
2 2023-01-05        宁波港     港口航运  3.2852  10407133.0       NaN       NaN     买入    102.291453
3 2023-01-06        宁波港     港口航运  3.2669  10450610.0       NaN       NaN     买入    102.291453
4 2023-01-09        宁波港     港口航运  3.2669   6518398.0       NaN       NaN     买入     80.572291
5 2023-01-10        宁波港     港口航运  3.2485   5882400.0       NaN       NaN     买入     80.572291
6 2023-01-11        宁波港     港口航运  3.2394   4453499.0       NaN       NaN     买入     80.572291
7 2023-01-12        宁波港     港口航运  3.2302   4491700.0       NaN       NaN     中性     99.596644
8 2023-01-13        宁波港     港口航运  3.2577   4929600.0       NaN       NaN     中性     99.596644
9 2023-01-16        宁波港     港口航运  3.2761   8098816.0       NaN       NaN     中性     99.596644

=== 导出建议 ===
整合后的数据可以:
1. 导出为CSV: merged.to_csv("integrated_data.csv")
2. 导出为Excel: merged.to_excel("integrated_data.xlsx")
3. 导出为Parquet: merged.to_parquet("integrated_data.parquet")
4. 导出为HDF5: merged.to_hdf("integrated_data.h5", key="data")

关键要点: - 真实项目往往需要整合多个异构数据源 - 分步整合比一次性合并更容易调试 - 不同数据源可能需要不同的填充策略 - 在合并时注意键的类型和格式一致性 - 创建详细的整合报告有助于验证数据质量 - 选择合适的存储格式取决于后续使用场景 - 数据整合是数据科学项目的核心环节

6.6 结论

同学们,现在你们已经对 pandas 在数据导入、清洗和重组方面的核心工具有了基础的理解。这些技能——分层索引、合并、连接和重塑——不仅仅是机械的步骤。它们是连接原始、混乱的现实世界信息与严谨的计量经济和金融分析所需结构化数据之间的重要桥梁。有效整理数据的能力,将最终决定你们实证结论的质量和可靠性。


6.7 延伸阅读

为了进一步深化您在数据整理和数据合并方面的理解,我们推荐以下精选资源:

6.7.1 pandas数据操作进阶

1. pandas Cookbook: 可复制的模式 - 链接: https://pandas.pydata.org/docs/user_guide/cookbook.html - 说明: pandas官方的”食谱”(Cookbook)文档,包含大量可复制的数据操作模式,特别推荐: - Combining / Merging / Joining - 合并与连接的高级技巧 - Hierarchical indexing - 分层索引的深入用法 - Reshaping and pivoting - 数据重塑的最佳实践

2. pandas in Action, 2nd Edition by Manning Publications - 作者: Bein Jurafsky - 出版社: Manning - 说明: 实践导向的pandas教程,通过大量实际案例讲解数据操作技巧。第6-8章深入讨论了数据合并、重塑和聚合操作。

6.7.2 SQL与pandas对比

3. pandas vs. SQL Reference Guide - 链接: https://medium.com/@data_39/pandas-vs-sql-a-comprehensive-guide-60c7c0d664 - 说明: 对于有SQL背景的学习者,这个指南将SQL操作映射到pandas的对应方法,帮助快速理解和迁移知识。

4. Modern SQL: A Practical Guide (第4章: Join Operations) - 作者: Alan Beaulieu - 出版社: O’Reilly - 说明: 系统性地讲解SQL中的各种连接操作,理解SQL的join原理有助于更好地使用pandas的merge函数。

6.7.3 数据合并理论

5. Database Management Systems (第15章: Relational Algebra) - 作者: Ramakrishnan & Gehrke - 出版社: McGraw Hill - 说明: 从理论角度讲解关系代数,包括各种连接操作的数学基础和优化策略。

6. Data Integration: The Promise and Perils of Merging Data (论文) - 作者: F. Naumann - 期刊: ACM SIGKDD - 链接: https://dl.acm.org/doi/10.1145/3321110 - 说明: 从学术角度讨论数据集成的挑战、方法和最佳实践。

6.7.4 时间序列数据处理

7. pandas for Time Series Analysis (官方指南) - 链接: https://pandas.pydata.org/docs/user_guide/timeseries.html - 说明: pandas官方的时间序列分析指南,详细讲解了时间序列的索引、重采样、频率转换等操作。

8. Practical Time Series Analysis (第2章: Data Wrangling) - 作者: Aileen Nielsen - 出版社: Packt Publishing - 说明: 面向实践的时间序列分析教程,包含大量数据清洗和整理的案例。

6.7.5 大规模数据处理

9. High-Performance pandas (网络课程) - 平台: DataCamp - 链接: https://www.datacamp.com/courses/high-performance-pandas - 说明: 专注于处理大规模数据集的pandas优化技巧,包括: - 使用Dask进行并行计算 - 内存优化策略 - 高效的数据类型选择

10. Efficient pandas: 20+ Ways to Optimize Your pandas Code - 链接: https://www.dataquest.io/blog/effective-pandas/ - 说明: 总结了20多种pandas代码优化技巧,帮助提升数据操作的效率。

6.7.6 在线资源与社区

11. Stack Overflow - pandas Tag - 链接: https://stackoverflow.com/questions/tagged/pandas - 说明: 当遇到具体的数据合并问题时,Stack Overflow的pandas标签是寻求帮助的最佳社区资源。

12. GitHub Awesome pandas Resources - 链接: https://github.com/tomaugspurger/pandas-exercises - 说明: 收集了大量pandas练习和教程,适合系统性地提升pandas数据整理技能。