7  数据聚合与分组运算

7.1 引言与学习目标

学习目标

通过本章学习,你应该能够:

  • 理论目标
    • 深刻理解分组聚合的数学原理和计算复杂度
    • 掌握拆分-应用-合并(split-apply-combine)范式的理论基础
    • 理解透视表的数学表示和数据结构变换原理
    • 掌握各种聚合函数的数学定义(均值、方差、分位数等)
  • 实践目标
    • 熟练使用pandas的groupby进行复杂的数据聚合操作
    • 运用agg、apply、transform进行高级分组运算
    • 掌握pivot_table和crosstab的创建与应用
    • 能够进行多级分组和自定义聚合函数
  • 应用目标
    • 能够使用本地金融数据进行多维度的分组分析
    • 实现基于多因子(行业、地区、时间)的投资组合分析
    • 运用分组聚合技术进行数据探索和特征工程
    • 实现复杂的统计汇总报告和绩效分析

对数据集进行分类,并对每个组应用一个函数(无论是聚合、转换还是过滤),是数据分析工作流程中的一个核心环节。在加载、合并和准备好数据集之后,我们经常需要计算分组统计量,或者为后续的报告和可视化创建数据透视表。pandas 提供了一个功能强大且灵活的 groupby 接口,使我们能够以一种自然的方式对数据集进行切片、切块和汇总。

关系型数据库和 SQL(结构化查询语言)之所以广受欢迎,原因之一在于它们能够轻松地连接、过滤、转换和聚合数据。然而,像 SQL 这样的查询语言对于可以执行的分组运算类型有一定的限制。正如您将看到的,借助 Python 和 pandas 的表现力,我们可以通过将复杂的分组运算表达为处理每个组相关数据的自定义 Python 函数来执行它们。在本章中,正如 章节 7 概述的那样,您将学习如何:

  • 使用一个或多个键(以函数、数组或 DataFrame 列名的形式)将 pandas 对象拆分为多个部分。
  • 计算分组摘要统计,如计数、均值、标准差,或用户自定义的函数。
  • 应用组内转换或其他操作,如标准化、线性回归、排名或子集选择。
  • 计算透视表和交叉表。
  • 执行分位数分析和其他统计分组分析。

7.1.1 理论基础:分组聚合的数学原理

分组运算的数学表示

给定数据集 \(D = \{(x_i, y_i)\}_{i=1}^{n}\),分组函数 \(g: X \to G\) 将数据划分为 \(k\) 个组:

\[ D_j = \{(x, y) \in D \mid g(x) = j\}, \quad j = 1, 2, ..., k \]

聚合运算的定义

给定聚合函数 \(f: \mathbb{R}^m \to \mathbb{R}\)(如求和、均值、最大值等),分组聚合定义为:

\[ \text{Aggregate}(D, g, f) = \{(j, f(D_j)) \mid j = 1, 2, ..., k\} \]

其中 \(D_j = \{y \mid \exists x, (x, y) \in D \land g(x) = j\}\) 是第 \(j\) 组的数据值集合。

常见聚合函数的数学定义

  1. 均值\[ \bar{y}_j = \frac{1}{|D_j|} \sum_{y \in D_j} y \]

  2. 方差\[ \text{Var}(D_j) = \frac{1}{|D_j|} \sum_{y \in D_j} (y - \bar{y}_j)^2 \]

  3. 分位数(对于 \(p = 0.5\),即中位数): \[ \text{Median}(D_j) = \inf\{q \mid \sum_{y \in D_j} \mathbb{I}[y \leq q] \geq \frac{|D_j|}{2}\} \]

  4. 计数\[ \text{Count}(D_j) = |D_j| = \sum_{y \in D_j} 1 \]

Split-Apply-Combine的数学表示

给定数据集 \(D\),分组函数 \(g\),和变换函数 \(\phi\)

\[ \text{Split}(D, g) = \bigcup_{j=1}^{k} \{j\} \times D_j \] \[ \text{Apply}(\{j\} \times D_j, \phi) = \bigcup_{j=1}^{k} \{j\} \times \{\phi(D_j)\} \] \[ \text{Combine}(\bigcup_{j=1}^{k} \{j\} \times \{\phi(D_j)\}) = \text{reindex}(\bigcup_{j=1}^{k} D_j') \]

其中 \(D_j' = \{\phi(y) \mid y \in D_j\}\) 是变换后的数据。

计算复杂度分析

设数据规模为 \(n\),分组数为 \(k\),聚合函数时间复杂度为 \(O(1)\)(如求和、均值):

  • 朴素算法:遍历每个组,\(O(n \times k)\)
  • 哈希表优化:pandas实现,\(O(n)\)

透视表的数学表示

透视表是数据的多维汇总表示。给定三维数据 \((i, j, v)\),其中: - \(i \in I\):行维度(如类别) - \(j \in J\):列维度(如时间) - \(v \in \mathbb{R}\):值

透视表 \(P\) 定义为:

\[ P_{ij} = \sum_{(i', j') \in \text{Group}(i, j)} v_{i'j'} \]

其中 \(\text{Group}(i, j)\) 是选择函数。

实务观察:横截面聚合与时间序列重采样的逻辑分野

虽然本章主要聚焦于横截面数据(Cross-sectional Data)的分组逻辑,但量化分析师必须意识到 groupby 在时间尺度上的特殊形态。对于高频或日度交易数据,一个核心的应用场景是重采样 (Resampling)

  1. 逻辑差异:普通 groupby 通常基于类别(如行业、地区)进行拆分,而重采样(如使用 resample())是基于时间频率的“桶”进行聚合(如将日K线合成为月K线)。
  2. 工程实践:在构建多因子模型时,我们常先通过 groupby 在行业维度计算因子暴露,再通过时间维度进行聚合分析。关于时间序列特有的分组技术,我们将在后续章节中进行系统性推演。

与前面的章节一样,我们首先导入 NumPy 和 pandas:

列表 7.1
import numpy as np
import pandas as pd

7.2 理解分组运算的思维模式

哈德利·威克姆(Hadley Wickham)是 R 语言 ggplot2dplyr 等知名数据科学包的作者,他创造了 “拆分-应用-合并”(split-apply-combine) 这个术语来描述分组运算。这是一个理解该过程的强大心智模型。

  1. 拆分 (Split):pandas 对象(如 DataFrameSeries)中包含的数据,根据您提供的一个或多个键分割成若干组。拆分是在对象的特定轴上执行的。例如,DataFrame 可以在其行上(axis='index')或列上(axis='columns')进行分组。
  2. 应用 (Apply): 将一个函数独立地应用于每个组,从而产生一个新值(或一个新的数据结构)。这可以是一个聚合函数,如 sum()mean();也可以是一个转换函数,如对数据进行标准化;或者是一个更复杂的自定义函数。
  3. 合并 (Combine): 将所有函数应用的结果合并成一个单一的结果对象。这个最终对象的结构取决于所执行的操作。

图 7.1 为这个过程提供了一个概念性的图示。

图 7.1: 分组聚合(拆分-应用-合并)图示

每个分组键可以有多种形式,而且键的类型不必完全相同: * 一个与分组轴长度相同的列表或数组。 * 一个指示 DataFrame 中列名的值。 * 一个给出分组轴上的值与组名之间对应关系的字典或 Series。 * 一个作用于轴索引或索引中各个标签的函数。

如果这些听起来有些抽象,不必担心。在本章中,我们将为所有这些方法提供许多示例。首先,我们创建一个小型的表格数据集作为 DataFrame

sample_trade_records_df = pd.DataFrame({'stock_code' : ['600000', '600000', None, '600036', '600036', '600000', None],
                   'analyst_id': pd.Series([1, 2, 1, 2, 1, None, 1], dtype='float64'),
                   'returns': np.random.standard_normal(7),
                   'volatility': np.random.standard_normal(7)})
sample_trade_records_df
表 7.1: 用于分组运算的模拟交易记录DataFrame
stock_code analyst_id returns volatility
0 600000 1.0 -0.685937 0.145744
1 600000 2.0 -0.086223 0.366702
2 None 1.0 1.211283 -0.541135
3 600036 2.0 -0.103430 -0.749096
4 600036 1.0 0.863470 0.016975
5 600000 NaN 0.714270 -1.313764
6 None 1.0 -1.613674 -2.349805

假设您想根据 key1 中的标签来计算 data1 列的均值。有多种方法可以实现。一种是访问 data1 并使用 key1 列(一个 Series)调用 groupby

grouped = sample_trade_records_df['returns'].groupby(sample_trade_records_df['stock_code'])
grouped
<pandas.core.groupby.generic.SeriesGroupBy object at 0x0000020B13E13190>

核心机制:理解 GroupBy 对象的“延迟加载”特性

调用 groupby 后返回的 <SeriesGroupBy> 对象,在计算科学中被称为一个惰性序列 (Lazy Sequence)。这对于处理海量金融数据至关重要:

  • 数据蓝图:该对象并不立即触发内存中的数据重排或聚合,它仅仅封装了两个核心要素:分组键(映射逻辑)和目标数据(计算主体)。
  • 计算触发:只有当你显式调用 .mean().sum() 或自定义的 .apply() 时,真正的“拆分-应用-合并”算法才会在底层的 C/Cython 代码中闭环运行。这种“按需计算”的模式极大地降低了在大规模面板数据分析中的中间开销。

这个 grouped 变量现在是一个特殊的 GroupBy 对象。除了关于分组键 sample_trade_records_df['stock_code'] 的一些中间数据外,它实际上还没有计算任何东西。其设计思想是,这个对象包含了对每个组应用某种操作所需的所有信息。例如,要计算各组的均值,我们可以调用 GroupBy 对象的 mean 方法:

grouped.mean()
表 7.2: 按 stock_code 分组的 returns 均值
stock_code
600000   -0.019297
600036    0.380020
Name: returns, dtype: float64

数据(一个 Series)已经通过分组键进行了聚合,产生了一个新的 Series,该 Series 现在由 stock_code 列中的唯一值进行索引。结果索引的名称是 ‘stock_code’,因为 DataFrame 的列 sample_trade_records_df['stock_code'] 的名称就是它。

如果我们传递一个包含多个数组的列表作为分组键,会得到一个层级索引的结果:

grouped_returns_mean_series = sample_trade_records_df['returns'].groupby([sample_trade_records_df['stock_code'], sample_trade_records_df['analyst_id']]).mean()
grouped_returns_mean_series
stock_code  analyst_id
600000      1.0          -0.685937
            2.0          -0.086223
600036      1.0           0.863470
            2.0          -0.103430
Name: returns, dtype: float64

这里我们使用了两个键对数据进行分组,得到的 Series 现在有一个由观察到的唯一键对组成的层级索引(MultiIndex)。我们可以使用 unstack 方法将这个结果重塑为一个更熟悉的 DataFrame

grouped_returns_mean_series.unstack()
表 7.3: 带有层级索引的未堆叠分组均值
analyst_id 1.0 2.0
stock_code
600000 -0.685937 -0.086223
600036 0.863470 -0.103430

在前面的例子中,分组键都是 Series。然而,它们可以是任何长度正确的数组。这里,我们用中国的省份和年份作为分组键:

provinces = np.array(['Beijing', 'Shanghai', 'Shanghai', 'Beijing', 'Beijing', 'Shanghai', 'Beijing'])
years = np.array([2005, 2005, 2006, 2005, 2006, 2006, 2005])
sample_trade_records_df['returns'].groupby([provinces, years]).mean()
表 7.4: 按外部 province 和 year 数组分组的 returns 均值
Beijing   2005   -0.801014
          2006    0.863470
Shanghai  2005   -0.086223
          2006    0.962776
Name: returns, dtype: float64

通常,分组信息与您要处理的数据位于同一个 DataFrame 中。在这种情况下,您可以将列名(无论是字符串、数字还是其他 Python 对象)作为分组键传递:

sample_trade_records_df.groupby('stock_code').mean()
表 7.5: 按单个列名分组
analyst_id returns volatility
stock_code
600000 1.5 -0.019297 -0.267106
600036 1.5 0.380020 -0.366061
sample_trade_records_df.groupby('analyst_id')[['returns', 'volatility']].mean()
表 7.6: 按另一列名分组,注意“讨厌的列”
returns volatility
analyst_id
1.0 -0.056215 -0.682055
2.0 -0.094827 -0.191197

技术风险分析:“讨厌的列” (Nuisance Columns) 与隐性数据过滤

在量化回测或财务审计中,数据类型的严谨性直接决定了结果的可靠性。当执行 groupby(...).mean() 等数学聚合时:

  1. 自动过滤风险pandas 会自动移除无法计算均值的非数值列(如 stock_code)。虽然这在表面上提供了便利,但在生产环境中若某一关键数值列(如成交量 volume)因含有非法字符被识别为 object 类型,该列将在这个环节被“静默删除”而不会报错。
  2. 防御性编程建议:建议在聚合前显式选择数值子集,或在聚合后通过列名校验来确保模型关键变量未被意外过滤。

要按多个列进行分组,请传递一个列名列表:

sample_trade_records_df.groupby(['stock_code', 'analyst_id']).mean()
表 7.7: 按多个列名分组
returns volatility
stock_code analyst_id
600000 1.0 -0.685937 0.145744
2.0 -0.086223 0.366702
600036 1.0 0.863470 0.016975
2.0 -0.103430 -0.749096

无论使用 groupby 的目标是什么,一个通常很有用的 GroupBy 方法是 size,它返回一个包含各组大小的 Series

sample_trade_records_df.groupby(['stock_code', 'analyst_id']).size()
表 7.8: 使用多个键的分组大小
stock_code  analyst_id
600000      1.0           1
            2.0           1
600036      1.0           1
            2.0           1
dtype: int64

请注意,默认情况下,分组键中的任何缺失值都会从结果中排除。可以通过向 groupby 传递 dropna=False 来禁用此行为:

sample_trade_records_df.groupby('stock_code', dropna=False).size()
表 7.9: 包含NA值的 key1 分组大小
stock_code
600000    3
600036    2
NaN       2
dtype: int64
sample_trade_records_df.groupby(['stock_code', 'analyst_id'], dropna=False).size()
表 7.10: 包含NA值的 key1 和 key2 分组大小
stock_code  analyst_id
600000      1.0           1
            2.0           1
            NaN           1
600036      1.0           1
            2.0           1
NaN         1.0           2
dtype: int64

size 精神相似的分组函数是 count,它计算每个组中每列的非空值数量:

sample_trade_records_df.groupby('stock_code').count()
表 7.11: 每组非空值的计数
analyst_id returns volatility
stock_code
600000 2 3 3
600036 2 2 2

7.2.1 遍历各组

groupby 返回的对象支持迭代,生成一个由包含组名和数据块的二元元组组成的序列。思考以下代码:

for name, group in sample_trade_records_df.groupby('stock_code'):
    print(f'组名: {name}')
    print(group)
    print('-' * 20)
组名: 600000
  stock_code  analyst_id   returns  volatility
0     600000         1.0 -0.685937    0.145744
1     600000         2.0 -0.086223    0.366702
5     600000         NaN  0.714270   -1.313764
--------------------
组名: 600036
  stock_code  analyst_id  returns  volatility
3     600036         2.0 -0.10343   -0.749096
4     600036         1.0  0.86347    0.016975
--------------------

如果使用多个键,元组的第一个元素将是一个包含键值的元组:

for (k1, k2), group in sample_trade_records_df.groupby(['stock_code', 'analyst_id']):
    print(f'组名: {(k1, k2)}')
    print(group)
    print('-' * 20)
组名: ('600000', np.float64(1.0))
  stock_code  analyst_id   returns  volatility
0     600000         1.0 -0.685937    0.145744
--------------------
组名: ('600000', np.float64(2.0))
  stock_code  analyst_id   returns  volatility
1     600000         2.0 -0.086223    0.366702
--------------------
组名: ('600036', np.float64(1.0))
  stock_code  analyst_id  returns  volatility
4     600036         1.0  0.86347    0.016975
--------------------
组名: ('600036', np.float64(2.0))
  stock_code  analyst_id  returns  volatility
3     600036         2.0 -0.10343   -0.749096
--------------------

当然,您可以对这些数据块做任何您想做的事情。一个您可能会觉得有用的技巧是,用一行代码计算出一个包含数据块的字典:

pieces = {name: group for name, group in sample_trade_records_df.groupby('stock_code')}
pieces['600036']
列表 7.2
stock_code analyst_id returns volatility
3 600036 2.0 -0.10343 -0.749096
4 600036 1.0 0.86347 0.016975

默认情况下,groupbyaxis='index'(即行)上进行分组,但您可以在任何其他轴上进行分组。例如,我们可以按数据类型对示例 df 的列进行分组:

列表 7.3
grouped = sample_trade_records_df.groupby(sample_trade_records_df.dtypes, axis='columns')

for dtype, group in grouped:
    print(f'数据类型: {dtype}')
    print(group)
    print('-' * 20)
数据类型: float64
   analyst_id   returns  volatility
0         1.0 -0.685937    0.145744
1         2.0 -0.086223    0.366702
2         1.0  1.211283   -0.541135
3         2.0 -0.103430   -0.749096
4         1.0  0.863470    0.016975
5         NaN  0.714270   -1.313764
6         1.0 -1.613674   -2.349805
--------------------
数据类型: object
  stock_code
0     600000
1     600000
2       None
3     600036
4     600036
5     600000
6       None
--------------------

7.2.2 选择一列或列的子集

使用列名或列名数组对从 DataFrame 创建的 GroupBy 对象进行索引,其效果是为聚合操作选择列的子集。这意味着:

df.groupby('key1')['data1']
df.groupby('key1')[['data2']]

是以下写法的语法糖 (syntactic sugar):

df['data1'].groupby(df['key1'])
df[['data2']].groupby(df['key1'])

特别是对于大型数据集,可能只需要对少数几列进行聚合。例如,在前面的数据集中,要仅计算 data2 列的均值并将结果作为 DataFrame 获取,我们可以这样写:

sample_trade_records_df.groupby(['stock_code', 'analyst_id'])[['volatility']].mean()
表 7.12: 对列的子集进行聚合
volatility
stock_code analyst_id
600000 1.0 0.145744
2.0 0.366702
600036 1.0 0.016975
2.0 -0.749096

如果传递的是列表或数组,此索引操作返回的对象是一个分组的 DataFrame;如果只传递单个列名作为标量,则返回一个分组的 Series

s_grouped = sample_trade_records_df.groupby(['stock_code', 'analyst_id'])['volatility']
s_grouped
<pandas.core.groupby.generic.SeriesGroupBy object at 0x0000020B0FB682B0>
s_grouped.mean()
表 7.13: 对分组Series进行聚合的结果
stock_code  analyst_id
600000      1.0           0.145744
            2.0           0.366702
600036      1.0           0.016975
            2.0          -0.749096
Name: volatility, dtype: float64

7.2.3 使用字典和Series进行分组

分组信息可能以非数组的形式存在。为了演示这一点,我们来看一个更具中国情境的例子。我们将使用本地的 stock_basic_data.parquet 文件,该文件包含了万得(Wind)或米筐(RQData)提供的中国上市公司基本面信息。

import pandas as pd
import numpy as np

# 加载本地上市公司基本信息
china_stock_basic_dataframe = pd.read_parquet('C:/qiufei/data/stock/stock_basic_data.parquet')

# 筛选感兴趣的省份(注意清理名称以匹配后续的映射)
provinces = ['北京市', '上海市', '江苏省', '浙江省', '广东省', '四川省', '河南省', '陕西省']
china_listed_companies_df = china_stock_basic_dataframe[china_stock_basic_dataframe['province'].isin(provinces)].copy()

# 清理省份名称,去除“省”和“市”后缀以匹配映射字典
china_listed_companies_df['province_cn'] = china_listed_companies_df['province'].str.replace('省', '').str.replace('市', '')

# 选择需要的列:省份、行业和发行价
china_listed_companies_df = china_listed_companies_df[['province_cn', 'industry_name', 'issue_price']].copy()
china_listed_companies_df.head()
表 7.14: 部分省市上市公司样本数据
province_cn industry_name issue_price
0 广东 货币金融服务 40.0
1 广东 房地产业 1.0
2 广东 未知 10.0
3 广东 软件和信息技术服务业 1.0
4 广东 未知 10.0

现在,假设我们有一个按地理区域对这些省份进行分组的对应关系,并希望按此分组对上市公司的发行价进行汇总。

列表 7.4
mapping = {'北京': '华北', '上海': '华东', '江苏': '华东', '浙江': '华东', 
           '广东': '华南', '四川': '西南', '河南': '华中', '陕西': '西北'}

# 将省份名称映射到区域以创建分组键
by_region = china_listed_companies_df.groupby(china_listed_companies_df['province_cn'].map(mapping))

现在我们可以应用聚合操作,例如,计算每个地理区域上市公司的平均发行价和公司数量。

by_region['issue_price'].agg(['mean', 'count']).round(2)
表 7.15: 按地理区域划分的上市公司平均发行价和数量
mean count
province_cn
华东 22.23 1873
华中 17.77 108
华北 22.94 479
华南 22.55 908
西北 22.29 85
西南 19.81 184

Series 也具有相同的功能,可以将其视为一个固定大小的映射:

map_series = pd.Series(mapping, name='region')
map_series
北京    华北
上海    华东
江苏    华东
浙江    华东
广东    华南
四川    西南
河南    华中
陕西    西北
Name: region, dtype: object

我们也可以将这个 Series 传递给 groupby

china_listed_companies_df.groupby(china_listed_companies_df['province_cn'].map(map_series))[['issue_price']].mean().round(2)
表 7.16: 使用Series进行分组的各区域平均发行价
issue_price
province_cn
华东 22.23
华中 17.77
华北 22.94
华南 22.55
西北 22.29
西南 19.81

7.2.4 使用函数进行分组

与字典或 Series 相比,使用 Python 函数是定义分组映射的一种更通用的方法。任何作为分组键传递的函数都将对每个索引值调用一次(如果使用 axis='columns',则对每个列值调用一次),其返回值将用作组名。

让我们使用 表 7.14 中的上市公司数据 DataFrame,但首先,我们将设置一个更有意义的索引。

china_stocks_hierarchical_df = china_listed_companies_df.set_index(['province_cn', 'industry_name'])
china_stocks_hierarchical_df.head()
表 7.17: 以省份和行业为索引的上市公司数据
issue_price
province_cn industry_name
广东 货币金融服务 40.0
房地产业 1.0
未知 10.0
软件和信息技术服务业 1.0
未知 10.0

假设您想按省份名称的长度进行分组。虽然您可以计算一个字符串长度的数组,但直接传递 len 函数更简单。pandas 会将其应用于索引的第一层(‘province_cn’)。

# 获取索引的第一层 ('province_cn')
province_index = china_stocks_hierarchical_df.index.get_level_values(0)
china_stocks_hierarchical_df.groupby(province_index.map(len)).mean().round(2)
表 7.18: 按省份名称长度分组的平均发行价
issue_price
province_cn
2 22.15

将函数与数组、字典或 Series 混合使用没有问题,因为所有东西在内部都会被转换为数组。

对于具有层级索引的数据集,最后一个便利之处是能够使用轴索引的某个级别进行聚合。使用 表 7.17 中的 china_stocks_hierarchical_df DataFrame,我们可以按索引的 ‘industry_name’ 级别进行分组。

要按级别分组,请使用 level 关键字传递级别编号或名称:

china_stocks_hierarchical_df.groupby(level='industry_name').mean().head(10).round(2)
表 7.19: 按 “industry_name” 索引级别进行聚合
issue_price
industry_name
专业技术服务业 24.58
专用设备制造业 26.93
互联网和相关服务 24.77
仓储业 15.28
仪器仪表制造业 24.75
住宿业 6.82
体育 10.67
保险业 21.85
公共设施管理业 6.88
其他制造业 38.78

7.3 数据聚合

聚合是指任何从数组生成标量值的数据转换。前面的例子已经使用了其中的几种,包括 meancountsum。您可能想知道在 GroupBy 对象上调用 .mean() 时发生了什么。许多常见的聚合,如 表 7.20 中所列的,都有优化的实现。然而,您不仅限于使用这些方法。

表 7.20: 经过优化的 groupby 方法
函数名 描述
any, all 如果任何(一个或多个值)或所有非NA值为“真值”,则返回 True
count 非NA值的数量
cummin, cummax 非NA值的累积最小值和最大值
cumsum 非NA值的累积和
cumprod 非NA值的累积积
first, last 第一个和最后一个非NA值
mean 非NA值的均值
median 非NA值的算术中位数
min, max 非NA值的最小值和最大值
nth 检索数据排序后在位置n处出现的值
ohlc 为类时间序列数据计算四个“开-高-低-收”统计量
prod 非NA值的乘积
quantile 计算样本分位数
rank 非NA值的序数排名,类似于调用 Series.rank
size 计算分组大小,以Series形式返回结果
sum 非NA值的和
std, var 样本标准差和方差

您可以使用自己设计的聚合函数,并额外调用任何也在被分组对象上定义的方法。例如,nsmallest Series 方法从数据中选择所请求数量的最小值。虽然 nsmallest 没有为 GroupBy 显式实现,但我们仍然可以通过一个未优化的实现来使用它。在内部,GroupBySeries 分片,对每个分片调用 piece.nsmallest(n),然后将这些结果组装成结果对象。让我们回到 表 7.1 中的初始示例 sample_trade_records_df

sample_trade_records_df
stock_code analyst_id returns volatility
0 600000 1.0 -0.685937 0.145744
1 600000 2.0 -0.086223 0.366702
2 None 1.0 1.211283 -0.541135
3 600036 2.0 -0.103430 -0.749096
4 600036 1.0 0.863470 0.016975
5 600000 NaN 0.714270 -1.313764
6 None 1.0 -1.613674 -2.349805
grouped = sample_trade_records_df.groupby('stock_code')
grouped['returns'].nsmallest(2)
表 7.21: 为 stock_code 中的每个组选择 returns 的两个最小值
stock_code   
600000      0   -0.685937
            1   -0.086223
600036      3   -0.103430
            4    0.863470
Name: returns, dtype: float64

要使用您自己的聚合函数,请将任何聚合数组的函数传递给 aggregate 方法或其简写别名 agg

列表 7.5
def peak_to_peak(arr):
    return arr.max() - arr.min()
grouped.agg(peak_to_peak)
表 7.22: 应用自定义的 peak_to_peak 聚合
analyst_id returns volatility
stock_code
600000 1.0 1.400207 1.680466
600036 1.0 0.966900 0.766071

性能深度评估:Cython 优化与 Python 原生函数开销

在构建高性能高频交易系统或处理千万级观测值的财务面板数据时,聚合函数的选择将产生显著的性能鸿沟。

  1. 内置优化路径pandas 的内置函数(如 sum, mean, std)在底层使用了 Cython 实现,能够绕过 Python 解释器的循环开销(GIL 有效利用或释放),并直接在连续内存块上运行。
  2. 自定义函数惩罚:如 peak_to_peak 这样的 Python 自定义函数,需要 pandas 对每一个分组块重新封装为数据系列,再进行一次 Python 函数调用。在大规模数据下,其执行耗时可能比内置函数高出 10 到 100 倍。
  3. 工程建议:尽可能通过组合内置函数来替代复杂的自定义逻辑,例如 peak_to_peak 可以改写为 grouped.max() - grouped.min(),这会触发向量化的路径。

您可能会注意到,像 describe 这样的方法也能工作,尽管严格来说它们不是聚合。它们为每个组生成了数据的多方面摘要。

# 输出可能很宽,所以我们进行转置以便于阅读
grouped.describe().T
表 7.23: 每个组的描述性统计
stock_code 600000 600036
analyst_id count 2.000000 2.000000
mean 1.500000 1.500000
std 0.707107 0.707107
min 1.000000 1.000000
25% 1.250000 1.250000
50% 1.500000 1.500000
75% 1.750000 1.750000
max 2.000000 2.000000
returns count 3.000000 2.000000
mean -0.019297 0.380020
std 0.702499 0.683702
min -0.685937 -0.103430
25% -0.386080 0.138295
50% -0.086223 0.380020
75% 0.314023 0.621745
max 0.714270 0.863470
volatility count 3.000000 2.000000
mean -0.267106 -0.366061
std 0.913140 0.541694
min -1.313764 -0.749096
25% -0.584010 -0.557578
50% 0.145744 -0.366061
75% 0.256223 -0.174543
max 0.366702 0.016975

7.3.1 逐列及多函数应用

让我们使用一个更贴近中国商业实践的数据集来探索更高级的聚合。我们将虚构一个来自某连锁餐厅的消费数据集,其中包含了消费金额、支付方式、是否为会员等信息。在中国,小费文化不普遍,因此我们用“折扣”代替。

# 创建一个模拟数据集
data = {
    'total_bill': np.random.uniform(50, 500, 250).round(2),
    'discount': np.random.uniform(0, 50, 250).round(2),
    'member': np.random.choice(['Yes', 'No'], 250, p=[0.6, 0.4]),
    'day': np.random.choice(['Fri', 'Sat', 'Sun', 'Thur'], 250),
    'time': np.random.choice(['Lunch', 'Dinner'], 250),
    'size': np.random.randint(1, 7, 250)
}
mock_china_restaurant_data_df = pd.DataFrame(data)

# 折扣不能超过总消费
mock_china_restaurant_data_df['discount'] = mock_china_restaurant_data_df.apply(
    lambda row: min(row['discount'], row['total_bill'] * 0.5), axis='columns'
)
mock_china_restaurant_data_df['discount_pct'] = (mock_china_restaurant_data_df['discount'] / mock_china_restaurant_data_df['total_bill']).round(4)
mock_china_restaurant_data_df.head()
列表 7.6
total_bill discount member day time size discount_pct
0 95.49 19.40 No Sat Dinner 6 0.2032
1 483.71 32.97 No Thur Dinner 1 0.0682
2 342.83 0.95 Yes Thur Lunch 3 0.0028
3 298.75 49.72 Yes Sun Lunch 1 0.1664
4 448.31 29.78 Yes Sat Lunch 4 0.0664

如您所见,对 SeriesDataFrame 的所有列进行聚合,只需使用 aggregate(或 agg)并附上期望的函数,或调用像 meanstd 这样的方法。然而,您可能希望根据不同的列使用不同的聚合函数,或者一次性使用多个函数。幸运的是,这是可以做到的。首先,我将按 daymember 对餐厅数据进行分组:

grouped = mock_china_restaurant_data_df.groupby(['day', 'member'])

请注意,对于像 表 7.20 中的描述性统计,您可以将函数名作为字符串传递:

grouped_pct = grouped['discount_pct']
grouped_pct.agg('mean')
day   member
Fri   No        0.129935
      Yes       0.104897
Sat   No        0.100953
      Yes       0.124037
Sun   No        0.124519
      Yes       0.108049
Thur  No        0.113144
      Yes       0.132195
Name: discount_pct, dtype: float64

如果您传递一个函数或函数名的列表,您将得到一个 DataFrame,其列名取自这些函数:

grouped_pct.agg(['mean', 'std', peak_to_peak])
表 7.24: 对分组Series应用多个聚合函数
mean std peak_to_peak
day member
Fri No 0.129935 0.128244 0.4920
Yes 0.104897 0.112336 0.4999
Sat No 0.100953 0.101656 0.4938
Yes 0.124037 0.110046 0.4302
Sun No 0.124519 0.114825 0.4766
Yes 0.108049 0.082121 0.3758
Thur No 0.113144 0.087618 0.3210
Yes 0.132195 0.122192 0.4977

您不必接受 GroupBy 为列指定的名称;特别是 lambda 函数的名称为 '<lambda>',这使得它们难以识别。因此,如果您传递一个 (name, function) 元组的列表,每个元组的第一个元素将用作 DataFrame 的列名:

grouped_pct.agg([('average', 'mean'), ('stdev', np.std)])
表 7.25: 使用自定义名称应用多个聚合
average stdev
day member
Fri No 0.129935 0.128244
Yes 0.104897 0.112336
Sat No 0.100953 0.101656
Yes 0.124037 0.110046
Sun No 0.124519 0.114825
Yes 0.108049 0.082121
Thur No 0.113144 0.087618
Yes 0.132195 0.122192

对于 DataFrame,您有更多的选择,因为您可以为所有列指定一个函数列表,或者为每列指定不同的函数。首先,假设我们想为 discount_pcttotal_bill 列计算相同的三个统计量:

functions = ['count', 'mean', 'max']
result = grouped[['discount_pct', 'total_bill']].agg(functions)
result
表 7.26: 对多列应用多个函数
discount_pct total_bill
count mean max count mean max
day member
Fri No 23 0.129935 0.5000 23 302.521739 490.98
Yes 30 0.104897 0.5000 30 298.064333 485.09
Sat No 32 0.100953 0.5000 32 262.492813 465.79
Yes 35 0.124037 0.4326 35 288.469143 498.60
Sun No 21 0.124519 0.5000 21 263.157143 493.71
Yes 43 0.108049 0.3861 43 257.188372 493.46
Thur No 25 0.113144 0.3236 25 262.722000 486.87
Yes 41 0.132195 0.5000 41 279.395366 499.16

如您所见,结果 DataFrame 具有层级列。您可以像通常那样访问列的子集:

result['discount_pct']
count mean max
day member
Fri No 23 0.129935 0.5000
Yes 30 0.104897 0.5000
Sat No 32 0.100953 0.5000
Yes 35 0.124037 0.4326
Sun No 21 0.124519 0.5000
Yes 43 0.108049 0.3861
Thur No 25 0.113144 0.3236
Yes 41 0.132195 0.5000

和之前一样,可以传递一个带有自定义名称的元组列表:

ftuples = [('Average', 'mean'), ('Variance', np.var)]
grouped[['discount_pct', 'total_bill']].agg(ftuples)
表 7.27: 对多列应用命名聚合
discount_pct total_bill
Average Variance Average Variance
day member
Fri No 0.129935 0.016446 302.521739 19404.190370
Yes 0.104897 0.012619 298.064333 14082.673763
Sat No 0.100953 0.010334 262.492813 12651.112976
Yes 0.124037 0.012110 288.469143 22444.156143
Sun No 0.124519 0.013185 263.157143 19448.798291
Yes 0.108049 0.006744 257.188372 14345.064657
Thur No 0.113144 0.007677 262.722000 18315.696867
Yes 0.132195 0.014931 279.395366 18168.556130

现在,假设您想对一个或多个列应用可能不同的函数。为此,向 agg 传递一个字典,其中包含列名到目前为止列出的任何函数规范的映射:

grouped.agg({'discount': np.max, 'size': 'sum'})
表 7.28: 对不同列应用不同函数
discount size
day member
Fri No 47.31 72
Yes 41.85 101
Sat No 49.91 112
Yes 48.80 119
Sun No 49.15 74
Yes 49.92 154
Thur No 49.82 83
Yes 48.29 139
grouped.agg({'discount_pct': ['min', 'max', 'mean', 'std'],
             'size': 'sum'})
表 7.29: 对某些列应用多个函数,对其他列应用单个函数
discount_pct size
min max mean std sum
day member
Fri No 0.0080 0.5000 0.129935 0.128244 72
Yes 0.0001 0.5000 0.104897 0.112336 101
Sat No 0.0062 0.5000 0.100953 0.101656 112
Yes 0.0024 0.4326 0.124037 0.110046 119
Sun No 0.0234 0.5000 0.124519 0.114825 74
Yes 0.0103 0.3861 0.108049 0.082121 154
Thur No 0.0026 0.3236 0.113144 0.087618 83
Yes 0.0023 0.5000 0.132195 0.122192 139

只有当至少有一列应用了多个函数时,DataFrame 才会有层级列。

7.3.2 返回不带行索引的聚合数据

到目前为止,在所有示例中,聚合后的数据都带有一个由唯一分组键组合构成的索引(可能是层级索引)。因为这并不总是理想的,您可以在大多数情况下通过向 groupby 传递 as_index=False 来禁用此行为。

mock_china_restaurant_data_df.groupby(['day', 'member'], as_index=False)[['total_bill', 'discount', 'size', 'discount_pct']].mean()
表 7.30: 带有扁平索引的分组聚合
day member total_bill discount size discount_pct
0 Fri No 302.521739 26.657609 3.130435 0.129935
1 Fri Yes 298.064333 22.067833 3.366667 0.104897
2 Sat No 262.492813 21.191875 3.500000 0.100953
3 Sat Yes 288.469143 23.759429 3.400000 0.124037
4 Sun No 263.157143 23.445238 3.523810 0.124519
5 Sun Yes 257.188372 22.746279 3.581395 0.108049
6 Thur No 262.722000 23.555600 3.320000 0.113144
7 Thur Yes 279.395366 25.514756 3.390244 0.132195

当然,总是可以通过在结果上调用 reset_index() 来获得这种格式的结果。使用 as_index=False 参数可以避免一些不必要的计算。

7.4 Apply: 通用的拆分-应用-合并

最通用的 GroupBy 方法是 applyapply 将被操作的对象拆分成块,对每个块调用传递的函数,然后尝试将这些块连接起来。

回到之前的餐厅消费数据集,假设您想按组选择前五个 discount_pct 值。首先,编写一个函数,用于选择特定列中值最大的行:

def top(df, n=5, column='discount_pct'):
    return df.sort_values(column, ascending=False)[:n]
    
top(mock_china_restaurant_data_df, n=6)
列表 7.7
total_bill discount member day time size discount_pct
76 52.51 26.255 No Fri Dinner 4 0.5000
58 72.31 36.155 Yes Thur Lunch 1 0.5000
130 62.31 31.155 Yes Fri Lunch 1 0.5000
145 85.16 42.580 No Sat Lunch 1 0.5000
123 65.96 32.980 No Sun Lunch 3 0.5000
187 83.28 39.340 Yes Thur Lunch 3 0.4724

现在,如果我们按 member 分组,并用这个函数调用 apply,我们会得到以下结果:

mock_china_restaurant_data_df.groupby('member').apply(top)
表 7.31: 会员与非会员的前5名折扣百分比
total_bill discount member day time size discount_pct
member
No 76 52.51 26.255 No Fri Dinner 4 0.5000
145 85.16 42.580 No Sat Lunch 1 0.5000
123 65.96 32.980 No Sun Lunch 3 0.5000
203 94.17 37.790 No Fri Dinner 4 0.4013
146 110.67 40.730 No Fri Dinner 5 0.3680
Yes 58 72.31 36.155 Yes Thur Lunch 1 0.5000
130 62.31 31.155 Yes Fri Lunch 1 0.5000
187 83.28 39.340 Yes Thur Lunch 3 0.4724
51 67.83 29.940 Yes Fri Lunch 1 0.4414
88 65.93 28.520 Yes Sat Lunch 1 0.4326

这里发生了什么?top 函数在 groupby 的每个组上被调用。mock_china_restaurant_data_df DataFrame 根据 member 的值被分割成组。然后 top 函数在每个组上被调用,每个函数调用的结果使用 pandas.concat 粘合在一起,并用组名标记各个部分。因此,结果有一个层级索引,其内层包含了原始 DataFrame 的索引值。

如果您向 apply 传递一个接受其他参数或关键字的函数,您可以在函数之后传递它们:

mock_china_restaurant_data_df.groupby(['member', 'day']).apply(top, n=1, column='total_bill')
表 7.32: 每个会员和日期组合的最高总账单
total_bill discount member day time size discount_pct
member day
No Fri 33 490.98 12.91 No Fri Dinner 1 0.0263
Sat 174 465.79 15.82 No Sat Dinner 1 0.0340
Sun 44 493.71 49.15 No Sun Lunch 1 0.0996
Thur 74 486.87 47.64 No Thur Lunch 4 0.0978
Yes Fri 237 485.09 0.07 Yes Fri Dinner 5 0.0001
Sat 155 498.60 13.11 Yes Sat Lunch 2 0.0263
Sun 63 493.46 11.08 Yes Sun Dinner 3 0.0225
Thur 236 499.16 10.65 Yes Thur Lunch 1 0.0213

除了这些基本的使用机制,要充分利用 apply 可能需要一些创造力。传递的函数内部发生什么完全取决于您;它必须返回一个 pandas 对象或一个标量值。

例如,您可能还记得我之前在一个 GroupBy 对象上调用了 describe

result = mock_china_restaurant_data_df.groupby('member')['discount_pct'].describe()
result
count mean std min 25% 50% 75% max
member
No 101.0 0.115470 0.106936 0.0026 0.0391 0.0920 0.1401 0.5
Yes 149.0 0.117814 0.106451 0.0001 0.0418 0.0851 0.1573 0.5

为了得到不同的视图,可以将其 unstack:

result.unstack('member')
       member
count  No        101.000000
       Yes       149.000000
mean   No          0.115470
       Yes         0.117814
std    No          0.106936
       Yes         0.106451
min    No          0.002600
       Yes         0.000100
25%    No          0.039100
       Yes         0.041800
50%    No          0.092000
       Yes         0.085100
75%    No          0.140100
       Yes         0.157300
max    No          0.500000
       Yes         0.500000
dtype: float64

GroupBy 内部,当您调用像 describe 这样的方法时,它实际上只是以下操作的快捷方式:

def f(group):
    return group.describe()
grouped.apply(f)

7.4.1 禁止分组键

在前面的例子中,您看到结果对象有一个由分组键形成的层级索引,以及原始对象每个部分的索引。您可以通过向 groupby 传递 group_keys=False 来禁用此行为:

mock_china_restaurant_data_df.groupby('member', group_keys=False).apply(top)
表 7.33: 会员与非会员的前5名折扣百分比,并禁止了分组键
total_bill discount member day time size discount_pct
76 52.51 26.255 No Fri Dinner 4 0.5000
145 85.16 42.580 No Sat Lunch 1 0.5000
123 65.96 32.980 No Sun Lunch 3 0.5000
203 94.17 37.790 No Fri Dinner 4 0.4013
146 110.67 40.730 No Fri Dinner 5 0.3680
58 72.31 36.155 Yes Thur Lunch 1 0.5000
130 62.31 31.155 Yes Fri Lunch 1 0.5000
187 83.28 39.340 Yes Thur Lunch 3 0.4724
51 67.83 29.940 Yes Fri Lunch 1 0.4414
88 65.93 28.520 Yes Sat Lunch 1 0.4326

7.4.2 分位数和分箱分析

您可能还记得,pandas 有一些工具,特别是 pandas.cutpandas.qcut,用于将数据按您选择的箱或样本分位数切分成桶。将这些函数与 groupby 结合起来,可以方便地对数据集进行分箱或分位数分析。考虑一个简单的随机数据集,并使用 pandas.cut 进行等长分箱:

frame = pd.DataFrame({'data1': np.random.standard_normal(1000),
                      'data2': np.random.standard_normal(1000)})
frame.head()
列表 7.8
data1 data2
0 1.342404 -1.228167
1 1.562269 -2.047301
2 -1.073685 1.187914
3 0.434035 -1.602576
4 -0.665325 -0.927110
quartiles = pd.cut(frame['data1'], 4)
quartiles.head(10)
0      (1.167, 2.659]
1      (1.167, 2.659]
2    (-1.816, -0.324]
3     (-0.324, 1.167]
4    (-1.816, -0.324]
5    (-1.816, -0.324]
6    (-1.816, -0.324]
7     (-0.324, 1.167]
8     (-0.324, 1.167]
9    (-1.816, -0.324]
Name: data1, dtype: category
Categories (4, interval[float64, right]): [(-3.314, -1.816] < (-1.816, -0.324] < (-0.324, 1.167] < (1.167, 2.659]]

cut 返回的 Categorical 对象可以直接传递给 groupby。因此,我们可以为这些四分位数计算一组分组统计量,如下所示:

def get_stats(group):
    return pd.DataFrame(
        {'min': group.min(), 'max': group.max(),
         'count': group.count(), 'mean': group.mean()}
    )

grouped = frame.groupby(quartiles)
grouped.apply(get_stats)
列表 7.9
min max count mean
data1
(-3.314, -1.816] data1 -3.307944 -1.822582 48 -2.248275
data2 -1.549661 2.716755 48 0.130066
(-1.816, -0.324] data1 -1.797399 -0.328869 333 -0.892076
data2 -3.044373 2.358524 333 -0.045136
(-0.324, 1.167] data1 -0.321397 1.158175 497 0.365144
data2 -3.284397 2.507435 497 -0.026776
(1.167, 2.659] data1 1.172192 2.658952 122 1.661277
data2 -2.539441 3.008719 122 -0.038390

请记住,使用 agg 可以更简单地计算出相同的结果:

grouped.agg(['min', 'max', 'count', 'mean'])
表 7.34: 使用 agg 方法进行分位数分析
data1 data2
min max count mean min max count mean
data1
(-3.314, -1.816] -3.307944 -1.822582 48 -2.248275 -1.549661 2.716755 48 0.130066
(-1.816, -0.324] -1.797399 -0.328869 333 -0.892076 -3.044373 2.358524 333 -0.045136
(-0.324, 1.167] -0.321397 1.158175 497 0.365144 -3.284397 2.507435 497 -0.026776
(1.167, 2.659] 1.172192 2.658952 122 1.661277 -2.539441 3.008719 122 -0.038390

这些是等长分箱。要根据样本分位数计算等大小的分箱,请使用 pandas.qcut。我们可以传递 labels=False 来只获取四分位数索引而不是区间:

quartiles_samp = pd.qcut(frame['data1'], 4, labels=False)
quartiles_samp.head()
0    3
1    3
2    0
3    2
4    1
Name: data1, dtype: int64

现在我们可以按这些样本分位数进行分组:

frame.groupby(quartiles_samp).apply(get_stats)
表 7.35: 按样本分位数进行聚合
min max count mean
data1
0 data1 -3.307944 -0.731561 250 -1.343229
data2 -3.044373 2.716755 250 -0.047535
1 data1 -0.731351 0.003788 250 -0.340292
data2 -2.894633 2.507435 250 -0.077588
2 data1 0.005341 0.714621 250 0.330477
data2 -3.046726 2.380581 250 -0.001942
3 data1 0.719322 2.658952 250 1.269742
data2 -3.284397 3.008719 250 0.019951

7.4.3 示例:使用特定于组的值填充缺失值

在清理缺失数据时,有时您会使用 dropna 删除数据观测值,但在其他情况下,您可能希望使用固定值或从数据中派生的某个值来填充空值(NA)。fillna 是实现此目的的正确工具。

假设您需要填充值因组而异。一种方法是对数据进行分组,并使用 apply 和一个在每个数据块上调用 fillna 的函数。这里有一些关于中国各省的数据,分为南方和北方地区(以秦岭-淮河为界):

provinces = ['黑龙江', '北京', '山东', '河南',
          '上海', '湖北', '四川', '广东']
group_key = ['北方'] * 4 + ['南方'] * 4
data = pd.Series(np.random.standard_normal(8), index=provinces)

# 将某些值设为缺失
data[['山东', '湖北', '广东']] = np.nan
data
列表 7.10
黑龙江   -0.252479
北京    -1.038845
山东          NaN
河南    -0.680008
上海    -0.716632
湖北          NaN
四川     0.878256
广东          NaN
dtype: float64

我们现在可以使用各组的均值来填充 NA 值:

def fill_mean(group):
    return group.fillna(group.mean())

#| label: tbl-fill-mean-result
#| tbl-cap: '用区域均值填充缺失值后的数据'
data.groupby(group_key).apply(fill_mean)
列表 7.11
北方  黑龙江   -0.252479
    北京    -1.038845
    山东    -0.657111
    河南    -0.680008
南方  上海    -0.716632
    湖北     0.080812
    四川     0.878256
    广东     0.080812
dtype: float64

在另一种情况下,您可能在代码中预定义了因组而异的填充值。由于分组内部有一个 name 属性,我们可以利用它:

fill_values = {'北方': 0.5, '南方': -1}
def fill_func(group):
    return group.fillna(fill_values[group.name])

data.groupby(group_key).apply(fill_func)
列表 7.12
北方  黑龙江   -0.252479
    北京    -1.038845
    山东     0.500000
    河南    -0.680008
南方  上海    -0.716632
    湖北    -1.000000
    四川     0.878256
    广东    -1.000000
dtype: float64

7.4.4 示例:随机抽样和排列

假设您想为蒙特卡洛模拟目的从一个大型数据集中抽取随机样本(有放回或无放回)。我们可以使用 Seriessample 方法。为了演示,这里有一种构建一副扑克牌的方法:

# H=红桃, S=黑桃, C=梅花, D=方块 (Mocking Sectors: Finance, Tech, Consumer, Energy)
sectors = ['Finance', 'Tech', 'Consumer', 'Energy']
stock_list = []
for sector in sectors:
    stock_list.extend([f'{sector}_{i}' for i in range(13)])
stock_pool = pd.Series(np.arange(52), index=stock_list)
stock_pool.head(13)
列表 7.13
Finance_0      0
Finance_1      1
Finance_2      2
Finance_3      3
Finance_4      4
Finance_5      5
Finance_6      6
Finance_7      7
Finance_8      8
Finance_9      9
Finance_10    10
Finance_11    11
Finance_12    12
dtype: int64

从股票池中随机抽取五只股票可以写成:

def draw_stocks(pool, n=5):
    return pool.sample(n)
draw_stocks(stock_pool)
Energy_12     51
Tech_11       24
Finance_1      1
Energy_11     50
Consumer_3    29
dtype: int64

假设您想从每种行业板块中随机抽取两个股票。由于板块名称是每个股票代码名称的第一个部分,我们可以基于此进行分组并使用 apply

def get_sector(stock_code):
    # 股票代码的前缀是板块
    return stock_code.split('_')[0]

#| label: tbl-draw-by-suit
#| tbl-cap: '从每个板块中随机抽取两只股票'
stock_pool.groupby(get_sector).apply(draw_stocks, n=2)
列表 7.14
Consumer  Consumer_4    30
          Consumer_5    31
Energy    Energy_12     51
          Energy_0      39
Finance   Finance_10    10
          Finance_5      5
Tech      Tech_9        22
          Tech_3        16
dtype: int64

stock_pool.groupby(get_sector, group_keys=False).apply(draw_stocks, n=2)


### 示例:分组加权平均和相关性 {#sec-example-weighted-avg}
在 `groupby` 的拆分-应用-合并范式下,`DataFrame` 中的列之间或两个 `Series` 之间的操作是可能的。举个例子,这个数据集包含分组键、值和一些权重:

::: {#tbl-weighted-avg-data .cell tbl-cap='用于分组加权平均计算的投资组合数据' execution_count=65}
``` {.python .cell-code}
portfolio_data = pd.DataFrame({'sector': ['Tech', 'Tech', 'Tech', 'Tech', 'Finance', 'Finance', 'Finance', 'Finance'],
                   'return': np.random.standard_normal(8),
                   'market_cap': np.random.uniform(size=8)})
portfolio_data
sector return market_cap
0 Tech -0.162351 0.970258
1 Tech -0.150885 0.370825
2 Tech 0.798210 0.493428
3 Tech 0.557764 0.891265
4 Finance -0.724968 0.953008
5 Finance 0.287030 0.133927
6 Finance -0.314550 0.998973
7 Finance 1.083750 0.685820

:::

按板块计算的加权平均收益率将是:

def get_wavg_return(group):
    return np.average(group['return'], weights=group['market_cap'])

#| label: tbl-wavg-result
#| tbl-cap: '按板块的分组加权平均收益率'
portfolio_data.groupby('sector').apply(get_wavg_return)
列表 7.15
sector
Finance   -0.080610
Tech       0.248553
dtype: float64

再举一个更贴近金融实践的例子。我们将使用本地数据获取几只A股龙头股票(贵州茅台、宁德时代、中国平安)和沪深300指数(000300.XSHG)历史收盘价数据。

import pandas as pd
import numpy as np

# 股票映射:代码 -> 名称
tickers = {'600519.XSHG': '贵州茅台', '300750.XSHE': '宁德时代', '601318.XSHG': '中国平安'}
index_symbol = '000300.XSHG'

all_data = {}

# 获取股票数据
stock_px = pd.read_parquet('C:/qiufei/data/stock/stock_price_pre_adjusted.parquet', 
                          filters=[('order_book_id', 'in', list(tickers.keys()))]).reset_index()
# 转换日期并重塑数据
stock_px['date'] = pd.to_datetime(stock_px['date'])
for code, name in tickers.items():
    all_data[name] = stock_px[stock_px['order_book_id'] == code].set_index('date')['close']

# 获取指数数据
index_px = pd.read_parquet('C:/qiufei/data/index/indexes.parquet', 
                          filters=[('symbol', '==', index_symbol)])
index_px['datetime'] = pd.to_datetime(index_px['datetime'])
all_data['沪深300'] = index_px.set_index('datetime')['close']

# 合并数据并处理缺失值
asset_closing_prices_df = pd.DataFrame(all_data).sort_index()
asset_closing_prices_df = asset_closing_prices_df.loc['2018-01-01':'2023-12-31']
asset_closing_prices_df.dropna(inplace=True)
asset_closing_prices_df.tail()
列表 7.16
贵州茅台 宁德时代 中国平安 沪深300

DataFrameinfo() 方法在这里是一种方便的方式,可以概览 DataFrame 的内容。

asset_closing_prices_df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 0 entries
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   贵州茅台    0 non-null      float64
 1   宁德时代    0 non-null      float64
 2   中国平安    0 non-null      float64
 3   沪深300   0 non-null      float64
dtypes: float64(4)
memory usage: 0.0 bytes

一个在投资组合分析中常见的任务是计算个股的每日收益率与市场指数(此处为沪深300)的年度相关性。作为一种实现方式,我们首先创建一个函数,计算每列与 ‘沪深300’ 列的成对相关性:

列表 7.17
def csi300_corr(group):
    return group.corrwith(group['沪深300'])

接下来,我们使用 pct_change 计算 close_px 的百分比变化(即日收益率):

daily_returns_df = asset_closing_prices_df.pct_change().dropna()

最后,我们按年份对这些收益率进行分组,年份可以从每个行标签(日期)中提取:

def get_year(x):
    return x.year

#| label: tbl-yearly-corr
#| tbl-cap: '股票收益与沪深300指数的年度相关性'
by_year = daily_returns_df.groupby(get_year)
by_year.apply(csi300_corr)
列表 7.18
贵州茅台 宁德时代 中国平安 沪深300

您还可以计算任意两列之间的相关性。这里我们计算贵州茅台和宁德时代之间的年度相关性:

def corr_moutai_catl(group):
    return group['贵州茅台'].corr(group['宁德时代'])

#| label: tbl-moutai-catl-corr-result
#| tbl-cap: '贵州茅台和宁德时代收益的年度相关性'
by_year.apply(corr_moutai_catl)
列表 7.19
贵州茅台 宁德时代 中国平安 沪深300

7.4.5 示例:分组线性回归

与前一个示例的主题相同,您可以使用 groupby 执行更复杂的分组统计分析,只要函数返回一个 pandas 对象或标量值即可。例如,我们可以定义以下 regress 函数(使用 statsmodels 计量经济学库),它对每个数据块执行普通最小二乘法(OLS)回归。

实证视角:资本资产定价模型 (CAPM) 的分组回归分析

在投资组合分析与学术研究中,对特定因子(如行业、市值、动量)进行分组并执行滚动回归是一项标准流程。上述操作在金融研究中具有明确的计量含义:

  1. 系统性风险评估:我们正在逐年估计 CAPM 模型 \(R_i - R_{rf} = \alpha + \beta(R_m - R_{rf}) + \epsilon\)。这里的回归斜率 \(\beta\) 揭示了资产收益对市场波动的暴露程度。
  2. 动态 Beta (Dynamic Beta):通过分组回归,我们可以观察到如“宁德时代”或“贵州茅台”等行业龙头的风险特性在不同宏观周期下的变动。
  3. 异方差性考量:需要注意的是,在实际研究中,金融收益率常表现出聚集波动性。虽然 OLS 提供了一致估计,但在进行假设检验时,应结合稳健标准误(Robust Standard Errors)或 Newey-West 调整以应对序列相关性问题。
列表 7.20
import statsmodels.api as sm
def regress(data, yvar, xvars):
    Y = data[yvar]
    X = data[xvars]
    X = sm.add_constant(X) # 添加截距项
    result = sm.OLS(Y, X).fit()
    return result.params

现在,要对“宁德时代”相对“沪深300”的收益率进行年度线性回归,我们执行:

by_year.apply(regress, yvar='宁德时代', xvars=['沪深300'])
表 7.36: 宁德时代收益对沪深300收益的年度OLS回归
贵州茅台 宁德时代 中国平安 沪深300

7.5 分组转换和“展开的”GroupBy

小节 7.4 中,我们研究了用于执行转换的 apply 方法。还有一个名为 transform 的内置方法,它与 apply 类似,但对您可以使用的函数类型施加了更多限制:

  • 它可以产生一个标量值,该值将被广播到组的形状。
  • 它可以产生一个与输入组形状相同的对象。
  • 它不能改变其输入。

让我们看一个简单的例子来说明:

sample_trading_volume_df = pd.DataFrame({'stock': ['AAPL', 'GOOG', 'MSFT'] * 4,
                   'volume': np.random.randint(1000, 10000, 12)})
sample_trading_volume_df
表 7.37: 用于演示 transform 的日成交量数据
stock volume
0 AAPL 6605
1 GOOG 9610
2 MSFT 6875
3 AAPL 5713
4 GOOG 9733
5 MSFT 1148
6 AAPL 8859
7 GOOG 2205
8 MSFT 7440
9 AAPL 7315
10 GOOG 5634
11 MSFT 9314

以下是按股票分组的平均成交量:

g = sample_trading_volume_df.groupby('stock')['volume']
g.mean()
stock
AAPL    7123.00
GOOG    6795.50
MSFT    6194.25
Name: volume, dtype: float64

假设我们要生成一个与 sample_trading_volume_df['volume'] 形状相同但值被替换为按 ‘stock’ 分组的平均值的 Series。我们可以将一个计算单个组均值的函数传递给 transform

def get_mean(group):
    return group.mean()
    
g.transform(get_mean)
列表 7.21
0     7123.00
1     6795.50
2     6194.25
3     7123.00
4     6795.50
5     6194.25
6     7123.00
7     6795.50
8     6194.25
9     7123.00
10    6795.50
11    6194.25
Name: volume, dtype: float64

对于内置的聚合函数,我们可以像 GroupBy agg 方法一样传递一个字符串别名:

g.transform('mean')
0     7123.00
1     6795.50
2     6194.25
3     7123.00
4     6795.50
5     6194.25
6     7123.00
7     6795.50
8     6194.25
9     7123.00
10    6795.50
11    6194.25
Name: volume, dtype: float64

transform vs. apply

transform 的核心约束是它返回的对象必须与输入的分组块(group chunk)具有相同的形状(即相同的索引)。这使得它可以用来进行广播操作,比如用组内均值替换每个元素。而 apply 则灵活得多,它可以返回任意形状的对象,pandas 会尝试将所有返回结果智能地拼接起来。

一个会使 transform 失败但 apply 能成功的例子是返回一个标量值的函数,比如 group.describe()

# 这会失败,因为 describe() 返回 un 一个 Series,其形状与输入组不同
# g.transform(lambda x: x.describe()) 

# 这可以工作,因为 apply 可以处理任意形状的输出
g.apply(lambda x: x.describe())

选择 transform 还是 apply 取决于您的目标:如果您想基于分组计算来“改变”或“转换”原始数据框中的值(例如中心化、标准化),请使用 transform。如果您想对每个组进行汇总,得到一个比原分组更小的结果,请使用 applyagg

apply 类似,transform 适用于返回 Series 的函数,但结果的大小必须与输入相同。例如,我们可以使用一个辅助函数将每个组乘以2:

def times_two(group):
    return group * 2
g.transform(times_two)
0     13210
1     19220
2     13750
3     11426
4     19466
5      2296
6     17718
7      4410
8     14880
9     14630
10    11268
11    18628
Name: volume, dtype: int32

作为一个更复杂的例子,我们可以计算每个组内降序的排名:

def get_ranks(group):
    return group.rank(ascending=False)
g.transform(get_ranks)
列表 7.22
0     3.0
1     2.0
2     3.0
3     4.0
4     1.0
5     4.0
6     1.0
7     4.0
8     2.0
9     2.0
10    3.0
11    1.0
Name: volume, dtype: float64

考虑一个由简单聚合组成的分组转换函数,例如,将数据进行标准化(计算z-score):

列表 7.23
def normalize(x):
    return (x - x.mean()) / x.std()

在这种情况下,我们可以使用 transformapply 获得等效的结果:

g.transform(normalize)
0    -0.389463
1     0.780881
2     0.193287
3    -1.060122
4     0.815008
5    -1.432792
6     1.305228
7    -1.273631
8     0.353708
9     0.144357
10   -0.322257
11    0.885797
Name: volume, dtype: float64
g.apply(normalize)
stock    
AAPL   0    -0.389463
       3    -1.060122
       6     1.305228
       9     0.144357
GOOG   1     0.780881
       4     0.815008
       7    -1.273631
       10   -0.322257
MSFT   2     0.193287
       5    -1.432792
       8     0.353708
       11    0.885797
Name: volume, dtype: float64

'mean''sum' 这样的内置聚合函数通常比通用的 apply 函数快得多。当与 transform 一起使用时,它们也有一个“快速路径”。这使我们能够执行所谓的 “展开的” (unwrapped) 分组操作:

normalized = (sample_trading_volume_df['volume'] - g.transform('mean')) / g.transform('std')
normalized
0    -0.389463
1     0.780881
2     0.193287
3    -1.060122
4     0.815008
5    -1.432792
6     1.305228
7    -1.273631
8     0.353708
9     0.144357
10   -0.322257
11    0.885797
Name: volume, dtype: float64

在这里,我们是在多个 GroupBy 操作的输出之间进行算术运算,而不是编写一个函数并将其传递给 groupby(...).apply。这种向量化的方法就是所谓的“展开的”,它通常要快得多。

7.6 透视表和交叉表

透视表是一种数据汇总工具,常见于Excel等电子表格程序和其他数据分析软件中。它通过一个或多个键聚合数据表,将数据排列在一个矩形中,其中一些分组键沿行排列,一些沿列排列。在 Python 中使用 pandas 创建透视表是通过 groupby 功能以及利用层级索引的重塑操作实现的。DataFrame 还有一个 pivot_table 方法,并且还有一个顶级的 pandas.pivot_table 函数。

回到我们的餐厅消费数据集,假设您想计算一个按 daymember 排列在行上的分组均值表(pivot_table 的默认聚合类型):

mock_china_restaurant_data_df.pivot_table(index=['day', 'member'], values=['total_bill', 'discount', 'size', 'discount_pct'])
表 7.38: 一个简单的均值透视表
discount discount_pct size total_bill
day member
Fri No 26.657609 0.129935 3.130435 302.521739
Yes 22.067833 0.104897 3.366667 298.064333
Sat No 21.191875 0.100953 3.500000 262.492813
Yes 23.759429 0.124037 3.400000 288.469143
Sun No 23.445238 0.124519 3.523810 263.157143
Yes 22.746279 0.108049 3.581395 257.188372
Thur No 23.555600 0.113144 3.320000 262.722000
Yes 25.514756 0.132195 3.390244 279.395366

这本可以用 groupby 直接生成。现在,假设我们只想对 discount_pctsize 求平均值,并额外按 time 分组。我将把 member 放在表的列中,把 timeday 放在行中:

mock_china_restaurant_data_df.pivot_table(index=['time', 'day'],
                       columns='member',
                       values=['discount_pct', 'size'])
表 7.39: 带有行和列分组的透视表
discount_pct size
member No Yes No Yes
time day
Dinner Fri 0.186283 0.091021 3.416667 3.285714
Sat 0.088444 0.117471 3.875000 3.285714
Sun 0.118388 0.110683 3.375000 3.347826
Thur 0.113082 0.110157 3.294118 3.619048
Lunch Fri 0.068464 0.117038 2.818182 3.437500
Sat 0.113462 0.128414 3.125000 3.476190
Sun 0.128292 0.105020 3.615385 3.850000
Thur 0.113275 0.155335 3.375000 3.150000

我们可以通过传递 margins=True 来扩充此表以包含部分总计。这会添加 All 行和列标签,其对应的值是单个层级内所有数据的分组统计信息:

mock_china_restaurant_data_df.pivot_table(index=['time', 'day'],
                       columns='member',
                       values=['discount_pct', 'size'],
                       margins=True)
表 7.40: 带有用于小计和总计的边距的透视表
discount_pct size
member No Yes All No Yes All
time day
Dinner Fri 0.186283 0.091021 0.134988 3.416667 3.285714 3.346154
Sat 0.088444 0.117471 0.101990 3.875000 3.285714 3.600000
Sun 0.118388 0.110683 0.112671 3.375000 3.347826 3.354839
Thur 0.113082 0.110157 0.111466 3.294118 3.619048 3.473684
Lunch Fri 0.068464 0.117038 0.097248 2.818182 3.437500 3.185185
Sat 0.113462 0.128414 0.121949 3.125000 3.476190 3.324324
Sun 0.128292 0.105020 0.114188 3.615385 3.850000 3.757576
Thur 0.113275 0.155335 0.143318 3.375000 3.150000 3.214286
All 0.115470 0.117814 0.116867 3.376238 3.442953 3.416000

要使用除 mean 之外的聚合函数,请将其传递给 aggfunc 关键字参数。例如,'count'len 会给您一个分组大小的交叉表:

mock_china_restaurant_data_df.pivot_table(index=['time', 'member'],
                       columns='day',
                       values='discount_pct',
                       aggfunc=len,
                       margins=True)
表 7.41: 使用 len 作为聚合函数的透视表
day Fri Sat Sun Thur All
time member
Dinner No 12 16 8 17 53
Yes 14 14 23 21 72
Lunch No 11 16 13 8 48
Yes 16 21 20 20 77
All 53 67 64 66 250

如果某些组合为空(或为NA),您可能希望传递一个 fill_value

mock_china_restaurant_data_df.pivot_table(index=['time', 'size', 'member'],
                       columns='day',
                       values='discount_pct',
                       fill_value=0)
表 7.42: 使用 fill_value 处理缺失组合的透视表
day Fri Sat Sun Thur
time size member
Dinner 1 No 0.055600 0.051850 0.074333 0.085950
Yes 0.050200 0.157750 0.076214 0.109233
2 No 0.080350 0.095375 0.000000 0.066000
Yes 0.155700 0.059467 0.257050 0.167525
3 No 0.201200 0.119700 0.092000 0.171100
Yes 0.089300 0.197800 0.103167 0.139900
4 No 0.283900 0.172650 0.035300 0.117100
Yes 0.105575 0.030400 0.088750 0.100900
5 No 0.248750 0.034600 0.100800 0.017900
Yes 0.000100 0.007600 0.133980 0.066400
6 No 0.129200 0.071717 0.248000 0.204875
Yes 0.105650 0.156950 0.085300 0.084125
Lunch 1 No 0.087800 0.201720 0.128567 0.008800
Yes 0.195167 0.201400 0.098450 0.113660
2 No 0.050425 0.000000 0.000000 0.000000
Yes 0.105950 0.111067 0.108700 0.059250
3 No 0.068575 0.077750 0.261700 0.199500
Yes 0.147800 0.113600 0.086400 0.230300
4 No 0.055500 0.072150 0.069650 0.122400
Yes 0.000000 0.107480 0.057600 0.173975
5 No 0.000000 0.014900 0.098550 0.160950
Yes 0.000000 0.034467 0.106967 0.188575
6 No 0.133800 0.096150 0.141500 0.000000
Yes 0.048843 0.150525 0.129150 0.048500

有关 pivot_table 选项的摘要,请参见 表 7.43

表 7.43: pivot_table 选项
参数 描述
values 要聚合的列名或列名列表;默认情况下,聚合所有数值列
index 用于在结果透视表的行上分组的列名或其他分组键
columns 用于在结果透视表的列上分组的列名或其他分组键
aggfunc 聚合函数或函数列表(默认为 ‘mean’);可以是 groupby 上下文中任何有效的函数
fill_value 替换结果表中的缺失值
dropna 如果为 True,则不包括条目全为 NA 的列
margins 添加行/列小计和总计(默认为 False
margins_name margins=True 时,用于边距行/列标签的名称;默认为 ‘All’
observed 对于分类分组键,如果为 True,则只显示键中观察到的类别值,而不是所有类别

7.6.1 交叉表: Crosstab

交叉表(crosstab)是透视表的一种特殊情况,用于计算分组频率。假设我们想按省份和饮食口味偏好来总结一些调查数据。pandas.crosstab 函数对于此任务可能比 pivot_table 更方便。

列表 7.24
from io import StringIO

data = """Sample,Province,Flavor
1,四川,重辣
2,广东,清淡
3,四川,微辣
4,湖南,重辣
5,广东,清淡
6,江苏,清淡
7,四川,重辣
8,北京,清淡
9,湖南,重辣
10,广东,清淡"""

data = pd.read_csv(StringIO(data))
pd.crosstab(data['Province'], data['Flavor'], margins=True)
表 7.44: 省份和口味偏好的交叉表
Flavor 微辣 清淡 重辣 All
Province
北京 0 1 0 1
四川 1 0 2 3
广东 0 3 0 3
江苏 0 1 0 1
湖南 0 0 2 2
All 1 5 4 10

crosstab 的前两个参数可以是数组、Series 或数组列表。就像在我们模拟的餐厅数据中一样:

pd.crosstab([mock_china_restaurant_data_df['time'], mock_china_restaurant_data_df['day']], mock_china_restaurant_data_df['member'], margins=True)
表 7.45: 在餐厅数据集上的交叉表
member No Yes All
time day
Dinner Fri 12 14 26
Sat 16 14 30
Sun 8 23 31
Thur 17 21 38
Lunch Fri 11 16 27
Sat 16 21 37
Sun 13 20 33
Thur 8 20 28
All 101 149 250

7.7 习题

7.7.1 习题 10.1: 基础分组聚合

问题描述

使用宁波港和宁波银行的股票数据,进行基础分组聚合:

  1. 按股票代码分组计算基本统计量
  2. 计算各股票的平均收益率和波动率
  3. 找出每个股票的最高价和最低价及其日期
  4. 按周分组计算累计收益率

完整解答

import pandas as pd
import numpy as np
from functools import reduce

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
stock_data = pd.concat([port_prices, bank_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-03-31')]

# 筛选宁波港和宁波银行的数据
filtered = stock_data.copy()
filtered = filtered.sort_values(['symbol', 'datetime'])
filtered.set_index('datetime', inplace=True) # 设置索引以便后续分析

# 1. 基本统计量
print('=== 1. 基本统计量 ===')
basic_stats = filtered.groupby('symbol')[['close', 'volume']].agg([
    ('mean', 'mean'),
    ('std', 'std'),
    ('min', 'min'),
    ('max', 'max')
]).round(2)
print(basic_stats)

# 2. 收益率和波动率
print('\n=== 2. 收益率分析 ===')
filtered['returns'] = filtered.groupby('symbol')['close'].pct_change()

returns_stats = filtered.groupby('symbol')['returns'].agg([
    ('日均收益率', 'mean'),
    ('收益率标准差', 'std'),
    ('夏普比率', lambda x: x.mean() / x.std() * np.sqrt(252) if x.std() > 0 else 0)
]).round(4)
print(returns_stats)

# 3. 最高价和最低价
print('\n=== 3. 极值分析 ===')

def get_extreme_dates(group, col='close', extreme='max'):
    """获取极值及其对应的日期"""
    if extreme == 'max':
        idx = group[col].idxmax()
    else:
        idx = group[col].idxmin()
    return pd.Series({
        f'{extreme}_value': group.loc[idx, col],
        f'{extreme}_date': idx
    })

max_info = filtered.groupby('symbol').apply(lambda g: get_extreme_dates(g, 'close', 'max'))
min_info = filtered.groupby('symbol').apply(lambda g: get_extreme_dates(g, 'close', 'min'))

print('最高价信息:')
print(max_info)
print('\n最低价信息:')
print(min_info)

# 4. 按周累计收益率
print('\n=== 4. 周累计收益率 ===')
filtered['week'] = filtered.index.isocalendar().week

# 计算每周累计收益率
weekly_returns = filtered.groupby(['symbol', 'week'])['returns'].apply(
    lambda x: (1 + x).prod() - 1
).reset_index()
weekly_returns.columns = ['股票', '周', '累计收益率']

print('每周累计收益率(前10行):')
print(weekly_returns.head(10))

# 按股票汇总周表现
weekly_summary = weekly_returns.pivot(index='周', columns='股票', values='累计收益率')
print('\n周收益率矩阵:')
print(weekly_summary.round(4).head())
=== 1. 基本统计量 ===
        close                           volume                          \
         mean   std    min    max         mean         std         min   
symbol                                                                   
宁波港      3.33  0.04   3.23   3.41   9668266.51  4834033.37   3979300.0   
宁波银行    27.84  2.04  24.86  31.20  30043131.53  9824164.07  15884425.0   

                    
               max  
symbol              
宁波港     25577104.0  
宁波银行    65203908.0  

=== 2. 收益率分析 ===
         日均收益率  收益率标准差    夏普比率
symbol                        
宁波港     0.0003  0.0063  0.6545
宁波银行   -0.0027  0.0165 -2.6092

=== 3. 极值分析 ===
最高价信息:
        max_value   max_date
symbol                      
宁波港        3.4137 2023-03-17
宁波银行      31.1967 2023-01-16

最低价信息:
        min_value   min_date
symbol                      
宁波港        3.2302 2023-01-12
宁波银行      24.8570 2023-03-17

=== 4. 周累计收益率 ===
每周累计收益率(前10行):
    股票   周         累计收益率
0  宁波港   1 -2.808217e-03
1  宁波港   2 -2.816125e-03
2  宁波港   3  2.535531e-02
3  宁波港   5  2.754244e-03
4  宁波港   6 -1.110223e-16
5  宁波港   7 -1.095686e-02
6  宁波港   8  8.301135e-03
7  宁波港   9  1.921983e-02
8  宁波港  10 -2.693494e-02
9  宁波港  11  3.045762e-02

周收益率矩阵:
股票     宁波港    宁波银行
周                 
1  -0.0028  0.0130
2  -0.0028  0.0411
3   0.0254 -0.0224
5   0.0028 -0.0416
6  -0.0000 -0.0132

关键要点: - groupby + agg 是最常用的聚合模式 - 聚合函数可以是字符串、函数或列表 - apply 允许自定义聚合逻辑 - 命名聚合使结果更清晰 - 分组前记得排序


7.7.2 习题 10.2: 多级分组与自定义聚合

问题描述

对股票数据按多个维度进行分组分析:

  1. 按股票和月份分组
  2. 计算各月交易天数和平均成交量
  3. 识别各股票的”活跃月份”(成交量最大的月份)
  4. 创建自定义聚合函数计算价格振幅

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
moutai_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
moutai_prices['symbol'] = '贵州茅台'
stock_data = pd.concat([port_prices, bank_prices, moutai_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-12-31')]

# 筛选数据
filtered = stock_data.copy()
filtered['month'] = filtered['datetime'].dt.month

# 1. 按股票和月份分组
print('=== 1. 按股票和月份的基本统计 ===')
monthly_stats = filtered.groupby(['symbol', 'month']).agg({
    'close': ['first', 'last', 'mean'],
    'volume': ['sum', 'mean']
}).round(2)
print(monthly_stats.head())

# 2. 交易天数和平均成交量
print('\n=== 2. 交易统计 ===')
trading_stats = filtered.groupby(['symbol', 'month']).agg({
    'datetime': 'count',  # 交易天数
    'volume': ['mean', 'sum']
}).round(0)
trading_stats.columns = ['交易天数', '日均成交量', '月总成交量']
print(trading_stats.head(10))

# 3. 活跃月份识别
print('\n=== 3. 活跃月份识别 ===')

most_active_month = filtered.groupby(['symbol', 'month'])['volume'].sum().reset_index()
most_active_month = most_active_month.loc[
    most_active_month.groupby('symbol')['volume'].idxmax()
]
print('各股票成交量最大的月份:')
print(most_active_month)

# 4. 自定义聚合:价格振幅
print('\n=== 4. 价格振幅分析 ===')

def price_amplitude(group):
    """计算价格振幅(最高价-最低价)/ 均价"""
    high = group['high'].max()
    low = group['low'].min()
    avg_price = group['close'].mean()
    amplitude = (high - low) / avg_price * 100 if avg_price > 0 else 0
    return pd.Series({
        '振幅(%)': amplitude,
        '最高价': high,
        '最低价': low,
        '均价': avg_price
    })

amplitude_by_month = filtered.groupby(['symbol', 'month']).apply(price_amplitude).reset_index()
amplitude_by_month = amplitude_by_month.sort_values(['symbol', 'month'])

print('月度价格振幅:')
print(amplitude_by_month.head(10))

# 找出振幅最大的月份
max_amplitude = amplitude_by_month.loc[
    amplitude_by_month.groupby('symbol')['振幅(%)'].idxmax()
]
print('\n振幅最大的月份:')
print(max_amplitude)
=== 1. 按股票和月份的基本统计 ===
             close                    volume             
             first  last  mean           sum         mean
symbol month                                             
宁波港    1      3.28  3.37  3.28  1.327776e+08   8298598.69
       2      3.38  3.34  3.35  1.508354e+08   7541771.50
       3      3.34  3.32  3.36  2.868147e+08  12470205.00
       4      3.35  3.48  3.43  3.399837e+08  17893879.26
       5      3.49  3.24  3.36  1.010306e+09  50515303.55

=== 2. 交易统计 ===
              交易天数       日均成交量         月总成交量
symbol month                                
宁波港    1        16   8298599.0  1.327776e+08
       2        20   7541772.0  1.508354e+08
       3        23  12470205.0  2.868147e+08
       4        19  17893879.0  3.399837e+08
       5        20  50515304.0  1.010306e+09
       6        20  12558214.0  2.511643e+08
       7        21  11256187.0  2.363799e+08
       8        23  14739590.0  3.390106e+08
       9        20  15133539.0  3.026708e+08
       10       17  12737156.0  2.165317e+08

=== 3. 活跃月份识别 ===
各股票成交量最大的月份:
   symbol  month        volume
4     宁波港      5  1.010306e+09
15   宁波银行      4  8.424915e+08
35   贵州茅台     12  5.560198e+07

=== 4. 价格振幅分析 ===
月度价格振幅:
  symbol  month      振幅(%)     最高价     最低价        均价
0    宁波港      1   4.749337  3.3862  3.2302  3.284669
1    宁波港      2   2.192295  3.3862  3.3128  3.348090
2    宁波港      3   4.646940  3.4412  3.2852  3.357048
3    宁波港      4   6.418870  3.5422  3.3219  3.432068
4    宁波港      5  14.462245  3.7074  3.2210  3.363240
5    宁波港      6   5.538913  3.3024  3.1236  3.228070
6    宁波港      7   5.793231  3.3683  3.1801  3.248619
7    宁波港      8   6.938901  3.3777  3.1519  3.254117
8    宁波港      9   4.360241  3.3118  3.1707  3.236060
9    宁波港     10   7.706393  3.2930  3.0484  3.173988

振幅最大的月份:
   symbol  month      振幅(%)        最高价        最低价           均价
4     宁波港      5  14.462245     3.7074     3.2210     3.363240
18   宁波银行      7  22.051447    27.5764    22.3721    23.600719
30   贵州茅台      7  13.945314  1779.4857  1554.1761  1615.665271

关键要点: - 多级分组使用列表指定分组键 - 自定义聚合函数通过 apply 实现 - idxmax() / idxmin() 找出极值位置 - 命名聚合清晰描述结果 - 振幅衡量价格波动程度


7.7.3 习题 10.3: 组变换与过滤

问题描述

使用组变换和组过滤操作:

  1. 计算滚动统计量(移动平均、标准差)
  2. 计算每组内的排名和百分位
  3. 过滤出满足特定条件的组
  4. 使用 transform 填充缺失值

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
stock_data = pd.concat([port_prices, bank_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-03-31')]

# 筛选数据
filtered = stock_data.copy()
filtered = filtered.sort_values(['symbol', 'datetime'])

# 1. 滚动统计量
print('=== 1. 滚动统计量 ===')
filtered['MA5'] = filtered.groupby('symbol')['close'].transform(
    lambda x: x.rolling(5).mean()
)
filtered['rolling_std'] = filtered.groupby('symbol')['close'].transform(
    lambda x: x.rolling(10).std()
)

print('带滚动统计的数据示例:')
print(filtered[['datetime', 'symbol', 'close', 'MA5', 'rolling_std']].head(10))

# 2. 组内排名
print('\n=== 2. 组内排名 ===')
filtered['daily_rank'] = filtered.groupby('symbol')['close'].rank()
filtered['daily_pct_rank'] = filtered.groupby('symbol')['close'].rank(pct=True)

print('各股票收盘价排名(前10天):')
print(filtered[['datetime', 'symbol', 'close', 'daily_rank', 'daily_pct_rank']].head(10))

# 3. 组过滤
print('\n=== 3. 组过滤 ===')

# 定义过滤函数:只保留平均价格大于10元的组
def filter_by_avg_price(group):
    return group['close'].mean() > 10

# 使用 filter
filtered_groups = filtered.groupby('symbol').filter(filter_by_avg_price)

print(f'过滤前形状: {filtered.shape}')
print(f'过滤后形状: {filtered_groups.shape}')
print(f'保留的股票: {filtered_groups["symbol"].unique().tolist()}')

# 4. Transform 填充缺失值
print('\n=== 4. Transform 填充缺失值 ===')

# 创建一些缺失值演示
data_with_na = filtered.copy()
data_with_na.loc[data_with_na.index[10:15], 'close'] = np.nan

# 使用组均值填充
data_with_na['close_filled'] = data_with_na.groupby('symbol')['close'].transform(
    lambda x: x.fillna(x.mean())
)

print('缺失值填充结果:')
print(data_with_na[['symbol', 'close', 'close_filled']].iloc[8:18])

# 比较填充前后的统计
print('\n填充效果验证:')
print(f'填充前缺失: {data_with_na["close"].isna().sum()}')
print(f'填充后缺失: {data_with_na["close_filled"].isna().sum()}')
=== 1. 滚动统计量 ===
带滚动统计的数据示例:
       datetime symbol   close      MA5  rolling_std
2981 2023-01-03    宁波港  3.2761      NaN          NaN
2982 2023-01-04    宁波港  3.2944      NaN          NaN
2983 2023-01-05    宁波港  3.2852      NaN          NaN
2984 2023-01-06    宁波港  3.2669      NaN          NaN
2985 2023-01-09    宁波港  3.2669  3.27790          NaN
2986 2023-01-10    宁波港  3.2485  3.27238          NaN
2987 2023-01-11    宁波港  3.2394  3.26138          NaN
2988 2023-01-12    宁波港  3.2302  3.25038          NaN
2989 2023-01-13    宁波港  3.2577  3.24854          NaN
2990 2023-01-16    宁波港  3.2761  3.25038     0.020305

=== 2. 组内排名 ===
各股票收盘价排名(前10天):
       datetime symbol   close  daily_rank  daily_pct_rank
2981 2023-01-03    宁波港  3.2761         8.0        0.135593
2982 2023-01-04    宁波港  3.2944        12.0        0.203390
2983 2023-01-05    宁波港  3.2852        10.5        0.177966
2984 2023-01-06    宁波港  3.2669         5.5        0.093220
2985 2023-01-09    宁波港  3.2669         5.5        0.093220
2986 2023-01-10    宁波港  3.2485         3.0        0.050847
2987 2023-01-11    宁波港  3.2394         2.0        0.033898
2988 2023-01-12    宁波港  3.2302         1.0        0.016949
2989 2023-01-13    宁波港  3.2577         4.0        0.067797
2990 2023-01-16    宁波港  3.2761         8.0        0.135593

=== 3. 组过滤 ===
过滤前形状: (118, 14)
过滤后形状: (59, 14)
保留的股票: ['宁波银行']

=== 4. Transform 填充缺失值 ===
缺失值填充结果:
     symbol   close  close_filled
2989    宁波港  3.2577      3.257700
2990    宁波港  3.2761      3.276100
2991    宁波港     NaN      3.336724
2992    宁波港     NaN      3.336724
2993    宁波港     NaN      3.336724
2994    宁波港     NaN      3.336724
2995    宁波港     NaN      3.336724
2996    宁波港  3.3678      3.367800
2997    宁波港  3.3770      3.377000
2998    宁波港  3.3678      3.367800

填充效果验证:
填充前缺失: 5
填充后缺失: 0

关键要点: - transform 保持原始数据形状 - filter 根据组条件筛选整个组 - rank() 计算组内排名 - pct=True 计算百分位排名 - 组内填充避免使用其他组的数据


7.7.4 习题 10.4: 分组应用分析

问题描述

使用 apply 进行复杂的组操作:

  1. 计算每只股票的最大回撤
  2. 找出连续上涨/下跌的最长天数
  3. 计算收益率的偏度和峰度
  4. 识别价格突变点

实证风险评估:最大回撤与高阶矩的统计意义

在评估量化策略或单只绩优股(如“宁波银行”)的风险时,单纯的均值-方差分析往往是不够的。

  1. 最大回撤 (MDD) 的逻辑:最大回撤衡量了从资产历史净值最高点(峰值)到最低点(谷值)的极端损失幅度。在行为金融学中,它是投资者心理承压能力的关键指标。
  2. 偏度 (Skewness) 与“黑天鹅”:正偏代表收益分布有较长的右尾(暴利机会),负偏则意味着分布有较长的左尾(极端亏损风险)。
  3. 峰度 (Kurtosis) 与“厚尾”:金融资产(尤其是 A 股)的收益率往往呈现“尖峰厚尾”特征,即峰度显著大于 0。这意味着极端行情出现的概率远高于正态分布的预测值,这对风险管理提出了更严苛的要求。

完整解答

import pandas as pd
import numpy as np
from scipy import stats

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
stock_data = pd.concat([port_prices, bank_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-12-31')]

# 筛选数据
filtered = stock_data.copy()
filtered = filtered.sort_values(['symbol', 'datetime'])
filtered.set_index('datetime', inplace=True)

# 1. 最大回撤计算
print('=== 1. 最大回撤分析 ===')

def calculate_max_drawdown(prices):
    """计算最大回撤"""
    cumulative_max = prices.cummax()
    drawdown = (prices - cumulative_max) / cumulative_max
    return drawdown.min()

max_dd = filtered.groupby('symbol')['close'].apply(calculate_max_drawdown)
print('最大回撤:')
print(max_dd.map(lambda x: f'{x:.2%}'))

# 2. 连续上涨/下跌天数
print('\n=== 2. 连续涨跌天数 ===')

def consecutive_days(group, col='returns', direction='up'):
    """计算连续上涨或下跌的最大天数"""
    if direction == 'up':
        condition = group[col] > 0
    else:
        condition = group[col] < 0

    # 计算连续True的数量
    consecutive = condition.astype(int)
    consecutive['consecutive_count'] = consecutive.groupby(
        (consecutive != consecutive.shift()).cumsum()
    ).cumsum()

    return consecutive['consecutive_count'].max()

# 计算收益率
returns_df = filtered.groupby('symbol')['close'].pct_change()

# 对每只股票计算
for symbol in filtered['symbol'].unique():
    symbol_returns = returns_df[filtered['symbol'] == symbol]
    max_up = (symbol_returns > 0).astype(int).groupby(
        (symbol_returns > 0) != (symbol_returns > 0).shift()
    ).cumsum().max()
    max_down = (symbol_returns < 0).astype(int).groupby(
        (symbol_returns < 0) != (symbol_returns < 0).shift()
    ).cumsum().max()

    print(f'{symbol}: 最长连续上涨{max_up:.0f}天, 最长连续下跌{max_down:.0f}天')

# 3. 偏度和峰度
print('\n=== 3. 收益率分布形状 ===')

returns_stats = filtered.groupby('symbol')['close'].agg([
    ('偏度', lambda x: stats.skew(x.pct_change().dropna())),
    ('峰度', lambda x: stats.kurtosis(x.pct_change().dropna()))
])
print(returns_stats)

# 4. 价格突变点
print('\n=== 4. 价格突变检测 ===')

def detect_price_jumps(group, threshold=0.05):
    """检测价格突变(收益率超过阈值)"""
    returns = group['close'].pct_change()
    jumps = returns[returns.abs() > threshold]
    return len(jumps), jumps.index.tolist()

jump_analysis = filtered.groupby('symbol').apply(
    lambda g: detect_price_jumps(g, threshold=0.03)
)

print('价格突变次数:')
for symbol, (count, dates) in jump_analysis.items():
    print(f'{symbol}: {count}次')
    if count > 0:
        print(f'  突变日期: {[str(d) for d in dates[:3]]}...')
=== 1. 最大回撤分析 ===
最大回撤:
symbol
宁波港     -13.90%
宁波银行    -41.99%
Name: close, dtype: object

=== 2. 连续涨跌天数 ===
宁波港: 最长连续上涨56天, 最长连续下跌52天
宁波银行: 最长连续上涨61天, 最长连续下跌88天

=== 3. 收益率分布形状 ===
              偏度        峰度
symbol                    
宁波港    -0.047406  0.295470
宁波银行    0.925386  3.231664

=== 4. 价格突变检测 ===
价格突变次数:
宁波港: 0次
宁波银行: 24次
  突变日期: ['2023-01-04 00:00:00', '2023-01-11 00:00:00', '2023-02-20 00:00:00']...

关键要点: - apply 适合复杂的组操作 - 最大回撤是重要的风险指标 - 连续涨跌反映趋势强度 - 偏度和峰度描述分布形状 - 突变检测识别异常波动


7.7.5 习题 10.5: 透视表与交叉表进阶

问题描述

使用宁波港、宁波银行和贵州茅台的数据创建高级透视表:

  1. 创建多维透视表
  2. 使用多个聚合函数
  3. 添加总计和小计
  4. 创建交叉表分析涨跌关系

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
moutai_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})

port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
moutai_prices['symbol'] = '贵州茅台'

stock_data = pd.concat([port_prices, bank_prices, moutai_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-03-31')]

# 筛选数据
symbols_map = {
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行',
    '600519.SH': '贵州茅台'
}
# The original code used stock_data['symbol'].isin(symbols_map.keys())
# but now 'symbol' column already contains mapped names.
# So, we filter by the mapped names directly.
filtered = stock_data[stock_data['symbol'].isin(symbols_map.values())].copy()
# The original code had filtered['symbol'] = filtered['symbol'].map(symbols_map)
# but this is already done during concat and symbol assignment.
# filtered['symbol'] = filtered['symbol'].map(symbols_map) # This line is no longer needed
filtered['returns'] = filtered.groupby('symbol')['close'].pct_change() # Calculate returns per symbol
filtered['month'] = pd.to_datetime(filtered['datetime']).dt.month
filtered['week'] = pd.to_datetime(filtered['datetime']).dt.isocalendar().week

# 添加涨跌标志
filtered['direction'] = pd.cut(filtered['returns'],
                               bins=[-np.inf, 0, np.inf],
                               labels=['下跌', '上涨'])

# 1. 多维透视表
print('=== 1. 多维透视表:股票×月份 ===')
pivot_month = pd.pivot_table(
    filtered,
    values='close',
    index='symbol',
    columns='month',
    aggfunc=['mean', 'std', 'min', 'max']
)
print(pivot_month.round(2))

# 2. 多聚合函数透视表
print('\n=== 2. 多聚合函数透视表 ===')
multi_agg_pivot = pd.pivot_table(
    filtered,
    values=['close', 'volume', 'returns'],
    index='symbol',
    aggfunc={
        'close': ['mean', 'std'],
        'volume': 'sum',
        'returns': ['mean', 'count']
    }
)
print(multi_agg_pivot.round(2))

# 3. 带总计的透视表
print('\n=== 3. 带总计的透视表 ===')
pivot_with_totals = pd.pivot_table(
    filtered,
    values='volume',
    index='symbol',
    columns='month',
    aggfunc='sum',
    margins=True,
    margins_name='总计'
)
print(pivot_with_totals)

# 4. 交叉表:涨跌关系
print('\n=== 4. 交叉表:涨跌天数统计 ===')

# 按股票和月份统计涨跌天数
crosstab_direction = pd.crosstab(
    [filtered['symbol'], filtered['month']],
    filtered['direction'],
    margins=True
)
print(crosstab_direction)

# 5. 归一化交叉表(行百分比)
print('\n=== 5. 涨跌比例(按股票) ===')
crosstab_pct = pd.crosstab(
    filtered['symbol'],
    filtered['direction'],
    normalize='index'
).round(3)
print(crosstab_pct)

# 6. 堆叠柱状图数据
print('\n=== 6. 堆叠柱状图数据准备 ===')
stacked_data = pd.pivot_table(
    filtered,
    values='returns',
    index='month',
    columns='symbol',
    aggfunc='mean'
).round(4)
print('月平均收益率:')
print(stacked_data)
=== 1. 多维透视表:股票×月份 ===
           mean                      std                    min           \
month         1        2        3      1      2      3        1        2   
symbol                                                                     
宁波港        3.28     3.35     3.36   0.04   0.02   0.03     3.23     3.31   
宁波银行      30.37    28.23    25.75   0.52   0.76   0.98    29.38    26.95   
贵州茅台    1670.46  1654.53  1611.97  50.93  23.92  25.59  1562.39  1615.82   

                     max                    
month         3        1        2        3  
symbol                                      
宁波港        3.31     3.37     3.38     3.41  
宁波银行      24.86    31.20    29.67    28.08  
贵州茅台    1566.54  1732.56  1698.24  1665.20  

=== 2. 多聚合函数透视表 ===
          close        returns             volume
           mean    std   count mean           sum
symbol                                           
宁波港        3.33   0.04      58  0.0  5.704277e+08
宁波银行      27.84   2.04      58 -0.0  1.772545e+09
贵州茅台    1642.26  41.74      58  0.0  1.439159e+08

=== 3. 带总计的透视表 ===
month             1            2             3            总计
symbol                                                      
宁波港     132777579.0  150835430.0  2.868147e+08  5.704277e+08
宁波银行    447477998.0  588636725.0  7.364300e+08  1.772545e+09
贵州茅台     44202953.0   52847305.0  4.686565e+07  1.439159e+08
总计      624458530.0  792319460.0  1.070110e+09  2.486888e+09

=== 4. 交叉表:涨跌天数统计 ===
direction      下跌  上涨  All
symbol month              
宁波港    1        8   7   15
       2       12   8   20
       3       16   7   23
宁波银行   1        9   6   15
       2       15   5   20
       3       14   9   23
贵州茅台   1        8   7   15
       2       13   7   20
       3       12  11   23
All           107  67  174

=== 5. 涨跌比例(按股票) ===
direction     下跌     上涨
symbol                 
宁波港        0.621  0.379
宁波银行       0.655  0.345
贵州茅台       0.569  0.431

=== 6. 堆叠柱状图数据准备 ===
月平均收益率:
symbol     宁波港    宁波银行    贵州茅台
month                         
1       0.0019  0.0014  0.0045
2      -0.0004 -0.0051 -0.0008
3      -0.0002 -0.0033  0.0002

关键要点: - 透视表适合多维度汇总 - 多聚合函数提供全面视角 - margins=True 添加总计 - 交叉表专门用于频数统计 - normalize 计算比例 - 透视表结果便于可视化


7.7.6 习题 10.6: 透视表高级应用

问题描述

使用透视表进行深度分析:

  1. 创建收益率的周度热力图数据
  2. 分析不同时间段的表现差异
  3. 比较不同股票的相对表现
  4. 识别最佳投资时机

完整解答

import pandas as pd
import numpy as np

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
moutai_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})

port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
moutai_prices['symbol'] = '贵州茅台'

stock_data = pd.concat([port_prices, bank_prices, moutai_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-12-31')]

# 筛选数据
symbols_map = {
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行',
    '600519.SH': '贵州茅台'
}
filtered = stock_data[stock_data['symbol'].isin(symbols_map.values())].copy()
filtered['returns'] = filtered.groupby('symbol')['close'].pct_change()

# 提取时间特征
filtered['date'] = pd.to_datetime(filtered['datetime']).dt.date
filtered['hour'] = pd.to_datetime(filtered['datetime']).dt.hour
filtered['day_of_week'] = pd.to_datetime(filtered['datetime']).dt.dayofweek
filtered['week'] = pd.to_datetime(filtered['datetime']).dt.isocalendar().week

days_map = {0: '周一', 1: '周二', 2: '周三', 3: '周四', 4: '周五'}
filtered['day_name'] = filtered['day_of_week'].map(days_map)

# 1. 周度热力图数据
print('=== 1. 周×股票收益率热力图 ===')
weekly_heatmap = pd.pivot_table(
    filtered,
    values='returns',
    index='week',
    columns='symbol',
    aggfunc='mean'
).round(4)
print(weekly_heatmap)

# 2. 周内效应分析
print('\n=== 2. 周内效应分析 ===')
weekday_pivot = pd.pivot_table(
    filtered,
    values='returns',
    index='day_name',
    columns='symbol',
    aggfunc='mean'
).round(4)
print(weekday_pivot)

# 3. 相对表现分析
print('\n=== 3. 相对表现矩阵 ===')

# 标准化收益率(z-score)
def z_score(group):
    return (group - group.mean()) / group.std()

normalized_returns = filtered.groupby('symbol')['returns'].transform(z_score)

# 创建相对表现矩阵
relative_performance = pd.pivot_table(
    filtered.assign(norm_returns=normalized_returns),
    values='norm_returns',
    index='week',
    columns='symbol',
    aggfunc='mean'
).round(2)
print(relative_performance)

# 4. 最佳投资时机
print('\n=== 4. 最佳投资时机分析 ===')

# 按周汇总表现
weekly_performance = pd.pivot_table(
    filtered,
    values='returns',
    index='week',
    columns='symbol',
    aggfunc='mean'
)

# 找出表现最好的周
best_week = weekly_performance.idxmax()
print('各股票表现最好的周:')
for symbol in symbols_map.values():
    best_return = weekly_performance[symbol].max()
    best_week_for_stock = weekly_performance[symbol].idxmax()
    print(f'{symbol}: 第{best_week_for_stock}周, 收益率={best_return:.2%}')

# 按星期汇总
weekday_summary = pd.pivot_table(
    filtered,
    values='returns',
    index='day_name',
    columns='symbol',
    aggfunc='mean'
).round(4)

print('\n按星期平均收益率:')
print(weekday_summary)

# 找出最佳交易日
best_day = weekday_summary.idxmax()
print(f'\n最佳交易日: {best_day}')
print(weekday_summary.loc[best_day])
=== 1. 周×股票收益率热力图 ===
symbol     宁波港    宁波银行    贵州茅台
week                          
1      -0.0009  0.0046  0.0142
2      -0.0006  0.0082  0.0092
3       0.0050 -0.0045 -0.0028
5       0.0006 -0.0084 -0.0045
6       0.0000 -0.0026 -0.0008
7      -0.0022 -0.0087  0.0013
8       0.0017 -0.0002 -0.0035
9       0.0038  0.0055  0.0034
10     -0.0054 -0.0205 -0.0076
11      0.0061 -0.0035 -0.0009
12     -0.0038  0.0034  0.0042
13     -0.0016 -0.0028  0.0046
14      0.0048  0.0012 -0.0040
15      0.0022  0.0091 -0.0087
16      0.0032 -0.0115  0.0015
17      0.0000  0.0029  0.0041
18      0.0105  0.0056 -0.0030
19     -0.0083 -0.0025 -0.0050
20     -0.0065 -0.0057 -0.0008
21     -0.0005 -0.0041 -0.0010
22     -0.0007 -0.0007 -0.0023
23      0.0012  0.0072 -0.0005
24     -0.0029 -0.0030  0.0154
25     -0.0058 -0.0116 -0.0115
26      0.0001 -0.0010 -0.0021
27      0.0024 -0.0044 -0.0001
28      0.0000  0.0085  0.0074
29      0.0018 -0.0045  0.0022
30      0.0040  0.0290  0.0139
31      0.0006 -0.0015 -0.0004
32     -0.0051 -0.0040 -0.0063
33      0.0006 -0.0018 -0.0034
34     -0.0035 -0.0036  0.0025
35      0.0012  0.0065  0.0029
36     -0.0023 -0.0005 -0.0035
37      0.0041  0.0001 -0.0040
38      0.0012  0.0025  0.0075
39     -0.0014 -0.0094 -0.0069
41     -0.0029 -0.0146 -0.0053
42     -0.0053 -0.0066 -0.0121
43      0.0043  0.0023  0.0039
44      0.0024  0.0055  0.0158
45      0.0018 -0.0017 -0.0039
46      0.0006  0.0003 -0.0025
47      0.0063 -0.0018  0.0027
48      0.0028 -0.0170 -0.0018
49     -0.0005 -0.0163 -0.0063
50      0.0006 -0.0040 -0.0070
51      0.0011 -0.0080  0.0054
52     -0.0044  0.0055  0.0066

=== 2. 周内效应分析 ===
symbol       宁波港    宁波银行    贵州茅台
day_name                        
周一        0.0014 -0.0022  0.0010
周三       -0.0013 -0.0046 -0.0009
周二       -0.0007 -0.0015  0.0003
周五        0.0003  0.0001 -0.0000
周四        0.0009 -0.0004  0.0005

=== 3. 相对表现矩阵 ===
symbol   宁波港  宁波银行  贵州茅台
week                    
1      -0.14  0.35  1.09
2      -0.09  0.55  0.69
3       0.65 -0.15 -0.23
5       0.06 -0.37 -0.36
6      -0.01 -0.05 -0.08
7      -0.30 -0.39  0.09
8       0.20  0.08 -0.28
9       0.49  0.40  0.25
10     -0.73 -1.05 -0.60
11      0.78 -0.10 -0.08
12     -0.51  0.29  0.31
13     -0.23 -0.06  0.34
14      0.62  0.16 -0.32
15      0.27  0.60 -0.69
16      0.41 -0.55  0.10
17     -0.02  0.26  0.30
18      1.37  0.40 -0.25
19     -1.11 -0.04 -0.40
20     -0.87 -0.22 -0.08
21     -0.09 -0.13 -0.09
22     -0.10  0.06 -0.19
23      0.14  0.50 -0.06
24     -0.39 -0.07  1.17
25     -0.78 -0.55 -0.90
26     -0.01  0.04 -0.18
27      0.30 -0.15 -0.02
28     -0.01  0.57  0.55
29      0.22 -0.15  0.15
30      0.51  1.71  1.06
31      0.06  0.01 -0.04
32     -0.69 -0.13 -0.50
33      0.07 -0.00 -0.28
34     -0.47 -0.11  0.18
35      0.14  0.46  0.21
36     -0.32  0.07 -0.29
37      0.53  0.10 -0.32
38      0.14  0.23  0.57
39     -0.21 -0.43 -0.55
41     -0.40 -0.71 -0.42
42     -0.72 -0.27 -0.95
43      0.55  0.22  0.28
44      0.30  0.40  1.21
45      0.21 -0.00 -0.32
46      0.06  0.11 -0.21
47      0.81 -0.01  0.19
48      0.35 -0.85 -0.16
49     -0.08 -0.81 -0.50
50      0.06 -0.13 -0.56
51      0.13 -0.35  0.40
52     -0.60  0.40  0.50

=== 4. 最佳投资时机分析 ===
各股票表现最好的周:
宁波港: 第18周, 收益率=1.05%
宁波银行: 第30周, 收益率=2.90%
贵州茅台: 第44周, 收益率=1.58%

按星期平均收益率:
symbol       宁波港    宁波银行    贵州茅台
day_name                        
周一        0.0014 -0.0022  0.0010
周三       -0.0013 -0.0046 -0.0009
周二       -0.0007 -0.0015  0.0003
周五        0.0003  0.0001 -0.0000
周四        0.0009 -0.0004  0.0005

最佳交易日: symbol
宁波港     周一
宁波银行    周五
贵州茅台    周一
dtype: object
symbol       宁波港    宁波银行   贵州茅台
day_name                       
周一        0.0014 -0.0022  0.001
周五        0.0003  0.0001 -0.000
周一        0.0014 -0.0022  0.001

关键要点: - 透视表创建热力图数据 - 时间特征提取很重要 - 相对表现使用z-score标准化 - 识别规律性(周内效应) - 多角度分析投资时机


7.7.7 习题 10.7: 综合数据聚合项目

问题描述

构建一个完整的股票分析报告,整合各种聚合操作:

  1. 创建数据质量报告
  2. 生成各股票的绩效指标
  3. 构建综合评分系统
  4. 输出格式化的分析报告

完整解答

import pandas as pd
import numpy as np
from scipy import stats

print('=== 股票分析报告生成器 ===\n')

# 从本地 Parquet 文件读取数据代替 HDF5
port_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
bank_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '002142.XSHE')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
moutai_prices = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '600519.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})

port_prices['symbol'] = '宁波港'
bank_prices['symbol'] = '宁波银行'
moutai_prices['symbol'] = '贵州茅台'

stock_data = pd.concat([port_prices, bank_prices, moutai_prices])
stock_data['datetime'] = pd.to_datetime(stock_data['trade_date'], format='%Y%m%d')
stock_data = stock_data[(stock_data['datetime'] >= '2023-01-01') & (stock_data['datetime'] <= '2023-12-31')]

# 筛选数据
symbols_map = {
    '601018.SH': '宁波港',
    '002142.SZ': '宁波银行',
    '600519.SH': '贵州茅台'
}
filtered = stock_data[stock_data['symbol'].isin(symbols_map.values())].copy()
filtered = filtered.sort_values(['symbol', 'datetime'])
# filtered.set_index('datetime', inplace=True) # This line is not needed for the subsequent operations and might cause issues if datetime is used as a column later.

# ============================================================================
# 1. 数据质量报告
# ============================================================================
print('[1/5] 数据质量检查')

quality_report = filtered.groupby('symbol').agg({
    'datetime': ['count', 'min', 'max'],
    'close': lambda x: x.isna().sum(),
    'volume': lambda x: x.isna().sum()
})
quality_report.columns = ['数据点数', '开始日期', '结束日期', '收盘价缺失', '成交量缺失']
print(quality_report)

# ============================================================================
# 2. 绩效指标计算
# ============================================================================
print('\n[2/5] 绩效指标计算')

def calculate_performance_metrics(group):
    """计算综合绩效指标"""
    returns = group['close'].pct_change().dropna()

    # 基本收益指标
    total_return = (group['close'].iloc[-1] / group['close'].iloc[0] - 1)
    annual_return = (1 + total_return) ** (252 / len(group)) - 1
    volatility = returns.std() * np.sqrt(252)

    # 风险调整收益
    sharpe_ratio = annual_return / volatility if volatility > 0 else 0
    sortino_ratio = annual_return / (returns[returns < 0].std() * np.sqrt(252)) if len(returns[returns < 0]) > 0 else 0

    # 最大回撤
    cumulative_max = group['close'].cummax()
    drawdown = (group['close'] - cumulative_max) / cumulative_max
    max_drawdown = drawdown.min()

    # 统计特性
    skewness = stats.skew(returns)
    kurtosis = stats.kurtosis(returns)

    return pd.Series({
        '总收益率': total_return,
        '年化收益率': annual_return,
        '年化波动率': volatility,
        '夏普比率': sharpe_ratio,
        '索提诺比率': sortino_ratio,
        '最大回撤': max_drawdown,
        '收益偏度': skewness,
        '收益峰度': kurtosis,
        '胜率(%)': (returns > 0).mean() * 100
    })

performance = filtered.groupby('symbol').apply(calculate_performance_metrics)
print(performance.round(4))

# ============================================================================
# 3. 综合评分系统
# ============================================================================
print('\n[3/5] 综合评分')

def calculate_score(metrics):
    """根据绩效指标计算综合评分"""
    score = 0

    # 收益得分(40%)
    if metrics['年化收益率'] > 0:
        score += 40

    # 夏普比率得分(30%)
    if metrics['夏普比率'] > 1:
        score += 30
    elif metrics['夏普比率'] > 0.5:
        score += 20
    elif metrics['夏普比率'] > 0:
        score += 10

    # 回撤控制得分(20%)
    if metrics['最大回撤'] > -0.05:
        score += 20
    elif metrics['最大回撤'] > -0.1:
        score += 10

    # 胜率得分(10%)
    if metrics['胜率(%)'] > 55:
        score += 10
    elif metrics['胜率(%)'] > 50:
        score += 5

    return score

scores = performance.apply(calculate_score, axis=1)
ranking = scores.sort_values(ascending=False)

print('综合评分排名:')
for stock, score in ranking.items():
    print(f'{stock}: {score:.0f}分')

# ============================================================================
# 4. 推荐建议
# ============================================================================
print('\n[4/5] 投资建议')

best_stock = ranking.idxmax()
worst_stock = ranking.idxmin()

print(f'最佳选择: {best_stock}')
print(f'  理由: 综合得分最高({ranking[best_stock]:.0f}分)')
print(f'  年化收益率: {performance.loc[best_stock, "年化收益率"]:.2%}')
print(f'  夏普比率: {performance.loc[best_stock, "夏普比率"]:.2f}')

print(f'\n谨慎对待: {worst_stock}')
print(f'  理由: 综合得分最低({ranking[worst_stock]:.0f}分)')
print(f'  最大回撤: {performance.loc[worst_stock, "最大回撤"]:.2%}')

# ============================================================================
# 5. 风险提示
# ============================================================================
print('\n[5/5] 风险提示')

print('风险分析:')
for symbol in symbols_map.values():
    print(f'\n{symbol}:')
    max_dd = performance.loc[symbol, '最大回撤']
    if max_dd < -0.2:
        print(f'  ⚠️ 高风险: 最大回撤达到{max_dd:.2%}')
    elif max_dd < -0.1:
        print(f'  ⚠️ 中等风险: 最大回撤{max_dd:.2%}')
    else:
        print(f'  ✓ 风险可控: 最大回撤{max_dd:.2%}')

    volatility = performance.loc[symbol, '年化波动率']
    print(f'  年化波动率: {volatility:.2%}')

print('\n=== 报告生成完成 ===')
=== 股票分析报告生成器 ===

[1/5] 数据质量检查
        数据点数       开始日期       结束日期  收盘价缺失  成交量缺失
symbol                                          
宁波港      242 2023-01-03 2023-12-29      0      0
宁波银行     242 2023-01-03 2023-12-29      0      0
贵州茅台     242 2023-01-03 2023-12-29      0      0

[2/5] 绩效指标计算
          总收益率   年化收益率   年化波动率    夏普比率   索提诺比率    最大回撤    收益偏度    收益峰度  \
symbol                                                                   
宁波港     0.0224  0.0233  0.1207  0.1931  0.3256 -0.1390 -0.0474  0.2955   
宁波银行   -0.3628 -0.3745  0.2854 -1.3125 -2.3640 -0.4199  0.9254  3.2317   
贵州茅台    0.0247  0.0257  0.2054  0.1252  0.2084 -0.1485  0.4890  3.0720   

          胜率(%)  
symbol           
宁波港     40.6639  
宁波银行    38.1743  
贵州茅台    46.8880  

[3/5] 综合评分
综合评分排名:
宁波港: 50分
贵州茅台: 50分
宁波银行: 0分

[4/5] 投资建议
最佳选择: 宁波港
  理由: 综合得分最高(50分)
  年化收益率: 2.33%
  夏普比率: 0.19

谨慎对待: 宁波银行
  理由: 综合得分最低(0分)
  最大回撤: -41.99%

[5/5] 风险提示
风险分析:

宁波港:
  ⚠️ 中等风险: 最大回撤-13.90%
  年化波动率: 12.07%

宁波银行:
  ⚠️ 高风险: 最大回撤达到-41.99%
  年化波动率: 28.54%

贵州茅台:
  ⚠️ 中等风险: 最大回撤-14.85%
  年化波动率: 20.54%

=== 报告生成完成 ===

关键要点: - 整合多种聚合操作 - 多维度评估绩效 - 构建评分系统 - 生成可操作的建议 - 注意风险提示

7.8 结论

对于利用python进行数据分析而言,掌握 pandas 的数据分组工具是一项不可或缺的技能。“拆分-应用-合并”策略为将复杂的数据操作问题分解为可管理的步骤提供了一个强大的心智模型。无论是执行简单的聚合、复杂的组内转换,还是创建富有洞察力的透视表,groupby 功能都是驱动大量数据清洗、建模和统计分析工作的核心引擎。


7.9 延伸阅读

为了进一步深化您在数据聚合和分组运算方面的理解,我们推荐以下精选资源:

7.9.1 pandas分组聚合官方文档

1. Group By: split-apply-combine (官方指南) - 链接: https://pandas.pydata.org/docs/user_guide/groupby.html - 说明: pandas官方的分组操作完整指南,包含: - 分组的基础概念和语法 - 各种聚合函数的使用方法 - 组内变换(transform)和过滤(filter) - 自定义聚合函数的应用

2. pandas Cookbook: Grouping (官方食谱) - 链接: https://pandas.pydata.org/docs/user_guide/cookbook.html#grouping - 说明: 实用的代码片段合集,解决常见的分组问题。

7.9.2 数据聚合理论

3. The Split-Apply-Combine Strategy for Data Analysis (论文) - 作者: Hadley Wickham - 期刊: Journal of Statistical Software - 链接: https://www.jstatsoft.org/article/view/v040i02 - 说明: 系统性地阐述”拆分-应用-合并”策略的理论基础,是理解数据分组操作的经典文献。

4. Data Aggregation: A Survey (研究综述) - 作者: R. L. Grossman, et al. - 链接: https://arxiv.org/abs/1905.00001 - 说明: 综述了数据聚合的各种方法和技术,包含理论分析和实际应用。

7.9.3 高级分组操作

5. Effective Pandas: Boost Data Workflow Efficiency (课程) - 平台: DataCamp - 链接: https://www.datacamp.com/courses/effective-pandas - 说明: 专注于pandas的高效使用模式,包含分组操作的最佳实践。

6. Modern Pandas: Tackling Big Data (教程) - 链接: https://towardsdatascience.com/modern-pandas-9b4c4c4b56c8 - 说明: 讲解如何使用pandas处理大规模数据集的技巧,包括优化的分组运算。

7.9.4 透视表与数据重塑

7. Pivot Tables in Excel (参考指南) - 链接: https://support.microsoft.com/en-us/office/pivot-tables - 说明: 对Excel透视表熟悉的读者,这个帮助理解pandas的pivot_table函数的作用。

8. Reshaping in pandas: Pivot, Stack, and Melt (教程) - 链接: https://pbpython.com/articles/pandas-reshaping-pivot-stack-melt - 说明: 详细的pandas数据重塑教程,包含大量实例和对比分析。

7.9.5 统计与数据分析

9. Python for Data Analysis, 3rd Edition (第9章) - 作者: Wes McKinney - 出版社: O’Reilly - 说明: pandas创始人编写的权威教材,第9章深入讨论了数据聚合和分组操作。

10. Practical Statistics for Data Scientists (书籍) - 作者: Peter Bruce & Andrew Bruce - 出版社: O’Reilly - 说明: 实用导向的统计学教程,包含大量使用pandas进行统计分析的案例。

7.9.6 在线资源与社区

11. Stack Overflow - pandas Tag - 链接: https://stackoverflow.com/questions/tagged/pandas - 说明: 当遇到具体的分组聚合问题时,Stack Overflow的pandas标签是寻求帮助的最佳社区资源。

12. GitHub Awesome Pandas Resources - 链接: https://github.com/tomaugspurger/pandas-exercises - 说明: 收集了大量pandas练习和教程,适合系统性地提升数据聚合技能。