3  NumPy 基础

3.1 引言与学习目标

学习目标

通过本章学习,你应该能够:

  • 理论目标
    • 深刻理解 NumPy ndarray 的数学基础,包括内存布局、步长(stride)和视图机制
    • 掌握向量化计算的数学原理,理解为什么向量化比循环快几个数量级
    • 理解广播机制的数学形式化定义和满足条件
    • 掌握NumPy中线性代数运算的矩阵表示和计算复杂度
  • 实践目标
    • 熟练使用 NumPy 进行大规模金融数据的高效计算
    • 运用向量化操作实现技术分析指标(如移动平均、收益率计算)
    • 使用NumPy实现蒙特卡洛模拟和期权定价
    • 掌握使用NumPy进行多维数据的索引、切片和布尔过滤
  • 应用目标
    • 能够使用 NumPy 从本地金融数据源读取和处理大规模股票市场数据
    • 实现基于随机游走理论的市场模拟和回测框架
    • 运用NumPy的线性代数功能进行投资组合优化和风险分析

NumPy,全称 Numerical Python,是 Python 中用于数值计算最重要的基础包之一。对于经济学和金融学的学生来说,掌握 NumPy 不仅仅是一项技术练习;它是开启现代经济分析中强大定量工具的钥匙,从计量经济建模到金融工程。许多提供科学计算功能的包,如 pandasstatsmodels,都使用 NumPy 的数组对象作为数据交换的标准”通用语言”之一。我们在 章节 3 中涵盖的关于 NumPy 的大部分知识也同样适用于这些更高级的库。

以下是您将在 NumPy 中找到的一些核心功能:

  • ndarray: 一种高效的多维数组,提供快速的面向数组的算术运算和灵活的广播(broadcasting)功能。
  • 数学函数: 用于对整个数据数组进行快速运算,而无需编写显式的 Python 循环。
  • 数据 I/O: 用于在磁盘上读/写数组数据以及处理内存映射文件的工具。
  • 线性代数和随机数生成: 提供全面的线性代数运算、随机数生成和傅里叶变换功能。
  • C API: 一个文档齐全的 C 语言 API,用于将 NumPy 与 C、C++ 或 FORTRAN 编写的库连接起来。这一特性是 Python 科学计算生态系统的基石,它使得高性能的遗留代码可以被封装在一个动态、易于访问的 Python 接口中。

虽然 NumPy 本身不提供高级的数据建模功能,但对其数组和面向数组的计算范式的深刻理解,是高效使用像 pandas 这类工具的先决条件。

NumPy 在经济学和金融学中的核心地位

作为经济学专业的学生,你可能会问:为什么我们要从如此底层的库学起?答案很简单:NumPy 是整个 Python 数据科学生态系统的基石。你将来要用到的几乎所有计量经济学、金融建模和机器学习库(例如 pandas, statsmodels, scikit-learn)的底层数据结构都是 NumPy 数组。理解 NumPy 的运行机制——特别是其对内存的有效管理和向量化计算的理念——能够让你写出运行速度快几个数量级的代码。在处理大规模经济数据(如面板数据、高频交易数据)时,这种效率的差异是决定性的。

3.1.1 理论基础:向量化计算的数学原理

向量化计算的理论基础

向量化计算的核心思想是将标量操作(scalar operations)转换为向量操作(vector operations),从而避免显式循环。

从数学角度,向量化可以形式化地定义为:

给定: - 向量 \(\mathbf{x} \in \mathbb{R}^n\) - 标量 \(c \in \mathbb{R}\) - 一元函数 \(f: \mathbb{R} \to \mathbb{R}\)

逐元素操作定义为: \[ f(\mathbf{x})_i = f(x_i), \quad \forall i \in \{1, 2, ..., n\} \]

标量广播定义为: \[ (c \odot \mathbf{x})_i = c \cdot x_i, \quad \forall i \in \{1, 2, ..., n\} \]

其中 \(\odot\) 表示广播乘法运算。

计算复杂度对比

操作类型 Python循环 NumPy向量化 加速比
元素级加法 O(n) 解释执行 O(n) 编译执行 10-100×
元素级乘法 O(n) 解释执行 O(n) 编译执行 10-100×
矩阵乘法 O(n³) 嵌套循环 O(n³) 优化BLAS 5-50×
两个向量点积 O(n) 嵌套循环 O(n) 单个SIMD 20-200×

其中SIMD(Single Instruction, Multiple Data)是现代CPU的并行计算特性,NumPy的向量化操作充分利用了这一点。

NumPy 之所以对数值计算如此重要,其主要原因之一是它为处理大型数据数组而设计的高效率。这种效率源于几个关键的架构决策:

  • 内存布局: NumPy 在内部将数据存储在连续的内存块中,独立于其他 Python 内置对象。其用 C 语言编写的算法库可以直接在这块内存上操作,无需进行任何类型检查或与 Python 动态类型相关的其他开销。因此,NumPy 数组比内置的 Python 序列使用更少的内存。
  • 向量化操作: NumPy 允许对整个数组进行复杂的计算,而无需显式的 for 循环。这种被称为向量化的实践,将循环操作委托给了高度优化的、预编译的 C 代码,其速度比解释执行的 Python 代码快几个数量级。

为了说明性能差异,我们来比较一个包含一百万个整数的 NumPy 数组和一个等效的 Python 列表。

列表 3.1
import numpy as np

price_array = np.arange(1_000_000)
price_list = list(range(1_000_000))

print('NumPy 数组乘以 2:')
%timeit price_array_2 = price_array * 2

print('\nPython 列表乘以 2:')
%timeit price_list_2 = [x * 2 for x in price_list]
NumPy 数组乘以 2:
3.61 ms ± 128 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Python 列表乘以 2:
90 ms ± 5.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

正如 列表 3.1 中的计时结果所示,基于 NumPy 的算法通常比纯 Python 的实现快 10 到 100 倍(甚至更多),并且使用的内存也显著减少。

3.2 NumPy ndarray:一个多维数组对象

NumPy 的一个关键特性是其 N 维数组对象,即 ndarray。它是一个快速、灵活的容器,适用于 Python 中的大型数据集,允许您使用类似于标量元素之间等效运算的语法,对整块数据执行数学运算。

3.2.1 ndarray 对象内部机制

内存布局的数学表示

从数学角度,NumPy数组可以定义为五元组: \[ \text{ndarray} = (D, S, T, \mathbf{s}, P) \]

其中: - \(D \subseteq \mathbb{R}^N\) 是数据域(\(N\) 个元素) - \(S \in \mathbb{N}^k\) 是形状(\(k\) 维度) - \(T\) 是数据类型(dtype) - \(\mathbf{s} \in \mathbb{N}^k\) 是步长(strides)向量 - \(P \in \mathbb{N}\) 是数据指针(内存地址)

线性索引公式

给定一个数组 \(A\),其形状为 \(S = (n_0, n_1, ..., n_{k-1})\),步长为 \(\mathbf{s} = (s_0, s_1, ..., s_{k-1})\),索引 \(\mathbf{i} = (i_0, i_1, ..., i_{k-1})\) 对应的线性地址为:

\[ \text{offset}(\mathbf{i}) = P + \sum_{j=0}^{k-1} i_j \cdot s_j \]

其中 \(0 \leq i_j < n_j\)

行优先序(Row-Major Order)

NumPy默认使用C风格的行优先序,即最后一个维度变化最快: \[ s_{k-1} = \text{sizeof}(T) \] \[ s_{j} = s_{j+1} \cdot n_{j+1}, \quad j = 0, 1, ..., k-2 \]

例如,一个 \((3, 4, 5)\) 形状的 float64 数组,其步长为: \[ \mathbf{s} = (160, 40, 8) \]

视图的数学定义

给定一个ndarray \(A = (D, S, T, \mathbf{s}, P)\),一个视图(view)是通过创建新的元组 \((D, S', T', \mathbf{s}', P)\) 但共享相同数据域 \(D\) 的操作。

切片操作 \(A[\text{slice}]\) 返回一个视图,满足: \[ P' = P + \text{offset}(\text{slice}) \] \[ D' = \{A[\mathbf{i}] \mid \mathbf{i} \in \text{slice}\} \]

要真正理解 ndarray 为何如此高效,我们必须超越其 Python 接口,深入其内部。NumPy ndarray 提供了一种将一块同质类型的数据(连续或跨步的内存块)解释为多维数组对象的方式。

ndarray 之所以如此灵活,部分原因在于每个数组对象都是数据块上的一个跨步视图(strided view)。例如,像 arr[::2, ::-1] 这样的数组视图不会复制任何数据。原因是 ndarray 不仅仅是一块内存;它还包含了跨步信息,使数组能够以不同的步长在内存中移动。更精确地说,ndarray 内部由以下部分组成:

  • 一个指向数据的指针——即指向内存(RAM)或内存映射文件中的数据块。
  • 数据类型dtype,描述数组中固定大小的值单元。
  • 一个表示数组形状的元组。
  • 一个跨度(strides)元组——整数元组,指示为了沿某个维度前进一个元素需要“跨过”的字节数。
import matplotlib.pyplot as plt
import matplotlib.patches as patches

fig, ax = plt.subplots(figsize=(10, 6))
ax.set_xlim(0, 10)
ax.set_ylim(0, 7)
ax.axis('off')

# Main ndarray object
ax.add_patch(patches.FancyBboxPatch((1, 4.5), 8, 2, boxstyle='round,pad=0.1', fc='lightblue', ec='black'))
ax.text(5, 5.5, 'NumPy ndarray Object', ha='center', va='center', fontsize=16, fontweight='bold')

# Data Pointer
ax.add_patch(patches.Rectangle((1, 0.5), 2.5, 2.5, fc='lightgreen', ec='black'))
ax.text(2.25, 1.75, 'Data Pointer\n(to memory block)', ha='center', va='center', fontsize=12)
ax.add_patch(patches.Arrow(5, 4.5, 0, -1.2, width=0.3, color='gray'))

# Dtype
ax.add_patch(patches.Rectangle((4, 2.5), 2, 1, fc='lightcoral', ec='black'))
ax.text(5, 3, 'dtype: float64', ha='center', va='center', fontsize=12)
ax.add_patch(patches.Arrow(3.5, 4.5, 1, -0.8, width=0.2, color='gray'))


# Shape
ax.add_patch(patches.Rectangle((7, 2.5), 2, 1, fc='lightgoldenrodyellow', ec='black'))
ax.text(8, 3, 'shape: (10, 5)', ha='center', va='center', fontsize=12)
ax.add_patch(patches.Arrow(6.5, 4.5, 1, -0.8, width=0.2, color='gray'))

# Strides
ax.add_patch(patches.Rectangle((7, 0.5), 2, 1, fc='plum', ec='black'))
ax.text(8, 1, 'strides: (40, 8)', ha='center', va='center', fontsize=12)
ax.add_patch(patches.Arrow(5, 4.5, 2.5, -2.8, width=0.2, color='gray'))

ax.set_title('ndarray 内部结构示意图', fontsize=18, pad=20)
plt.show()
图 3.1: NumPy ndarray 对象的内部结构

例如,一个 10 × 5 的数组其形状为 (10, 5)。

np.ones((10, 5)).shape
(10, 5)

一个典型的 C 顺序(行主序)的 3 × 4 × 5 的 float64(8字节)数组,其跨度为 (160, 40, 8)。这个信息对性能至关重要,因为特定轴上的跨度越大,意味着沿着该轴进行计算的成本越高。

np.ones((3, 4, 5), dtype=np.float64).strides
(160, 40, 8)

虽然大多数用户不会直接与跨度(strides)交互,但它们是“零拷贝”视图机制的基础。跨度甚至可以是负数,这使得数组能够“向后”移动通过内存,例如在 obj[::-1] 这样的切片中。

3.2.2 创建 ndarray

为了让您体验 NumPy 如何实现批量计算,我们首先导入 NumPy 并创建一个小数组。我们将遵循标准约定 import numpy as np

import numpy as np

# 从一个列表的列表创建一个二维数组(模拟波动率数据)
volatility_data = np.array([[1.5, -0.1, 3], [0, -3, 6.5]])
volatility_data
array([[ 1.5, -0.1,  3. ],
       [ 0. , -3. ,  6.5]])

现在,我们可以直接在这个 data 对象上执行数学运算:

# 逐元素乘法
volatility_data * 10
array([[ 15.,  -1.,  30.],
       [  0., -30.,  65.]])
# 逐元素加法
volatility_data + volatility_data
array([[ 3. , -0.2,  6. ],
       [ 0. , -6. , 13. ]])

在第一个例子中,所有元素都乘以 10。在第二个例子中,数组中每个“单元格”的对应值被相加。

ndarray 是一个通用的多维同质数据容器;也就是说,所有元素必须是相同类型。每个数组都有一个 shape(一个表示各维度大小的元组)和一个 dtype(一个描述数组数据类型的对象):

# 获取数组的形状
volatility_data.shape
(2, 3)
# 获取数组的数据类型
volatility_data.dtype
dtype('float64')

创建数组最简单的方法是使用 array 函数。它接受任何序列型对象(包括其他数组),并生成一个包含传入数据的新 NumPy 数组。

# 从列表创建(模拟收益率)
returns_data = [6, 7.5, 8, 0, 1]
returns_arr = np.array(returns_data)
returns_arr
array([6. , 7.5, 8. , 0. , 1. ])

嵌套序列,比如一个等长列表组成的列表,将被转换成一个多维数组:

portfolio_matrix = [[1, 2, 3, 4], [5, 6, 7, 8]]
portfolio_arr = np.array(portfolio_matrix)
portfolio_arr
array([[1, 2, 3, 4],
       [5, 6, 7, 8]])

由于 portfolio_matrix 是一个列表的列表,NumPy 数组 portfolio_arr 有两个维度。我们可以通过检查 ndimshape 属性来确认这一点:

portfolio_arr.ndim
2
portfolio_arr.shape
(2, 4)

除了 np.array,还有许多其他函数可以创建新数组。np.zerosnp.ones 分别创建全为 0 或全为 1 的数组。np.empty 创建一个数组,但不将其值初始化为任何特定值。要用这些方法创建更高维度的数组,只需传入一个元组作为形状参数。

# 创建一个长度为10的全零一维数组
np.zeros(10)
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
# 创建一个3x6的全零数组
np.zeros((3, 6))
array([[0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.]])
# 创建一个未初始化的2x3x2数组
np.empty((2, 3, 2))
array([[[0., 0.],
        [1., 0.],
        [1., 1.]],

       [[0., 1.],
        [0., 0.],
        [0., 0.]]])

关于 np.empty 的重要警告

假设 numpy.empty 会返回一个全零数组是不安全的。此函数返回的是未初始化的内存,因此可能包含任意的“垃圾”值。在金融建模中,例如初始化投资组合权重矩阵时,你应当只在确定会立即用真实数据覆盖该数组的情况下使用此方法,以避免计算中引入随机不可控的数值误差。

np.arange 是内置 Python range 函数的数组版本:

np.arange(15)
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14])

表 3.1 列出了一些标准的数组创建函数。

表 3.1: 一些重要的 NumPy 数组创建函数。
函数 描述
array 将输入数据(列表、元组、数组或其他序列类型)转换为 ndarray,可推断数据类型或显式指定;默认复制输入数据。
asarray 将输入转换为 ndarray,但如果输入已经是 ndarray 则不复制。
arange 类似于内置的 range,但返回一个 ndarray 而不是列表。
ones, ones_like 生成一个给定形状和数据类型的全 1 数组;ones_like 接受另一个数组并生成一个相同形状和数据类型的全 1 数组。
zeros, zeros_like 类似于 onesones_like,但生成的是全 0 数组。
empty, empty_like 通过分配新内存创建新数组,但不像 ones 和 zeros 那样用任何值填充。
full, full_like 生成一个给定形状和数据类型的数组,所有值都设置为指定的“填充值”;full_like 接受另一个数组并生成一个相同形状和数据类型的填充数组。
eye, identity 创建一个 N × N 的单位矩阵(对角线上为 1,其他地方为 0)。

3.2.3 ndarray 的数据类型

数据类型dtype 是一个特殊的对象,它包含了 ndarray 需要将一块内存解释为特定类型数据的信息(或元数据)。除非显式指定,numpy.array 会尝试推断一个合适的数据类型。

prices_float = np.array([6, 7.5, 8, 0, 1])
prices_int = np.array([[1, 2, 3], [4, 5, 6]])
print(f'prices_float 的 dtype: {prices_float.dtype}')
print(f'prices_int 的 dtype: {prices_int.dtype}')
prices_float 的 dtype: float64
prices_int 的 dtype: int64

因为 arr1 包含浮点数,其 dtype 被推断为 float64arr2 只包含整数,被推断为 int64(在64位系统上)。

您可以在创建时显式指定 dtype

prices_float = np.array([1, 2, 3], dtype=np.float64)
prices_int = np.array([1, 2, 3], dtype=np.int32)
print(f'prices_float 的 dtype: {prices_float.dtype}')
print(f'prices_int 的 dtype: {prices_int.dtype}')
prices_float 的 dtype: float64
prices_int 的 dtype: int32

数据类型是 NumPy 与来自其他系统的数据交互灵活性的源泉。它们提供了到磁盘或内存底层表示的直接映射。数值型 dtype 的命名方式是类型名(intfloatcomplex)后跟一个表示每个元素位数的数字。一个标准的双精度浮点值(Python 的 float 对象所使用的)占用 8 字节或 64 位,因此得名 float64表 3.2 列出了主要的数据类型。

表 3.2: NumPy 数据类型。
类型 类型代码 描述
int8, uint8 i1, u1 有符号和无符号 8 位 (1 字节) 整数
int16, uint16 i2, u2 有符号和无符号 16 位整数
int32, uint32 i4, u4 有符号和无符号 32 位整数
int64, uint64 i8, u8 有符号和无符号 64 位整数
float16 f2 半精度浮点数
float32 f4 标准单精度浮点数
float64 f8 标准双精度浮点数 (与 Python 的 float 兼容)
complex64, complex128 c8, c16 由两个 32 位或 64 位浮点数表示的复数
bool ? 存储 TrueFalse 值的布尔类型
object O Python 对象类型
bytes_ S 固定长度 ASCII 字节串类型 (例如 S10 代表 10 字节字符串)
unicode_ U 固定长度 Unicode 类型 (例如 U10 代表 10 字符字符串)

有符号与无符号整数的财务数据处理细节

在经济和金融数据处理中,负数经常出现(例如季度亏损、负利率或收益率回撤)。因此,必须清楚 signed(有符号)和 unsigned(无符号)整数类型的区别。

  • signed 整数: 可以表示正数和负数(如 int8 范围为 -128 到 127)。
  • unsigned 整数: 只能表示非负数(如 uint8 范围为 0 到 255)。

在处理会计分录或盈亏数据时,错误地使用 unsigned 类型可能导致负数被错误地解释为一个非常大的正数,从而引发严重的财务计算逻辑泄露。

您可以使用 astype 方法显式地将一个数组从一个 dtype 转换或强制转换(cast)为另一个。

int_prices = np.array([1, 2, 3, 4, 5])
print(f'原始 dtype: {int_prices.dtype}')

float_prices = int_prices.astype(np.float64)
print(f'新的 dtype: {float_prices.dtype}')
print(f'新的数组: {float_prices}')
原始 dtype: int64
新的 dtype: float64
新的数组: [1. 2. 3. 4. 5.]

如果您将浮点数转换为整数 dtype,小数部分将被截断:

arr = np.array([3.7, -1.2, -2.6, 0.5, 12.9, 10.1])
arr.astype(np.int32)
array([ 3, -1, -2,  0, 12, 10], dtype=int32)

如果您有一个由表示数字的字符串组成的数组,您可以使用 astype 将它们转换为数值形式:

numeric_strings = np.array(['1.25', '-9.6', '42'], dtype=np.bytes_)
numeric_strings.astype(float)
array([ 1.25, -9.6 , 42.  ])

astype 总是创建一个新数组

调用 astype 总是会创建一个全新的数据副本,即使新旧 dtype 完全一致。在处理由 financial_statement.parquet 转换而来的数千万行财务数据时,应注意内存峰值,避免因不必要的转换导致内存溢出。

3.2.4 NumPy 数据类型层次结构

您可能偶尔需要检查一个数组的 dtype 是否属于某个通用类别,比如整数或浮点数。数据类型有诸如 np.integernp.floating 这样的超类,可以与 np.issubdtype 函数一起使用:

ints = np.ones(10, dtype=np.uint16)
floats = np.ones(10, dtype=np.float32)

print(f'ints.dtype 是 np.integer 的子类型吗? {np.issubdtype(ints.dtype, np.integer)}')
print(f'floats.dtype 是 np.floating 的子类型吗? {np.issubdtype(floats.dtype, np.floating)}')
ints.dtype 是 np.integer 的子类型吗? True
floats.dtype 是 np.floating 的子类型吗? True

您可以通过调用特定数据类型的 mro 方法来查看它的所有父类:

np.float64.mro()
[numpy.float64,
 numpy.floating,
 numpy.inexact,
 numpy.number,
 numpy.generic,
 float,
 object]
import matplotlib.pyplot as plt

def plot_hierarchy(ax, hierarchy, x, y, dx, dy):
    for key, value in hierarchy.items():
        ax.text(x, y, key, ha='center', va='center', bbox=dict(boxstyle='round,pad=0.5', fc='skyblue'))
        if isinstance(value, dict):
            child_keys = list(value.keys())
            num_children = len(child_keys)
            child_x_start = x - (num_children - 1) * dx / 2
            for i, child_key in enumerate(child_keys):
                child_x = child_x_start + i * dx
                child_y = y - dy
                ax.plot([x, child_x], [y - 0.3, child_y + 0.3], 'k-')
                plot_hierarchy(ax, {child_key: value[child_key]}, child_x, child_y, dx / num_children, dy)
        elif isinstance(value, list):
            num_children = len(value)
            child_x_start = x - (num_children - 1) * dx / 2
            for i, child_item in enumerate(value):
                 child_x = child_x_start + i * dx
                 child_y = y - dy
                 ax.plot([x, child_x], [y - 0.3, child_y + 0.3], 'k-')
                 ax.text(child_x, child_y, child_item, ha='center', va='center', bbox=dict(boxstyle='round,pad=0.3', fc='lightgreen'))


hierarchy = {
    'np.generic': {
        'np.number': {
            'np.integer': {
                'np.signedinteger': ['int8', 'int16', 'int32', 'int64'],
                'np.unsignedinteger': ['uint8', 'uint16', 'uint32', 'uint64']
            },
            'np.floating': ['float16', 'float32', 'float64'],
            'np.complexfloating': ['complex64', 'complex128']
        },
        'np.bool_': {},
        'np.character': {
            'np.bytes_': [],
            'np.unicode_': []
        }
    }
}

fig, ax = plt.subplots(figsize=(16, 10))
ax.axis('off')
plot_hierarchy(ax, hierarchy, 0.5, 1.0, 0.4, 0.15)
ax.set_title('NumPy Data Type Hierarchy', fontsize=20)
plt.show()
图 3.2: NumPy 数据类型类层次结构

3.3 NumPy 数组的运算

数组的重要性在于,它们使您能够表达对数据的批量操作,而无需编写任何 for 循环。NumPy 用户称之为向量化(vectorization)。任何在等大小数组之间的算术运算都将应用该运算于逐个元素。

# 模拟资产收益率
asset_returns = np.array([[1., 2., 3.], [4., 5., 6.]])
# 逐元素相乘
asset_returns * asset_returns
array([[ 1.,  4.,  9.],
       [16., 25., 36.]])
# 逐元素相减
asset_returns - asset_returns
array([[0., 0., 0.],
       [0., 0., 0.]])

与标量的算术运算会将标量参数传播到数组的每个元素。这是广播(broadcasting)的一种简单形式。

核心概念:广播 (Broadcasting)

广播的数学形式化定义

给定两个数组 \(A\)\(B\),其形状分别为 \(S_A \in \mathbb{N}^{n_A}\)\(S_B \in \mathbb{N}^{n_B}\),广播是寻找最小公共形状 \(S_C\) 的过程:

\[ S_C = \max(S_A, S_B) \]

其中 \(\max\) 操作定义为逐元素的最大值(维度对齐时),并满足广播三定律:

  1. 维度对齐:从尾部(右侧)开始对齐维度。对于缺失维度,在左侧补 1。
  2. 兼容性检查:两个数组的每个维度要么相等,要么其中一个为 1。

广播描述了 NumPy 在算术运算期间如何处理不同形状的数组。例如,在收益率归一化计算 returns - returns.mean(axis=0) 中,均值向量会被广播到原始矩阵的每一行。这是 NumPy 向量化运算兼顾效率和语义简洁的核心。

# 标量除法
1 / asset_returns
array([[1.        , 0.5       , 0.33333333],
       [0.25      , 0.2       , 0.16666667]])
# 标量指数运算
asset_returns ** 2
array([[ 1.,  4.,  9.],
       [16., 25., 36.]])

同样大小数组之间的比较也会产生布尔数组:

benchmark_returns = np.array([[0., 4., 1.], [7., 2., 12.]])
benchmark_returns > asset_returns
array([[False,  True, False],
       [ True, False,  True]])

3.4 基本索引和切片

NumPy 数组索引是一个深度话题,有很多方法可以选择数据的子集。一维数组很简单;它们的操作类似于 Python 列表。

# 模拟日成交量
daily_volume = np.arange(10)
daily_volume
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
# 获取索引为 5 的元素
daily_volume[5]
np.int64(5)
# 获取从索引 5 到 8 (不包括) 的切片
daily_volume[5:8]
array([5, 6, 7])

如果你将一个标量值赋给一个切片,比如 arr[5:8] = 12,这个值会被传播(或广播)到整个选区。

# 广播赋值
daily_volume[5:8] = 12
daily_volume
array([ 0,  1,  2,  3,  4, 12, 12, 12,  8,  9])

核心规则:视图(Views) vs. 副本(Copies)

与 Python 内置列表不同,NumPy 中的数组切片(Slicing)返回的是底层数据的视图

  • 优势: 极大优化内存消耗。处理 GB 级的日内分笔行情时,切片操作几乎瞬间完成且不占用额外内存。
  • 风险: 对视图的修改会直接“回溯”原数组数据。
  • 解决方案: 若需独立处理子集,必须显式调用 .copy()(例如 latest_prices = all_prices[:100].copy())。

为了演示这一点,我们首先创建 arr 的一个切片:

# 创建切片视图
volume_slice = daily_volume[5:8]
volume_slice
array([12, 12, 12])

现在,当我们改变 arr_slice 中的值时,这些改动会反映在原始数组 arr 中:

volume_slice[:] = 12345
daily_volume
array([    0,     1,     2,     3,     4, 12345, 12345, 12345,     8,
           9])

“裸”切片 [:] 会赋值给数组中的所有值:

volume_slice[:] = 64
daily_volume
array([ 0,  1,  2,  3,  4, 64, 64, 64,  8,  9])

对于更高维度的数组,你有更多的选择。在一个二维数组中,每个索引处的元素是一维数组:

# 模拟价格矩阵 (3x3)
price_matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])

# 访问第三行 (索引 2)
price_matrix[2]
array([7, 8, 9])

单个元素可以通过递归方式访问,但更方便的是传递一个逗号分隔的索引列表:

# 这两种形式是等价的
print(f'递归访问: {price_matrix[0][2]}')
print(f'元组访问: {price_matrix[0, 2]}')
递归访问: 3
元组访问: 3

图 3.3 展示了在一个二维数组上的索引。将轴 0 想象为“行”,轴 1 想象为“列”,会很有帮助。

import matplotlib.pyplot as plt

def plot_indexing_diagram():
    fig, ax = plt.subplots(figsize=(4, 4))
    ax.set_xlim(-0.5, 2.5)
    ax.set_ylim(-0.5, 2.5)
    ax.invert_yaxis()
    
    for i in range(3):
        for j in range(3):
            ax.add_patch(plt.Rectangle((j-0.5, i-0.5), 1, 1, fill=True, color='skyblue', alpha=0.6, ec='black'))
            ax.text(j, i, f'({i}, {j})', ha='center', va='center', fontsize=14)

    ax.set_xticks(range(3))
    ax.set_yticks(range(3))
    ax.xaxis.tick_top()
    ax.xaxis.set_label_position('top')
    ax.set_xlabel('轴 1 (列)', fontsize=12)
    ax.set_ylabel('轴 0 (行)', fontsize=12)
    
    # Highlight one element
    ax.add_patch(plt.Rectangle((2-0.5, 0-0.5), 1, 1, fill=True, color='tomato', alpha=0.8, ec='black'))
    ax.text(1.5, -1.0, "arr2d[0, 2]", color='tomato', fontsize=14, ha='center')

    ax.set_title('二维数组索引', loc='center', pad=40, fontsize=16)

    plt.show()

plot_indexing_diagram()
图 3.3: 在一个二维 NumPy 数组中索引元素

在多维数组中,如果你省略了后面的索引,返回的对象将是一个较低维度的 ndarray 切片。对于一个 2 × 2 × 3 的数组 arr3d

# 模拟三维张量 (2x2x3)
tensor_data = np.array([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]])
# 在轴 0 上切片返回一个 2x3 数组
tensor_data[0]
array([[1, 2, 3],
       [4, 5, 6]])

3.4.1 使用切片进行索引

和一维数组一样,多维数组也可以被切片。

# 选择 price_matrix 的前两行
price_matrix[:2]
array([[1, 2, 3],
       [4, 5, 6]])

这是沿着轴 0 进行切片。你可以将 price_matrix[:2] 理解为“选择 price_matrix 的前两行”。你可以传递多个切片:

# 选择前两行以及从索引 1 开始的所有列
price_matrix[:2, 1:]
array([[2, 3],
       [5, 6]])

当你这样切片时,你总是得到相同维度的数组视图。通过混合使用整数索引和切片,你可以得到较低维度的切片。例如,要选择第二行但只取前两列:

lower_dim_slice = price_matrix[1, :2]
lower_dim_slice
array([4, 5])

这里,lower_dim_slice 是一维的。 单独一个冒号 : 表示取整个轴:

# 选择所有行中的第一列
price_matrix[:, :1]
array([[1],
       [4],
       [7]])

图 3.4 提供了切片的可视化总结。

import numpy as np
import matplotlib.pyplot as plt

def show_slice(ax, slice_str, highlight):
    arr = np.arange(25).reshape((5, 5))
    ax.imshow(np.zeros_like(arr), cmap='Pastel2', vmin=-1, vmax=1)
    
    # Highlight the slice
    rows, cols = highlight
    if isinstance(rows, int): rows = slice(rows, rows + 1)
    if isinstance(cols, int): cols = slice(cols, cols + 1)
    
    highlight_mask = np.zeros_like(arr, dtype=bool)
    highlight_mask[rows, cols] = True
    ax.imshow(np.where(highlight_mask, 1, 0), cmap='Set1', alpha=0.5, vmin=-1, vmax=1)

    for i in range(5):
        for j in range(5):
            ax.text(j, i, str(arr[i, j]), ha='center', va='center', color='black')
    
    ax.set_title(f"arr{slice_str}", fontsize=14)
    ax.set_xticks([])
    ax.set_yticks([])

fig, axes = plt.subplots(2, 2, figsize=(8, 8))
fig.suptitle('二维数组切片示例', fontsize=20)

show_slice(axes[0, 0], '[:2, 1:]', (slice(0, 2), slice(1, None)))
show_slice(axes[0, 1], '[2]', (2, slice(None, None)))
show_slice(axes[1, 0], '[:, :1]', (slice(None, None), slice(0, 1)))
show_slice(axes[1, 1], '[1, :2]', (1, slice(0, 2)))

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
图 3.4: 二维数组切片

3.5 布尔索引

布尔索引的集合论基础

布尔索引从数学角度是特征函数(Characteristic Function)的应用。

给定: - 源集合 \(A = \{a_1, a_2, ..., a_n\}\) - 谓词 \(P: A \to \{\text{True}, \text{False}\}\)

过滤定义为: \[ S_P = \{a \in A \mid P(a) = \text{True}\} \]

在NumPy中,布尔数组 \(\mathbf{b} \in \{\text{True}, \text{False}\}^n\) 本质上是一个特征函数向量: \[ \mathbf{b}(i) = \begin{cases} 1 & \text{if } A[i] \text{ satisfies } P \\ 0 & \text{otherwise} \end{cases} \]

布尔运算的集合对应

布尔运算 NumPy语法 集合论表示 数学含义
与 (AND) & \(A \cap B\) 交集
或 (OR) \| \(A \cup B\) 并集
非 (NOT) ~ \(A^c\) 补集
异或 (XOR) ^ \((A \cup B) \setminus (A \cap B)\) 对称差

让我们考虑一个例子,我们有一个数组中的数据,以及一个包含重复名字的数组。

tickers = np.array(['AAPL', 'MSFT', 'GOOG', 'AAPL', 'GOOG', 'MSFT', 'MSFT'])
market_cap_data = np.random.standard_normal((7, 4)) # 模拟市值数据(标准化后)

假设每个名字对应 market_cap_data 数组中的一行,我们想选择所有名字为 ‘AAPL’ 的行。像算术运算一样,比较也是向量化的。将 tickers 与字符串 ‘AAPL’ 比较会产生一个布尔数组:

tickers == 'AAPL'
array([ True, False, False,  True, False, False, False])

这个布尔数组可以在索引 market_cap_data 数组时传入:

market_cap_data[tickers == 'AAPL']
array([[ 1.7213711 , -0.96399913, -0.07747488,  2.18871029],
       [ 1.0912576 ,  0.47151237, -0.38037469, -0.61363623]])

布尔数组的长度必须与其索引的数组轴的长度相同。你可以将布尔数组与切片或整数混合使用:

# 选择名为 'AAPL' 的行,并显示从第 2 列开始的列
market_cap_data[tickers == 'AAPL', 2:]
array([[-0.07747488,  2.18871029],
       [-0.38037469, -0.61363623]])

要选择除了 ‘AAPL’ 以外的所有内容,你可以使用 != 或者用 ~ 对条件取反:

market_cap_data[~(tickers == 'AAPL')]
array([[ 0.90275117,  1.7680143 , -0.01123834,  0.46155821],
       [ 1.00740312, -1.53034078, -1.11947321, -0.17470618],
       [-2.63545419,  0.3224616 ,  0.96953757,  1.5879225 ],
       [ 2.11260971,  0.17585338, -0.69777529, -0.26580363],
       [-0.75977764,  1.36271381,  0.46448028,  0.30489181]])

要组合多个布尔条件,使用布尔算术运算符 & (与) 和 | (或)。Python 关键字 andor 不适用于布尔数组。

mask = (tickers == 'AAPL') | (tickers == 'MSFT')
market_cap_data[mask]
array([[ 1.7213711 , -0.96399913, -0.07747488,  2.18871029],
       [ 0.90275117,  1.7680143 , -0.01123834,  0.46155821],
       [ 1.0912576 ,  0.47151237, -0.38037469, -0.61363623],
       [ 2.11260971,  0.17585338, -0.69777529, -0.26580363],
       [-0.75977764,  1.36271381,  0.46448028,  0.30489181]])

使用布尔数组设置值时,会将右侧的值或多个值赋到布尔数组为 True 的位置。

# 将 market_cap_data 中的所有负值设置为 0
market_cap_data[market_cap_data < 0] = 0
market_cap_data
array([[1.7213711 , 0.        , 0.        , 2.18871029],
       [0.90275117, 1.7680143 , 0.        , 0.46155821],
       [1.00740312, 0.        , 0.        , 0.        ],
       [1.0912576 , 0.47151237, 0.        , 0.        ],
       [0.        , 0.3224616 , 0.96953757, 1.5879225 ],
       [2.11260971, 0.17585338, 0.        , 0.        ],
       [0.        , 1.36271381, 0.46448028, 0.30489181]])

布尔索引必创副本

通过布尔掩码(Boolean Mask)选择数据时,NumPy 总是会创建数据的独立副本。这与切片视图机制截然不同。在进行复杂的数据过滤和清洗时,务必注意这种动态差异对程序总内存占用的影响。

3.6 花式索引 (Fancy Indexing)

花式索引的数学表示

花式索引从数学角度是索引映射(Indexing Map)的应用。

给定: - 源数组 \(A \in \mathbb{R}^n\) - 索引集合 \(I = \{i_1, i_2, ..., i_k\} \subseteq \{0, 1, ..., n-1\}\)

索引映射定义为: \[ \text{fancy}(A, I) = (A[i_1], A[i_2], ..., A[i_k]) \in \mathbb{R}^k \]

这是从 \(\mathbb{R}^n\)\(\mathbb{R}^k\) 的线性映射。

多维花式索引

对于多维数组 \(A \in \mathbb{R}^{n_0 \times n_1 \times \cdots \times n_{d-1}}\) 和索引集合 \(I_0, I_1, ..., I_{d-1}\)

\[ \text{fancy\_multi}(A, I_0, ..., I_{d-1}) = (A[I_0[0], I_1[0], ..., I_{d-1}[0]], ..., (A[I_0[k-1], I_1[k-1], ..., I_{d-1}[k-1]]) \]

组合索引

花式索引可以与其他索引类型组合。设 \(S\) 为切片对象,\(\mathbf{b}\) 为布尔数组,\(I\) 为整数索引集合:

\[ A[I, S, \mathbf{b}] \]

这种组合提供了极大的灵活性,但需要注意返回的是副本还是视图。

花式索引是 NumPy 采纳的一个术语,用来描述使用整数数组进行索引。假设我们有一个 8 × 4 的数组:

simulated_factors = np.empty((8, 4))
for i in range(8):
    simulated_factors[i] = i
simulated_factors
array([[0., 0., 0., 0.],
       [1., 1., 1., 1.],
       [2., 2., 2., 2.],
       [3., 3., 3., 3.],
       [4., 4., 4., 4.],
       [5., 5., 5., 5.],
       [6., 6., 6., 6.],
       [7., 7., 7., 7.]])

要以特定顺序选择行的子集,你可以传递一个指定所需顺序的列表或 ndarray 整数:

# 选择第 4, 3, 0, 和 6 行
simulated_factors[[4, 3, 0, 6]]
array([[4., 4., 4., 4.],
       [3., 3., 3., 3.],
       [0., 0., 0., 0.],
       [6., 6., 6., 6.]])

使用负数索引会从末尾选择行:

# 选择倒数第 3, 5, 和 7 行
simulated_factors[[-3, -5, -7]]
array([[5., 5., 5., 5.],
       [3., 3., 3., 3.],
       [1., 1., 1., 1.]])

传递多个索引数组会做一些稍微不同的事情;它会选择一个一维数组,其元素对应于每个索引元组。

reshaped_factors = np.arange(32).reshape((8, 4))
# 选择元素 (1,0), (5,3), (7,1), 和 (2,2)
reshaped_factors[[1, 5, 7, 2], [0, 3, 1, 2]]
array([ 4, 23, 29, 10])

在这种情况下,花式索引的行为是选择元素 (1, 0)(5, 3)(7, 1)(2, 2)。当传递的整数数组数量与轴数相同时,花式索引的结果总是一维的。要选择一个矩形区域,你可以使用以下方法:

# 选择第 1, 5, 7, 2 行,并从这些行中选择第 0, 3, 1, 2 列
reshaped_factors[[1, 5, 7, 2]][:, [0, 3, 1, 2]]
array([[ 4,  7,  5,  6],
       [20, 23, 21, 22],
       [28, 31, 29, 30],
       [ 8, 11,  9, 10]])

3.7 数组转置和轴交换

转置是一种特殊的重塑形式,它同样返回底层数据的视图而不复制任何内容。数组有 transpose 方法和特殊的 T 属性:

factor_data = np.arange(15).reshape((3, 5))
factor_data.T
array([[ 0,  5, 10],
       [ 1,  6, 11],
       [ 2,  7, 12],
       [ 3,  8, 13],
       [ 4,  9, 14]])

在进行矩阵计算时,这非常常见。例如,当使用 np.dot 计算内积 \(\boldsymbol{X'X}\) 时:

# 真实世界数据背景下的例子
# 让我们模拟一个有两个因子的简单回归,所以我们的 factors 矩阵是 n x 2
# 我们将使用随机数据进行演示
rng = np.random.default_rng(seed=12345)
factors = rng.standard_normal((100, 2)) # 100 个观测值, 2 个因子
cov_matrix_unscaled = np.dot(factors.T, factors)
cov_matrix_unscaled
array([[89.36134837, 11.49065471],
       [11.49065471, 93.25390931]])

@ 中缀运算符是进行矩阵乘法的另一种方式:

factors.T @ factors
array([[89.36134837, 11.49065471],
       [11.49065471, 93.25390931]])

使用 .T 进行简单转置是轴交换的一个特例。ndarrayswapaxes 方法,它接受一对轴号并交换指定的轴来重新排列数据:

tensor_3d = np.arange(16).reshape((2, 2, 4))
tensor_3d.swapaxes(1, 2)
array([[[ 0,  4],
        [ 1,  5],
        [ 2,  6],
        [ 3,  7]],

       [[ 8, 12],
        [ 9, 13],
        [10, 14],
        [11, 15]]])

swapaxes 同样返回数据的视图而不进行复制。

3.8 通用函数:快速的逐元素数组函数

通用函数的数学定义

通用函数(ufunc)从数学角度是元素级映射(Element-wise Mapping)的形式化。

给定: - 源数组 \(\mathbf{x} \in \mathbb{R}^n\) - 一元函数 \(f: \mathbb{R} \to \mathbb{R}\)

一元ufunc定义为: \[ \text{ufunc}_f(\mathbf{x})_i = f(x_i), \quad \forall i \in \{0, 1, ..., n-1\} \]

对于二元函数 \(g: \mathbb{R} \times \mathbb{R} \to \mathbb{R}\) 和两个数组 \(\mathbf{x}, \mathbf{y} \in \mathbb{R}^n\)

二元ufunc定义为: \[ \text{ufunc}_g(\mathbf{x}, \mathbf{y})_i = g(x_i, y_i), \quad \forall i \in \{0, 1, ..., n-1\} \]

特殊方法

  1. reduce(归约)\[ \text{reduce}(g, \mathbf{x}) = g(g(...g(g(x_0, x_1), x_2), ..., x_{n-1}) \]

  2. accumulate(累积)\[ \text{accumulate}(g, \mathbf{x})_i = g(x_0, x_1, ..., x_i), \quad \forall i \]

  3. outer(外积)\[ \text{outer}(g, \mathbf{x}, \mathbf{y})_{i,j} = g(x_i, y_j), \quad \forall i, j \]

通用函数(universal function),或称 ufunc,是一种对 ndarray 中的数据执行逐元素操作的函数。你可以将它们看作是简单函数的快速向量化包装器,这些函数接受一个或多个标量值并产生一个或多个标量结果。

许多 ufuncs 是简单的逐元素变换,如 np.sqrtnp.exp

values = np.arange(10)
np.sqrt(values)
array([0.        , 1.        , 1.41421356, 1.73205081, 2.        ,
       2.23606798, 2.44948974, 2.64575131, 2.82842712, 3.        ])

这些被称为一元 ufuncs。其他的,如 np.addnp.maximum,接受两个数组(因此是二元 ufuncs)并返回一个单一数组作为结果:

rng = np.random.default_rng(seed=12345)
vec_a = rng.standard_normal(8)
vec_b = rng.standard_normal(8)
np.maximum(vec_a, vec_b)
array([ 0.36105811,  1.26372846,  2.34740965,  0.96849691, -0.07534331,
        0.90219827, -0.46695317,  0.6488928 ])

表 3.3表 3.4 列出了一些 NumPy 的 ufuncs。

表 3.3: 一些一元通用函数。
函数 描述
abs, fabs 计算整数、浮点数或复数的绝对值。对于非复数,fabs 速度更快。
sqrt 计算每个元素的平方根。相当于 arr ** 0.5
square 计算每个元素的平方。相当于 arr ** 2
exp 计算每个元素的指数 \(e^x\)
log, log10, log2, log1p 分别为自然对数(底e)、底10对数、底2对数和 log(1 + x)
sign 计算每个元素的符号:1(正数),0(零),-1(负数)。
ceil 计算每个元素的上取整(大于等于该值的最小整数)。
floor 计算每个元素的下取整(小于等于该值的最大整数)。
rint 将每个元素四舍五入到最近的整数,保留 dtype。
modf 将数组的小数和整数部分作为两个独立的数组返回。
isnan 返回一个布尔数组,指示每个值是否为 NaN(非数字)。
isfinite, isinf 分别返回布尔数组,指示每个元素是有限的(非 inf,非 NaN)还是无限的。
cos, cosh, sin, sinh, tan, tanh 常规和双曲三角函数。
arccos, arccosh, … 反三角函数。
logical_not 对数组中的每个元素计算 not x。相当于 ~arr
表 3.4: 一些二元通用函数。
函数 描述
add 将两个数组中对应的元素相加。
subtract 从第一个数组的元素中减去第二个数组的元素。
multiply 将数组元素相乘。
divide, floor_divide 除法或向下取整的除法(丢弃余数)。
power 将第一个数组中的元素作为底,第二个数组中的元素作为指数,进行幂运算。
maximum, fmax 逐元素计算最大值。fmax 忽略 NaN
minimum, fmin 逐元素计算最小值。fmin 忽略 NaN
mod 逐元素计算模数(余数)。
copysign 将第二个数组中值的符号复制到第一个数组的值上。
greater, ..., not_equal 逐元素比较,产生布尔数组。
logical_and, logical_or, logical_xor 逐元素计算逻辑与、或、异或。

3.8.1 ufunc 的高级用法

NumPy 的每个二元 ufunc 都有特殊的方法来执行特殊的向量化操作。

reduce 接受一个数组,并通过执行一系列二元操作来聚合其值。例如,np.add.reduce 等同于 arr.sum()

values = np.arange(10)
np.add.reduce(values)
np.int64(45)

accumulatereduce 相关,但它会产生一个与输入大小相同的数组,其中包含中间的“累积”值,等同于 cumsum

matrix_data = np.arange(15).reshape((3, 5))
np.add.accumulate(matrix_data, axis=1)
array([[ 0,  1,  3,  6, 10],
       [ 5, 11, 18, 26, 35],
       [10, 21, 33, 46, 60]])

outer 对两个数组执行逐对的叉积。输出数组的形状是输入数组形状的拼接。

vec = np.arange(4)
np.multiply.outer(vec, np.arange(5))
array([[ 0,  0,  0,  0,  0],
       [ 0,  1,  2,  3,  4],
       [ 0,  2,  4,  6,  8],
       [ 0,  3,  6,  9, 12]])

reduceat 执行“局部 reduce”或“分组”操作。它接受一个“分箱边界”序列,指示如何分割和聚合值。

values = np.arange(10)
# 对 values[0:5], values[5:8], 和 values[8:] 求和
np.add.reduceat(values, [0, 5, 8])
array([10, 18, 17])

3.9 使用数组进行面向数组编程

使用 NumPy 数组,您可以将许多类型的数据处理任务表达为简洁的数组表达式,而这些任务可能需要编写循环。这种用数组表达式替代显式循环的做法被称为向量化

3.9.1 将条件逻辑表达为数组操作

numpy.where 函数是三元表达式 x if condition else y 的向量化版本。

bull_return_scenarios = np.array([1.1, 1.2, 1.3, 1.4, 1.5])
bear_return_scenarios = np.array([2.1, 2.2, 2.3, 2.4, 2.5])
bull_market_flag = np.array([True, False, True, True, False])

假设我们想在 bull_market_flagTrue 时从 bull_return_scenarios 中取值,否则从 bear_return_scenarios 中取值。 …

trading_signal = np.where(bull_market_flag, bull_return_scenarios, bear_return_scenarios)
trading_signal
array([1.1, 2.2, 1.3, 1.4, 2.5])

在数据分析中,where 的一个典型用途是根据另一个数组生成一个新值的数组。例如,假设你有一个随机生成的数据矩阵,你想将所有正值替换为 2,所有负值替换为 –2。

# 模拟收益率矩阵
rng = np.random.default_rng(seed=12345)
returns_matrix = rng.standard_normal((4, 4))
# 生成交易信号:收益率为正时为 1,为负时为 -1
np.where(returns_matrix > 0, 1, -1)
array([[-1,  1, -1, -1],
       [-1, -1, -1,  1],
       [ 1, -1,  1,  1],
       [-1,  1, -1, -1]])

3.9.2 数学和统计方法

一组计算整个数组或沿轴的数据统计量的数学函数,可以作为数组类的方法来访问。您可以使用聚合(有时称为归约)如 summeanstd(标准差),通过调用数组实例方法(例如 arr.sum())或使用顶层 NumPy 函数(例如 np.sum(arr))。

# 计算均值和总和
returns_matrix = rng.standard_normal((5, 4))
print(f'returns_matrix 的均值: {returns_matrix.mean()}')
print(f'returns_matrix 的总和: {returns_matrix.sum()}')
returns_matrix 的均值: 0.17933634979615845
returns_matrix 的总和: 3.586726995923169

meansum 这样的函数接受一个可选的 axis 参数,该参数会计算给定轴上的统计量,结果是一个维度减少一的数组:

# 沿着列计算均值 (axis=1)
returns_matrix.mean(axis=1)
array([ 0.37675318,  0.07598404, -0.2834985 ,  1.48722347, -0.75978045])
# 沿着行计算总和 (axis=0)
returns_matrix.sum(axis=0)
array([ 2.71870476,  0.30188842, -0.49975489,  1.0658887 ])

其他方法如 cumsumcumprod 不进行聚合,而是产生一个包含中间结果的数组:

matrix_data = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]])
# 沿列的累积和
matrix_data.cumsum(axis=1)
array([[ 0,  1,  3],
       [ 3,  7, 12],
       [ 6, 13, 21]])

表 3.5 列出了一些基本的数组统计方法。

表 3.5: 基本数组统计方法。
方法 描述
sum 数组中所有元素的总和,或沿一个轴的总和;零长度数组的总和为 0。
mean 算术平均值;在零长度数组上无效(返回 NaN)。
std, var 分别为标准差和方差。
min, max 最小值和最大值。
argmin, argmax 分别为最小值和最大值元素的索引。
cumsum 从 0 开始的元素的累积和。
cumprod 从 1 开始的元素的累积积。

3.9.3 布尔数组的方法

在前面的方法中,布尔值被强制转换为 1 (True) 和 0 (False)。因此,sum 经常被用作计算布尔数组中 True 值数量的手段:

pnl_vector = rng.standard_normal(100)
(pnl_vector > 0).sum() # 正值的数量
np.int64(51)

另外两个方法 anyall 对布尔数组很有用。any 测试数组中是否有一个或多个值为 True,而 all 检查是否每个值都为 True

# 模拟每日盈亏状态(已盈利为 True)
profitable_days_mask = np.array([False, False, True, False])
print(f'是否有盈余天? {profitable_days_mask.any()}')
print(f'是否所有天都盈利? {profitable_days_mask.all()}')
是否有盈余天? True
是否所有天都盈利? False

3.9.4 排序

和 Python 内置的 list 类型一样,NumPy 数组可以使用 sort 方法进行原地排序:

unsorted_returns = rng.standard_normal(6)
unsorted_returns.sort()
unsorted_returns
array([-0.7758961 , -0.62361213,  0.28208603,  0.41071644,  0.84122103,
        1.12182226])

你可以通过将轴号传递给 sort,对多维数组中每个一维部分的值沿一个轴进行原地排序:

matrix_data = rng.standard_normal((5, 3))
matrix_data.sort(axis=1) # 对每一行进行排序
matrix_data
array([[-2.7224161 , -0.6733048 ,  1.24622153],
       [-0.0292946 ,  0.17534089,  0.79020803],
       [-1.41951426, -1.35996632,  0.22341156],
       [-2.17088985,  0.62848817,  1.76177943],
       [-0.86924667,  0.60119653,  0.95075786]])

顶层方法 numpy.sort 返回一个排序后的数组副本,而不是在原地修改数组。

3.9.5 唯一值和其他集合逻辑

NumPy 对一维 ndarray 有一些基本的集合操作。一个常用的是 np.unique,它返回数组中排序后的唯一值:

sectors = np.array(['Tech', 'Energy', 'Finance', 'Tech', 'Energy', 'Finance', 'Finance'])
np.unique(sectors)
array(['Energy', 'Finance', 'Tech'], dtype='<U7')

另一个函数 np.in1d,测试一个数组中的值是否在另一个数组中,返回一个布尔数组:

stock_ids = np.array([600000, 0, 0, 600036, 601398, 5, 600000])
np.in1d(stock_ids, [600036, 601398, 600000])
array([ True, False, False,  True,  True, False,  True])

3.10 使用数组进行文件输入和输出

numpy.savenumpy.load 是在磁盘上高效保存和加载数组数据的两个主力函数。默认情况下,数组以未压缩的原始二进制格式保存,文件扩展名为 .npy

saved_data = np.arange(10)
np.save('some_array', saved_data)

磁盘上的数组随后可以用 np.load 加载:

np.load('some_array.npy')
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

你可以使用 numpy.savez 将多个数组保存在一个未压缩的归档文件中,并将数组作为关键字参数传递:

np.savez('array_archive.npz', a=saved_data, b=saved_data)

加载 .npz 文件时,你会得到一个类似字典的对象,它会延迟加载各个数组:

arch = np.load('array_archive.npz')
arch['b']
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

如果你的数据压缩效果好,你可能希望使用 numpy.savez_compressed

3.11 示例:随机游走

随机游走的模拟为利用数组操作提供了一个说明性的应用。让我们首先考虑一个从 0 开始的简单随机游走,步长为 1 和 –1,且以相等的概率发生。

纯 Python 的实现可能如下所示:

import random
position = 0
walk = [position]
nsteps = 1000
for i in range(nsteps):
    step = 1 if random.randint(0, 1) else -1
    position += step
    walk.append(position)

图 3.5 展示了前 100 步的图。

import matplotlib.pyplot as plt
plt.plot(walk[:100])
plt.title('随机游走的前100步')
plt.xlabel('步数')
plt.ylabel('位置')
plt.grid(True)
plt.show()
图 3.5: 一个简单的随机游走

这个游走是随机步长的累积和。我们可以使用 NumPy 更高效地计算它。

nsteps = 1000
rng = np.random.default_rng(seed=12345)
draws = rng.integers(0, 2, size=nsteps)
steps = np.where(draws == 0, 1, -1)
price_path = steps.cumsum()

由此,我们可以提取统计数据,比如游走轨迹中的最小值和最大值:

print(f'最小值: {price_path.min()}')
print(f'最大值: {price_path.max()}')
最小值: -8
最大值: 50

一个更复杂的统计量是首次穿越时间,即随机游走达到特定值的步数。这里我们可能想知道游走需要多长时间才能距离原点至少 10 步。(np.abs(price_path) >= 10).argmax() 给了我们这个条件首次为真的索引。

(np.abs(price_path) >= 10).argmax()
np.int64(155)

3.11.1 一次模拟多次随机游走

为了模拟多次随机游走,比如 5000 次,我们可以一次性生成所有的随机游走。rng.integers 函数可以生成一个二维的抽样数组,然后我们可以计算每一行的累积和(axis=1)。

nwalks = 5000
nsteps = 1000
draws = rng.integers(0, 2, size=(nwalks, nsteps)) # 0 或 1
steps = np.where(draws > 0, 1, -1)
simulated_paths = steps.cumsum(axis=1)
simulated_paths
array([[  1,   2,   3, ...,  22,  23,  22],
       [  1,   0,  -1, ..., -50, -49, -48],
       [  1,   2,   3, ...,  50,  49,  48],
       ...,
       [ -1,  -2,  -1, ..., -10,  -9, -10],
       [ -1,  -2,  -3, ...,   8,   9,   8],
       [ -1,   0,   1, ...,  -4,  -3,  -2]], shape=(5000, 1000))

现在,我们可以计算所有游走中获得的最大值和最小值:

print(f'所有游走中的最大值: {simulated_paths.max()}')
print(f'所有游走中的最小值: {simulated_paths.min()}')
所有游走中的最大值: 114
所有游走中的最小值: -120

让我们计算达到 30 或 –30 的最小穿越时间。这有点棘手,因为不是所有的游走都会达到 30。我们可以使用 any 方法来检查这一点:

hits30 = (np.abs(simulated_paths) >= 30).any(axis=1)
print(f'达到 30 或 -30 的游走次数: {hits30.sum()}')
达到 30 或 -30 的游走次数: 3395

我们可以使用这个布尔数组来选择那些实际穿越了绝对 30 水平的游走的行,并在 axis=1 上调用 argmax 来获得穿越时间:

crossing_times = (np.abs(simulated_paths[hits30]) >= 30).argmax(axis=1)
print(f'平均最小穿越时间: {crossing_times.mean()}')
平均最小穿越时间: 500.5699558173785

3.11.2 一个真实世界的“随机游走”:标普500指数日收益率

虽然随机游走模型是一个理论构建,但它构成了金融学中有效市场假说的基础。该假说认为,资产价格反映了所有可用信息,因此价格变化应该遵循“随机游走”,是不可预测的。让我们通过绘制标普500指数的累积对数收益率,来看看这如何应用于真实数据。

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path

# 从本地数据获取上证指数数据
# 注意:在实际教学环境中,路径应根据操作系统自动调整
import platform
if platform.system() == 'Windows':
    index_path = 'C:/qiufei/data/index/indexes.parquet'
else:
    index_path = '/home/ubuntu/r2_data_mount/qiufei/data/index/indexes.parquet'

index_data = pd.read_parquet(index_path)
sse_index = index_data[index_data['symbol'] == '000001.XSHG'].copy()
sse_index['datetime'] = pd.to_datetime(sse_index['datetime'].astype(str), format='%Y%m%d%H%M%S')
sse_index.set_index('datetime', inplace=True)
sse_index = sse_index.sort_index()

# 选取时间范围
sse_index = sse_index.loc['2014-01-01':'2024-01-01']

# 计算对数收益率
log_returns = np.log(sse_index['close'] / sse_index['close'].shift(1)).dropna()

# 通过累积和创建“随机游走”
random_walk_sse = log_returns.cumsum()

# 绘图
plt.rcParams['font.sans-serif'] = ['Source Han Serif SC']
plt.rcParams['axes.unicode_minus'] = False 
plt.figure(figsize=(12, 6))
plt.plot(random_walk_sse)
plt.title('作为随机游走的上证指数累积对数收益率')
plt.xlabel('日期')
plt.ylabel('累积对数收益率')
plt.grid(True)
plt.show()
图 3.6: 上证指数的累积对数收益率 (2014-2024)

正如您在 图 3.6 中所看到的,上证指数累积收益率的路径与我们模拟的随机游走相似。这展示了在 NumPy 中开发的强大、抽象的工具如何能被直接应用于分析和理解复杂的经济和金融现象。

3.12 习题

3.12.1 基础习题

习题 4.1: 数组创建与基本运算

创建两个数组:

import numpy as np
prices_a = np.array([35.5, 36.2, 35.9, 36.8, 37.1])  # 宁波港5日收盘价
prices_b = np.array([12.3, 12.5, 12.4, 12.8, 12.7])  # 宁波银行5日收盘价
  1. 计算两个数组的价格之和
  2. 计算两个数组价格的均值和标准差
  3. 找出两个数组的最大值和最小值
  4. 对数组进行排序
  5. 计算价格的相关系数

解答:

import numpy as np

prices_a = np.array([35.5, 36.2, 35.9, 36.8, 37.1])
prices_b = np.array([12.3, 12.5, 12.4, 12.8, 12.7])

# (a) 价格之和
total = prices_a + prices_b
print('(a) 价格之和:')
print(total)

# (b) 均值和标准差
mean_a = np.mean(prices_a)
std_a = np.std(prices_a)
mean_b = np.mean(prices_b)
std_b = np.std(prices_b)
print(f'\n(b) 统计量:')
print(f'宁波港: 均值={mean_a:.2f}, 标准差={std_a:.2f}')
print(f'宁波银行: 均值={mean_b:.2f}, 标准差={std_b:.2f}')

# (c) 最大值和最小值
print(f'\n(c) 极值:')
print(f'宁波港: 最大={np.max(prices_a):.2f}, 最小={np.min(prices_a):.2f}')
print(f'宁波银行: 最大={np.max(prices_b):.2f}, 最小={np.min(prices_b):.2f}')

# (d) 排序
print(f'\n(d) 排序后:')
print(f'宁波港: {np.sort(prices_a)}')
print(f'宁波银行: {np.sort(prices_b)}')

# (e) 相关系数
corr = np.corrcoef(prices_a, prices_b)[0, 1]
print(f'\n(e) 相关系数: {corr:.4f}')
(a) 价格之和:
[47.8 48.7 48.3 49.6 49.8]

(b) 统计量:
宁波港: 均值=36.30, 标准差=0.58
宁波银行: 均值=12.54, 标准差=0.19

(c) 极值:
宁波港: 最大=37.10, 最小=35.50
宁波银行: 最大=12.80, 最小=12.30

(d) 排序后:
宁波港: [35.5 35.9 36.2 36.8 37.1]
宁波银行: [12.3 12.4 12.5 12.7 12.8]

(e) 相关系数: 0.9432

习题 4.2: 数组索引与切片

给定一个10x10的矩阵:

np.random.seed(42)
matrix = np.random.randint(1, 100, size=(10, 10))
  1. 提取前3行前3列的子矩阵
  2. 提取所有行第2、4、6列
  3. 提取对角线元素
  4. 将矩阵转置
  5. 找出每行的最大值及其索引

解答:

import numpy as np

np.random.seed(42)
matrix = np.random.randint(1, 100, size=(10, 10))

# (a) 前3行前3列
sub_matrix = matrix[:3, :3]
print('(a) 前3行前3列:')
print(sub_matrix)

# (b) 所有行的第2、4、6列
selected_cols = matrix[:, [1, 3, 5]]
print(f'\n(b) 第2、4、6列:')
print(selected_cols)

# (c) 对角线元素
diagonal = np.diag(matrix)
print(f'\n(c) 对角线元素:')
print(diagonal)

# (d) 转置
transposed = matrix.T
print(f'\n(d) 转置后形状: {transposed.shape}')

# (e) 每行最大值及索引
max_values = np.max(matrix, axis=1)
max_indices = np.argmax(matrix, axis=1)
print(f'\n(e) 每行最大值:')
for i in range(10):
    print(f'  行{i}: 值={max_values[i]}, 列={max_indices[i]}')
(a) 前3行前3列:
[[52 93 15]
 [88 24  3]
 [64 60 21]]

(b) 第2、4、6列:
[[93 72 21]
 [24 22  2]
 [60 33 58]
 [42 60 15]
 [55  3  7]
 [89 14 90]
 [71  8 35]
 [ 2 54 54]
 [34 62 95]
 [62 85 82]]

(c) 对角线元素:
[52 24 21 60 51 90 78 63 72 89]

(d) 转置后形状: (10, 10)

(e) 每行最大值:
  行0: 值=93, 列=1
  行1: 值=88, 列=0
  行2: 值=91, 列=9
  行3: 值=92, 列=2
  行4: 值=73, 列=7
  行5: 值=92, 列=9
  行6: 值=81, 列=7
  行7: 值=93, 列=6
  行8: 值=95, 列=5
  行9: 值=89, 列=9

习题 4.3: 布尔索引与条件过滤

给定一个包含股票收益率的数组:

returns = np.array([0.02, -0.01, 0.03, -0.02, 0.01, 0.04, -0.03, 0.02])
  1. 找出所有正收益
  2. 找出收益率大于2%的数据
  3. 计算正收益和负收益的个数
  4. 将所有负收益替换为0
  5. 计算累计收益率

解答:

import numpy as np

returns = np.array([0.02, -0.01, 0.03, -0.02, 0.01, 0.04, -0.03, 0.02])

# (a) 正收益
positive_mask = returns > 0
positive_returns = returns[positive_mask]
print('(a) 正收益:')
print(positive_returns)

# (b) 收益率大于2%
high_returns = returns[returns > 0.02]
print(f'\n(b) 收益率大于2%:')
print(high_returns)

# (c) 正负收益个数
n_positive = np.sum(returns > 0)
n_negative = np.sum(returns < 0)
print(f'\n(c) 正收益个数: {n_positive}, 负收益个数: {n_negative}')

# (d) 负收益替换为0
returns_clean = np.where(returns > 0, returns, 0)
print(f'\n(d) 负收益替换为0后:')
print(returns_clean)

# (e) 累计收益率
cumulative = np.cumprod(1 + returns) - 1
print(f'\n(e) 累计收益率:')
print(cumulative)
(a) 正收益:
[0.02 0.03 0.01 0.04 0.02]

(b) 收益率大于2%:
[0.03 0.04]

(c) 正收益个数: 5, 负收益个数: 3

(d) 负收益替换为0后:
[0.02 0.   0.03 0.   0.01 0.04 0.   0.02]

(e) 累计收益率:
[0.02       0.0098     0.040094   0.01929212 0.02948504 0.07066444
 0.03854451 0.0593154 ]

3.12.2 进阶习题

习题 4.4: 矩阵运算与线性代数

在投资组合理论中,矩阵运算是核心工具。给定:

# 3个资产的协方差矩阵
cov_matrix = np.array([
    [0.04, 0.02, 0.01],
    [0.02, 0.09, 0.03],
    [0.01, 0.03, 0.16]
])

# 等权重组合
weights = np.array([1/3, 1/3, 1/3])
  1. 计算投资组合方差:σ²p = w^T Σ w
  2. 计算投资组合标准差
  3. 找出协方差矩阵的特征值和特征向量
  4. 解释特征值的经济学含义

解答:

import numpy as np

cov_matrix = np.array([
    [0.04, 0.02, 0.01],
    [0.02, 0.09, 0.03],
    [0.01, 0.03, 0.16]
])
weights = np.array([1/3, 1/3, 1/3])

# (a) 投资组合方差
portfolio_variance = np.dot(weights.T, np.dot(cov_matrix, weights))
print(f'(a) 投资组合方差: {portfolio_variance:.6f}')

# (b) 投资组合标准差
portfolio_std = np.sqrt(portfolio_variance)
print(f'(b) 投资组合标准差: {portfolio_std:.6f} ({portfolio_std*100:.2f}%)')

# (c) 特征值和特征向量
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
print(f'\n(c) 特征值:')
for i, ev in enumerate(eigenvalues):
    print(f'  特征值 {i+1}: {ev:.6f}')

print(f'\n特征向量:')
for i, ev in enumerate(eigenvectors.T):
    print(f'  特征向量 {i+1}: {ev}')

# (d) 经济学含义解释
print(f'\n(d) 经济学含义:')
print(f'特征值代表主成分(Principal Components)解释的方差量。')
print(f'最大特征值 {eigenvalues.max():.6f} 对应第一主成分,')
print(f'解释了 {eigenvalues.max()/eigenvalues.sum()*100:.1f}% 的总方差。')
print(f'这在风险因子分析中很重要,可以识别主要的风险来源。')
(a) 投资组合方差: 0.045556
(b) 投资组合标准差: 0.213437 (21.34%)

(c) 特征值:
  特征值 1: 0.173136
  特征值 2: 0.032982
  特征值 3: 0.083882

特征向量:
  特征向量 1: [-0.12390393 -0.3630554  -0.92349261]
  特征向量 2: [-0.94289213  0.33306872 -0.00443355]
  特征向量 3: [-0.30919612 -0.87020458  0.3835906 ]

(d) 经济学含义:
特征值代表主成分(Principal Components)解释的方差量。
最大特征值 0.173136 对应第一主成分,
解释了 59.7% 的总方差。
这在风险因子分析中很重要,可以识别主要的风险来源。

投资组合风险的矩阵代数表示

在马科维茨现代投资组合理论中,投资组合方差的计算是核心:

\[ \sigma_p^2 = \mathbf{w}^T \Sigma \mathbf{w} = \sum_{i=1}^{n}\sum_{j=1}^{n} w_i w_j \sigma_{ij} \]

其中 \(\mathbf{w}\) 是权重向量,\(\Sigma\) 是协方差矩阵。

谱分解的经济解释: 通过对协方差矩阵进行特征值分解(\(\Sigma = Q \Lambda Q^T\)),我们可以将总风险分解为独立的风险因子。较大的特征值代表了主要的系统性风险来源(如市场因子)。在前几个主成分(Principal Components)往往能捕捉 A 股市场大部分波动的背景下,这种在线性代数层面的分解对风险对冲至关重要。


习题 4.5: 广播机制的应用

NumPy的广播机制使得不同形状的数组可以进行运算。给定:

# 每日收益率 (5天 x 3个资产)
daily_returns = np.array([
    [0.01, 0.02, 0.015],
    [-0.01, 0.01, 0.005],
    [0.02, -0.01, 0.01],
    [0.015, 0.005, -0.005],
    [-0.005, 0.01, 0.02]
])

# 3个资产的权重
weights = np.array([0.4, 0.3, 0.3])
  1. 使用广播计算每日投资组合收益率
  2. 计算累计收益率
  3. 如果每个资产初始投资100000元,计算最终价值
  4. 添加一个新的资产(权重为0),使投资组合包含4个资产

解答:

import numpy as np

daily_returns = np.array([
    [0.01, 0.02, 0.015],
    [-0.01, 0.01, 0.005],
    [0.02, -0.01, 0.01],
    [0.015, 0.005, -0.005],
    [-0.005, 0.01, 0.02]
])
weights = np.array([0.4, 0.3, 0.3])

# (a) 每日投资组合收益率
portfolio_returns = daily_returns @ weights  # 矩阵乘法
print('(a) 每日投资组合收益率:')
print(portfolio_returns)

# (b) 累计收益率
cumulative_returns = np.cumprod(1 + portfolio_returns) - 1
print(f'\n(b) 累计收益率:')
print(cumulative_returns)

# (c) 最终价值
initial_value = np.array([100000, 100000, 100000])
# 每个资产的价值变化
final_values = initial_value * (1 + cumulative_returns[-1])
total_final = np.sum(final_values * weights)
print(f'\n(c) 最终价值: ¥{total_final:,.2f}')

# (d) 添加第4个资产
weights_4 = np.array([0.3, 0.25, 0.25, 0.2])  # 新权重
returns_4 = np.column_stack([daily_returns, np.zeros(5)])  # 添加零收益
portfolio_returns_4 = returns_4 @ weights_4
print(f'\n(d) 4资产组合每日收益率:')
print(portfolio_returns_4)
(a) 每日投资组合收益率:
[0.0145 0.0005 0.008  0.006  0.007 ]

(b) 累计收益率:
[0.0145     0.01500725 0.02312731 0.02926607 0.03647093]

(c) 最终价值: ¥103,647.09

(d) 4资产组合每日收益率:
[0.01175 0.00075 0.006   0.0045  0.006  ]

3.12.3 应用习题

习题 4.6: 随机游走与布朗运动

使用NumPy模拟几何布朗运动(GBM),这是股票价格的经典模型:

\[ dS_t = \mu S_t dt + \sigma S_t dW_t \]

其中: - \(S_t\) 是股票价格 - \(\mu\) 是漂移率(期望收益率) - \(\sigma\) 是波动率 - \(dW_t\) 是维纳过程增量

参数设置: - S0 = 100(初始价格) - μ = 0.08(年化收益率8%) - σ = 0.2(年化波动率20%) - T = 1年,252个交易日

解答:

import numpy as np
import matplotlib.pyplot as plt

# 参数设置
S0 = 100  # 初始价格
mu = 0.08  # 年化收益率
sigma = 0.2  # 年化波动率
T = 1.0  # 时间跨度(年)
N = 252  # 交易日数
dt = T / N  # 时间步长

# 生成随机游走路径
np.random.seed(42)
# 标准正态随机数
Z = np.random.standard_normal(N)

# 几何布朗运动的离散化
# ln(S_t) = ln(S_0) + (μ - σ²/2)t + σW_t
# 其中 W_t = √dt * ΣZ_i
t = np.arange(1, N + 1) * dt
W = np.cumsum(np.sqrt(dt) * Z)

log_prices = np.log(S0) + (mu - 0.5 * sigma**2) * t + sigma * W
prices = np.exp(log_prices)

print(f'几何布朗运动模拟:')
print(f'初始价格: ¥{S0:.2f}')
print(f'最终价格: ¥{prices[-1]:.2f}')
print(f'总收益率: {(prices[-1]/S0 - 1)*100:+.2f}%')

# 统计特性
returns = np.diff(prices) / prices[:-1]
print(f'\n收益率统计:')
print(f'日均值: {np.mean(returns)*100:.4f}%')
print(f'日标准差: {np.std(returns)*100:.4f}%')
print(f'年化均值: {np.mean(returns)*252*100:.2f}%')
print(f'年化波动率: {np.std(returns)*np.sqrt(252)*100:.2f}%')

# 可视化
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

plt.figure(figsize=(12, 6))
plt.plot(prices, label='模拟价格')
plt.axhline(y=S0, color='r', linestyle='--', label='初始价格')
plt.title('几何布朗运动模拟的股价路径')
plt.xlabel('交易日')
plt.ylabel('价格')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
几何布朗运动模拟:
初始价格: ¥100.00
最终价格: ¥104.92
总收益率: +4.92%

收益率统计:
日均值: 0.0240%
日标准差: 1.2206%
年化均值: 6.04%
年化波动率: 19.38%

几何布朗运动与 Black-Scholes 定价逻辑

几何布朗运动(GBM)是金融数学的基石,其显式解展示了资产价格在连续时间下的演化路径:

\[ S_t = S_0 \exp\left[\left(\mu - \frac{\sigma^2}{2}\right)t + \sigma W_t\right] \]

模型核心假设: 1. 对数正态性:收益率服从正态分布,而股价永远为正。 2. 波动率恒定:假设 \(\sigma\) 在有效期内不变(这也是 Black-Scholes 模型常被批评之处)。

通过 NumPy 的 cumsumexp 函数,我们可以瞬间数万次模拟由于随机项 \(W_t \sim N(0, t)\) 驱动的股价路径,从而为复杂的二元期权或亚洲期权寻找数值定价。


3.12.4 挑战习题

习题 4.7: 实现蒙特卡洛期权定价

使用蒙特卡洛方法模拟欧式看涨期权价格。参数: - S0 = 100(当前股价) - K = 105(行权价) - T = 1年 - r = 0.05(无风险利率) - σ = 0.2(波动率) - 模拟次数:10000次

欧式看涨期权到期收益:\(C_T = \max(S_T - K, 0)\)

期权价格 = \(e^{-rT} E[C_T]\)

解答:

import numpy as np

def monte_carlo_call_option(S0, K, T, r, sigma, n_simulations=10000):
    """
    蒙特卡洛欧式看涨期权定价

    参数:
    S0: 当前股价
    K: 行权价
    T: 到期时间(年)
    r: 无风险利率
    sigma: 波动率
    n_simulations: 模拟次数
    """
    np.random.seed(42)

    # 生成标准正态随机数
    Z = np.random.standard_normal(n_simulations)

    # 模拟到期日股价(GBM解析解)
    ST = S0 * np.exp((r - 0.5 * sigma**2) * T + sigma * np.sqrt(T) * Z)

    # 计算期权收益
    payoffs = np.maximum(ST - K, 0)

    # 贴现到现值
    option_price = np.exp(-r * T) * np.mean(payoffs)

    # 计算标准误差
    std_error = np.std(np.exp(-r * T) * payoffs) / np.sqrt(n_simulations)

    return option_price, std_error

# 参数
S0 = 100
K = 105
T = 1.0
r = 0.05
sigma = 0.2

# 运行蒙特卡洛模拟
price, se = monte_carlo_call_option(S0, K, T, r, sigma, n_simulations=100000)

print(f'蒙特卡洛期权定价结果:')
print(f'期权价格: ¥{price:.4f}')
print(f'标准误差: ¥{se:.6f}')
print(f'95%置信区间: [¥{price - 1.96*se:.4f}, ¥{price + 1.96*se:.4f}]')

# 与Black-Scholes公式比较(解析解)
from scipy.stats import norm
d1 = (np.log(S0/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
d2 = d1 - sigma*np.sqrt(T)
bs_price = S0*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)

print(f'\nBlack-Scholes解析解: ¥{bs_price:.4f}')
print(f'蒙特卡洛误差: ¥{abs(price - bs_price):.6f} ({abs(price - bs_price)/bs_price*100:.2f}%)')
蒙特卡洛期权定价结果:
期权价格: ¥8.0416
标准误差: ¥0.041760
95%置信区间: [¥7.9598, ¥8.1235]

Black-Scholes解析解: ¥8.0214
蒙特卡洛误差: ¥0.020268 (0.25%)

蒙特卡洛模拟的收敛与加速

蒙特卡洛估计的标准误差定义为 \(\text{SE} = \sigma / \sqrt{N}\)。这意味着要将定价精度提高一个数量级,计算量必须增加 100 倍。

在量化金融实务中,传统的蒙特卡洛法常受到“维度灾难”的挑战。为了在有限的算力下获得更高精度,业界常采用方差缩减(Variance Reduction)技术,如对偶变量法或准蒙特卡洛(Quasi-Monte Carlo)。NumPy 结合底层 BLAS 库的并行算效,使得在普通个人电脑上进行万次甚至十万次期权路径模拟成为现实。


3.13 结论

本章我们系统地学习了 NumPy 的核心组件 ndarray 及其向量化计算范式。从内存布局的底层视角到广播机制的数学形式化,NumPy 展现了其作为高性能金融计算基石的强大能力。虽然本教材后续章节将主要使用 Pandas 进行数据建模,但理解 NumPy 的数组导向思维、内存视图与副本的区别,是编写高效、稳健的量化交易系统不可或缺的前提。

3.14 学习总结与进阶建议

随着你对 NumPy 基础的掌握,建议通过以下进阶路径深化你的数值计算能力:

1. 深挖 NumPy 内部机制 若想真正优化大规模金融数据的处理速度,必须理解 NumPy 的 C 语言底层实现。Wes McKinney 的 Python for Data Analysis 第 4 章及附录 A 是极佳的起点。进一步地,Efficient NumPy(Juan Nunez-Iglesias 著)深入探讨了内存管理和最佳实践,能帮助你避免在大数据回测中的性能瓶颈。

2. 掌握数值线性代数的严谨性 量化金融中的许多高级模型(如主成分分析或最小二乘回归)本质上都是线性代数运算。Lloyd N. Trefethen 的 Numerical Linear Algebra 能让你从数学层面理解这些运算的稳定性和复杂度。这种严谨性对于构建非套利定价模型或风险管理系统至关重要。

3. 随机过程与蒙特卡洛实战 本章演示的随机游走只是冰山一角。对于希望从事衍生品定价或风险控制的读者,保罗·格拉瑟曼(Paul Glasserman)的经典著作 Monte Carlo Methods in Financial Engineering 是该领域的“圣经”。通过在 NumPy 中实现其中的方差缩减技术,你将能够构建出工业级的期权定价引擎。

4. 量化交易生态系统 NumPy 是工具,而量化是目的。Yves Hilpisch 的 Python for Finance 展示了如何将 NumPy 与 Pandas、SciPy 结合,处理真实的金融时间序列。记住,在实盘环境中,NumPy 为你的策略提供速度,但风险控制和数理逻辑才是你的真正防线。

3.14.1 补充练习

3.14.2 练习 1:计算股票日收益率与波动率

题目: 假设我们有一个包含某只股票连续10个交易日收盘价的NumPy数组。请计算其对数收益率(Log Returns),并求出年化波动率(假设一年有252个交易日)。

import numpy as np

# 模拟的收盘价序列
prices = np.array([100.0, 101.5, 100.5, 102.0, 103.5, 104.0, 103.0, 105.0, 107.0, 106.0])

解答

import numpy as np

prices = np.array([100.0, 101.5, 100.5, 102.0, 103.5, 104.0, 103.0, 105.0, 107.0, 106.0])

# 计算对数收益率: ln(P_t / P_{t-1}) = ln(P_t) - ln(P_{t-1})
log_returns = np.log(prices[1:] / prices[:-1])
print("Daily Log Returns:", np.round(log_returns, 4))

# 计算年化波动率
# 波动率 = 收益率标准差 * sqrt(252)
daily_vol = np.std(log_returns, ddof=1) # ddof=1 for sample standard deviation
annualized_vol = daily_vol * np.sqrt(252)

print(f"Annualized Volatility: {annualized_vol:.2%}")

3.14.3 练习 2:几何布朗运动 (GBM) 模拟

题目: 使用蒙特卡洛模拟生成 1000 条股票价格路径。假设: - 初始价格 \(S_0 = 100\) - 年化期望收益率 \(\mu = 0.05\) - 年化波动率 \(\sigma = 0.2\) - 时间 \(T = 1\) 年 - 步数 \(steps = 252\)

几何布朗运动公式: \[ S_t = S_{t-1} \cdot \exp\left( (\mu - \frac{1}{2}\sigma^2)\Delta t + \sigma \sqrt{\Delta t} Z_t \right) \] 其中 \(Z_t \sim N(0, 1)\)

解答

import numpy as np
import matplotlib.pyplot as plt

# 参数设置
S0 = 100
mu = 0.05
sigma = 0.2
T = 1.0
dt = 1/252
steps = 252
simulations = 1000

# 生成随机项 Z
# 形状: (steps, simulations)
Z = np.random.standard_normal((steps, simulations))

# 计算漂移项和扩散项
drift = (mu - 0.5 * sigma**2) * dt
diffusion = sigma * np.sqrt(dt) * Z

# 计算日收益率增量
returns = np.exp(drift + diffusion)

# 初始化价格路径矩阵
S = np.zeros((steps + 1, simulations))
S[0] = S0

# 逐步推导价格 (累乘)
# 计算累积乘积路径:S_t = S_0 * product(returns_1 ... returns_t)
S[1:] = S0 * np.cumprod(returns, axis=0)

# 绘制前10条模拟路径
plt.figure(figsize=(10, 6))
plt.plot(S[:, :10])
plt.title('Geometric Brownian Motion Simulation (10 Paths)')
plt.xlabel('Time Steps')
plt.ylabel('Price')
plt.grid(True)
plt.show()

3.14.4 练习 3:投资组合风险价值 (VaR) 计算

题目: 给定一个投资组合包含 3 只股票,其权重向量 \(w\) 和日收益率协方差矩阵 \(\Sigma\) 如下。假设投资组合总价值为 100 万元。请计算该投资组合在 95% 置信水平下的日风险价值 (VaR)。 假设收益率服从正态分布。 \[ VaR_{95\%} = \text{Portfolio Value} \times 1.65 \times \sigma_p \] 其中组合波动率 \(\sigma_p = \sqrt{w^T \Sigma w}\)

# 权重向量 (w)
weights = np.array([0.4, 0.3, 0.3])

# 协方差矩阵 (Sigma) - 假设值
cov_matrix = np.array([
    [0.0004, 0.0002, 0.0001],
    [0.0002, 0.0003, 0.00015],
    [0.0001, 0.00015, 0.0005]
])
portfolio_value = 1_000_000

解答

import numpy as np

weights = np.array([0.4, 0.3, 0.3])
cov_matrix = np.array([
    [0.0004, 0.0002, 0.0001],
    [0.0002, 0.0003, 0.00015],
    [0.0001, 0.00015, 0.0005]
])
portfolio_value = 1_000_000

# 1. 计算组合方差: w^T * Sigma * w
# 使用 @ 符号进行矩阵乘法
port_variance = weights.T @ cov_matrix @ weights

# 2. 计算组合波动率 (标准差)
port_volatility = np.sqrt(port_variance)
print(f"Portfolio Daily Volatility: {port_volatility:.2%}")

# 3. 计算 95% VaR (Z-score 约为 1.645)
z_score_95 = 1.645
VaR_95 = portfolio_value * z_score_95 * port_volatility

print(f"95% Daily VaR: {VaR_95:.2f} CNY")