ohlc_data = (10.5, 12.0, 10.2, 11.5)
ohlc_data(10.5, 12.0, 10.2, 11.5)
Python 作为金融数据分析的强大工具,其核心优势在于丰富的内置数据结构和灵活的编程范式。本章将系统性地介绍 Python 的基础数据结构——元组、列表、字典和集合,以及函数式编程的基本概念。
学习目标
通过本章学习,你应该能够:
前置知识要求
本章将讨论 Python 语言内置的功能,这些功能在我们学习金融和经济数据分析的过程中将被广泛使用。虽然像 pandas 和 NumPy 这样强大的第三方库为更大数据集提供了先进的计算功能,但它们的设计初衷是与 Python 灵活的内置数据操作工具协同工作。
在深入探讨具体数据结构之前,我们需要从数学和计算机科学的角度理解数据结构的本质。
数据抽象与抽象数据类型(Abstract Data Type, ADT)
从数学角度看,数据结构是一种抽象数据类型(ADT),由两个要素定义:
例如,对于列表(List): - 数据对象:\(D = \{a_1, a_2, ..., a_n\}\)(元素序列) - 操作集合:\(\Omega = \{\text{append}, \text{remove}, \text{sort}, \text{slice}, ...\}\)
计算复杂度理论
在金融数据分析中,选择合适的数据结构需要考虑其时间复杂度(Time Complexity):
| 操作 | 列表 | 字典/集合 | 说明 |
|---|---|---|---|
| 访问元素 | O(1) | O(1) | 索引访问 |
| 搜索元素 | O(n) | O(1) | 列表线性扫描,哈希表常数时间 |
| 插入元素 | O(n) | O(1) | 列表需移动元素 |
| 删除元素 | O(n) | O(1) | 列表需移动元素 |
| 切片 | O(k) | N/A | k为切片长度 |
可变性与内存管理
从计算机体系结构角度,理解可变性对内存管理至关重要:
在金融数据处理中,选择合适的数据结构类型直接影响程序的性能和可靠性。
Python 的数据结构简洁而强大。精通其用法是成为一名高效的经济分析程序员的关键。我们将从元组、列表和字典开始,它们是一些最常用的序列类型。
元组 (tuple) 是一个固定长度、不可变的 Python 对象序列,一旦被赋值,就不能被更改。这种不可变性并非限制,而是一种特性;它保证了一条数据记录,例如股票交易的价格、数量和时间戳,能够保持恒定,不会被意外修改。创建元组最简单的方法是用括号包裹一个逗号分隔的值序列。
在许多情况下,括号可以省略:
你可以通过调用 tuple() 构造函数将任何序列或迭代器转换为元组。这对于“冻结”一个列表的内容非常有用。
元素可以通过方括号 [] 访问,就像在大多数其他编程语言中一样。按照计算机科学的惯例,Python 中的序列是0索引的。
当你在更复杂的表达式中定义元组时,通常需要将值用括号括起来,例如下面这个创建元组的元组(嵌套元组)的例子:
关于“不可变性 (Immutability)”的辨析
元组的“不可变性”指的是元组本身所包含的对象的引用是不可更改的,即你不能将元组某个位置上的对象替换成另一个对象。然而,如果元组中的某个对象本身是“可变的”(比如一个列表),那么这个对象的内容是可以被修改的。这个特性在处理复杂数据结构时非常重要。
尽管存储在元组中的对象本身可能是可变的,但一旦元组被创建,就不可能修改每个槽位中存储的对象:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[6], line 3 1 trade_tuple = tuple(['AAPL', 100]) 2 # 下面这行会引发 TypeError,因为你不能给元组的一个槽位赋一个新的对象 ----> 3 trade_tuple[1] = 200 TypeError: 'tuple' object does not support item assignment
但是,如果元组内部的一个对象是可变的,比如一个列表,你可以就地修改它。这是一个至关重要的区别。
('Fund_A', ['AAPL', 'GOOG', 'MSFT'], True)
你可以使用 + 运算符连接元组以产生更长的元组。请注意,这将创建一个新的元组;它不会修改原始元组。
将元组乘以一个整数,效果是连接该元组的多个副本。对象本身不会被复制,只有它们的引用会被复制。在处理大型数据集时,这对内存管理来说是一个重要的细节。
元组一个非常方便的特性是解包 (unpacking)。如果你试图将一个值赋给一个元组式的变量表达式,Python 会尝试解包等号右侧的值:
即使是带有嵌套元组的序列也可以被解包,这对于迭代结构化数据非常有用。
这个功能为交换变量值提供了一种优雅的方式,这个任务在许多其他语言中需要一个临时变量。
变量交换的底层机制
bid_price, ask_price = ask_price, bid_price 这种看似神奇的语法,其背后原理正是元组的创建和解包。
ask_price, bid_price,这会创建一个临时的、未命名的元组。整个过程是原子性的,确保了交换的正确性,即使变量名相同也不会出错。
变量解包的一个常见用途是迭代元组或列表的序列:
有时,你可能想从序列的开头“摘取”几个元素,并收集其余的元素。*rest 语法可以用于此目的。在经济时间序列分析中,这对于将最新的数据点与历史序列分开非常有用。
按照惯例,许多 Python 程序员在解包时会使用下划线 (_) 来表示不想要的变量。
由于元组的大小和内容不能被修改,它的实例方法非常少。一个特别有用的方法是 count(也适用于列表),它计算一个值出现的次数。
与元组相反,列表 (list) 的长度是可变的,其内容可以就地修改。它们是可变的 (mutable)。在数据分析中,你可能会用列表来累积模拟结果,或者存放一个正在增量更新的经济时间序列数据。你可以用方括号 [] 或 list 类型函数来定义它们:
['AAPL', 'GOOG', 'MSFT']
列表和元组在语义上相似,在许多函数中可以互换使用。list 函数在数据处理中经常被用来物化 (materialize) 一个迭代器或生成器表达式(我们稍后会探讨这些概念)。
可以使用 append 方法将元素添加到列表的末尾:
使用 insert 可以在列表的特定位置插入一个元素。插入索引必须在 0 和列表长度(包含)之间。
append 与 insert 的性能考量
在处理大规模金融数据集时,理解基本操作的计算成本至关重要:
append(value): 将元素添加到列表末尾是一个 O(1) 操作,即其执行时间与列表大小无关,对于实时行情数据的累积非常高效。insert(i, value): 在列表的任意位置 i 插入元素是一个 O(n) 操作。这是因为插入点之后的所有元素都需要向右移动一位。对于大型历史数据集,频繁使用 insert 会导致显著的性能瓶颈。因此,在构建大型资产列表时,应优先使用 append。如果确实需要在序列两端进行高效操作,可以考虑使用 collections.deque。
insert 的逆操作是 pop,它会移除并返回特定索引处的元素:
可以使用 remove 按值移除元素,它会找到第一个这样的值并从列表中移除:
你可以使用 in 关键字检查列表中是否包含某个值:
not 关键字可以用来否定 in:
列表搜索的计算复杂度
检查一个值是否存在于列表中(value in ticker_list)是一个 O(n) 操作。Python 必须从头到尾对列表进行线性扫描,直到找到匹配项或结束。
相比之下,字典(dict)和集合(set)使用了哈希表结构,其查找操作平均时间复杂度为 O(1)。在需要频繁检查某个股票是否在持仓池(portfolio pool)中时,将列表转换为集合能带来巨大的性能提升。
与元组类似,用 + 将两个列表相加会连接它们:
如果你已经定义了一个列表,你可以使用 extend 方法向其追加多个元素:
[10.5, None, 11.2, 12.0, 11.8, (2, 3)]
请注意,通过加法进行列表连接是一个相对昂贵的操作,因为必须创建一个新列表并将对象复制过去。使用 extend 向现有列表追加元素,尤其是在构建大列表时,通常是更好的选择,因为它会就地修改列表。
你可以通过调用列表的 sort 方法对其进行就地排序(不创建新对象):
sort 有一些偶尔会派上用场的选项。其中之一是传递一个次要排序键 (key)——即一个函数,它为每个元素生成一个用于排序的值。例如,我们可以按字符串的长度对一个字符串集合进行排序:
['He', 'saw', 'six', 'small', 'foxes']
稍后,我们将学习 sorted 函数,它可以从一个通用序列生成一个排好序的副本。
你可以通过使用切片表示法来选择大多数序列类型的部分,其基本形式是 start:stop,传递给索引运算符 []。
切片也可以被赋值:
虽然 start 索引处的元素被包含在内,但 stop 索引处的元素不被包含,因此结果中的元素数量是 stop - start。start 或 stop 都可以省略,此时它们分别默认为序列的开头或结尾:
负数索引表示从序列末尾开始切片:
切片的语义需要一些时间来适应,特别是如果你之前使用过 R 或 MATLAB。图 2.1 提供了一个有用的图示,说明了使用正整数和负整数进行切片。在图中,索引显示在“箱格”的边缘,以帮助说明切片选择的开始和停止位置。
在第二个冒号后还可以使用一个 step,例如,可以用来取每隔一个元素:
[1000, 1800, 1500, 4000]
一个巧妙的用法是传递 -1,这有一个有用的效果,就是反转一个列表或元组:
字典 (dictionary) 或 dict 可能是最重要的内置 Python 数据结构。在其他编程语言中,字典有时被称为哈希映射 (hash map) 或关联数组 (associative array)。字典存储键值对的集合。在经济建模中,这是一个非常有用的结构,用于将唯一标识符(如国家的ISO代码或公司的股票代码)映射到相关数据(如GDP、通货膨胀率或市值)。
创建字典的一种方法是使用花括号 {} 和冒号来分隔键和值:
{'symbol': 'AAPL', 'price': 180.5, 'volume': 50000}
你可以使用与访问列表或元组元素相同的语法来访问、插入或设置元素:
你可以使用与检查列表或元组是否包含某个值相同的语法来检查字典是否包含某个键:
你可以使用 del 关键字或 pop 方法(它会同时返回值并删除键)来删除值:
del 之前: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000, 'temp': 'to be deleted', 'market': 'NASDAQ'}
del 之后: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000, 'market': 'NASDAQ'}
弹出的值: NASDAQ
pop 之后: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000}
keys 和 values 方法分别提供字典键和值的迭代器。
如果你需要同时迭代键和值,可以使用 items 方法来迭代键值对(作为2元组):
你可以使用 update 方法将一个字典合并到另一个字典中。这将就地修改字典,因此传递给 update 的数据中任何已存在的键,其旧值都将被丢弃。
{'symbol': 'AAPL', 'price': 185.0, 'volume': 50000, 'pe_ratio': 30.5}
通常情况下,你可能会得到两个序列,并希望在字典中将它们按元素配对。zip 函数与 dict 构造函数结合使用,可以轻松实现这一点。
下面的逻辑很常见:
字典的 get 方法提供了一种更简洁的写法。如果键不存在,get 将返回 None 或一个指定的默认值。
一个常见的用例是让字典的值是其他集合,比如列表。例如,按首字母对一个单词列表进行分类。
{'a': ['apple', 'atom'], 'b': ['bat', 'bar', 'book']}
setdefault 方法可以简化这个工作流程。前面的 for 循环可以重写为:
{'a': ['apple', 'atom'], 'b': ['bat', 'bar', 'book']}
内置的 collections 模块有一个有用的类 defaultdict,这使得分组更加容易。
在深入理解字典键的限制之前,我们需要从数学角度理解哈希表(Hash Table)的工作原理。
哈希函数的数学定义
哈希函数是一个映射: \[ h: U \to \{0, 1, ..., m-1\} \]
其中: - \(U\) 是键的宇宙(所有可能的键) - \(\{0, 1, ..., m-1\}\) 是哈希值空间(\(m\) 是哈希表的大小) - \(h(k)\) 是键 \(k\) 的哈希值
理想哈希函数的性质
一个理想的哈希函数应满足以下数学性质:
确定性(Determinism): \[ \forall k \in U: h(k) = \text{constant} \] 相同的键必须始终产生相同的哈希值
均匀性(Uniformity): 哈希值应在哈希表中均匀分布 \[ P(h(k) = \frac{1}{m}, \quad \forall k \in U \] 其中 \(P\) 是概率分布
独立性(Independence): 不同键的哈希值应相互独立 \[ \forall k_1 \neq k_2: h(k_1) \bot h(k_2) \approx \text{independent} \]
冲突解决(Collision Resolution)
当两个不同的键 \(k_1 \neq k_2\) 产生相同的哈希值时: \[ h(k_1) = h(k_2) \] 这称为**哈希冲突(Hash Collision)。
Python 使用开放寻址法(Open Addressing)和二次探测(Quadratic Probing)来解决冲突。
探测序列定义为: \[ h_i(k) = (h(k) + c_1 \cdot i + c_2 \cdot i^2) \mod m \]
其中 \(c_1, c_2\) 是常数,\(i\) 是探测次数。
平均查找复杂度
在理想情况下(均匀哈希,低负载因子 \(\alpha = n/m\)),哈希表的平均查找复杂度为:
\[ T_{avg}(n) = \Theta\left(\frac{1}{1-\alpha}\right) = O(1) \]
其中: - \(n\) 是存储的键值对数量 - \(m\) 是哈希表的容量 - \(\alpha = n/m\) 是负载因子
这就是为什么字典的查找操作(in 运算)平均为 \(O(1)\) 的数学基础。
虽然字典的值可以是任何 Python 对象,但键通常必须是不可变对象,如标量类型(int、float、string)或元组(并且元组中的所有对象也必须是不可变的)。这里的技术术语是可哈希性 (hashability)。
关键概念:可哈希性 (Hashability)
字典在内部使用哈希表来存储数据,这使得键的查找速度极快。哈希函数将一个不可变对象(如股票代码字符串)转换为一个整数(哈希值),作为内存索引。
为了让系统可靠工作,一个关键要求是:一个对象的哈希值在其生命周期内必须永远不变。
这就是为什么金融分析中的唯一标识符(如 order_book_id)必须使用不可变类型的原因。
你可以使用 hash 函数检查一个对象是否是可哈希的:
180890216708858939
-9209053662355515447
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[49], line 4 2 print(hash((1, 2, (2, 3)))) 3 # 下面这行会失败,因为列表是可变的,因此不可哈希。 ----> 4 hash((1, 2, [2, 3])) TypeError: unhashable type: 'list'
要使用列表作为键,一个选择是将其转换为元组:
集合 (set) 是一个无序的唯一元素集合。它们在成员资格测试、从序列中移除重复项以及数学运算(如并集、交集和差集)方面非常有用。在经济数据集中,你可以使用集合来查找所代表的唯一国家或行业列表。
集合可以通过两种方式创建:通过 set 函数或通过带有花括号的集合字面量:
集合支持数学集合运算。考虑这两个示例集合:
这两个集合的并集 (union) 是出现在任一集合中的不同元素的集合。这可以通过 union 方法或 | 二元运算符来计算:
交集 (intersection) 包含同时出现在两个集合中的元素。可以使用 & 运算符或 intersection 方法:
表 2.1 列出了常用的集合方法。
| 函数 | 替代语法 | 描述 |
|---|---|---|
a.add(x) |
N/A | 将元素 x 添加到集合 a。 |
a.clear() |
N/A | 将集合 a 重置为空状态。 |
a.remove(x) |
N/A | 从集合 a 中移除元素 x。 |
a.pop() |
N/A | 从集合 a 中移除一个任意元素。 |
a.union(b) |
a \| b |
a 和 b 中的所有唯一元素。 |
a.update(b) |
a \|= b |
将 a 设置为 a 和 b 的并集。 |
a.intersection(b) |
a & b |
a 和 b 中的所有共同元素。 |
a.intersection_update(b) |
a &= b |
将 a 设置为 a 和 b 的交集。 |
a.difference(b) |
a - b |
a 中存在但 b 中不存在的元素。 |
a.difference_update(b) |
a -= b |
将 a 设置为 a 和 b 的差集。 |
a.symmetric_difference(b) |
a ^ b |
存在于 a 或 b 中但不同时存在的元素。 |
a.symmetric_difference_update(b) |
a ^= b |
将 a 设置为对称差集。 |
a.issubset(b) |
<= b |
如果 a 是 b 的子集,则为 True。 |
a.issuperset(b) |
>= b |
如果 a 是 b 的超集,则为 True。 |
a.isdisjoint(b) |
N/A | 如果 a 和 b 没有共同元素,则为 True。 |
所有的逻辑集合运算都有就地 (in-place) 版本,对于非常大的集合,这可能更高效。
与字典键一样,集合元素也必须是不可变的。要存储类似列表的元素,你必须将它们转换为元组:
你还可以检查子集和超集:
当且仅当集合的内容相等时,它们才相等;顺序无关紧要。
Python 有一些有用的序列函数,你应该熟悉它们。
在迭代序列时,通常需要跟踪当前项的索引。enumerate 返回一个 (i, value) 元组的序列:
sorted 函数从任何序列的元素中返回一个新的排好序的列表。这与 list.sort() 方法形成对比,后者是就地对列表进行排序。
zip 将多个列表、元组或其他序列的元素“配对”起来,创建一个元组的列表。
[('foo', 'one'), ('bar', 'two'), ('baz', 'three')]
zip 可以接受任意数量的序列,它产生的元素数量由最短的序列决定。
[('foo', 'one', False), ('bar', 'two', True)]
zip 的一个常见用途是同时迭代多个序列,可能还会与 enumerate 结合使用:
reversed 以相反的顺序迭代序列的元素。
请记住,reversed 是一个生成器 (generator)(稍后讨论),所以它在被物化(例如,用 list() 或 for 循环)之前不会创建反转的序列。
列表推导式 (List comprehensions) 是一个方便且被广泛使用的 Python 语言特性。它们允许你通过一个简洁的表达式,从一个集合中筛选元素并进行转换,从而简洁地形成一个新列表。基本形式是:
从数学角度看,列表推导式是集合概括(Set-Comprehension)思想的程序实现。
集合概括的数学定义
在朴素集合论(Naive Set Theory)中,集合概括定义为:
\[ S = \{x \in A \mid P(x)\} \]
其中: - \(A\) 是源集合 - \(P(x)\) 是关于元素 \(x\) 的性质或谓词 - \(S\) 是满足性质的所有元素的集合
示例:偶数集合的数学表示 \[ E = \{x \in \{1, 2, ..., 100\} \mid (x \mod 2 = 0)\} \]
在 Python 中,这对应于:
映射与过滤的数学表示
更复杂的推导式涉及映射(Mapping)和过滤(Filtering)的组合。
给定: - 源集合 \(A\) - 映射函数 \(f: A \to B\) - 谓词函数 \(P: A \to \{\text{True}, \text{False}\}\)
过滤后映射定义为: \[ R = \{f(x) \mid x \in A \land P(x)\} \]
这在 Python 中直接对应于:
多重集合概括(嵌套推导式)
对于嵌套结构,我们使用笛卡尔积(Cartesian Product)的概念:
\[ A \times B = \{(a, b) \mid a \in A \land b \in B\} \]
Python 的嵌套推导式:
这正是笛卡尔积的实现,时间复杂度为 \(O(|A| \times |B|)\)。
复杂度对比
| 方法 | 时间复杂度 | 空间复杂度 | 说明 |
|---|---|---|---|
| 传统 for 循环 | O(n) | O(n) | 需要预分配列表空间 |
| 列表推导式 | O(n) | O(n) | 更简洁,Python 内部优化 |
| 生成器表达式 | O(n) | O(1) | 惰性求值,节省内存 |
对于金融数据处理,列表推导式提供了声明式、函数式的编程风格,使代码更易读和维护。 [expr for value in collection if condition]
这等同于以下 for 循环:
if condition 部分是可选的。例如,给定一个字符串列表,我们可以过滤掉长度为2或更短的字符串,并将它们转换为大写:
['BAT', 'CAR', 'DOVE', 'PYTHON']
集合和字典推导式是一个自然的扩展。集合推导式看起来与等效的列表推导式相似,只是用花括号代替:
字典推导式看起来像这样: {key-expr: value-expr for value in collection if condition} 例如,我们可以创建一个这些字符串到它们在列表中位置的查找映射:
假设我们有一个包含一些英文和西班牙文名字的列表的列表。
假设我们想得到一个包含所有名字中含有两个或更多”a”的单个列表。我们可以用嵌套的 for 循环来做到这一点,但嵌套列表推导式更简洁。
['Maria', 'Natalia']
列表推导式的 for 部分是根据嵌套的顺序排列的。这里是另一个例子,我们将一个元组列表“扁平化”为一个简单的整数列表:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
重要的是要区分刚才展示的语法和列表推导式内部的列表推导式,后者会产生一个列表的列表:
函数是 Python 中代码组织和复用的主要且最重要的方法。根据经验,如果你预计需要重复相同或非常相似的代码超过一次,那么编写一个可复用的函数是值得的。函数用 def 关键字声明。
每个函数可以有位置参数 (positional arguments) 和关键字参数 (keyword arguments)。关键字参数最常用于指定默认值或可选参数。
虽然关键字参数是可选的,但在调用函数时必须指定所有位置参数。主要的限制是关键字参数必须跟在位置参数(如果有的话)之后。
函数可以访问不同命名空间 (namespaces) 中的变量。默认情况下,在函数内部赋的任何变量都赋给了局部命名空间 (local namespace)。局部命名空间在函数被调用时创建,并立即由函数的参数填充。函数结束后,局部命名空间被销毁。 考虑这个函数:
现在,假设我们在函数外部声明了 a。函数可以从更高的作用域访问这个变量。
对函数作用域之外的变量进行赋值是可能的,但它们必须使用 global 关键字声明。我通常不鼓励使用 global 关键字。
Python 函数一个非常方便的特性是能够返回多个值。
这里发生的是,函数实际上只返回一个对象,一个元组,然后这个元组被解包到结果变量中。
一个可能有吸引力的替代方案是返回一个字典:
由于 Python 函数是对象,你可以将它们作为参数传递给其他函数。这是函数式编程的一个基本概念,对于数据分析非常强大。假设我们正在对一个美国州名列表进行数据清洗。
为了使这个列表统一,我们需要去除空白、移除标点符号并统一大小写。
['Alabama',
'Georgia',
'Georgia',
'Georgia',
'Florida',
'South Carolina',
'West Virginia']
一种更灵活、函数式的方法是定义一个你想要应用的操作列表。
['Alabama',
'Georgia',
'Georgia',
'Georgia',
'Florida',
'South Carolina',
'West Virginia']
你还可以将函数用作其他内置函数的参数,比如 map,它将一个函数应用于一个序列。
Python 支持所谓的匿名 (anonymous) 或 lambda 函数,这是一种编写只包含单个语句的函数的方式,该语句的结果就是返回值。
它们在数据分析中特别方便,因为许多数据转换函数都接受其他函数作为参数。传递一个 lambda 函数通常比编写一个完整的函数声明更清晰。
[8, 14, 2, 4, 18]
再举一个例子,假设你想按每个字符串中不同字母的数量对一个字符串集合进行排序:
Python 中的许多对象都支持迭代。这是通过迭代器协议 (iterator protocol) 实现的。例如,迭代一个字典会产生字典的键。
当你写 for key in some_dict 时,Python 解释器首先尝试从 some_dict 创建一个迭代器。
<dict_keyiterator object at 0x0000020234465260>
['a', 'b', 'c']
生成器 (generator) 是构建新可迭代对象的一种便捷方式。普通函数执行并返回单个结果,而生成器可以通过暂停和恢复执行来返回一系列多个值。要创建一个生成器,使用 yield 关键字而不是 return。
从计算机科学和函数式编程理论角度理解生成器需要掌握惰性求值的核心概念。
严格求值 vs 惰性求值
在编程语言理论中,表达式的求值策略分为两类:
| 求值策略 | 定义 | 示例 | 内存模式 |
|---|---|---|---|
| 严格求值 | 表达式一被遇到就计算所有值 | sum([f(x) for x in S]) |
立即分配所有内存 |
| 惰性求值 | 仅在实际需要时才计算值 | Python 生成器 | 按需分配内存 |
惰性序列的数学表示
一个惰性序列 \(L\) 可以表示为一个** thunk(延迟计算)**:
\[ L = \lambda: [\text{yield } e_1, \text{yield } e_2, ..., \text{yield } e_n] \]
其中每个 \(e_i\) 产生一个值,但只有在被请求时才计算。
流(Streams)的数学定义
在数据处理理论中,流(Stream)是一个无限序列:
\[ S = [s_1, s_2, s_3, ...] \]
惰性求值允许我们处理无限流而不耗尽内存:
共递归的数学表示
许多生成器可以建模为共递归(Corecursion):
\[ f(n) = \text{if } n = 0: \text{then } 1\] \[ f(n) = \text{if } n = 0: \text{then } 1: \text{else } n \times f(n-1)\]
尾递归优化使得: - 时间复杂度:从指数级 \(O(2^n)\) 降到线性 \(O(n)\) - 空间复杂度:从 \(O(n)\) 栈降到 \(O(1)\)(记忆化)
这在金融计算中极其重要,例如:
数据管道(Pipeline)的数学模型
生成器使数据管道成为可能:
\[ \mathrm{数据} \rightarrow \mathrm{过滤} \rightarrow \mathrm{转换} \rightarrow \mathrm{聚合} \]
每个阶段都是一个惰性操作:
这种函数式编程风格在量化金融中非常重要,特别是处理大规模 tick 级数据时。
关键概念: 迭代器与生成器
对于处理大规模经济或金融数据集(如逐笔成交数据),理解生成器的“惰性求值 (lazy evaluation)”特性至关重要。
for 循环中)才“屈服 (yield)”一个值。它在任何时刻都只需在内存中保存一个元素。这种机制允许我们以极低的内存成本处理几乎无限大的数据流,是量化金融工程中的核心技术。
制作生成器的另一种方法是使用生成器表达式 (generator expression)。这是列表推导式的生成器版本。要创建一个,将本应是列表推导式的内容用圆括号括起来。
这等同于更冗长的生成器:
生成器表达式可以用作函数参数,代替列表推导式,这样可以更节省内存,也可能更快。
标准库 itertools 模块有许多常见数据算法的生成器集合。例如,groupby 接受一个序列和一个函数,按函数的返回值对连续的元素进行分组。
Alan ['Alan']
Adam ['Adam']
Wes ['Wes']
Will ['Will']
Albert ['Albert']
Steven ['Steven']
表 2.2 列出了一些其他有用的 itertools 函数。
| 函数 | 描述 |
|---|---|
chain(*iterables) |
通过将迭代器链接在一起来生成一个序列。 |
combinations(iterable, k) |
生成所有可能的k元组,忽略顺序且无放回。 |
permutations(iterable, k) |
生成所有可能的k元组,考虑顺序。 |
groupby(iterable[, keyfunc]) |
为每个唯一键生成 (键, 子迭代器)。 |
product(*iterables, repeat=1) |
生成输入可迭代对象的笛卡尔积(作为元组)。 |
优雅地处理 Python 错误或异常 (exceptions) 是构建健壮程序的重要部分。在数据分析中,许多函数只对特定类型的输入有效,而真实世界的数据往往很杂乱。例如,float() 在输入不当时会失败,并引发 ValueError:
假设我们想要一个能够优雅失败的 float 版本。我们可以通过编写一个函数,将对 float 的调用封装在 try/except 块中来实现。
你可能只想抑制 ValueError,因为 TypeError 可能表明你的程序中存在一个真正的 bug。要做到这一点,在 except 后面写上异常类型:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[96], line 8 5 return x 7 # 这现在会引发 TypeError,正如预期的那样 ----> 8 attempt_float_specific((1, 2)) Cell In[96], line 3, in attempt_float_specific(x) 1 def attempt_float_specific(x): 2 try: ----> 3 return float(x) 4 except ValueError: 5 return x TypeError: float() argument must be a string or a real number, not 'tuple'
你可以通过编写一个异常类型的元组来捕获多种异常类型:
在某些情况下,你可能希望无论 try 块是否成功,都执行一些代码。为此,使用 finally。类似地,你可以使用 else 来执行仅在 try 块成功时才执行的代码。
本书大部分内容使用像 pandas.read_csv 这样的高级工具来读取数据文件。然而,理解如何在 Python 中处理文件的基础知识很重要。要打开一个文件进行读写,使用内置的 open 函数。一个最佳实践是传递一个 encoding 参数。
默认情况下,文件以只读模式('r')打开。然后我们可以像处理列表一样处理文件对象 f,并迭代其行。行内容会带有行尾(EOL)标记,所以你通常会用 rstrip() 来移除它们。
当你使用 open 创建文件对象时,建议在完成后关闭文件。with 语句通过在退出块时自动关闭文件,使这变得更容易。
表 2.3 列出了有效的文件读/写模式。
| 模式 | 描述 |
|---|---|
r |
只读模式 |
w |
只写模式;创建一个新文件(擦除现有数据) |
x |
只写模式;创建一个新文件,但如果路径已存在则失败 |
a |
追加到现有文件(如果文件不存在则创建) |
r+ |
读写 |
b |
添加到模式中用于二进制文件(例如,'rb' 或 'wb') |
t |
文本模式文件(默认);自动将字节解码为 Unicode |
对于可读文件,一些最常用的方法是 read、seek 和 tell。read 从文件中返回一定数量的字符。
# 假设 path 指向一个 utf-8 编码的文件
with open(path, encoding='utf-8') as f:
# 读取前10个字符
content_text = f.read(10)
# 获取当前位置
position_text = f.tell()
# 将位置移动到第3个字节
f.seek(3)
# 从新位置读取一个字符
char_after_seek = f.read(1)
with open(path, mode='rb') as f: # 二进制模式
# 读取前10个字节
content_binary = f.read(10)
# 获取当前位置
position_binary = f.tell()
# 用于演示的虚构输出
print(f'文本内容: "{content_text}", 位置: {position_text}')
print(f'二进制内容: {content_binary}, 位置: {position_binary}')
print(f'移动后读取的字符: "{char_after_seek}"')表 2.4 总结了许多最常用的文件方法。
| 方法/属性 | 描述 |
|---|---|
read([size]) |
以字符串或字节形式返回文件数据,可选大小。 |
readlines([size]) |
返回文件中的行列表,可选大小参数。 |
write(string) |
将传递的字符串写入文件。 |
writelines(strings) |
将传递的字符串序列写入文件。 |
close() |
关闭文件对象。 |
flush() |
将内部 I/O 缓冲区刷新到磁盘。 |
seek(pos) |
移动到指定的文件位置(整数)。 |
tell() |
以整数形式返回当前文件位置。 |
closed |
如果文件已关闭,则为 True。 |
encoding |
用于将字节解释为 Unicode 的编码。 |
习题 3.1: 列表与字符串操作
给定一个字符串列表:
解答:
stocks = ['贵州茅台', '宁波港', '宁波银行', '中国平安', '招商银行']
codes = ['600519.SH', '601018.SH', '002142.SZ', '601318.SH', '600036.SH']
# (a) 配对股票名称和代码
paired = list(zip(stocks, codes))
print('(a) 股票配对:')
for stock, code in paired:
print(f'{stock}: {code}')
# (b) 创建字典列表
stock_list = [{'name': s, 'code': c} for s, c in paired]
print('\n(b) 字典列表:')
print(stock_list)
# (c) 筛选以"宁波"开头的股票
ningbo_stocks = [s for s in stocks if s.startswith('宁波')]
print(f'\n(c) 宁波相关股票: {ningbo_stocks}')
# (d) 按字母顺序排序
sorted_stocks = sorted(stocks)
print(f'\n(d) 排序后: {sorted_stocks}')(a) 股票配对:
贵州茅台: 600519.SH
宁波港: 601018.SH
宁波银行: 002142.SZ
中国平安: 601318.SH
招商银行: 600036.SH
(b) 字典列表:
[{'name': '贵州茅台', 'code': '600519.SH'}, {'name': '宁波港', 'code': '601018.SH'}, {'name': '宁波银行', 'code': '002142.SZ'}, {'name': '中国平安', 'code': '601318.SH'}, {'name': '招商银行', 'code': '600036.SH'}]
(c) 宁波相关股票: ['宁波港', '宁波银行']
(d) 排序后: ['中国平安', '宁波港', '宁波银行', '招商银行', '贵州茅台']
习题 3.2: 字典操作
创建一个表示长三角地区部分省份2022年GDP的字典(单位:万亿元):
解答:
gdp_2022 = {
'江苏': 12.29,
'浙江': 7.77,
'上海': 4.47,
'安徽': 4.50
}
# (a) 添加福建
gdp_2022['福建'] = 5.31
print('(a) 添加福建后的字典:')
print(gdp_2022)
# (b) 计算总GDP
total_gdp = sum(gdp_2022.values())
print(f'\n(b) 总GDP: {total_gdp:.2f} 万亿元')
# (c) 找出GDP最高的省份
max_province = max(gdp_2022, key=gdp_2022.get)
max_gdp = gdp_2022[max_province]
print(f'\n(c) GDP最高省份: {max_province} ({max_gdp} 万亿元)')
# (d) 转换为美元
exchange_rate = 7.0 # 1美元=7人民币
gdp_usd = {k: v / exchange_rate for k, v in gdp_2022.items()}
print(f'\n(d) GDP(万亿美元):')
for province, gdp in gdp_usd.items():
print(f' {province}: {gdp:.3f}')
# (e) 按GDP降序排列
sorted_gdp = dict(sorted(gdp_2022.items(), key=lambda x: x[1], reverse=True))
print(f'\n(e) 降序排列:')
for province, gdp in sorted_gdp.items():
print(f' {province}: {gdp} 万亿元')(a) 添加福建后的字典:
{'江苏': 12.29, '浙江': 7.77, '上海': 4.47, '安徽': 4.5, '福建': 5.31}
(b) 总GDP: 34.34 万亿元
(c) GDP最高省份: 江苏 (12.29 万亿元)
(d) GDP(万亿美元):
江苏: 1.756
浙江: 1.110
上海: 0.639
安徽: 0.643
福建: 0.759
(e) 降序排列:
江苏: 12.29 万亿元
浙江: 7.77 万亿元
福建: 5.31 万亿元
安徽: 4.5 万亿元
上海: 4.47 万亿元
习题 3.3: 集合操作
给定两个集合:
解答:
set_a = {'宁波港', '宁波银行', '贵州茅台', '中国平安'}
set_b = {'宁波港', '招商银行', '工商银行', '中国平安'}
# (a) 交集
intersection = set_a & set_b
print(f'(a) 交集: {intersection}')
# (b) 并集
union = set_a | set_b
print(f'\n(b) 并集: {union}')
# (c) 差集(A-B)
difference = set_a - set_b
print(f'\n(c) A-B差集: {difference}')
# (d) 成员测试
is_in_b = '宁波银行' in set_b
print(f'\n(d) 宁波银行在set_b中: {is_in_b}')(a) 交集: {'中国平安', '宁波港'}
(b) 并集: {'中国平安', '工商银行', '宁波港', '贵州茅台', '宁波银行', '招商银行'}
(c) A-B差集: {'贵州茅台', '宁波银行'}
(d) 宁波银行在set_b中: False
习题 3.4: 函数与递归
斐波那契数列在金融中有广泛应用,例如技术分析中的斐波那契回调位。数列定义为: - F(0) = 0 - F(1) = 1 - F(n) = F(n-1) + F(n-2) (当 n > 1 时)
解答:
# (a) 递归版本
def fib_recursive(n):
"""递归计算斐波那契数"""
if n <= 1:
return n
return fib_recursive(n-1) + fib_recursive(n-2)
# (b) 循环版本(更高效)
def fib_iterative(n):
"""使用循环计算斐波那契数"""
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
# (c) 记忆化优化版本
from functools import lru_cache
@lru_cache(maxsize=None)
def fib_memoized(n):
"""使用记忆化优化递归计算"""
if n <= 1:
return n
return fib_memoized(n-1) + fib_memoized(n-2)
# (d) 计算前15个斐波那契数
print('(d) 前15个斐波那契数:')
fib_numbers = [fib_iterative(i) for i in range(15)]
print(fib_numbers)
# (e) 计算相邻两项的比值
print('\n(e) 相邻项比值(趋近于黄金分割率1.618...):')
for i in range(2, 15):
ratio = fib_numbers[i] / fib_numbers[i-1]
print(f'F({i})/F({i-1}) = {ratio:.6f}')
print('\n说明:这个比值趋近于黄金分割率 φ ≈ 1.6180339887...')
print('在金融技术分析中,斐波那契回调位(如38.2%, 61.8%)')
print('就是基于黄金分割率及其衍生比例。')(d) 前15个斐波那契数:
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377]
(e) 相邻项比值(趋近于黄金分割率1.618...):
F(2)/F(1) = 1.000000
F(3)/F(2) = 2.000000
F(4)/F(3) = 1.500000
F(5)/F(4) = 1.666667
F(6)/F(5) = 1.600000
F(7)/F(6) = 1.625000
F(8)/F(7) = 1.615385
F(9)/F(8) = 1.619048
F(10)/F(9) = 1.617647
F(11)/F(10) = 1.618182
F(12)/F(11) = 1.617978
F(13)/F(12) = 1.618056
F(14)/F(13) = 1.618026
说明:这个比值趋近于黄金分割率 φ ≈ 1.6180339887...
在金融技术分析中,斐波那契回调位(如38.2%, 61.8%)
就是基于黄金分割率及其衍生比例。
斐波那契数列与黄金分割率的数学关系
斐波那契数列相邻两项的比值趋近于黄金分割率 \(\phi\),这是一个重要的数学常数:
\[ \phi = \frac{1 + \sqrt{5}}{2} \approx 1.6180339887... \]
当 \(n \to \infty\) 时,\(F(n+1)/F(n) \to \phi\)。这是因为斐波那契数列的通项公式(Binet’s Formula)为:
\[ F(n) = \frac{\phi^n - \psi^n}{\sqrt{5}} \]
其中 \(\psi = \frac{1 - \sqrt{5}}{2} \approx -0.618\)。在金融技术分析中,斐波那契回调位(Fibonacci Retracement)是常用工具,关键比率如 38.2% (\(1 - \phi^{-1}\)) 和 61.8% (\(\phi^{-1}\)) 均源于此。
习题 3.5: 文件处理与数据清洗
假设你有一个包含股票价格数据的CSV文件(格式:日期,开盘价,最高价,最低价,收盘价,成交量),编写代码:
解答:
import csv
from datetime import datetime
# 为了演示,先创建一个模拟的CSV文件
sample_data = """日期,开盘价,最高价,最低价,收盘价,成交量
2023-01-03,35.50,36.20,35.30,36.00,125000
2023-01-04,36.10,36.80,35.90,36.50,138000
2023-01-05,36.40,37.20,36.30,37.00,142000
2023-01-06,37.10,37.50,36.80,36.90,115000
2023-01-09,36.80,37.10,36.50,36.70,98000
"""
# 写入模拟文件
with open('stock_data_sample.csv', 'w', encoding='utf-8') as f:
f.write(sample_data)
# (a)-(c): 读取并解析文件
stock_data = []
with open('stock_data_sample.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 转换数据类型
record = {
'date': datetime.strptime(row['日期'], '%Y-%m-%d'),
'open': float(row['开盘价']),
'high': float(row['最高价']),
'low': float(row['最低价']),
'close': float(row['收盘价']),
'volume': int(row['成交量'])
}
stock_data.append(record)
print('(a)-(c) 读取的数据:')
for record in stock_data:
print(f" {record['date'].strftime('%Y-%m-%d')}: 收盘价={record['close']}")
# (d) 计算每日涨跌幅
for i in range(len(stock_data)):
if i == 0:
stock_data[i]['change_pct'] = 0.0
else:
prev_close = stock_data[i-1]['close']
curr_close = stock_data[i]['close']
stock_data[i]['change_pct'] = (curr_close - prev_close) / prev_close * 100
print('\n(d) 添加涨跌幅后的数据:')
for record in stock_data:
if record['change_pct'] != 0:
print(f" {record['date'].strftime('%Y-%m-%d')}: {record['change_pct']:+.2f}%")
# (e) 找出涨幅最大和最小的交易日
max_gain = max(stock_data[1:], key=lambda x: x['change_pct'])
max_loss = min(stock_data[1:], key=lambda x: x['change_pct'])
print(f'\n(e) 涨幅最大: {max_gain["date"].strftime("%Y-%m-%d")} (+{max_gain["change_pct"]:.2f}%)')
print(f' 跌幅最大: {max_loss["date"].strftime("%Y-%m-%d")} ({max_loss["change_pct"]:.2f}%)')
# (f) 写入新文件
with open('stock_data_cleaned.csv', 'w', encoding='utf-8', newline='') as f:
fieldnames = ['date', 'open', 'high', 'low', 'close', 'volume', 'change_pct']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(stock_data)
print('\n(f) 已将清洗后的数据写入 stock_data_cleaned.csv')(a)-(c) 读取的数据:
2023-01-03: 收盘价=36.0
2023-01-04: 收盘价=36.5
2023-01-05: 收盘价=37.0
2023-01-06: 收盘价=36.9
2023-01-09: 收盘价=36.7
(d) 添加涨跌幅后的数据:
2023-01-04: +1.39%
2023-01-05: +1.37%
2023-01-06: -0.27%
2023-01-09: -0.54%
(e) 涨幅最大: 2023-01-04 (+1.39%)
跌幅最大: 2023-01-09 (-0.54%)
(f) 已将清洗后的数据写入 stock_data_cleaned.csv
习题 3.6: 股票收益率计算
使用真实股票数据(宁波港601018),编写代码:
解答:
import pandas as pd
import numpy as np
from pathlib import Path
# 读取宁波港数据
# 从本地 Parquet 文件读取数据代替 HDF5
ningbo_port = pd.read_parquet(
'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
# 准备数据
ningbo_port['trade_date'] = pd.to_datetime(ningbo_port['trade_date'], format='%Y%m%d')
ningbo_port = ningbo_port.sort_values('trade_date')
# 提取需要的字段
prices = ningbo_port[['trade_date', 'close']].copy()
prices = prices.tail(100) # 使用最近100个交易日
# (a) 读取数据完成
print(f'(a) 已读取 {len(prices)} 个交易日的数据')
print(f'日期范围: {prices["trade_date"].iloc[0].strftime("%Y-%m-%d")} 至 {prices["trade_date"].iloc[-1].strftime("%Y-%m-%d")}')
# (b) 计算收益率
prices['return'] = prices['close'].pct_change()
prices['log_return'] = np.log(prices['close'] / prices['close'].shift(1))
print('\n(b) 最近5天的收益率:')
print(prices[['trade_date', 'return', 'log_return']].tail())
# (c) 统计正负收益天数
positive_days = (prices['return'] > 0).sum()
negative_days = (prices['return'] < 0).sum()
total_days = len(prices) - 1 # 减去第一天的NaN
print(f'\n(c) 收益天数统计(共{total_days}天):')
print(f' 上涨天数: {positive_days} ({positive_days/total_days*100:.1f}%)')
print(f' 下跌天数: {negative_days} ({negative_days/total_days*100:.1f}%)')
# (d) 计算统计量
mean_return = prices['return'].mean() * 100 # 转为百分比
std_return = prices['return'].std() * 100
print(f'\n(d) 收益率统计:')
print(f' 日平均收益率: {mean_return:.4f}%')
print(f' 日收益率标准差: {std_return:.4f}%')
print(f' 年化波动率: {std_return * np.sqrt(252):.2f}%')
# (e) 最大涨跌幅
max_gain_idx = prices['return'].idxmax()
max_loss_idx = prices['return'].idxmin()
print(f'\n(e) 极端交易日:')
print(f' 最大涨幅: {prices.loc[max_gain_idx, "trade_date"].strftime("%Y-%m-%d")}')
print(f' 涨幅: {prices.loc[max_gain_idx, "return"]*100:+.2f}%')
print(f' 最大跌幅: {prices.loc[max_loss_idx, "trade_date"].strftime("%Y-%m-%d")}')
print(f' 跌幅: {prices.loc[max_loss_idx, "return"]*100:+.2f}%')(a) 已读取 100 个交易日的数据
日期范围: 2025-08-06 至 2025-12-31
(b) 最近5天的收益率:
trade_date return log_return
3703 2025-12-25 0.005450 0.005435
3704 2025-12-26 -0.005420 -0.005435
3705 2025-12-29 -0.005450 -0.005464
3706 2025-12-30 -0.005479 -0.005495
3707 2025-12-31 0.000000 0.000000
(c) 收益天数统计(共99天):
上涨天数: 43 (43.4%)
下跌天数: 42 (42.4%)
(d) 收益率统计:
日平均收益率: 0.0099%
日收益率标准差: 0.9855%
年化波动率: 15.64%
(e) 极端交易日:
最大涨幅: 2025-09-23
涨幅: +3.24%
最大跌幅: 2025-09-25
跌幅: -2.95%
简单收益率与对数收益率的对比
在金融计量学中,理解两类收益率的区别至关重要:
对数收益率的优势: - 时间可加性: \(T\) 期对数收益率等于各单期对数收益率之和,即 \(r_{0,T} = \sum_{t=1}^{T} r_t\)。 - 对称性: 上涨 10% 后下跌 10%,对数收益率之和为 0,而简单收益率由于基数变化会导致净亏损。 - 正态性: 在连续时间资产定价模型(如 Black-Scholes)中,资产价格常被假设服从对数正态分布,这意味着对数收益率服从正态分布。
在实践中,简单收益率更直观,适合向客户展示业绩;而对数收益率因其优良的数学特性,更适合进行学术研究和波动率建模。
习题 3.7: 生成器与内存效率
在处理大量金融数据时,内存效率很重要。比较三种方法:
解答:
import sys
import random
# (a) 列表方法
def get_random_list(n):
"""生成包含n个随机数的列表"""
return [random.random() for _ in range(n)]
# (b) 生成器表达式
def get_random_generator_expr(n):
"""返回生成器表达式"""
return (random.random() for _ in range(n))
# (c) 生成器函数
def get_random_generator_func(n):
"""生成器函数"""
for _ in range(n):
yield random.random()
n = 100000
# (d) 比较内存占用
random_list = get_random_list(n)
list_size = sys.getsizeof(random_list)
random_gen_expr = get_random_generator_expr(n)
gen_expr_size = sys.getsizeof(random_gen_expr)
random_gen_func = get_random_generator_func(n)
gen_func_size = sys.getsizeof(random_gen_func)
print(f'(d) 内存占用比较 (n={n:,}):')
print(f' 列表: {list_size:,} 字节')
print(f' 生成器表达式: {gen_expr_size} 字节')
print(f' 生成器函数: {gen_func_size} 字节')
print(f'\n内存节省比例:')
print(f' 生成器 vs 列表: {list_size/gen_expr_size:.1f}x')
print(f'\n说明:生成器在处理大规模数据时可以显著节省内存')
print('因为它不会一次性生成所有数据,而是按需生成。')
# 实际使用示例:计算前1000个随机数的平均值
print(f'\n使用生成器计算平均值:')
avg_from_list = sum(get_random_list(1000)) / 1000
avg_from_gen = sum(get_random_generator_func(1000)) / 1000
print(f' 从列表: {avg_from_list:.6f}')
print(f' 从生成器: {avg_from_gen:.6f}')(d) 内存占用比较 (n=100,000):
列表: 800,984 字节
生成器表达式: 104 字节
生成器函数: 104 字节
内存节省比例:
生成器 vs 列表: 7701.8x
说明:生成器在处理大规模数据时可以显著节省内存
因为它不会一次性生成所有数据,而是按需生成。
使用生成器计算平均值:
从列表: 0.490975
从生成器: 0.494433
生成器在金融大数据中的应用
在处理 A 股高频交易(Tick-by-Tick)数据时,数据量往往以 GB 甚至 TB 计。
核心应用模式: - 流式处理: 逐行读取超大型 CSV 或 H5 文件,避免一次性加载导致的内存崩溃(OOM)。 - 数据管道 (Pipeline): 通过链接多个生成器(如:读取 -> 清洗 -> 聚合),每个环节只处理当前记录。
这种模式特别适合研究长周期的日内高频波动特征或构建极其复杂的阿尔法因子(Alpha Factors)。
习题 3.8: 实现一个简单的回测系统
编写一个面向对象的交易系统,包含以下类:
Stock 类:存储股票信息Portfolio 类:管理持仓Backtest 类:执行回测解答:
import pandas as pd
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
# (a) Stock类
@dataclass
class Stock:
"""股票类"""
symbol: str
name: str
prices: pd.DataFrame
def __post_init__(self):
"""确保日期是索引"""
if not isinstance(self.prices.index, pd.DatetimeIndex):
if 'trade_date' in self.prices.columns:
self.prices['trade_date'] = pd.to_datetime(self.prices['trade_date'])
self.prices.set_index('trade_date', inplace=True)
def get_price(self, date):
"""获取指定日期的价格"""
if date in self.prices.index:
return self.prices.loc[date, 'close']
return None
def calculate_return(self, start_date, end_date):
"""计算期间收益率"""
if start_date in self.prices.index and end_date in self.prices.index:
start_price = self.prices.loc[start_date, 'close']
end_price = self.prices.loc[end_date, 'close']
return (end_price - start_price) / start_price
return None
# (b) Portfolio类
class Portfolio:
"""投资组合类"""
def __init__(self, initial_capital: float = 100000):
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions = {} # {symbol: shares}
self.transaction_history = []
def buy(self, symbol: str, shares: int, price: float, date: datetime):
"""买入股票"""
cost = shares * price
if cost <= self.cash:
self.cash -= cost
if symbol in self.positions:
self.positions[symbol] += shares
else:
self.positions[symbol] = shares
self.transaction_history.append({
'date': date,
'action': 'BUY',
'symbol': symbol,
'shares': shares,
'price': price,
'total': cost
})
return True
return False
def sell(self, symbol: str, shares: int, price: float, date: datetime):
"""卖出股票"""
if symbol in self.positions and self.positions[symbol] >= shares:
proceeds = shares * price
self.cash += proceeds
self.positions[symbol] -= shares
if self.positions[symbol] == 0:
del self.positions[symbol]
self.transaction_history.append({
'date': date,
'action': 'SELL',
'symbol': symbol,
'shares': shares,
'price': price,
'total': proceeds
})
return True
return False
def get_value(self, stock_prices: Dict[str, float]):
"""计算组合总价值"""
stocks_value = sum(shares * stock_prices.get(symbol, 0)
for symbol, shares in self.positions.items())
return self.cash + stocks_value
def get_positions(self):
"""获取当前持仓"""
return self.positions.copy()
# (c) Backtest类
class Backtest:
"""回测类"""
def __init__(self, stock: Stock, strategy='ma_crossover',
short_window=5, long_window=20):
self.stock = stock
self.strategy = strategy
self.short_window = short_window
self.long_window = long_window
self.portfolio = None
self.signals = None
def generate_signals(self):
"""生成交易信号"""
prices = self.stock.prices['close']
if self.strategy == 'ma_crossover':
# 移动平均线交叉策略
short_ma = prices.rolling(window=self.short_window).mean()
long_ma = prices.rolling(window=self.long_window).mean()
# 生成信号:1=买入,-1=卖出,0=持有
signals = pd.DataFrame(index=prices.index)
signals['signal'] = 0
signals.loc[short_ma > long_ma, 'signal'] = 1
signals.loc[short_ma < long_ma, 'signal'] = -1
# 只在信号变化时交易
signals['position'] = signals['signal'].diff()
signals.loc[signals['position'] == 0, 'position'] = 0
self.signals = signals
return signals
def run(self, start_date, end_date, initial_capital=100000):
"""执行回测"""
self.portfolio = Portfolio(initial_capital)
signals = self.generate_signals()
# 筛选回测期间
test_period = signals.loc[start_date:end_date]
for date, signal in test_period.iterrows():
price = self.stock.get_price(date)
if price is None:
continue
# 买入信号
if signal['position'] == 1:
# 使用可用现金的95%买入
shares = int((self.portfolio.cash * 0.95) / price)
if shares > 0:
self.portfolio.buy(self.stock.symbol, shares, price, date)
# 卖出信号
elif signal['position'] == -1:
current_positions = self.portfolio.get_positions()
shares = current_positions.get(self.stock.symbol, 0)
if shares > 0:
self.portfolio.sell(self.stock.symbol, shares, price, date)
return self.portfolio
def evaluate(self):
"""评估回测结果"""
if not self.portfolio:
return None
# 计算最终价值
final_date = self.stock.prices.index[-1]
final_price = self.stock.get_price(final_date)
final_value = self.portfolio.get_value({self.stock.symbol: final_price})
# 计算收益率
total_return = (final_value - self.portfolio.initial_capital) / self.portfolio.initial_capital
# 计算买入持有策略的收益率
first_price = self.stock.prices['close'].iloc[0]
buy_hold_return = (final_price - first_price) / first_price
results = {
'initial_capital': self.portfolio.initial_capital,
'final_value': final_value,
'total_return': total_return,
'buy_hold_return': buy_hold_return,
'excess_return': total_return - buy_hold_return,
'num_trades': len(self.portfolio.transaction_history)
}
return results
# (d) 使用宁波港数据进行回测
print('(d) 宁波港回测示例:\n')
# 加载数据
# 从本地 Parquet 文件读取数据代替 HDF5
ningbo_port_df = pd.read_parquet(
'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
ningbo_port_df['trade_date'] = pd.to_datetime(ningbo_port_df['trade_date'], format='%Y%m%d')
# 创建Stock对象
ningbo_port = Stock(
symbol='601018.SH',
name='宁波港',
prices=ningbo_port_df[['trade_date', 'close', 'high', 'low', 'volume']].copy()
)
print(f'股票: {ningbo_port.name} ({ningbo_port.symbol})')
print(f'数据范围: {ningbo_port.prices.index[0]} 至 {ningbo_port.prices.index[-1]}')
print(f'数据点数: {len(ningbo_port.prices)}')
# 创建回测对象
backtest = Backtest(ningbo_port, strategy='ma_crossover',
short_window=5, long_window=20)
# 选择回测期间(最近一年)
start_date = ningbo_port.prices.index[-252]
end_date = ningbo_port.prices.index[-1]
print(f'\n回测期间: {start_date} 至 {end_date}')
print(f'策略: 5日/20日移动平均线交叉')
# 运行回测
portfolio = backtest.run(start_date, end_date, initial_capital=100000)
# 评估结果
results = backtest.evaluate()
print(f'\n回测结果:')
print(f' 初始资金: ¥{results["initial_capital"]:,.2f}')
print(f' 最终价值: ¥{results["final_value"]:,.2f}')
print(f' 策略收益率: {results["total_return"]*100:+.2f}%')
print(f' 买入持有收益率: {results["buy_hold_return"]*100:+.2f}%')
print(f' 超额收益: {results["excess_return"]*100:+.2f}%')
print(f' 交易次数: {results["num_trades"]}')
print(f'\n持仓情况:')
positions = portfolio.get_positions()
if positions:
for symbol, shares in positions.items():
print(f' {symbol}: {shares} 股')
else:
print(' 无持仓')(d) 宁波港回测示例:
股票: 宁波港 (601018.SH)
数据范围: 2010-09-28 00:00:00 至 2025-12-31 00:00:00
数据点数: 3708
回测期间: 2024-12-19 00:00:00 至 2025-12-31 00:00:00
策略: 5日/20日移动平均线交叉
回测结果:
初始资金: ¥100,000.00
最终价值: ¥100,000.00
策略收益率: +0.00%
买入持有收益率: +42.33%
超额收益: -42.33%
交易次数: 0
持仓情况:
无持仓
移动平均线交叉策略的数学原理
移动平均线交叉是最经典的技术分析策略之一,其核心在于利用低通滤波器(Low-pass Filter)原理过滤市场白噪声(White Noise),提取股价的长期趋势。
数学定义: \(n\) 日简单移动平均线(SMA)定义为:
\[ MA_t(n) = \frac{1}{n}\sum_{i=0}^{n-1} P_{t-i} \]
交易逻辑: 该策略本质上是基于动量(Momentum)效应: - 看涨信号: 短期均线向上穿越长期均线(“金叉”),暗示短期动量超过长期动量。 - 看跌信号: 短期均线向下穿越长期均线(“死叉”),暗示股价进入阶段性下行。
通过此类量化手段,投资者可以克服心理锚定偏差(Anchoring Bias),实现纪律化投资。
本章我们不仅掌握了 Python 的核心内置数据结构——元组、列表、字典和集合,还深入探讨了函数编程的最佳实践与文件操作的底层机制。这些工具构成了 Python 量化分析生态系统的基石。掌握它们的数学特性(如可哈希性)和计算复杂度(如 \(O(1)\) 与 \(O(n)\) 的区别),对于后续使用 Pandas 处理海量高频金融数据至关重要。
现在,你已经具备了构建复杂交易逻辑的基础,接下来我们将迈入 NumPy 的大门,探索面向数组的向量化计算,这是提升模型性能的关键一步。
为了进一步提升你的编程能力和对底层机制的理解,建议你在掌握本章基本语法后,关注以下进阶方向:
1. 深入理解 Python 内部机制 建议阅读 Luciano Ramalho 的 Fluent Python。该书深入解析了 Python 的数据模型,特别是迭代器和生成器的内部运作。理解这些底层的“魔法方法”能帮助你写出更符合 Python 哲学且高性能的量化交易算法。
2. 编写工程级的 Python 代码 在金融机构工作时,代码的可维护性与正确性同等重要。Effective Python(Brett Slatkin 著)提供了大量关于命名规范、异步编程和性能优化的最佳实践。遵循其中的标准能让你的分析工具更具专业性,减少在大规模并发计算中的潜在错误。
3. 算法与复杂度的数学严谨性 对于习惯于定量分析的学生,建议阅读经典的《算法导论》(CLRS)。特别是关于哈希表和概率分析的章节,能让你从数学层面理解为什么字典查找是 \(O(1)\)。这种思维方式对于优化高频交易系统中的关键路径至关重要。
4. 量化金融实战应用 理论最终要落地。Yves Hilpisch 的 Python for Finance 展示了如何将这些基础结构应用于时间序列分析和风险管理。如果你对交易策略感兴趣,Stefan Jansen 的作品提供了从数据获取到回测的完整框架。请务必记住,任何量化策略都涉及高度的市场风险,应在充分验证后谨慎使用。
题目: 给定一个包含股票代码的列表,使用列表推导式筛选出所有以上海证券交易所(.XSHG)或深圳证券交易所(.XSHE)结尾的股票代码。
解答:
题目: 给定两个字典,分别存储了股票的持仓权重和日收益率。请计算该投资组合的加权收益率。
解答:
weights = {'600000.XSHG': 0.4, '600036.XSHG': 0.3, '601398.XSHG': 0.3}
returns = {'600000.XSHG': 0.012, '600036.XSHG': 0.005, '601398.XSHG': -0.002}
# 方法1:使用循环
portfolio_return = 0.0
for ticker, weight in weights.items():
portfolio_return += weight * returns[ticker]
print(f"Portfolio Return (Loop): {portfolio_return:.4f}")
# 方法2:使用列表推导式和 sum 函数 (更 Pythonic)
portfolio_return_v2 = sum([weights[t] * returns[t] for t in weights])
print(f"Portfolio Return (List Comp): {portfolio_return_v2:.4f}")题目: 给定一个包含金融新闻标题的列表,请完成以下任务: 1. 移除每条新闻标题两端的空白字符。 2. 过滤掉不包含“增长”或“下跌”关键词的标题。
解答: