引言与学习目标
学习目标
通过本章学习,你应该能够:
理论目标 :
深刻理解 NumPy ndarray 的数学基础,包括内存布局、步长(stride)和视图机制
掌握向量化计算的数学原理,理解为什么向量化比循环快几个数量级
理解广播机制的数学形式化定义和满足条件
掌握NumPy中线性代数运算的矩阵表示和计算复杂度
实践目标 :
熟练使用 NumPy 进行大规模金融数据的高效计算
运用向量化操作实现技术分析指标(如移动平均、收益率计算)
使用NumPy实现蒙特卡洛模拟和期权定价
掌握使用NumPy进行多维数据的索引、切片和布尔过滤
应用目标 :
能够使用 NumPy 从本地金融数据源读取和处理大规模股票市场数据
实现基于随机游走理论的市场模拟和回测框架
运用NumPy的线性代数功能进行投资组合优化和风险分析
NumPy,全称 Numerical Python,是 Python 中用于数值计算最重要的基础包之一。对于经济学和金融学的学生来说,掌握 NumPy 不仅仅是一项技术练习;它是开启现代经济分析中强大定量工具的钥匙,从计量经济建模到金融工程。许多提供科学计算功能的包,如 pandas 和 statsmodels,都使用 NumPy 的数组对象作为数据交换的标准”通用语言”之一。我们在 章节 3 中涵盖的关于 NumPy 的大部分知识也同样适用于这些更高级的库。
以下是您将在 NumPy 中找到的一些核心功能:
ndarray : 一种高效的多维数组,提供快速的面向数组的算术运算和灵活的广播 (broadcasting)功能。
数学函数 : 用于对整个数据数组进行快速运算,而无需编写显式的 Python 循环。
数据 I/O : 用于在磁盘上读/写数组数据以及处理内存映射文件的工具。
线性代数和随机数生成 : 提供全面的线性代数运算、随机数生成和傅里叶变换功能。
C API : 一个文档齐全的 C 语言 API,用于将 NumPy 与 C、C++ 或 FORTRAN 编写的库连接起来。这一特性是 Python 科学计算生态系统的基石,它使得高性能的遗留代码可以被封装在一个动态、易于访问的 Python 接口中。
虽然 NumPy 本身不提供高级的数据建模功能,但对其数组和面向数组的计算范式的深刻理解,是高效使用像 pandas 这类工具的先决条件。
NumPy 在经济学和金融学中的核心地位
作为经济学专业的学生,你可能会问:为什么我们要从如此底层的库学起?答案很简单:NumPy 是整个 Python 数据科学生态系统的基石。你将来要用到的几乎所有计量经济学、金融建模和机器学习库(例如 pandas, statsmodels, scikit-learn)的底层数据结构都是 NumPy 数组。理解 NumPy 的运行机制——特别是其对内存的有效管理和向量化计算的理念——能够让你写出运行速度快几个数量级的代码。在处理大规模经济数据(如面板数据、高频交易数据)时,这种效率的差异是决定性的。
理论基础:向量化计算的数学原理
向量化计算的理论基础
向量化计算的核心思想是将标量操作 (scalar operations)转换为向量操作 (vector operations),从而避免显式循环。
从数学角度,向量化可以形式化地定义为:
给定: - 向量 \(\mathbf{x} \in \mathbb{R}^n\) - 标量 \(c \in \mathbb{R}\) - 一元函数 \(f: \mathbb{R} \to \mathbb{R}\)
逐元素操作 定义为: \[ f(\mathbf{x})_i = f(x_i), \quad \forall i \in \{1, 2, ..., n\} \]
标量广播 定义为: \[ (c \odot \mathbf{x})_i = c \cdot x_i, \quad \forall i \in \{1, 2, ..., n\} \]
其中 \(\odot\) 表示广播乘法运算。
计算复杂度对比
元素级加法
O(n) 解释执行
O(n) 编译执行
10-100×
元素级乘法
O(n) 解释执行
O(n) 编译执行
10-100×
矩阵乘法
O(n³) 嵌套循环
O(n³) 优化BLAS
5-50×
两个向量点积
O(n) 嵌套循环
O(n) 单个SIMD
20-200×
其中SIMD(Single Instruction, Multiple Data)是现代CPU的并行计算特性,NumPy的向量化操作充分利用了这一点。
NumPy 之所以对数值计算如此重要,其主要原因之一是它为处理大型数据数组而设计的高效率。这种效率源于几个关键的架构决策:
内存布局 : NumPy 在内部将数据存储在连续的内存块中,独立于其他 Python 内置对象。其用 C 语言编写的算法库可以直接在这块内存上操作,无需进行任何类型检查或与 Python 动态类型相关的其他开销。因此,NumPy 数组比内置的 Python 序列使用更少的内存。
向量化操作 : NumPy 允许对整个数组进行复杂的计算,而无需显式的 for 循环。这种被称为向量化的实践,将循环操作委托给了高度优化的、预编译的 C 代码,其速度比解释执行的 Python 代码快几个数量级。
为了说明性能差异,我们来比较一个包含一百万个整数的 NumPy 数组和一个等效的 Python 列表。
正如 列表 3.1 中的计时结果所示,基于 NumPy 的算法通常比纯 Python 的实现快 10 到 100 倍(甚至更多),并且使用的内存也显著减少。
NumPy ndarray:一个多维数组对象
NumPy 的一个关键特性是其 N 维数组对象,即 ndarray。它是一个快速、灵活的容器,适用于 Python 中的大型数据集,允许您使用类似于标量元素之间等效运算的语法,对整块数据执行数学运算。
ndarray 对象内部机制
内存布局的数学表示
从数学角度,NumPy数组可以定义为五元组: \[ \text{ndarray} = (D, S, T, \mathbf{s}, P) \]
其中: - \(D \subseteq \mathbb{R}^N\) 是数据域(\(N\) 个元素) - \(S \in \mathbb{N}^k\) 是形状(\(k\) 维度) - \(T\) 是数据类型(dtype) - \(\mathbf{s} \in \mathbb{N}^k\) 是步长(strides)向量 - \(P \in \mathbb{N}\) 是数据指针(内存地址)
线性索引公式
给定一个数组 \(A\) ,其形状为 \(S = (n_0, n_1, ..., n_{k-1})\) ,步长为 \(\mathbf{s} = (s_0, s_1, ..., s_{k-1})\) ,索引 \(\mathbf{i} = (i_0, i_1, ..., i_{k-1})\) 对应的线性地址为:
\[ \text{offset}(\mathbf{i}) = P + \sum_{j=0}^{k-1} i_j \cdot s_j \]
其中 \(0 \leq i_j < n_j\) 。
行优先序(Row-Major Order)
NumPy默认使用C风格的行优先序,即最后一个维度变化最快: \[ s_{k-1} = \text{sizeof}(T) \] \[ s_{j} = s_{j+1} \cdot n_{j+1}, \quad j = 0, 1, ..., k-2 \]
例如,一个 \((3, 4, 5)\) 形状的 float64 数组,其步长为: \[ \mathbf{s} = (160, 40, 8) \]
视图的数学定义
给定一个ndarray \(A = (D, S, T, \mathbf{s}, P)\) ,一个视图 (view)是通过创建新的元组 \((D, S', T', \mathbf{s}', P)\) 但共享相同数据域 \(D\) 的操作。
切片操作 \(A[\text{slice}]\) 返回一个视图,满足: \[ P' = P + \text{offset}(\text{slice}) \] \[ D' = \{A[\mathbf{i}] \mid \mathbf{i} \in \text{slice}\} \]
要真正理解 ndarray 为何如此高效,我们必须超越其 Python 接口,深入其内部。NumPy ndarray 提供了一种将一块同质类型的数据(连续或跨步的内存块)解释为多维数组对象的方式。
ndarray 之所以如此灵活,部分原因在于每个数组对象都是数据块上的一个跨步视图 (strided view)。例如,像 arr[::2, ::-1] 这样的数组视图不会复制任何数据。原因是 ndarray 不仅仅是一块内存;它还包含了跨步信息,使数组能够以不同的步长在内存中移动。更精确地说,ndarray 内部由以下部分组成:
一个指向数据的指针 ——即指向内存(RAM)或内存映射文件中的数据块。
数据类型 或 dtype,描述数组中固定大小的值单元。
一个表示数组形状 的元组。
一个跨度 (strides)元组——整数元组,指示为了沿某个维度前进一个元素需要“跨过”的字节数。
import matplotlib.pyplot as plt
import matplotlib.patches as patches
fig, ax = plt.subplots(figsize= (10 , 6 ))
ax.set_xlim(0 , 10 )
ax.set_ylim(0 , 7 )
ax.axis('off' )
# Main ndarray object
ax.add_patch(patches.FancyBboxPatch((1 , 4.5 ), 8 , 2 , boxstyle= 'round,pad=0.1' , fc= 'lightblue' , ec= 'black' ))
ax.text(5 , 5.5 , 'NumPy ndarray Object' , ha= 'center' , va= 'center' , fontsize= 16 , fontweight= 'bold' )
# Data Pointer
ax.add_patch(patches.Rectangle((1 , 0.5 ), 2.5 , 2.5 , fc= 'lightgreen' , ec= 'black' ))
ax.text(2.25 , 1.75 , 'Data Pointer \n (to memory block)' , ha= 'center' , va= 'center' , fontsize= 12 )
ax.add_patch(patches.Arrow(5 , 4.5 , 0 , - 1.2 , width= 0.3 , color= 'gray' ))
# Dtype
ax.add_patch(patches.Rectangle((4 , 2.5 ), 2 , 1 , fc= 'lightcoral' , ec= 'black' ))
ax.text(5 , 3 , 'dtype: float64' , ha= 'center' , va= 'center' , fontsize= 12 )
ax.add_patch(patches.Arrow(3.5 , 4.5 , 1 , - 0.8 , width= 0.2 , color= 'gray' ))
# Shape
ax.add_patch(patches.Rectangle((7 , 2.5 ), 2 , 1 , fc= 'lightgoldenrodyellow' , ec= 'black' ))
ax.text(8 , 3 , 'shape: (10, 5)' , ha= 'center' , va= 'center' , fontsize= 12 )
ax.add_patch(patches.Arrow(6.5 , 4.5 , 1 , - 0.8 , width= 0.2 , color= 'gray' ))
# Strides
ax.add_patch(patches.Rectangle((7 , 0.5 ), 2 , 1 , fc= 'plum' , ec= 'black' ))
ax.text(8 , 1 , 'strides: (40, 8)' , ha= 'center' , va= 'center' , fontsize= 12 )
ax.add_patch(patches.Arrow(5 , 4.5 , 2.5 , - 2.8 , width= 0.2 , color= 'gray' ))
ax.set_title('ndarray 内部结构示意图' , fontsize= 18 , pad= 20 )
plt.show()
例如,一个 10 × 5 的数组其形状为 (10, 5)。
一个典型的 C 顺序(行主序)的 3 × 4 × 5 的 float64(8字节)数组,其跨度为 (160, 40, 8)。这个信息对性能至关重要,因为特定轴上的跨度越大,意味着沿着该轴进行计算的成本越高。
np.ones((3 , 4 , 5 ), dtype= np.float64).strides
虽然大多数用户不会直接与跨度(strides)交互,但它们是“零拷贝”视图机制的基础。跨度甚至可以是负数,这使得数组能够“向后”移动通过内存,例如在 obj[::-1] 这样的切片中。
创建 ndarray
为了让您体验 NumPy 如何实现批量计算,我们首先导入 NumPy 并创建一个小数组。我们将遵循标准约定 import numpy as np。
import numpy as np
# 从一个列表的列表创建一个二维数组(模拟波动率数据)
volatility_data = np.array([[1.5 , - 0.1 , 3 ], [0 , - 3 , 6.5 ]])
volatility_data
array([[ 1.5, -0.1, 3. ],
[ 0. , -3. , 6.5]])
现在,我们可以直接在这个 data 对象上执行数学运算:
# 逐元素乘法
volatility_data * 10
array([[ 15., -1., 30.],
[ 0., -30., 65.]])
# 逐元素加法
volatility_data + volatility_data
array([[ 3. , -0.2, 6. ],
[ 0. , -6. , 13. ]])
在第一个例子中,所有元素都乘以 10。在第二个例子中,数组中每个“单元格”的对应值被相加。
ndarray 是一个通用的多维同质数据容器;也就是说,所有元素必须是相同类型。每个数组都有一个 shape(一个表示各维度大小的元组)和一个 dtype(一个描述数组数据类型的对象):
# 获取数组的形状
volatility_data.shape
# 获取数组的数据类型
volatility_data.dtype
创建数组最简单的方法是使用 array 函数。它接受任何序列型对象(包括其他数组),并生成一个包含传入数据的新 NumPy 数组。
# 从列表创建(模拟收益率)
returns_data = [6 , 7.5 , 8 , 0 , 1 ]
returns_arr = np.array(returns_data)
returns_arr
array([6. , 7.5, 8. , 0. , 1. ])
嵌套序列,比如一个等长列表组成的列表,将被转换成一个多维数组:
portfolio_matrix = [[1 , 2 , 3 , 4 ], [5 , 6 , 7 , 8 ]]
portfolio_arr = np.array(portfolio_matrix)
portfolio_arr
array([[1, 2, 3, 4],
[5, 6, 7, 8]])
由于 portfolio_matrix 是一个列表的列表,NumPy 数组 portfolio_arr 有两个维度。我们可以通过检查 ndim 和 shape 属性来确认这一点:
除了 np.array,还有许多其他函数可以创建新数组。np.zeros 和 np.ones 分别创建全为 0 或全为 1 的数组。np.empty 创建一个数组,但不将其值初始化为任何特定值。要用这些方法创建更高维度的数组,只需传入一个元组作为形状参数。
# 创建一个长度为10的全零一维数组
np.zeros(10 )
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
# 创建一个3x6的全零数组
np.zeros((3 , 6 ))
array([[0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0.]])
# 创建一个未初始化的2x3x2数组
np.empty((2 , 3 , 2 ))
array([[[0., 0.],
[1., 0.],
[1., 1.]],
[[0., 1.],
[0., 0.],
[0., 0.]]])
关于 np.empty 的重要警告
假设 numpy.empty 会返回一个全零数组是不安全的。此函数返回的是未初始化的内存,因此可能包含任意的“垃圾”值。在金融建模中,例如初始化投资组合权重矩阵时,你应当只在确定会立即用真实数据覆盖该数组的情况下使用此方法,以避免计算中引入随机不可控的数值误差。
np.arange 是内置 Python range 函数的数组版本:
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
表 3.1 列出了一些标准的数组创建函数。
ndarray 的数据类型
数据类型 或 dtype 是一个特殊的对象,它包含了 ndarray 需要将一块内存解释为特定类型数据的信息(或元数据)。除非显式指定,numpy.array 会尝试推断一个合适的数据类型。
prices_float = np.array([6 , 7.5 , 8 , 0 , 1 ])
prices_int = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ]])
print (f'prices_float 的 dtype: { prices_float. dtype} ' )
print (f'prices_int 的 dtype: { prices_int. dtype} ' )
prices_float 的 dtype: float64
prices_int 的 dtype: int64
因为 arr1 包含浮点数,其 dtype 被推断为 float64。arr2 只包含整数,被推断为 int64(在64位系统上)。
您可以在创建时显式指定 dtype:
prices_float = np.array([1 , 2 , 3 ], dtype= np.float64)
prices_int = np.array([1 , 2 , 3 ], dtype= np.int32)
print (f'prices_float 的 dtype: { prices_float. dtype} ' )
print (f'prices_int 的 dtype: { prices_int. dtype} ' )
prices_float 的 dtype: float64
prices_int 的 dtype: int32
数据类型是 NumPy 与来自其他系统的数据交互灵活性的源泉。它们提供了到磁盘或内存底层表示的直接映射。数值型 dtype 的命名方式是类型名(int、float、complex)后跟一个表示每个元素位数的数字。一个标准的双精度浮点值(Python 的 float 对象所使用的)占用 8 字节或 64 位,因此得名 float64。表 3.2 列出了主要的数据类型。
有符号与无符号整数的财务数据处理细节
在经济和金融数据处理中,负数经常出现(例如季度亏损、负利率或收益率回撤)。因此,必须清楚 signed(有符号)和 unsigned(无符号)整数类型的区别。
signed 整数 : 可以表示正数和负数(如 int8 范围为 -128 到 127)。
unsigned 整数 : 只能表示非负数(如 uint8 范围为 0 到 255)。
在处理会计分录或盈亏数据时,错误地使用 unsigned 类型可能导致负数被错误地解释为一个非常大的正数,从而引发严重的财务计算逻辑泄露。
您可以使用 astype 方法显式地将一个数组从一个 dtype 转换或强制转换 (cast)为另一个。
int_prices = np.array([1 , 2 , 3 , 4 , 5 ])
print (f'原始 dtype: { int_prices. dtype} ' )
float_prices = int_prices.astype(np.float64)
print (f'新的 dtype: { float_prices. dtype} ' )
print (f'新的数组: { float_prices} ' )
原始 dtype: int64
新的 dtype: float64
新的数组: [1. 2. 3. 4. 5.]
如果您将浮点数转换为整数 dtype,小数部分将被截断:
arr = np.array([3.7 , - 1.2 , - 2.6 , 0.5 , 12.9 , 10.1 ])
arr.astype(np.int32)
array([ 3, -1, -2, 0, 12, 10], dtype=int32)
如果您有一个由表示数字的字符串组成的数组,您可以使用 astype 将它们转换为数值形式:
numeric_strings = np.array(['1.25' , '-9.6' , '42' ], dtype= np.bytes_)
numeric_strings.astype(float )
array([ 1.25, -9.6 , 42. ])
astype 总是创建一个新数组
调用 astype 总是会创建一个全新的数据副本,即使新旧 dtype 完全一致。在处理由 financial_statement.parquet 转换而来的数千万行财务数据时,应注意内存峰值,避免因不必要的转换导致内存溢出。
NumPy 数据类型层次结构
您可能偶尔需要检查一个数组的 dtype 是否属于某个通用类别,比如整数或浮点数。数据类型有诸如 np.integer 和 np.floating 这样的超类,可以与 np.issubdtype 函数一起使用:
ints = np.ones(10 , dtype= np.uint16)
floats = np.ones(10 , dtype= np.float32)
print (f'ints.dtype 是 np.integer 的子类型吗? { np. issubdtype(ints.dtype, np.integer)} ' )
print (f'floats.dtype 是 np.floating 的子类型吗? { np. issubdtype(floats.dtype, np.floating)} ' )
ints.dtype 是 np.integer 的子类型吗? True
floats.dtype 是 np.floating 的子类型吗? True
您可以通过调用特定数据类型的 mro 方法来查看它的所有父类:
[numpy.float64,
numpy.floating,
numpy.inexact,
numpy.number,
numpy.generic,
float,
object]
import matplotlib.pyplot as plt
def plot_hierarchy(ax, hierarchy, x, y, dx, dy):
for key, value in hierarchy.items():
ax.text(x, y, key, ha= 'center' , va= 'center' , bbox= dict (boxstyle= 'round,pad=0.5' , fc= 'skyblue' ))
if isinstance (value, dict ):
child_keys = list (value.keys())
num_children = len (child_keys)
child_x_start = x - (num_children - 1 ) * dx / 2
for i, child_key in enumerate (child_keys):
child_x = child_x_start + i * dx
child_y = y - dy
ax.plot([x, child_x], [y - 0.3 , child_y + 0.3 ], 'k-' )
plot_hierarchy(ax, {child_key: value[child_key]}, child_x, child_y, dx / num_children, dy)
elif isinstance (value, list ):
num_children = len (value)
child_x_start = x - (num_children - 1 ) * dx / 2
for i, child_item in enumerate (value):
child_x = child_x_start + i * dx
child_y = y - dy
ax.plot([x, child_x], [y - 0.3 , child_y + 0.3 ], 'k-' )
ax.text(child_x, child_y, child_item, ha= 'center' , va= 'center' , bbox= dict (boxstyle= 'round,pad=0.3' , fc= 'lightgreen' ))
hierarchy = {
'np.generic' : {
'np.number' : {
'np.integer' : {
'np.signedinteger' : ['int8' , 'int16' , 'int32' , 'int64' ],
'np.unsignedinteger' : ['uint8' , 'uint16' , 'uint32' , 'uint64' ]
},
'np.floating' : ['float16' , 'float32' , 'float64' ],
'np.complexfloating' : ['complex64' , 'complex128' ]
},
'np.bool_' : {},
'np.character' : {
'np.bytes_' : [],
'np.unicode_' : []
}
}
}
fig, ax = plt.subplots(figsize= (16 , 10 ))
ax.axis('off' )
plot_hierarchy(ax, hierarchy, 0.5 , 1.0 , 0.4 , 0.15 )
ax.set_title('NumPy Data Type Hierarchy' , fontsize= 20 )
plt.show()
NumPy 数组的运算
数组的重要性在于,它们使您能够表达对数据的批量操作,而无需编写任何 for 循环。NumPy 用户称之为向量化 (vectorization)。任何在等大小数组之间的算术运算都将应用该运算于逐个元素。
# 模拟资产收益率
asset_returns = np.array([[1. , 2. , 3. ], [4. , 5. , 6. ]])
# 逐元素相乘
asset_returns * asset_returns
array([[ 1., 4., 9.],
[16., 25., 36.]])
# 逐元素相减
asset_returns - asset_returns
array([[0., 0., 0.],
[0., 0., 0.]])
与标量的算术运算会将标量参数传播到数组的每个元素。这是广播 (broadcasting)的一种简单形式。
核心概念:广播 (Broadcasting)
广播的数学形式化定义
给定两个数组 \(A\) 和 \(B\) ,其形状分别为 \(S_A \in \mathbb{N}^{n_A}\) 和 \(S_B \in \mathbb{N}^{n_B}\) ,广播是寻找最小公共形状 \(S_C\) 的过程:
\[ S_C = \max(S_A, S_B) \]
其中 \(\max\) 操作定义为逐元素的最大值(维度对齐时),并满足广播三定律:
维度对齐 :从尾部(右侧)开始对齐维度。对于缺失维度,在左侧补 1。
兼容性检查 :两个数组的每个维度要么相等,要么其中一个为 1。
广播描述了 NumPy 在算术运算期间如何处理不同形状的数组。例如,在收益率归一化计算 returns - returns.mean(axis=0) 中,均值向量会被广播到原始矩阵的每一行。这是 NumPy 向量化运算兼顾效率和语义简洁的核心。
array([[1. , 0.5 , 0.33333333],
[0.25 , 0.2 , 0.16666667]])
# 标量指数运算
asset_returns ** 2
array([[ 1., 4., 9.],
[16., 25., 36.]])
同样大小数组之间的比较也会产生布尔数组:
benchmark_returns = np.array([[0. , 4. , 1. ], [7. , 2. , 12. ]])
benchmark_returns > asset_returns
array([[False, True, False],
[ True, False, True]])
基本索引和切片
NumPy 数组索引是一个深度话题,有很多方法可以选择数据的子集。一维数组很简单;它们的操作类似于 Python 列表。
# 模拟日成交量
daily_volume = np.arange(10 )
daily_volume
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
# 获取索引为 5 的元素
daily_volume[5 ]
# 获取从索引 5 到 8 (不包括) 的切片
daily_volume[5 :8 ]
如果你将一个标量值赋给一个切片,比如 arr[5:8] = 12,这个值会被传播(或广播 )到整个选区。
# 广播赋值
daily_volume[5 :8 ] = 12
daily_volume
array([ 0, 1, 2, 3, 4, 12, 12, 12, 8, 9])
核心规则:视图(Views) vs. 副本(Copies)
与 Python 内置列表不同,NumPy 中的数组切片(Slicing)返回的是底层数据的视图 。
优势 : 极大优化内存消耗。处理 GB 级的日内分笔行情时,切片操作几乎瞬间完成且不占用额外内存。
风险 : 对视图的修改会直接“回溯”原数组数据。
解决方案 : 若需独立处理子集,必须显式调用 .copy()(例如 latest_prices = all_prices[:100].copy())。
为了演示这一点,我们首先创建 arr 的一个切片:
# 创建切片视图
volume_slice = daily_volume[5 :8 ]
volume_slice
现在,当我们改变 arr_slice 中的值时,这些改动会反映在原始数组 arr 中:
volume_slice[:] = 12345
daily_volume
array([ 0, 1, 2, 3, 4, 12345, 12345, 12345, 8,
9])
“裸”切片 [:] 会赋值给数组中的所有值:
volume_slice[:] = 64
daily_volume
array([ 0, 1, 2, 3, 4, 64, 64, 64, 8, 9])
对于更高维度的数组,你有更多的选择。在一个二维数组中,每个索引处的元素是一维数组:
# 模拟价格矩阵 (3x3)
price_matrix = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ], [7 , 8 , 9 ]])
# 访问第三行 (索引 2)
price_matrix[2 ]
单个元素可以通过递归方式访问,但更方便的是传递一个逗号分隔的索引列表:
# 这两种形式是等价的
print (f'递归访问: { price_matrix[0 ][2 ]} ' )
print (f'元组访问: { price_matrix[0 , 2 ]} ' )
图 3.3 展示了在一个二维数组上的索引。将轴 0 想象为“行”,轴 1 想象为“列”,会很有帮助。
import matplotlib.pyplot as plt
def plot_indexing_diagram():
fig, ax = plt.subplots(figsize= (4 , 4 ))
ax.set_xlim(- 0.5 , 2.5 )
ax.set_ylim(- 0.5 , 2.5 )
ax.invert_yaxis()
for i in range (3 ):
for j in range (3 ):
ax.add_patch(plt.Rectangle((j- 0.5 , i- 0.5 ), 1 , 1 , fill= True , color= 'skyblue' , alpha= 0.6 , ec= 'black' ))
ax.text(j, i, f'( { i} , { j} )' , ha= 'center' , va= 'center' , fontsize= 14 )
ax.set_xticks(range (3 ))
ax.set_yticks(range (3 ))
ax.xaxis.tick_top()
ax.xaxis.set_label_position('top' )
ax.set_xlabel('轴 1 (列)' , fontsize= 12 )
ax.set_ylabel('轴 0 (行)' , fontsize= 12 )
# Highlight one element
ax.add_patch(plt.Rectangle((2 - 0.5 , 0 - 0.5 ), 1 , 1 , fill= True , color= 'tomato' , alpha= 0.8 , ec= 'black' ))
ax.text(1.5 , - 1.0 , "arr2d[0, 2]" , color= 'tomato' , fontsize= 14 , ha= 'center' )
ax.set_title('二维数组索引' , loc= 'center' , pad= 40 , fontsize= 16 )
plt.show()
plot_indexing_diagram()
在多维数组中,如果你省略了后面的索引,返回的对象将是一个较低维度的 ndarray 切片。对于一个 2 × 2 × 3 的数组 arr3d:
# 模拟三维张量 (2x2x3)
tensor_data = np.array([[[1 , 2 , 3 ], [4 , 5 , 6 ]], [[7 , 8 , 9 ], [10 , 11 , 12 ]]])
# 在轴 0 上切片返回一个 2x3 数组
tensor_data[0 ]
array([[1, 2, 3],
[4, 5, 6]])
使用切片进行索引
和一维数组一样,多维数组也可以被切片。
# 选择 price_matrix 的前两行
price_matrix[:2 ]
array([[1, 2, 3],
[4, 5, 6]])
这是沿着轴 0 进行切片。你可以将 price_matrix[:2] 理解为“选择 price_matrix 的前两行”。你可以传递多个切片:
# 选择前两行以及从索引 1 开始的所有列
price_matrix[:2 , 1 :]
当你这样切片时,你总是得到相同维度的数组视图。通过混合使用整数索引和切片,你可以得到较低维度的切片。例如,要选择第二行但只取前两列:
lower_dim_slice = price_matrix[1 , :2 ]
lower_dim_slice
这里,lower_dim_slice 是一维的。 单独一个冒号 : 表示取整个轴:
# 选择所有行中的第一列
price_matrix[:, :1 ]
图 3.4 提供了切片的可视化总结。
import numpy as np
import matplotlib.pyplot as plt
def show_slice(ax, slice_str, highlight):
arr = np.arange(25 ).reshape((5 , 5 ))
ax.imshow(np.zeros_like(arr), cmap= 'Pastel2' , vmin=- 1 , vmax= 1 )
# Highlight the slice
rows, cols = highlight
if isinstance (rows, int ): rows = slice (rows, rows + 1 )
if isinstance (cols, int ): cols = slice (cols, cols + 1 )
highlight_mask = np.zeros_like(arr, dtype= bool )
highlight_mask[rows, cols] = True
ax.imshow(np.where(highlight_mask, 1 , 0 ), cmap= 'Set1' , alpha= 0.5 , vmin=- 1 , vmax= 1 )
for i in range (5 ):
for j in range (5 ):
ax.text(j, i, str (arr[i, j]), ha= 'center' , va= 'center' , color= 'black' )
ax.set_title(f"arr { slice_str} " , fontsize= 14 )
ax.set_xticks([])
ax.set_yticks([])
fig, axes = plt.subplots(2 , 2 , figsize= (8 , 8 ))
fig.suptitle('二维数组切片示例' , fontsize= 20 )
show_slice(axes[0 , 0 ], '[:2, 1:]' , (slice (0 , 2 ), slice (1 , None )))
show_slice(axes[0 , 1 ], '[2]' , (2 , slice (None , None )))
show_slice(axes[1 , 0 ], '[:, :1]' , (slice (None , None ), slice (0 , 1 )))
show_slice(axes[1 , 1 ], '[1, :2]' , (1 , slice (0 , 2 )))
plt.tight_layout(rect= [0 , 0.03 , 1 , 0.95 ])
plt.show()
布尔索引
布尔索引的集合论基础
布尔索引从数学角度是特征函数 (Characteristic Function)的应用。
给定: - 源集合 \(A = \{a_1, a_2, ..., a_n\}\) - 谓词 \(P: A \to \{\text{True}, \text{False}\}\)
过滤 定义为: \[ S_P = \{a \in A \mid P(a) = \text{True}\} \]
在NumPy中,布尔数组 \(\mathbf{b} \in \{\text{True}, \text{False}\}^n\) 本质上是一个特征函数向量: \[ \mathbf{b}(i) = \begin{cases}
1 & \text{if } A[i] \text{ satisfies } P \\
0 & \text{otherwise}
\end{cases} \]
布尔运算的集合对应 :
与 (AND)
&
\(A \cap B\)
交集
或 (OR)
\|
\(A \cup B\)
并集
非 (NOT)
~
\(A^c\)
补集
异或 (XOR)
^
\((A \cup B) \setminus (A \cap B)\)
对称差
让我们考虑一个例子,我们有一个数组中的数据,以及一个包含重复名字的数组。
tickers = np.array(['AAPL' , 'MSFT' , 'GOOG' , 'AAPL' , 'GOOG' , 'MSFT' , 'MSFT' ])
market_cap_data = np.random.standard_normal((7 , 4 )) # 模拟市值数据(标准化后)
假设每个名字对应 market_cap_data 数组中的一行,我们想选择所有名字为 ‘AAPL’ 的行。像算术运算一样,比较也是向量化的。将 tickers 与字符串 ‘AAPL’ 比较会产生一个布尔数组:
array([ True, False, False, True, False, False, False])
这个布尔数组可以在索引 market_cap_data 数组时传入:
market_cap_data[tickers == 'AAPL' ]
array([[ 1.7213711 , -0.96399913, -0.07747488, 2.18871029],
[ 1.0912576 , 0.47151237, -0.38037469, -0.61363623]])
布尔数组的长度必须与其索引的数组轴的长度相同。你可以将布尔数组与切片或整数混合使用:
# 选择名为 'AAPL' 的行,并显示从第 2 列开始的列
market_cap_data[tickers == 'AAPL' , 2 :]
array([[-0.07747488, 2.18871029],
[-0.38037469, -0.61363623]])
要选择除了 ‘AAPL’ 以外的所有内容,你可以使用 != 或者用 ~ 对条件取反:
market_cap_data[~ (tickers == 'AAPL' )]
array([[ 0.90275117, 1.7680143 , -0.01123834, 0.46155821],
[ 1.00740312, -1.53034078, -1.11947321, -0.17470618],
[-2.63545419, 0.3224616 , 0.96953757, 1.5879225 ],
[ 2.11260971, 0.17585338, -0.69777529, -0.26580363],
[-0.75977764, 1.36271381, 0.46448028, 0.30489181]])
要组合多个布尔条件,使用布尔算术运算符 & (与) 和 | (或)。Python 关键字 and 和 or 不适用于布尔数组。
mask = (tickers == 'AAPL' ) | (tickers == 'MSFT' )
market_cap_data[mask]
array([[ 1.7213711 , -0.96399913, -0.07747488, 2.18871029],
[ 0.90275117, 1.7680143 , -0.01123834, 0.46155821],
[ 1.0912576 , 0.47151237, -0.38037469, -0.61363623],
[ 2.11260971, 0.17585338, -0.69777529, -0.26580363],
[-0.75977764, 1.36271381, 0.46448028, 0.30489181]])
使用布尔数组设置值时,会将右侧的值或多个值赋到布尔数组为 True 的位置。
# 将 market_cap_data 中的所有负值设置为 0
market_cap_data[market_cap_data < 0 ] = 0
market_cap_data
array([[1.7213711 , 0. , 0. , 2.18871029],
[0.90275117, 1.7680143 , 0. , 0.46155821],
[1.00740312, 0. , 0. , 0. ],
[1.0912576 , 0.47151237, 0. , 0. ],
[0. , 0.3224616 , 0.96953757, 1.5879225 ],
[2.11260971, 0.17585338, 0. , 0. ],
[0. , 1.36271381, 0.46448028, 0.30489181]])
布尔索引必创副本
通过布尔掩码(Boolean Mask)选择数据时,NumPy 总是 会创建数据的独立副本。这与切片视图机制截然不同。在进行复杂的数据过滤和清洗时,务必注意这种动态差异对程序总内存占用的影响。
花式索引 (Fancy Indexing)
花式索引的数学表示
花式索引从数学角度是索引映射 (Indexing Map)的应用。
给定: - 源数组 \(A \in \mathbb{R}^n\) - 索引集合 \(I = \{i_1, i_2, ..., i_k\} \subseteq \{0, 1, ..., n-1\}\)
索引映射 定义为: \[ \text{fancy}(A, I) = (A[i_1], A[i_2], ..., A[i_k]) \in \mathbb{R}^k \]
这是从 \(\mathbb{R}^n\) 到 \(\mathbb{R}^k\) 的线性映射。
多维花式索引
对于多维数组 \(A \in \mathbb{R}^{n_0 \times n_1 \times \cdots \times n_{d-1}}\) 和索引集合 \(I_0, I_1, ..., I_{d-1}\) :
\[ \text{fancy\_multi}(A, I_0, ..., I_{d-1}) = (A[I_0[0], I_1[0], ..., I_{d-1}[0]], ..., (A[I_0[k-1], I_1[k-1], ..., I_{d-1}[k-1]]) \]
组合索引
花式索引可以与其他索引类型组合。设 \(S\) 为切片对象,\(\mathbf{b}\) 为布尔数组,\(I\) 为整数索引集合:
\[ A[I, S, \mathbf{b}] \]
这种组合提供了极大的灵活性,但需要注意返回的是副本还是视图。
花式索引 是 NumPy 采纳的一个术语,用来描述使用整数数组进行索引。假设我们有一个 8 × 4 的数组:
simulated_factors = np.empty((8 , 4 ))
for i in range (8 ):
simulated_factors[i] = i
simulated_factors
array([[0., 0., 0., 0.],
[1., 1., 1., 1.],
[2., 2., 2., 2.],
[3., 3., 3., 3.],
[4., 4., 4., 4.],
[5., 5., 5., 5.],
[6., 6., 6., 6.],
[7., 7., 7., 7.]])
要以特定顺序选择行的子集,你可以传递一个指定所需顺序的列表或 ndarray 整数:
# 选择第 4, 3, 0, 和 6 行
simulated_factors[[4 , 3 , 0 , 6 ]]
array([[4., 4., 4., 4.],
[3., 3., 3., 3.],
[0., 0., 0., 0.],
[6., 6., 6., 6.]])
使用负数索引会从末尾选择行:
# 选择倒数第 3, 5, 和 7 行
simulated_factors[[- 3 , - 5 , - 7 ]]
array([[5., 5., 5., 5.],
[3., 3., 3., 3.],
[1., 1., 1., 1.]])
传递多个索引数组会做一些稍微不同的事情;它会选择一个一维数组,其元素对应于每个索引元组。
reshaped_factors = np.arange(32 ).reshape((8 , 4 ))
# 选择元素 (1,0), (5,3), (7,1), 和 (2,2)
reshaped_factors[[1 , 5 , 7 , 2 ], [0 , 3 , 1 , 2 ]]
在这种情况下,花式索引的行为是选择元素 (1, 0)、(5, 3)、(7, 1) 和 (2, 2)。当传递的整数数组数量与轴数相同时,花式索引的结果总是一维的。要选择一个矩形区域,你可以使用以下方法:
# 选择第 1, 5, 7, 2 行,并从这些行中选择第 0, 3, 1, 2 列
reshaped_factors[[1 , 5 , 7 , 2 ]][:, [0 , 3 , 1 , 2 ]]
array([[ 4, 7, 5, 6],
[20, 23, 21, 22],
[28, 31, 29, 30],
[ 8, 11, 9, 10]])
数组转置和轴交换
转置是一种特殊的重塑形式,它同样返回底层数据的视图而不复制任何内容。数组有 transpose 方法和特殊的 T 属性:
factor_data = np.arange(15 ).reshape((3 , 5 ))
factor_data.T
array([[ 0, 5, 10],
[ 1, 6, 11],
[ 2, 7, 12],
[ 3, 8, 13],
[ 4, 9, 14]])
在进行矩阵计算时,这非常常见。例如,当使用 np.dot 计算内积 \(\boldsymbol{X'X}\) 时:
# 真实世界数据背景下的例子
# 让我们模拟一个有两个因子的简单回归,所以我们的 factors 矩阵是 n x 2
# 我们将使用随机数据进行演示
rng = np.random.default_rng(seed= 12345 )
factors = rng.standard_normal((100 , 2 )) # 100 个观测值, 2 个因子
cov_matrix_unscaled = np.dot(factors.T, factors)
cov_matrix_unscaled
array([[89.36134837, 11.49065471],
[11.49065471, 93.25390931]])
@ 中缀运算符是进行矩阵乘法的另一种方式:
array([[89.36134837, 11.49065471],
[11.49065471, 93.25390931]])
使用 .T 进行简单转置是轴交换的一个特例。ndarray 有 swapaxes 方法,它接受一对轴号并交换指定的轴来重新排列数据:
tensor_3d = np.arange(16 ).reshape((2 , 2 , 4 ))
tensor_3d.swapaxes(1 , 2 )
array([[[ 0, 4],
[ 1, 5],
[ 2, 6],
[ 3, 7]],
[[ 8, 12],
[ 9, 13],
[10, 14],
[11, 15]]])
swapaxes 同样返回数据的视图而不进行复制。
通用函数:快速的逐元素数组函数
通用函数的数学定义
通用函数(ufunc)从数学角度是元素级映射 (Element-wise Mapping)的形式化。
给定: - 源数组 \(\mathbf{x} \in \mathbb{R}^n\) - 一元函数 \(f: \mathbb{R} \to \mathbb{R}\)
一元ufunc 定义为: \[ \text{ufunc}_f(\mathbf{x})_i = f(x_i), \quad \forall i \in \{0, 1, ..., n-1\} \]
对于二元函数 \(g: \mathbb{R} \times \mathbb{R} \to \mathbb{R}\) 和两个数组 \(\mathbf{x}, \mathbf{y} \in \mathbb{R}^n\) :
二元ufunc 定义为: \[ \text{ufunc}_g(\mathbf{x}, \mathbf{y})_i = g(x_i, y_i), \quad \forall i \in \{0, 1, ..., n-1\} \]
特殊方法 :
reduce(归约) : \[ \text{reduce}(g, \mathbf{x}) = g(g(...g(g(x_0, x_1), x_2), ..., x_{n-1}) \]
accumulate(累积) : \[ \text{accumulate}(g, \mathbf{x})_i = g(x_0, x_1, ..., x_i), \quad \forall i \]
outer(外积) : \[ \text{outer}(g, \mathbf{x}, \mathbf{y})_{i,j} = g(x_i, y_j), \quad \forall i, j \]
通用函数(universal function),或称 ufunc ,是一种对 ndarray 中的数据执行逐元素操作的函数。你可以将它们看作是简单函数的快速向量化包装器,这些函数接受一个或多个标量值并产生一个或多个标量结果。
许多 ufuncs 是简单的逐元素变换,如 np.sqrt 或 np.exp:
values = np.arange(10 )
np.sqrt(values)
array([0. , 1. , 1.41421356, 1.73205081, 2. ,
2.23606798, 2.44948974, 2.64575131, 2.82842712, 3. ])
这些被称为一元 ufuncs。其他的,如 np.add 或 np.maximum,接受两个数组(因此是二元 ufuncs)并返回一个单一数组作为结果:
rng = np.random.default_rng(seed= 12345 )
vec_a = rng.standard_normal(8 )
vec_b = rng.standard_normal(8 )
np.maximum(vec_a, vec_b)
array([ 0.36105811, 1.26372846, 2.34740965, 0.96849691, -0.07534331,
0.90219827, -0.46695317, 0.6488928 ])
表 3.3 和 表 3.4 列出了一些 NumPy 的 ufuncs。
ufunc 的高级用法
NumPy 的每个二元 ufunc 都有特殊的方法来执行特殊的向量化操作。
reduce 接受一个数组,并通过执行一系列二元操作来聚合其值。例如,np.add.reduce 等同于 arr.sum():
values = np.arange(10 )
np.add.reduce (values)
accumulate 与 reduce 相关,但它会产生一个与输入大小相同的数组,其中包含中间的“累积”值,等同于 cumsum:
matrix_data = np.arange(15 ).reshape((3 , 5 ))
np.add.accumulate(matrix_data, axis= 1 )
array([[ 0, 1, 3, 6, 10],
[ 5, 11, 18, 26, 35],
[10, 21, 33, 46, 60]])
outer 对两个数组执行逐对的叉积。输出数组的形状是输入数组形状的拼接。
vec = np.arange(4 )
np.multiply.outer(vec, np.arange(5 ))
array([[ 0, 0, 0, 0, 0],
[ 0, 1, 2, 3, 4],
[ 0, 2, 4, 6, 8],
[ 0, 3, 6, 9, 12]])
reduceat 执行“局部 reduce”或“分组”操作。它接受一个“分箱边界”序列,指示如何分割和聚合值。
values = np.arange(10 )
# 对 values[0:5], values[5:8], 和 values[8:] 求和
np.add.reduceat(values, [0 , 5 , 8 ])
使用数组进行面向数组编程
使用 NumPy 数组,您可以将许多类型的数据处理任务表达为简洁的数组表达式,而这些任务可能需要编写循环。这种用数组表达式替代显式循环的做法被称为向量化 。
将条件逻辑表达为数组操作
numpy.where 函数是三元表达式 x if condition else y 的向量化版本。
bull_return_scenarios = np.array([1.1 , 1.2 , 1.3 , 1.4 , 1.5 ])
bear_return_scenarios = np.array([2.1 , 2.2 , 2.3 , 2.4 , 2.5 ])
bull_market_flag = np.array([True , False , True , True , False ])
假设我们想在 bull_market_flag 为 True 时从 bull_return_scenarios 中取值,否则从 bear_return_scenarios 中取值。 …
trading_signal = np.where(bull_market_flag, bull_return_scenarios, bear_return_scenarios)
trading_signal
array([1.1, 2.2, 1.3, 1.4, 2.5])
在数据分析中,where 的一个典型用途是根据另一个数组生成一个新值的数组。例如,假设你有一个随机生成的数据矩阵,你想将所有正值替换为 2,所有负值替换为 –2。
# 模拟收益率矩阵
rng = np.random.default_rng(seed= 12345 )
returns_matrix = rng.standard_normal((4 , 4 ))
# 生成交易信号:收益率为正时为 1,为负时为 -1
np.where(returns_matrix > 0 , 1 , - 1 )
array([[-1, 1, -1, -1],
[-1, -1, -1, 1],
[ 1, -1, 1, 1],
[-1, 1, -1, -1]])
数学和统计方法
一组计算整个数组或沿轴的数据统计量的数学函数,可以作为数组类的方法来访问。您可以使用聚合(有时称为归约 )如 sum、mean 和 std(标准差),通过调用数组实例方法(例如 arr.sum())或使用顶层 NumPy 函数(例如 np.sum(arr))。
# 计算均值和总和
returns_matrix = rng.standard_normal((5 , 4 ))
print (f'returns_matrix 的均值: { returns_matrix. mean()} ' )
print (f'returns_matrix 的总和: { returns_matrix. sum ()} ' )
returns_matrix 的均值: 0.17933634979615845
returns_matrix 的总和: 3.586726995923169
像 mean 和 sum 这样的函数接受一个可选的 axis 参数,该参数会计算给定轴上的统计量,结果是一个维度减少一的数组:
# 沿着列计算均值 (axis=1)
returns_matrix.mean(axis= 1 )
array([ 0.37675318, 0.07598404, -0.2834985 , 1.48722347, -0.75978045])
# 沿着行计算总和 (axis=0)
returns_matrix.sum (axis= 0 )
array([ 2.71870476, 0.30188842, -0.49975489, 1.0658887 ])
其他方法如 cumsum 和 cumprod 不进行聚合,而是产生一个包含中间结果的数组:
matrix_data = np.array([[0 , 1 , 2 ], [3 , 4 , 5 ], [6 , 7 , 8 ]])
# 沿列的累积和
matrix_data.cumsum(axis= 1 )
array([[ 0, 1, 3],
[ 3, 7, 12],
[ 6, 13, 21]])
表 3.5 列出了一些基本的数组统计方法。
布尔数组的方法
在前面的方法中,布尔值被强制转换为 1 (True) 和 0 (False)。因此,sum 经常被用作计算布尔数组中 True 值数量的手段:
pnl_vector = rng.standard_normal(100 )
(pnl_vector > 0 ).sum () # 正值的数量
另外两个方法 any 和 all 对布尔数组很有用。any 测试数组中是否有一个或多个值为 True,而 all 检查是否每个值都为 True:
# 模拟每日盈亏状态(已盈利为 True)
profitable_days_mask = np.array([False , False , True , False ])
print (f'是否有盈余天? { profitable_days_mask. any ()} ' )
print (f'是否所有天都盈利? { profitable_days_mask. all ()} ' )
是否有盈余天? True
是否所有天都盈利? False
排序
和 Python 内置的 list 类型一样,NumPy 数组可以使用 sort 方法进行原地排序:
unsorted_returns = rng.standard_normal(6 )
unsorted_returns.sort()
unsorted_returns
array([-0.7758961 , -0.62361213, 0.28208603, 0.41071644, 0.84122103,
1.12182226])
你可以通过将轴号传递给 sort,对多维数组中每个一维部分的值沿一个轴进行原地排序:
matrix_data = rng.standard_normal((5 , 3 ))
matrix_data.sort(axis= 1 ) # 对每一行进行排序
matrix_data
array([[-2.7224161 , -0.6733048 , 1.24622153],
[-0.0292946 , 0.17534089, 0.79020803],
[-1.41951426, -1.35996632, 0.22341156],
[-2.17088985, 0.62848817, 1.76177943],
[-0.86924667, 0.60119653, 0.95075786]])
顶层方法 numpy.sort 返回一个排序后的数组副本 ,而不是在原地修改数组。
唯一值和其他集合逻辑
NumPy 对一维 ndarray 有一些基本的集合操作。一个常用的是 np.unique,它返回数组中排序后的唯一值:
sectors = np.array(['Tech' , 'Energy' , 'Finance' , 'Tech' , 'Energy' , 'Finance' , 'Finance' ])
np.unique(sectors)
array(['Energy', 'Finance', 'Tech'], dtype='<U7')
另一个函数 np.in1d,测试一个数组中的值是否在另一个数组中,返回一个布尔数组:
stock_ids = np.array([600000 , 0 , 0 , 600036 , 601398 , 5 , 600000 ])
np.in1d(stock_ids, [600036 , 601398 , 600000 ])
array([ True, False, False, True, True, False, True])
使用数组进行文件输入和输出
numpy.save 和 numpy.load 是在磁盘上高效保存和加载数组数据的两个主力函数。默认情况下,数组以未压缩的原始二进制格式保存,文件扩展名为 .npy:
saved_data = np.arange(10 )
np.save('some_array' , saved_data)
磁盘上的数组随后可以用 np.load 加载:
np.load('some_array.npy' )
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
你可以使用 numpy.savez 将多个数组保存在一个未压缩的归档文件中,并将数组作为关键字参数传递:
np.savez('array_archive.npz' , a= saved_data, b= saved_data)
加载 .npz 文件时,你会得到一个类似字典的对象,它会延迟加载各个数组:
arch = np.load('array_archive.npz' )
arch['b' ]
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
如果你的数据压缩效果好,你可能希望使用 numpy.savez_compressed。
示例:随机游走
随机游走的模拟为利用数组操作提供了一个说明性的应用。让我们首先考虑一个从 0 开始的简单随机游走,步长为 1 和 –1,且以相等的概率发生。
纯 Python 的实现可能如下所示:
import random
position = 0
walk = [position]
nsteps = 1000
for i in range (nsteps):
step = 1 if random.randint(0 , 1 ) else - 1
position += step
walk.append(position)
图 3.5 展示了前 100 步的图。
import matplotlib.pyplot as plt
plt.plot(walk[:100 ])
plt.title('随机游走的前100步' )
plt.xlabel('步数' )
plt.ylabel('位置' )
plt.grid(True )
plt.show()
这个游走是随机步长的累积和。我们可以使用 NumPy 更高效地计算它。
nsteps = 1000
rng = np.random.default_rng(seed= 12345 )
draws = rng.integers(0 , 2 , size= nsteps)
steps = np.where(draws == 0 , 1 , - 1 )
price_path = steps.cumsum()
由此,我们可以提取统计数据,比如游走轨迹中的最小值和最大值:
print (f'最小值: { price_path. min ()} ' )
print (f'最大值: { price_path. max ()} ' )
一个更复杂的统计量是首次穿越时间 ,即随机游走达到特定值的步数。这里我们可能想知道游走需要多长时间才能距离原点至少 10 步。(np.abs(price_path) >= 10).argmax() 给了我们这个条件首次为真的索引。
(np.abs (price_path) >= 10 ).argmax()
一次模拟多次随机游走
为了模拟多次随机游走,比如 5000 次,我们可以一次性生成所有的随机游走。rng.integers 函数可以生成一个二维的抽样数组,然后我们可以计算每一行的累积和(axis=1)。
nwalks = 5000
nsteps = 1000
draws = rng.integers(0 , 2 , size= (nwalks, nsteps)) # 0 或 1
steps = np.where(draws > 0 , 1 , - 1 )
simulated_paths = steps.cumsum(axis= 1 )
simulated_paths
array([[ 1, 2, 3, ..., 22, 23, 22],
[ 1, 0, -1, ..., -50, -49, -48],
[ 1, 2, 3, ..., 50, 49, 48],
...,
[ -1, -2, -1, ..., -10, -9, -10],
[ -1, -2, -3, ..., 8, 9, 8],
[ -1, 0, 1, ..., -4, -3, -2]], shape=(5000, 1000))
现在,我们可以计算所有游走中获得的最大值和最小值:
print (f'所有游走中的最大值: { simulated_paths. max ()} ' )
print (f'所有游走中的最小值: { simulated_paths. min ()} ' )
所有游走中的最大值: 114
所有游走中的最小值: -120
让我们计算达到 30 或 –30 的最小穿越时间。这有点棘手,因为不是所有的游走都会达到 30。我们可以使用 any 方法来检查这一点:
hits30 = (np.abs (simulated_paths) >= 30 ).any (axis= 1 )
print (f'达到 30 或 -30 的游走次数: { hits30. sum ()} ' )
我们可以使用这个布尔数组来选择那些实际穿越了绝对 30 水平的游走的行,并在 axis=1 上调用 argmax 来获得穿越时间:
crossing_times = (np.abs (simulated_paths[hits30]) >= 30 ).argmax(axis= 1 )
print (f'平均最小穿越时间: { crossing_times. mean()} ' )
平均最小穿越时间: 500.5699558173785
一个真实世界的“随机游走”:标普500指数日收益率
虽然随机游走模型是一个理论构建,但它构成了金融学中有效市场假说的基础。该假说认为,资产价格反映了所有可用信息,因此价格变化应该遵循“随机游走”,是不可预测的。让我们通过绘制标普500指数的累积对数收益率,来看看这如何应用于真实数据。
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
# 从本地数据获取上证指数数据
# 注意:在实际教学环境中,路径应根据操作系统自动调整
import platform
if platform.system() == 'Windows' :
index_path = 'C:/qiufei/data/index/indexes.parquet'
else :
index_path = '/home/ubuntu/r2_data_mount/qiufei/data/index/indexes.parquet'
index_data = pd.read_parquet(index_path)
sse_index = index_data[index_data['symbol' ] == '000001.XSHG' ].copy()
sse_index['datetime' ] = pd.to_datetime(sse_index['datetime' ].astype(str ), format = '%Y%m %d %H%M%S' )
sse_index.set_index('datetime' , inplace= True )
sse_index = sse_index.sort_index()
# 选取时间范围
sse_index = sse_index.loc['2014-01-01' :'2024-01-01' ]
# 计算对数收益率
log_returns = np.log(sse_index['close' ] / sse_index['close' ].shift(1 )).dropna()
# 通过累积和创建“随机游走”
random_walk_sse = log_returns.cumsum()
# 绘图
plt.rcParams['font.sans-serif' ] = ['Source Han Serif SC' ]
plt.rcParams['axes.unicode_minus' ] = False
plt.figure(figsize= (12 , 6 ))
plt.plot(random_walk_sse)
plt.title('作为随机游走的上证指数累积对数收益率' )
plt.xlabel('日期' )
plt.ylabel('累积对数收益率' )
plt.grid(True )
plt.show()
正如您在 图 3.6 中所看到的,上证指数累积收益率的路径与我们模拟的随机游走相似。这展示了在 NumPy 中开发的强大、抽象的工具如何能被直接应用于分析和理解复杂的经济和金融现象。
习题
基础习题
习题 4.1: 数组创建与基本运算
创建两个数组:
import numpy as np
prices_a = np.array([35.5 , 36.2 , 35.9 , 36.8 , 37.1 ]) # 宁波港5日收盘价
prices_b = np.array([12.3 , 12.5 , 12.4 , 12.8 , 12.7 ]) # 宁波银行5日收盘价
计算两个数组的价格之和
计算两个数组价格的均值和标准差
找出两个数组的最大值和最小值
对数组进行排序
计算价格的相关系数
解答:
import numpy as np
prices_a = np.array([35.5 , 36.2 , 35.9 , 36.8 , 37.1 ])
prices_b = np.array([12.3 , 12.5 , 12.4 , 12.8 , 12.7 ])
# (a) 价格之和
total = prices_a + prices_b
print ('(a) 价格之和:' )
print (total)
# (b) 均值和标准差
mean_a = np.mean(prices_a)
std_a = np.std(prices_a)
mean_b = np.mean(prices_b)
std_b = np.std(prices_b)
print (f' \n (b) 统计量:' )
print (f'宁波港: 均值= { mean_a:.2f} , 标准差= { std_a:.2f} ' )
print (f'宁波银行: 均值= { mean_b:.2f} , 标准差= { std_b:.2f} ' )
# (c) 最大值和最小值
print (f' \n (c) 极值:' )
print (f'宁波港: 最大= { np. max (prices_a):.2f} , 最小= { np. min (prices_a):.2f} ' )
print (f'宁波银行: 最大= { np. max (prices_b):.2f} , 最小= { np. min (prices_b):.2f} ' )
# (d) 排序
print (f' \n (d) 排序后:' )
print (f'宁波港: { np. sort(prices_a)} ' )
print (f'宁波银行: { np. sort(prices_b)} ' )
# (e) 相关系数
corr = np.corrcoef(prices_a, prices_b)[0 , 1 ]
print (f' \n (e) 相关系数: { corr:.4f} ' )
(a) 价格之和:
[47.8 48.7 48.3 49.6 49.8]
(b) 统计量:
宁波港: 均值=36.30, 标准差=0.58
宁波银行: 均值=12.54, 标准差=0.19
(c) 极值:
宁波港: 最大=37.10, 最小=35.50
宁波银行: 最大=12.80, 最小=12.30
(d) 排序后:
宁波港: [35.5 35.9 36.2 36.8 37.1]
宁波银行: [12.3 12.4 12.5 12.7 12.8]
(e) 相关系数: 0.9432
习题 4.2: 数组索引与切片
给定一个10x10的矩阵:
np.random.seed(42 )
matrix = np.random.randint(1 , 100 , size= (10 , 10 ))
提取前3行前3列的子矩阵
提取所有行第2、4、6列
提取对角线元素
将矩阵转置
找出每行的最大值及其索引
解答:
import numpy as np
np.random.seed(42 )
matrix = np.random.randint(1 , 100 , size= (10 , 10 ))
# (a) 前3行前3列
sub_matrix = matrix[:3 , :3 ]
print ('(a) 前3行前3列:' )
print (sub_matrix)
# (b) 所有行的第2、4、6列
selected_cols = matrix[:, [1 , 3 , 5 ]]
print (f' \n (b) 第2、4、6列:' )
print (selected_cols)
# (c) 对角线元素
diagonal = np.diag(matrix)
print (f' \n (c) 对角线元素:' )
print (diagonal)
# (d) 转置
transposed = matrix.T
print (f' \n (d) 转置后形状: { transposed. shape} ' )
# (e) 每行最大值及索引
max_values = np.max (matrix, axis= 1 )
max_indices = np.argmax(matrix, axis= 1 )
print (f' \n (e) 每行最大值:' )
for i in range (10 ):
print (f' 行 { i} : 值= { max_values[i]} , 列= { max_indices[i]} ' )
(a) 前3行前3列:
[[52 93 15]
[88 24 3]
[64 60 21]]
(b) 第2、4、6列:
[[93 72 21]
[24 22 2]
[60 33 58]
[42 60 15]
[55 3 7]
[89 14 90]
[71 8 35]
[ 2 54 54]
[34 62 95]
[62 85 82]]
(c) 对角线元素:
[52 24 21 60 51 90 78 63 72 89]
(d) 转置后形状: (10, 10)
(e) 每行最大值:
行0: 值=93, 列=1
行1: 值=88, 列=0
行2: 值=91, 列=9
行3: 值=92, 列=2
行4: 值=73, 列=7
行5: 值=92, 列=9
行6: 值=81, 列=7
行7: 值=93, 列=6
行8: 值=95, 列=5
行9: 值=89, 列=9
习题 4.3: 布尔索引与条件过滤
给定一个包含股票收益率的数组:
returns = np.array([0.02 , - 0.01 , 0.03 , - 0.02 , 0.01 , 0.04 , - 0.03 , 0.02 ])
找出所有正收益
找出收益率大于2%的数据
计算正收益和负收益的个数
将所有负收益替换为0
计算累计收益率
解答:
import numpy as np
returns = np.array([0.02 , - 0.01 , 0.03 , - 0.02 , 0.01 , 0.04 , - 0.03 , 0.02 ])
# (a) 正收益
positive_mask = returns > 0
positive_returns = returns[positive_mask]
print ('(a) 正收益:' )
print (positive_returns)
# (b) 收益率大于2%
high_returns = returns[returns > 0.02 ]
print (f' \n (b) 收益率大于2%:' )
print (high_returns)
# (c) 正负收益个数
n_positive = np.sum (returns > 0 )
n_negative = np.sum (returns < 0 )
print (f' \n (c) 正收益个数: { n_positive} , 负收益个数: { n_negative} ' )
# (d) 负收益替换为0
returns_clean = np.where(returns > 0 , returns, 0 )
print (f' \n (d) 负收益替换为0后:' )
print (returns_clean)
# (e) 累计收益率
cumulative = np.cumprod(1 + returns) - 1
print (f' \n (e) 累计收益率:' )
print (cumulative)
(a) 正收益:
[0.02 0.03 0.01 0.04 0.02]
(b) 收益率大于2%:
[0.03 0.04]
(c) 正收益个数: 5, 负收益个数: 3
(d) 负收益替换为0后:
[0.02 0. 0.03 0. 0.01 0.04 0. 0.02]
(e) 累计收益率:
[0.02 0.0098 0.040094 0.01929212 0.02948504 0.07066444
0.03854451 0.0593154 ]
进阶习题
习题 4.4: 矩阵运算与线性代数
在投资组合理论中,矩阵运算是核心工具。给定:
# 3个资产的协方差矩阵
cov_matrix = np.array([
[0.04 , 0.02 , 0.01 ],
[0.02 , 0.09 , 0.03 ],
[0.01 , 0.03 , 0.16 ]
])
# 等权重组合
weights = np.array([1 / 3 , 1 / 3 , 1 / 3 ])
计算投资组合方差:σ²p = w^T Σ w
计算投资组合标准差
找出协方差矩阵的特征值和特征向量
解释特征值的经济学含义
解答:
import numpy as np
cov_matrix = np.array([
[0.04 , 0.02 , 0.01 ],
[0.02 , 0.09 , 0.03 ],
[0.01 , 0.03 , 0.16 ]
])
weights = np.array([1 / 3 , 1 / 3 , 1 / 3 ])
# (a) 投资组合方差
portfolio_variance = np.dot(weights.T, np.dot(cov_matrix, weights))
print (f'(a) 投资组合方差: { portfolio_variance:.6f} ' )
# (b) 投资组合标准差
portfolio_std = np.sqrt(portfolio_variance)
print (f'(b) 投资组合标准差: { portfolio_std:.6f} ( { portfolio_std* 100 :.2f} %)' )
# (c) 特征值和特征向量
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)
print (f' \n (c) 特征值:' )
for i, ev in enumerate (eigenvalues):
print (f' 特征值 { i+ 1 } : { ev:.6f} ' )
print (f' \n 特征向量:' )
for i, ev in enumerate (eigenvectors.T):
print (f' 特征向量 { i+ 1 } : { ev} ' )
# (d) 经济学含义解释
print (f' \n (d) 经济学含义:' )
print (f'特征值代表主成分(Principal Components)解释的方差量。' )
print (f'最大特征值 { eigenvalues. max ():.6f} 对应第一主成分,' )
print (f'解释了 { eigenvalues. max ()/ eigenvalues. sum ()* 100 :.1f} % 的总方差。' )
print (f'这在风险因子分析中很重要,可以识别主要的风险来源。' )
(a) 投资组合方差: 0.045556
(b) 投资组合标准差: 0.213437 (21.34%)
(c) 特征值:
特征值 1: 0.173136
特征值 2: 0.032982
特征值 3: 0.083882
特征向量:
特征向量 1: [-0.12390393 -0.3630554 -0.92349261]
特征向量 2: [-0.94289213 0.33306872 -0.00443355]
特征向量 3: [-0.30919612 -0.87020458 0.3835906 ]
(d) 经济学含义:
特征值代表主成分(Principal Components)解释的方差量。
最大特征值 0.173136 对应第一主成分,
解释了 59.7% 的总方差。
这在风险因子分析中很重要,可以识别主要的风险来源。
投资组合风险的矩阵代数表示
在马科维茨现代投资组合理论中,投资组合方差的计算是核心:
\[ \sigma_p^2 = \mathbf{w}^T \Sigma \mathbf{w} = \sum_{i=1}^{n}\sum_{j=1}^{n} w_i w_j \sigma_{ij} \]
其中 \(\mathbf{w}\) 是权重向量,\(\Sigma\) 是协方差矩阵。
谱分解的经济解释 : 通过对协方差矩阵进行特征值分解(\(\Sigma = Q \Lambda Q^T\) ),我们可以将总风险分解为独立的风险因子。较大的特征值代表了主要的系统性风险来源(如市场因子)。在前几个主成分(Principal Components)往往能捕捉 A 股市场大部分波动的背景下,这种在线性代数层面的分解对风险对冲至关重要。
习题 4.5: 广播机制的应用
NumPy的广播机制使得不同形状的数组可以进行运算。给定:
# 每日收益率 (5天 x 3个资产)
daily_returns = np.array([
[0.01 , 0.02 , 0.015 ],
[- 0.01 , 0.01 , 0.005 ],
[0.02 , - 0.01 , 0.01 ],
[0.015 , 0.005 , - 0.005 ],
[- 0.005 , 0.01 , 0.02 ]
])
# 3个资产的权重
weights = np.array([0.4 , 0.3 , 0.3 ])
使用广播计算每日投资组合收益率
计算累计收益率
如果每个资产初始投资100000元,计算最终价值
添加一个新的资产(权重为0),使投资组合包含4个资产
解答:
import numpy as np
daily_returns = np.array([
[0.01 , 0.02 , 0.015 ],
[- 0.01 , 0.01 , 0.005 ],
[0.02 , - 0.01 , 0.01 ],
[0.015 , 0.005 , - 0.005 ],
[- 0.005 , 0.01 , 0.02 ]
])
weights = np.array([0.4 , 0.3 , 0.3 ])
# (a) 每日投资组合收益率
portfolio_returns = daily_returns @ weights # 矩阵乘法
print ('(a) 每日投资组合收益率:' )
print (portfolio_returns)
# (b) 累计收益率
cumulative_returns = np.cumprod(1 + portfolio_returns) - 1
print (f' \n (b) 累计收益率:' )
print (cumulative_returns)
# (c) 最终价值
initial_value = np.array([100000 , 100000 , 100000 ])
# 每个资产的价值变化
final_values = initial_value * (1 + cumulative_returns[- 1 ])
total_final = np.sum (final_values * weights)
print (f' \n (c) 最终价值: ¥ { total_final:,.2f} ' )
# (d) 添加第4个资产
weights_4 = np.array([0.3 , 0.25 , 0.25 , 0.2 ]) # 新权重
returns_4 = np.column_stack([daily_returns, np.zeros(5 )]) # 添加零收益
portfolio_returns_4 = returns_4 @ weights_4
print (f' \n (d) 4资产组合每日收益率:' )
print (portfolio_returns_4)
(a) 每日投资组合收益率:
[0.0145 0.0005 0.008 0.006 0.007 ]
(b) 累计收益率:
[0.0145 0.01500725 0.02312731 0.02926607 0.03647093]
(c) 最终价值: ¥103,647.09
(d) 4资产组合每日收益率:
[0.01175 0.00075 0.006 0.0045 0.006 ]
应用习题
习题 4.6: 随机游走与布朗运动
使用NumPy模拟几何布朗运动(GBM),这是股票价格的经典模型:
\[ dS_t = \mu S_t dt + \sigma S_t dW_t \]
其中: - \(S_t\) 是股票价格 - \(\mu\) 是漂移率(期望收益率) - \(\sigma\) 是波动率 - \(dW_t\) 是维纳过程增量
参数设置: - S0 = 100(初始价格) - μ = 0.08(年化收益率8%) - σ = 0.2(年化波动率20%) - T = 1年,252个交易日
解答:
import numpy as np
import matplotlib.pyplot as plt
# 参数设置
S0 = 100 # 初始价格
mu = 0.08 # 年化收益率
sigma = 0.2 # 年化波动率
T = 1.0 # 时间跨度(年)
N = 252 # 交易日数
dt = T / N # 时间步长
# 生成随机游走路径
np.random.seed(42 )
# 标准正态随机数
Z = np.random.standard_normal(N)
# 几何布朗运动的离散化
# ln(S_t) = ln(S_0) + (μ - σ²/2)t + σW_t
# 其中 W_t = √dt * ΣZ_i
t = np.arange(1 , N + 1 ) * dt
W = np.cumsum(np.sqrt(dt) * Z)
log_prices = np.log(S0) + (mu - 0.5 * sigma** 2 ) * t + sigma * W
prices = np.exp(log_prices)
print (f'几何布朗运动模拟:' )
print (f'初始价格: ¥ { S0:.2f} ' )
print (f'最终价格: ¥ { prices[- 1 ]:.2f} ' )
print (f'总收益率: { (prices[- 1 ]/ S0 - 1 )* 100 :+.2f} %' )
# 统计特性
returns = np.diff(prices) / prices[:- 1 ]
print (f' \n 收益率统计:' )
print (f'日均值: { np. mean(returns)* 100 :.4f} %' )
print (f'日标准差: { np. std(returns)* 100 :.4f} %' )
print (f'年化均值: { np. mean(returns)* 252 * 100 :.2f} %' )
print (f'年化波动率: { np. std(returns)* np. sqrt(252 )* 100 :.2f} %' )
# 可视化
plt.rcParams['font.sans-serif' ] = ['SimHei' ]
plt.rcParams['axes.unicode_minus' ] = False
plt.figure(figsize= (12 , 6 ))
plt.plot(prices, label= '模拟价格' )
plt.axhline(y= S0, color= 'r' , linestyle= '--' , label= '初始价格' )
plt.title('几何布朗运动模拟的股价路径' )
plt.xlabel('交易日' )
plt.ylabel('价格' )
plt.legend()
plt.grid(True , alpha= 0.3 )
plt.show()
几何布朗运动模拟:
初始价格: ¥100.00
最终价格: ¥104.92
总收益率: +4.92%
收益率统计:
日均值: 0.0240%
日标准差: 1.2206%
年化均值: 6.04%
年化波动率: 19.38%
几何布朗运动与 Black-Scholes 定价逻辑
几何布朗运动(GBM)是金融数学的基石,其显式解展示了资产价格在连续时间下的演化路径:
\[ S_t = S_0 \exp\left[\left(\mu - \frac{\sigma^2}{2}\right)t + \sigma W_t\right] \]
模型核心假设 : 1. 对数正态性 :收益率服从正态分布,而股价永远为正。 2. 波动率恒定 :假设 \(\sigma\) 在有效期内不变(这也是 Black-Scholes 模型常被批评之处)。
通过 NumPy 的 cumsum 和 exp 函数,我们可以瞬间数万次模拟由于随机项 \(W_t \sim N(0, t)\) 驱动的股价路径,从而为复杂的二元期权或亚洲期权寻找数值定价。
挑战习题
习题 4.7: 实现蒙特卡洛期权定价
使用蒙特卡洛方法模拟欧式看涨期权价格。参数: - S0 = 100(当前股价) - K = 105(行权价) - T = 1年 - r = 0.05(无风险利率) - σ = 0.2(波动率) - 模拟次数:10000次
欧式看涨期权到期收益:\(C_T = \max(S_T - K, 0)\)
期权价格 = \(e^{-rT} E[C_T]\)
解答:
import numpy as np
def monte_carlo_call_option(S0, K, T, r, sigma, n_simulations= 10000 ):
"""
蒙特卡洛欧式看涨期权定价
参数:
S0: 当前股价
K: 行权价
T: 到期时间(年)
r: 无风险利率
sigma: 波动率
n_simulations: 模拟次数
"""
np.random.seed(42 )
# 生成标准正态随机数
Z = np.random.standard_normal(n_simulations)
# 模拟到期日股价(GBM解析解)
ST = S0 * np.exp((r - 0.5 * sigma** 2 ) * T + sigma * np.sqrt(T) * Z)
# 计算期权收益
payoffs = np.maximum(ST - K, 0 )
# 贴现到现值
option_price = np.exp(- r * T) * np.mean(payoffs)
# 计算标准误差
std_error = np.std(np.exp(- r * T) * payoffs) / np.sqrt(n_simulations)
return option_price, std_error
# 参数
S0 = 100
K = 105
T = 1.0
r = 0.05
sigma = 0.2
# 运行蒙特卡洛模拟
price, se = monte_carlo_call_option(S0, K, T, r, sigma, n_simulations= 100000 )
print (f'蒙特卡洛期权定价结果:' )
print (f'期权价格: ¥ { price:.4f} ' )
print (f'标准误差: ¥ { se:.6f} ' )
print (f'95%置信区间: [¥ { price - 1.96 * se:.4f} , ¥ { price + 1.96 * se:.4f} ]' )
# 与Black-Scholes公式比较(解析解)
from scipy.stats import norm
d1 = (np.log(S0/ K) + (r + 0.5 * sigma** 2 )* T) / (sigma* np.sqrt(T))
d2 = d1 - sigma* np.sqrt(T)
bs_price = S0* norm.cdf(d1) - K* np.exp(- r* T)* norm.cdf(d2)
print (f' \n Black-Scholes解析解: ¥ { bs_price:.4f} ' )
print (f'蒙特卡洛误差: ¥ { abs (price - bs_price):.6f} ( { abs (price - bs_price)/ bs_price* 100 :.2f} %)' )
蒙特卡洛期权定价结果:
期权价格: ¥8.0416
标准误差: ¥0.041760
95%置信区间: [¥7.9598, ¥8.1235]
Black-Scholes解析解: ¥8.0214
蒙特卡洛误差: ¥0.020268 (0.25%)
蒙特卡洛模拟的收敛与加速
蒙特卡洛估计的标准误差定义为 \(\text{SE} = \sigma / \sqrt{N}\) 。这意味着要将定价精度提高一个数量级,计算量必须增加 100 倍。
在量化金融实务中,传统的蒙特卡洛法常受到“维度灾难”的挑战。为了在有限的算力下获得更高精度,业界常采用方差缩减(Variance Reduction) 技术,如对偶变量法或准蒙特卡洛(Quasi-Monte Carlo)。NumPy 结合底层 BLAS 库的并行算效,使得在普通个人电脑上进行万次甚至十万次期权路径模拟成为现实。
结论
本章我们系统地学习了 NumPy 的核心组件 ndarray 及其向量化计算范式。从内存布局的底层视角到广播机制的数学形式化,NumPy 展现了其作为高性能金融计算基石的强大能力。虽然本教材后续章节将主要使用 Pandas 进行数据建模,但理解 NumPy 的数组导向思维、内存视图与副本的区别,是编写高效、稳健的量化交易系统不可或缺的前提。
学习总结与进阶建议
随着你对 NumPy 基础的掌握,建议通过以下进阶路径深化你的数值计算能力:
1. 深挖 NumPy 内部机制 若想真正优化大规模金融数据的处理速度,必须理解 NumPy 的 C 语言底层实现。Wes McKinney 的 Python for Data Analysis 第 4 章及附录 A 是极佳的起点。进一步地,Efficient NumPy (Juan Nunez-Iglesias 著)深入探讨了内存管理和最佳实践,能帮助你避免在大数据回测中的性能瓶颈。
2. 掌握数值线性代数的严谨性 量化金融中的许多高级模型(如主成分分析或最小二乘回归)本质上都是线性代数运算。Lloyd N. Trefethen 的 Numerical Linear Algebra 能让你从数学层面理解这些运算的稳定性和复杂度。这种严谨性对于构建非套利定价模型或风险管理系统至关重要。
3. 随机过程与蒙特卡洛实战 本章演示的随机游走只是冰山一角。对于希望从事衍生品定价或风险控制的读者,保罗·格拉瑟曼(Paul Glasserman)的经典著作 Monte Carlo Methods in Financial Engineering 是该领域的“圣经”。通过在 NumPy 中实现其中的方差缩减技术,你将能够构建出工业级的期权定价引擎。
4. 量化交易生态系统 NumPy 是工具,而量化是目的。Yves Hilpisch 的 Python for Finance 展示了如何将 NumPy 与 Pandas、SciPy 结合,处理真实的金融时间序列。记住,在实盘环境中,NumPy 为你的策略提供速度,但风险控制和数理逻辑才是你的真正防线。
练习 1:计算股票日收益率与波动率
题目 : 假设我们有一个包含某只股票连续10个交易日收盘价的NumPy数组。请计算其对数收益率(Log Returns),并求出年化波动率(假设一年有252个交易日)。
import numpy as np
# 模拟的收盘价序列
prices = np.array([100.0 , 101.5 , 100.5 , 102.0 , 103.5 , 104.0 , 103.0 , 105.0 , 107.0 , 106.0 ])
解答 :
import numpy as np
prices = np.array([100.0 , 101.5 , 100.5 , 102.0 , 103.5 , 104.0 , 103.0 , 105.0 , 107.0 , 106.0 ])
# 计算对数收益率: ln(P_t / P_{t-1}) = ln(P_t) - ln(P_{t-1})
log_returns = np.log(prices[1 :] / prices[:- 1 ])
print ("Daily Log Returns:" , np.round (log_returns, 4 ))
# 计算年化波动率
# 波动率 = 收益率标准差 * sqrt(252)
daily_vol = np.std(log_returns, ddof= 1 ) # ddof=1 for sample standard deviation
annualized_vol = daily_vol * np.sqrt(252 )
print (f"Annualized Volatility: { annualized_vol:.2%} " )
练习 2:几何布朗运动 (GBM) 模拟
题目 : 使用蒙特卡洛模拟生成 1000 条股票价格路径。假设: - 初始价格 \(S_0 = 100\) - 年化期望收益率 \(\mu = 0.05\) - 年化波动率 \(\sigma = 0.2\) - 时间 \(T = 1\) 年 - 步数 \(steps = 252\)
几何布朗运动公式: \[ S_t = S_{t-1} \cdot \exp\left( (\mu - \frac{1}{2}\sigma^2)\Delta t + \sigma \sqrt{\Delta t} Z_t \right) \] 其中 \(Z_t \sim N(0, 1)\) 。
解答 :
import numpy as np
import matplotlib.pyplot as plt
# 参数设置
S0 = 100
mu = 0.05
sigma = 0.2
T = 1.0
dt = 1 / 252
steps = 252
simulations = 1000
# 生成随机项 Z
# 形状: (steps, simulations)
Z = np.random.standard_normal((steps, simulations))
# 计算漂移项和扩散项
drift = (mu - 0.5 * sigma** 2 ) * dt
diffusion = sigma * np.sqrt(dt) * Z
# 计算日收益率增量
returns = np.exp(drift + diffusion)
# 初始化价格路径矩阵
S = np.zeros((steps + 1 , simulations))
S[0 ] = S0
# 逐步推导价格 (累乘)
# 计算累积乘积路径:S_t = S_0 * product(returns_1 ... returns_t)
S[1 :] = S0 * np.cumprod(returns, axis= 0 )
# 绘制前10条模拟路径
plt.figure(figsize= (10 , 6 ))
plt.plot(S[:, :10 ])
plt.title('Geometric Brownian Motion Simulation (10 Paths)' )
plt.xlabel('Time Steps' )
plt.ylabel('Price' )
plt.grid(True )
plt.show()
练习 3:投资组合风险价值 (VaR) 计算
题目 : 给定一个投资组合包含 3 只股票,其权重向量 \(w\) 和日收益率协方差矩阵 \(\Sigma\) 如下。假设投资组合总价值为 100 万元。请计算该投资组合在 95% 置信水平下的日风险价值 (VaR)。 假设收益率服从正态分布。 \[ VaR_{95\%} = \text{Portfolio Value} \times 1.65 \times \sigma_p \] 其中组合波动率 \(\sigma_p = \sqrt{w^T \Sigma w}\) 。
# 权重向量 (w)
weights = np.array([0.4 , 0.3 , 0.3 ])
# 协方差矩阵 (Sigma) - 假设值
cov_matrix = np.array([
[0.0004 , 0.0002 , 0.0001 ],
[0.0002 , 0.0003 , 0.00015 ],
[0.0001 , 0.00015 , 0.0005 ]
])
portfolio_value = 1_000_000
解答 :
import numpy as np
weights = np.array([0.4 , 0.3 , 0.3 ])
cov_matrix = np.array([
[0.0004 , 0.0002 , 0.0001 ],
[0.0002 , 0.0003 , 0.00015 ],
[0.0001 , 0.00015 , 0.0005 ]
])
portfolio_value = 1_000_000
# 1. 计算组合方差: w^T * Sigma * w
# 使用 @ 符号进行矩阵乘法
port_variance = weights.T @ cov_matrix @ weights
# 2. 计算组合波动率 (标准差)
port_volatility = np.sqrt(port_variance)
print (f"Portfolio Daily Volatility: { port_volatility:.2%} " )
# 3. 计算 95% VaR (Z-score 约为 1.645)
z_score_95 = 1.645
VaR_95 = portfolio_value * z_score_95 * port_volatility
print (f"95% Daily VaR: { VaR_95:.2f} CNY" )