2  python 内置数据结构、函数与文件

2.1 引言与学习目标

Python 作为金融数据分析的强大工具,其核心优势在于丰富的内置数据结构和灵活的编程范式。本章将系统性地介绍 Python 的基础数据结构——元组、列表、字典和集合,以及函数式编程的基本概念。

学习目标

通过本章学习,你应该能够:

  • 理论目标
    • 深刻理解 Python 四种核心数据结构(元组、列表、字典、集合)的数学特性和计算复杂度
    • 掌握可变对象(mutable)与不可变对象(immutable)的本质区别及其在内存管理中的意义
    • 理解列表推导式、生成器表达式的数学基础(集合论与映射理论)
    • 理解函数式编程的核心概念(映射、过滤、规约)及其在数据处理中的应用
  • 实践目标
    • 熟练运用各种数据结构解决实际金融数据分析问题
    • 使用中国股票市场数据进行实际的收益率计算和风险评估
    • 编写面向对象的简单回测系统
    • 运用生成器处理大规模金融数据集,优化内存使用
    • 掌握文件 I/O 操作,处理 CSV 格式的金融数据
  • 应用目标
    • 使用宁波港、宁波银行等长三角地区上市公司的真实数据进行分析
    • 构建移动平均线交叉策略等简单的量化交易系统
    • 计算并分析股票收益率、波动率等关键金融指标

前置知识要求

  • 熟解基本的编程概念:变量、数据类型、控制流(if/else、for/while)
  • 熟悉函数的数学表示:\(f: X \to Y\)(映射概念)
  • 熟悉基本的集合论概念:集合、并集、交集、差集
  • 了解基础的金融概念:收益率、波动率、时间序列数据

本章将讨论 Python 语言内置的功能,这些功能在我们学习金融和经济数据分析的过程中将被广泛使用。虽然像 pandas 和 NumPy 这样强大的第三方库为更大数据集提供了先进的计算功能,但它们的设计初衷是与 Python 灵活的内置数据操作工具协同工作。

2.1.1 理论基础:数据结构的数学性质

在深入探讨具体数据结构之前,我们需要从数学和计算机科学的角度理解数据结构的本质。

数据抽象与抽象数据类型(Abstract Data Type, ADT)

从数学角度看,数据结构是一种抽象数据类型(ADT),由两个要素定义:

  1. 数据对象(Data Object):存储信息的数学域 \(D\)
  2. 操作集合(Operations):定义在数据对象上的数学运算 \(\Omega\)

例如,对于列表(List): - 数据对象:\(D = \{a_1, a_2, ..., a_n\}\)(元素序列) - 操作集合:\(\Omega = \{\text{append}, \text{remove}, \text{sort}, \text{slice}, ...\}\)

计算复杂度理论

在金融数据分析中,选择合适的数据结构需要考虑其时间复杂度(Time Complexity)

操作 列表 字典/集合 说明
访问元素 O(1) O(1) 索引访问
搜索元素 O(n) O(1) 列表线性扫描,哈希表常数时间
插入元素 O(n) O(1) 列表需移动元素
删除元素 O(n) O(1) 列表需移动元素
切片 O(k) N/A k为切片长度

可变性与内存管理

从计算机体系结构角度,理解可变性对内存管理至关重要:

  • 不可变对象(Immutable):对象一旦创建,其内存状态不可改变
    • 优点:线程安全、可作为字典键、内存优化(Python可复用对象)
    • 缺点:任何修改都需要创建新对象
  • 可变对象(Mutable):对象创建后,其内存状态可以改变
    • 优点:原地修改、内存效率高
    • 缺点:非线程安全、不可作为字典键

在金融数据处理中,选择合适的数据结构类型直接影响程序的性能和可靠性。


2.2 数据结构与序列

Python 的数据结构简洁而强大。精通其用法是成为一名高效的经济分析程序员的关键。我们将从元组、列表和字典开始,它们是一些最常用的序列类型。

2.2.1 元组 (Tuple)

元组 (tuple) 是一个固定长度、不可变的 Python 对象序列,一旦被赋值,就不能被更改。这种不可变性并非限制,而是一种特性;它保证了一条数据记录,例如股票交易的价格、数量和时间戳,能够保持恒定,不会被意外修改。创建元组最简单的方法是用括号包裹一个逗号分隔的值序列。

ohlc_data = (10.5, 12.0, 10.2, 11.5)
ohlc_data
列表 2.1
(10.5, 12.0, 10.2, 11.5)

在许多情况下,括号可以省略:

ohlc_data = 10.5, 12.0, 10.2, 11.5
ohlc_data
列表 2.2
(10.5, 12.0, 10.2, 11.5)

你可以通过调用 tuple() 构造函数将任何序列或迭代器转换为元组。这对于“冻结”一个列表的内容非常有用。

列表 2.3
print(tuple(['AAPL', 180.5]))
ticker_chars = tuple('000001')
print(ticker_chars)
('AAPL', 180.5)
('0', '0', '0', '0', '0', '1')

元素可以通过方括号 [] 访问,就像在大多数其他编程语言中一样。按照计算机科学的惯例,Python 中的序列是0索引的。

ticker_chars[0]
列表 2.4
'0'

当你在更复杂的表达式中定义元组时,通常需要将值用括号括起来,例如下面这个创建元组的元组(嵌套元组)的例子:

列表 2.5
# ((股票A, 价格), (股票B, 价格))
portfolio_structure = (('AAPL', 180.5), ('GOOG', 140.2))
print(portfolio_structure)
(('AAPL', 180.5), ('GOOG', 140.2))

关于“不可变性 (Immutability)”的辨析

元组的“不可变性”指的是元组本身所包含的对象的引用是不可更改的,即你不能将元组某个位置上的对象替换成另一个对象。然而,如果元组中的某个对象本身是“可变的”(比如一个列表),那么这个对象的内容是可以被修改的。这个特性在处理复杂数据结构时非常重要。

尽管存储在元组中的对象本身可能是可变的,但一旦元组被创建,就不可能修改每个槽位中存储的对象:

列表 2.6
trade_tuple = tuple(['AAPL', 100])
# 下面这行会引发 TypeError,因为你不能给元组的一个槽位赋一个新的对象
trade_tuple[1] = 200
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[6], line 3
      1 trade_tuple = tuple(['AAPL', 100])
      2 # 下面这行会引发 TypeError,因为你不能给元组的一个槽位赋一个新的对象
----> 3 trade_tuple[1] = 200

TypeError: 'tuple' object does not support item assignment

但是,如果元组内部的一个对象是可变的,比如一个列表,你可以就地修改它。这是一个至关重要的区别。

# 元组表示:(基金名称, 持仓股票列表, 是否活跃)
portfolio_struct = ('Fund_A', ['AAPL', 'GOOG'], True)
portfolio_struct[1].append('MSFT')
portfolio_struct
列表 2.7
('Fund_A', ['AAPL', 'GOOG', 'MSFT'], True)

你可以使用 + 运算符连接元组以产生更长的元组。请注意,这将创建一个的元组;它不会修改原始元组。

('2024-01-01', 'BUY') + ('AAPL', 100) + (185.5,)
列表 2.8
('2024-01-01', 'BUY', 'AAPL', 100, 185.5)

将元组乘以一个整数,效果是连接该元组的多个副本。对象本身不会被复制,只有它们的引用会被复制。在处理大型数据集时,这对内存管理来说是一个重要的细节。

('BUY', 'SELL') * 4
列表 2.9
('BUY', 'SELL', 'BUY', 'SELL', 'BUY', 'SELL', 'BUY', 'SELL')

2.2.1.1 元组解包 (Unpacking tuples)

元组一个非常方便的特性是解包 (unpacking)。如果你试图将一个值赋给一个元组式的变量表达式,Python 会尝试解包等号右侧的值:

列表 2.10
quote = (180.5, 182.0, 179.8)
open_px, high_px, low_px = quote
print(f'high_px is: {high_px}')
high_px is: 182.0

即使是带有嵌套元组的序列也可以被解包,这对于迭代结构化数据非常有用。

列表 2.11
# 交易记录:日期, 方向, (股票, 数量)
transaction = '2024-01-01', 'BUY', ('AAPL', 100)
date, side, (symbol, qty) = transaction
print(f'symbol is: {symbol}')
symbol is: AAPL

这个功能为交换变量值提供了一种优雅的方式,这个任务在许多其他语言中需要一个临时变量。

变量交换的底层机制

bid_price, ask_price = ask_price, bid_price 这种看似神奇的语法,其背后原理正是元组的创建和解包。

  1. 创建元组: Python 首先计算等号右边的表达式 ask_price, bid_price,这会创建一个临时的、未命名的元组。
  2. 解包元组: 然后,Python 将这个临时元组的元素解包到等号左边的变量中。

整个过程是原子性的,确保了交换的正确性,即使变量名相同也不会出错。

列表 2.12
bid_price, ask_price = 10.0, 10.2
print(f'Before swap: bid={bid_price}, ask={ask_price}')
bid_price, ask_price = ask_price, bid_price
print(f'After swap: bid={bid_price}, ask={ask_price}')
Before swap: bid=10.0, ask=10.2
After swap: bid=10.2, ask=10.0

变量解包的一个常见用途是迭代元组或列表的序列:

列表 2.13
orders = [('AAPL', 100, 180.0), ('GOOG', 50, 140.0), ('MSFT', 200, 400.0)]
for symbol, qty, price in orders:
  print(f'symbol={symbol}, qty={qty}, price={price}')
symbol=AAPL, qty=100, price=180.0
symbol=GOOG, qty=50, price=140.0
symbol=MSFT, qty=200, price=400.0

有时,你可能想从序列的开头“摘取”几个元素,并收集其余的元素。*rest 语法可以用于此目的。在经济时间序列分析中,这对于将最新的数据点与历史序列分开非常有用。

列表 2.14
# 最近5日的收盘价 (从最新到最旧)
latest_prices = 105, 104, 103, 102, 100
today, yesterday, *history = latest_prices
print(f'today = {today}')
print(f'yesterday = {yesterday}')
print(f'history = {history}')
today = 105
yesterday = 104
history = [103, 102, 100]

按照惯例,许多 Python 程序员在解包时会使用下划线 (_) 来表示不想要的变量。

列表 2.15
today, yesterday, *_ = latest_prices

2.2.1.2 元组方法 (Tuple methods)

由于元组的大小和内容不能被修改,它的实例方法非常少。一个特别有用的方法是 count(也适用于列表),它计算一个值出现的次数。

ratings = ('Buy', 'Hold', 'Buy', 'Buy', 'Sell', 'Hold', 'Buy')
ratings.count('Buy')
列表 2.16
4

2.2.2 列表 (List)

与元组相反,列表 (list) 的长度是可变的,其内容可以就地修改。它们是可变的 (mutable)。在数据分析中,你可能会用列表来累积模拟结果,或者存放一个正在增量更新的经济时间序列数据。你可以用方括号 []list 类型函数来定义它们:

price_list = [10.5, 11.2, 9.8, None]
tickers_tuple = ('AAPL', 'GOOG', 'MSFT')
ticker_list = list(tickers_tuple)
ticker_list
列表 2.17
['AAPL', 'GOOG', 'MSFT']
ticker_list[1] = 'NVDA'
ticker_list
列表 2.18
['AAPL', 'NVDA', 'MSFT']

列表和元组在语义上相似,在许多函数中可以互换使用。list 函数在数据处理中经常被用来物化 (materialize) 一个迭代器或生成器表达式(我们稍后会探讨这些概念)。

gen = range(10)
print(gen)
list(gen)
range(0, 10)
列表 2.19
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

2.2.2.1 添加和删除元素

可以使用 append 方法将元素添加到列表的末尾:

ticker_list.append('TSLA')
ticker_list
列表 2.20
['AAPL', 'NVDA', 'MSFT', 'TSLA']

使用 insert 可以在列表的特定位置插入一个元素。插入索引必须在 0 和列表长度(包含)之间。

ticker_list.insert(1, 'AMZN')
ticker_list
列表 2.21
['AAPL', 'AMZN', 'NVDA', 'MSFT', 'TSLA']

appendinsert 的性能考量

在处理大规模金融数据集时,理解基本操作的计算成本至关重要:

  • append(value): 将元素添加到列表末尾是一个 O(1) 操作,即其执行时间与列表大小无关,对于实时行情数据的累积非常高效。
  • insert(i, value): 在列表的任意位置 i 插入元素是一个 O(n) 操作。这是因为插入点之后的所有元素都需要向右移动一位。对于大型历史数据集,频繁使用 insert 会导致显著的性能瓶颈。

因此,在构建大型资产列表时,应优先使用 append。如果确实需要在序列两端进行高效操作,可以考虑使用 collections.deque

insert 的逆操作是 pop,它会移除并返回特定索引处的元素:

列表 2.22
ticker_list.pop(2)
print(ticker_list)
['AAPL', 'AMZN', 'MSFT', 'TSLA']

可以使用 remove 按值移除元素,它会找到第一个这样的值并从列表中移除:

列表 2.23
ticker_list.append('AAPL')
print(f'移除前: {ticker_list}')
ticker_list.remove('AAPL')
print(f'移除后: {ticker_list}')
移除前: ['AAPL', 'AMZN', 'MSFT', 'TSLA', 'AAPL']
移除后: ['AMZN', 'MSFT', 'TSLA', 'AAPL']

你可以使用 in 关键字检查列表中是否包含某个值:

'TSLA' in ticker_list
列表 2.24
True

not 关键字可以用来否定 in

'TSLA' not in ticker_list
列表 2.25
False

列表搜索的计算复杂度

检查一个值是否存在于列表中(value in ticker_list)是一个 O(n) 操作。Python 必须从头到尾对列表进行线性扫描,直到找到匹配项或结束。

相比之下,字典(dict)和集合(set)使用了哈希表结构,其查找操作平均时间复杂度为 O(1)。在需要频繁检查某个股票是否在持仓池(portfolio pool)中时,将列表转换为集合能带来巨大的性能提升。

2.2.2.2 连接和组合列表

与元组类似,用 + 将两个列表相加会连接它们:

[10.5, None, 11.2] + [12.0, 11.8, (2, 3)]
列表 2.26
[10.5, None, 11.2, 12.0, 11.8, (2, 3)]

如果你已经定义了一个列表,你可以使用 extend 方法向其追加多个元素:

stock_prices = [10.5, None, 11.2]
stock_prices.extend([12.0, 11.8, (2, 3)])
stock_prices
列表 2.27
[10.5, None, 11.2, 12.0, 11.8, (2, 3)]

请注意,通过加法进行列表连接是一个相对昂贵的操作,因为必须创建一个新列表并将对象复制过去。使用 extend 向现有列表追加元素,尤其是在构建大列表时,通常是更好的选择,因为它会就地修改列表。

2.2.2.3 排序

你可以通过调用列表的 sort 方法对其进行就地排序(不创建新对象):

unsorted_ids = [7, 2, 5, 1, 3]
unsorted_ids.sort()
unsorted_ids
列表 2.28
[1, 2, 3, 5, 7]

sort 有一些偶尔会派上用场的选项。其中之一是传递一个次要排序 (key)——即一个函数,它为每个元素生成一个用于排序的值。例如,我们可以按字符串的长度对一个字符串集合进行排序:

asset_tags = ['saw', 'small', 'He', 'foxes', 'six']
asset_tags.sort(key=len)
asset_tags
列表 2.29
['He', 'saw', 'six', 'small', 'foxes']

稍后,我们将学习 sorted 函数,它可以从一个通用序列生成一个排好序的副本。

2.2.3 切片 (Slicing)

你可以通过使用切片表示法来选择大多数序列类型的部分,其基本形式是 start:stop,传递给索引运算符 []

price_sequence = [7, 2, 5, 1, 3, 6, 8, 9]
price_sequence[1:5]
列表 2.30
[2, 5, 1, 3]

切片也可以被赋值:

price_sequence[3:5] = ['a', 'b']
price_sequence
列表 2.31
[7, 2, 5, 'a', 'b', 6, 8, 9]

虽然 start 索引处的元素被包含在内,但 stop 索引处的元素不被包含,因此结果中的元素数量是 stop - startstartstop 都可以省略,此时它们分别默认为序列的开头或结尾:

列表 2.32
print(price_sequence[:5])
print(price_sequence[3:])
[7, 2, 5, 'a', 'b']
['a', 'b', 6, 8, 9]

负数索引表示从序列末尾开始切片:

列表 2.33
print(price_sequence[-4:])
print(price_sequence[-6:-2])
['b', 6, 8, 9]
[5, 'a', 'b', 6]

切片的语义需要一些时间来适应,特别是如果你之前使用过 R 或 MATLAB。图 2.1 提供了一个有用的图示,说明了使用正整数和负整数进行切片。在图中,索引显示在“箱格”的边缘,以帮助说明切片选择的开始和停止位置。

图 2.1: Python 切片约定示意图

在第二个冒号后还可以使用一个 step,例如,可以用来取每隔一个元素:

# 假设这是过去8天的交易量
daily_volumes = [1000, 2500, 1800, 3200, 1500, 2100, 4000, 1200]
daily_volumes[::2]
列表 2.34
[1000, 1800, 1500, 4000]

一个巧妙的用法是传递 -1,这有一个有用的效果,就是反转一个列表或元组:

daily_volumes[::-1]
列表 2.35
[1200, 4000, 2100, 1500, 3200, 1800, 2500, 1000]

2.2.4 字典 (Dictionary)

字典 (dictionary)dict 可能是最重要的内置 Python 数据结构。在其他编程语言中,字典有时被称为哈希映射 (hash map)关联数组 (associative array)。字典存储键值对的集合。在经济建模中,这是一个非常有用的结构,用于将唯一标识符(如国家的ISO代码或公司的股票代码)映射到相关数据(如GDP、通货膨胀率或市值)。

创建字典的一种方法是使用花括号 {} 和冒号来分隔键和值:

empty_dict = {}
stock_info = {'symbol' : 'AAPL', 'price' : 180.5, 'volume': 50000}
stock_info
列表 2.36
{'symbol': 'AAPL', 'price': 180.5, 'volume': 50000}

你可以使用与访问列表或元组元素相同的语法来访问、插入或设置元素:

列表 2.37
stock_info['price'] = 182.0
print(stock_info)
print(stock_info['price'])
{'symbol': 'AAPL', 'price': 182.0, 'volume': 50000}
182.0

你可以使用与检查列表或元组是否包含某个值相同的语法来检查字典是否包含某个键:

'volume' in stock_info
列表 2.38
True

你可以使用 del 关键字或 pop 方法(它会同时返回值并删除键)来删除值:

列表 2.39
stock_info['temp'] = 'to be deleted'
stock_info['market'] = 'NASDAQ'
print(f'del 之前: {stock_info}')
del stock_info['temp']
print(f'del 之后: {stock_info}')
ret = stock_info.pop('market')
print(f'弹出的值: {ret}')
print(f'pop 之后: {stock_info}')
del 之前: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000, 'temp': 'to be deleted', 'market': 'NASDAQ'}
del 之后: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000, 'market': 'NASDAQ'}
弹出的值: NASDAQ
pop 之后: {'symbol': 'AAPL', 'price': 182.0, 'volume': 50000}

keysvalues 方法分别提供字典键和值的迭代器。

列表 2.40
print(list(stock_info.keys()))
print(list(stock_info.values()))
['symbol', 'price', 'volume']
['AAPL', 182.0, 50000]

如果你需要同时迭代键和值,可以使用 items 方法来迭代键值对(作为2元组):

list(stock_info.items())
列表 2.41
[('symbol', 'AAPL'), ('price', 182.0), ('volume', 50000)]

你可以使用 update 方法将一个字典合并到另一个字典中。这将就地修改字典,因此传递给 update 的数据中任何已存在的键,其旧值都将被丢弃。

stock_info.update({'price' : 185.0, 'pe_ratio' : 30.5})
stock_info
列表 2.42
{'symbol': 'AAPL', 'price': 185.0, 'volume': 50000, 'pe_ratio': 30.5}

2.2.4.1 从序列创建字典

通常情况下,你可能会得到两个序列,并希望在字典中将它们按元素配对。zip 函数与 dict 构造函数结合使用,可以轻松实现这一点。

key_list = ['AAPL', 'GOOG', 'MSFT']
# 2024年5月10日的近似收盘价,用于演示
value_list = [183.05, 170.68, 414.74] 
mapping = dict(zip(key_list, value_list))
mapping
列表 2.43
{'AAPL': 183.05, 'GOOG': 170.68, 'MSFT': 414.74}

2.2.4.2 默认值

下面的逻辑很常见:

if key in some_dict:
    value = some_dict[key]
else:
    value = default_value

字典的 get 方法提供了一种更简洁的写法。如果键不存在,get 将返回 None 或一个指定的默认值。

列表 2.44
# 获取IBM的价格,如果找不到则默认为 0.0
value = mapping.get('IBM', 0.0) 
print(value)
value = mapping.get('AAPL', 0.0)
print(value)
0.0
183.05

一个常见的用例是让字典的值是其他集合,比如列表。例如,按首字母对一个单词列表进行分类。

words = ['apple', 'bat', 'bar', 'atom', 'book']
by_letter = {}
for word in words:
  letter = word[0]
  if letter not in by_letter:
    by_letter[letter] = [word]
  else:
    by_letter[letter].append(word)
by_letter
列表 2.45
{'a': ['apple', 'atom'], 'b': ['bat', 'bar', 'book']}

setdefault 方法可以简化这个工作流程。前面的 for 循环可以重写为:

by_letter = {}
for word in words:
  letter = word[0]
  by_letter.setdefault(letter, []).append(word)
by_letter
列表 2.46
{'a': ['apple', 'atom'], 'b': ['bat', 'bar', 'book']}

内置的 collections 模块有一个有用的类 defaultdict,这使得分组更加容易。

from collections import defaultdict
by_letter = defaultdict(list)
for word in words:
  by_letter[word[0]].append(word)
dict(by_letter) # 为了显示,转换回常规字典
列表 2.47
{'a': ['apple', 'atom'], 'b': ['bat', 'bar', 'book']}

2.2.4.3 哈希表的数学原理

在深入理解字典键的限制之前,我们需要从数学角度理解哈希表(Hash Table)的工作原理。

哈希函数的数学定义

哈希函数是一个映射: \[ h: U \to \{0, 1, ..., m-1\} \]

其中: - \(U\) 是键的宇宙(所有可能的键) - \(\{0, 1, ..., m-1\}\) 是哈希值空间(\(m\) 是哈希表的大小) - \(h(k)\) 是键 \(k\) 的哈希值

理想哈希函数的性质

一个理想的哈希函数应满足以下数学性质:

  1. 确定性(Determinism)\[ \forall k \in U: h(k) = \text{constant} \] 相同的键必须始终产生相同的哈希值

  2. 均匀性(Uniformity): 哈希值应在哈希表中均匀分布 \[ P(h(k) = \frac{1}{m}, \quad \forall k \in U \] 其中 \(P\) 是概率分布

  3. 独立性(Independence): 不同键的哈希值应相互独立 \[ \forall k_1 \neq k_2: h(k_1) \bot h(k_2) \approx \text{independent} \]

冲突解决(Collision Resolution)

当两个不同的键 \(k_1 \neq k_2\) 产生相同的哈希值时: \[ h(k_1) = h(k_2) \] 这称为**哈希冲突(Hash Collision)。

Python 使用开放寻址法(Open Addressing)二次探测(Quadratic Probing)来解决冲突。

探测序列定义为: \[ h_i(k) = (h(k) + c_1 \cdot i + c_2 \cdot i^2) \mod m \]

其中 \(c_1, c_2\) 是常数,\(i\) 是探测次数。

平均查找复杂度

在理想情况下(均匀哈希,低负载因子 \(\alpha = n/m\)),哈希表的平均查找复杂度为:

\[ T_{avg}(n) = \Theta\left(\frac{1}{1-\alpha}\right) = O(1) \]

其中: - \(n\) 是存储的键值对数量 - \(m\) 是哈希表的容量 - \(\alpha = n/m\) 是负载因子

这就是为什么字典的查找操作(in 运算)平均为 \(O(1)\) 的数学基础。

2.2.4.4 有效的字典键类型

虽然字典的值可以是任何 Python 对象,但键通常必须是不可变对象,如标量类型(int、float、string)或元组(并且元组中的所有对象也必须是不可变的)。这里的技术术语是可哈希性 (hashability)

关键概念:可哈希性 (Hashability)

字典在内部使用哈希表来存储数据,这使得键的查找速度极快。哈希函数将一个不可变对象(如股票代码字符串)转换为一个整数(哈希值),作为内存索引。

为了让系统可靠工作,一个关键要求是:一个对象的哈希值在其生命周期内必须永远不变

  • 不可变对象(如字符串、数字、元组):值不变,哈希值也不变,是“可哈希的”。
  • 可变对象(如列表、字典):内容可变,哈希值随之改变,无法作为稳定的索引。

这就是为什么金融分析中的唯一标识符(如 order_book_id)必须使用不可变类型的原因。

你可以使用 hash 函数检查一个对象是否是可哈希的:

列表 2.48
print(hash('string'))
print(hash((1, 2, (2, 3))))
# 下面这行会失败,因为列表是可变的,因此不可哈希。
hash((1, 2, [2, 3]))
180890216708858939
-9209053662355515447
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[49], line 4
      2 print(hash((1, 2, (2, 3))))
      3 # 下面这行会失败,因为列表是可变的,因此不可哈希。
----> 4 hash((1, 2, [2, 3]))

TypeError: unhashable type: 'list'

要使用列表作为键,一个选择是将其转换为元组:

d = {}
d[tuple()] = 5
d
列表 2.49
{(): 5}

2.2.5 集合 (Set)

集合 (set) 是一个无序的唯一元素集合。它们在成员资格测试、从序列中移除重复项以及数学运算(如并集、交集和差集)方面非常有用。在经济数据集中,你可以使用集合来查找所代表的唯一国家或行业列表。

集合可以通过两种方式创建:通过 set 函数或通过带有花括号的集合字面量:

列表 2.50
print(set())
print({600000, 600000, 600000, 600036, 601398, 601398})
set()
{600000, 600036, 601398}

集合支持数学集合运算。考虑这两个示例集合:

列表 2.51
portfolio_a = {'AAPL', 'GOOG', 'MSFT', 'AMZN', 'META'}
portfolio_b = {'MSFT', 'AMZN', 'META', 'TSLA', 'NVDA', 'NFLX'}

这两个集合的并集 (union) 是出现在任一集合中的不同元素的集合。这可以通过 union 方法或 | 二元运算符来计算:

列表 2.52
print(portfolio_a.union(portfolio_b))
print(portfolio_a | portfolio_b)
{'NVDA', 'NFLX', 'AMZN', 'MSFT', 'GOOG', 'TSLA', 'AAPL', 'META'}
{'NVDA', 'NFLX', 'AMZN', 'MSFT', 'GOOG', 'TSLA', 'AAPL', 'META'}

交集 (intersection) 包含同时出现在两个集合中的元素。可以使用 & 运算符或 intersection 方法:

列表 2.53
print(portfolio_a.intersection(portfolio_b))
print(portfolio_a & portfolio_b)
{'MSFT', 'AMZN', 'META'}
{'MSFT', 'AMZN', 'META'}

表 2.1 列出了常用的集合方法。

表 2.1: Python 集合运算
函数 替代语法 描述
a.add(x) N/A 将元素 x 添加到集合 a
a.clear() N/A 将集合 a 重置为空状态。
a.remove(x) N/A 从集合 a 中移除元素 x
a.pop() N/A 从集合 a 中移除一个任意元素。
a.union(b) a \| b ab 中的所有唯一元素。
a.update(b) a \|= b a 设置为 ab 的并集。
a.intersection(b) a & b ab 中的所有共同元素。
a.intersection_update(b) a &= b a 设置为 ab 的交集。
a.difference(b) a - b a 中存在但 b 中不存在的元素。
a.difference_update(b) a -= b a 设置为 ab 的差集。
a.symmetric_difference(b) a ^ b 存在于 ab 中但不同时存在的元素。
a.symmetric_difference_update(b) a ^= b a 设置为对称差集。
a.issubset(b) <= b 如果 ab 的子集,则为 True。
a.issuperset(b) >= b 如果 ab 的超集,则为 True。
a.isdisjoint(b) N/A 如果 ab 没有共同元素,则为 True。

所有的逻辑集合运算都有就地 (in-place) 版本,对于非常大的集合,这可能更高效。

列表 2.54
c = portfolio_a.copy()
c |= portfolio_b
print(f'就地并集: {c}')

d = portfolio_a.copy()
d &= portfolio_b
print(f'就地交集: {d}')
就地并集: {'NVDA', 'NFLX', 'AMZN', 'MSFT', 'GOOG', 'TSLA', 'AAPL', 'META'}
就地交集: {'MSFT', 'AMZN', 'META'}

与字典键一样,集合元素也必须是不可变的。要存储类似列表的元素,你必须将它们转换为元组:

my_data = [1, 2, 3]
my_set = {tuple(my_data)}
my_set
列表 2.55
{(1, 2, 3)}

你还可以检查子集和超集:

列表 2.56
a_set = {1, 2, 3, 4, 5}
print({1, 2, 3}.issubset(a_set))
print(a_set.issuperset({1, 2, 3}))
True
True

当且仅当集合的内容相等时,它们才相等;顺序无关紧要。

{1, 2, 3} == {3, 2, 1}
列表 2.57
True

2.3 内置序列函数

Python 有一些有用的序列函数,你应该熟悉它们。

2.3.1 enumerate

在迭代序列时,通常需要跟踪当前项的索引。enumerate 返回一个 (i, value) 元组的序列:

列表 2.58
collection = ['Econ', 'Finance', 'Stats']
for index, value in enumerate(collection):
  print(f'{index}: {value}')
0: Econ
1: Finance
2: Stats

2.3.2 sorted

sorted 函数从任何序列的元素中返回一个的排好序的列表。这与 list.sort() 方法形成对比,后者是就地对列表进行排序。

列表 2.59
print(sorted([7, 1, 2, 6, 0, 3, 2]))
print(sorted('horse race'))
[0, 1, 2, 2, 3, 6, 7]
[' ', 'a', 'c', 'e', 'e', 'h', 'o', 'r', 'r', 's']

2.3.3 zip

zip 将多个列表、元组或其他序列的元素“配对”起来,创建一个元组的列表。

seq1 = ['foo', 'bar', 'baz']
seq2 = ['one', 'two', 'three']
zipped = zip(seq1, seq2)
list(zipped)
列表 2.60
[('foo', 'one'), ('bar', 'two'), ('baz', 'three')]

zip 可以接受任意数量的序列,它产生的元素数量由最短的序列决定。

seq3 = [False, True]
list(zip(seq1, seq2, seq3))
列表 2.61
[('foo', 'one', False), ('bar', 'two', True)]

zip 的一个常见用途是同时迭代多个序列,可能还会与 enumerate 结合使用:

列表 2.62
for index, (a, b) in enumerate(zip(seq1, seq2)):
  print(f'{index}: {a}, {b}')
0: foo, one
1: bar, two
2: baz, three

2.3.4 reversed

reversed 以相反的顺序迭代序列的元素。

list(reversed(range(10)))
列表 2.63
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

请记住,reversed 是一个生成器 (generator)(稍后讨论),所以它在被物化(例如,用 list()for 循环)之前不会创建反转的序列。

2.4 列表、集合与字典推导式

列表推导式 (List comprehensions) 是一个方便且被广泛使用的 Python 语言特性。它们允许你通过一个简洁的表达式,从一个集合中筛选元素并进行转换,从而简洁地形成一个新列表。基本形式是:

2.4.1 推导式的数学基础:集合论与集合构造

从数学角度看,列表推导式是集合概括(Set-Comprehension)思想的程序实现。

集合概括的数学定义

在朴素集合论(Naive Set Theory)中,集合概括定义为:

\[ S = \{x \in A \mid P(x)\} \]

其中: - \(A\) 是源集合 - \(P(x)\) 是关于元素 \(x\) 的性质或谓词 - \(S\) 是满足性质的所有元素的集合

示例:偶数集合的数学表示 \[ E = \{x \in \{1, 2, ..., 100\} \mid (x \mod 2 = 0)\} \]

在 Python 中,这对应于:

even_numbers = [x for x in range(1, 101) if x % 2 == 0]

映射与过滤的数学表示

更复杂的推导式涉及映射(Mapping)过滤(Filtering)的组合。

给定: - 源集合 \(A\) - 映射函数 \(f: A \to B\) - 谓词函数 \(P: A \to \{\text{True}, \text{False}\}\)

过滤后映射定义为: \[ R = \{f(x) \mid x \in A \land P(x)\} \]

这在 Python 中直接对应于:

result = [f(x) for x in A if P(x)]

多重集合概括(嵌套推导式)

对于嵌套结构,我们使用笛卡尔积(Cartesian Product)的概念:

\[ A \times B = \{(a, b) \mid a \in A \land b \in B\} \]

Python 的嵌套推导式:

pairs = [(a, b) for a in list_a for b in list_b]

这正是笛卡尔积的实现,时间复杂度为 \(O(|A| \times |B|)\)

复杂度对比

方法 时间复杂度 空间复杂度 说明
传统 for 循环 O(n) O(n) 需要预分配列表空间
列表推导式 O(n) O(n) 更简洁,Python 内部优化
生成器表达式 O(n) O(1) 惰性求值,节省内存

对于金融数据处理,列表推导式提供了声明式、函数式的编程风格,使代码更易读和维护。 [expr for value in collection if condition]

这等同于以下 for 循环:

result = []
for value in collection:
    if condition:
        result.append(expr)

if condition 部分是可选的。例如,给定一个字符串列表,我们可以过滤掉长度为2或更短的字符串,并将它们转换为大写:

strings = ['a', 'as', 'bat', 'car', 'dove', 'python']
[x.upper() for x in strings if len(x) > 2]
列表 2.64
['BAT', 'CAR', 'DOVE', 'PYTHON']

集合和字典推导式是一个自然的扩展。集合推导式看起来与等效的列表推导式相似,只是用花括号代替:

unique_lengths = {len(x) for x in strings}
unique_lengths
列表 2.65
{1, 2, 3, 4, 6}

字典推导式看起来像这样: {key-expr: value-expr for value in collection if condition} 例如,我们可以创建一个这些字符串到它们在列表中位置的查找映射:

loc_mapping = {val: index for index, val in enumerate(strings)}
loc_mapping
列表 2.66
{'a': 0, 'as': 1, 'bat': 2, 'car': 3, 'dove': 4, 'python': 5}

2.4.2 嵌套列表推导式

假设我们有一个包含一些英文和西班牙文名字的列表的列表。

列表 2.67
all_data = [['John', 'Emily', 'Michael', 'Mary', 'Steven'],
            ['Maria', 'Juan', 'Javier', 'Natalia', 'Pilar']]

假设我们想得到一个包含所有名字中含有两个或更多”a”的单个列表。我们可以用嵌套的 for 循环来做到这一点,但嵌套列表推导式更简洁。

result = [name for names in all_data for name in names if name.count('a') >= 2]
result
列表 2.68
['Maria', 'Natalia']

列表推导式的 for 部分是根据嵌套的顺序排列的。这里是另一个例子,我们将一个元组列表“扁平化”为一个简单的整数列表:

some_tuples = [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
flattened = [x for tup in some_tuples for x in tup]
flattened
列表 2.69
[1, 2, 3, 4, 5, 6, 7, 8, 9]

重要的是要区分刚才展示的语法和列表推导式内部的列表推导式,后者会产生一个列表的列表:

[[x for x in tup] for tup in some_tuples]
列表 2.70
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

2.5 函数 (Functions)

函数是 Python 中代码组织和复用的主要且最重要的方法。根据经验,如果你预计需要重复相同或非常相似的代码超过一次,那么编写一个可复用的函数是值得的。函数用 def 关键字声明。

列表 2.71
def my_function(x, y):
  return x + y

每个函数可以有位置参数 (positional arguments)关键字参数 (keyword arguments)。关键字参数最常用于指定默认值或可选参数。

列表 2.72
def my_function2(x, y, z=1.5):
  if z > 1:
    return z * (x + y)
  else:
    return z / (x + y)

虽然关键字参数是可选的,但在调用函数时必须指定所有位置参数。主要的限制是关键字参数必须跟在位置参数(如果有的话)之后。

列表 2.73
print(my_function2(5, 6, z=0.7))
print(my_function2(10, 20))
0.06363636363636363
45.0

2.5.1 命名空间、作用域和局部函数

函数可以访问不同命名空间 (namespaces) 中的变量。默认情况下,在函数内部赋的任何变量都赋给了局部命名空间 (local namespace)。局部命名空间在函数被调用时创建,并立即由函数的参数填充。函数结束后,局部命名空间被销毁。 考虑这个函数:

列表 2.74
def func():
  a = []
  for i in range(5):
    a.append(i)
# 当 func() 被调用时,列表 `a` 被创建、填充,然后被销毁。
# `a` 在函数外部不存在。

现在,假设我们在函数外部声明了 a。函数可以从更高的作用域访问这个变量。

列表 2.75
a = []
def func():
  for i in range(5):
    a.append(i)

func()
print(a)
func()
print(a)
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]

对函数作用域之外的变量进行赋值是可能的,但它们必须使用 global 关键字声明。我通常不鼓励使用 global 关键字。

列表 2.76
a = None
def bind_a_variable():
  global a
  a = []
bind_a_variable()
print(a)
[]

2.5.2 返回多个值

Python 函数一个非常方便的特性是能够返回多个值。

列表 2.77
def f():
  a = 5
  b = 6
  c = 7
  return a, b, c

a, b, c = f()
print(f'a={a}, b={b}, c={c}')
a=5, b=6, c=7

这里发生的是,函数实际上只返回一个对象,一个元组,然后这个元组被解包到结果变量中。

return_value = f()
return_value
列表 2.78
(5, 6, 7)

一个可能有吸引力的替代方案是返回一个字典:

def f():
  a = 5
  b = 6
  c = 7
  return {'a': a, 'b': b, 'c': c}
f()
列表 2.79
{'a': 5, 'b': 6, 'c': 7}

2.5.3 函数是对象

由于 Python 函数是对象,你可以将它们作为参数传递给其他函数。这是函数式编程的一个基本概念,对于数据分析非常强大。假设我们正在对一个美国州名列表进行数据清洗。

列表 2.80
states = [' Alabama ', 'Georgia!', 'Georgia', 'georgia', 'FlorIda',
          'south carolina##', 'West virginia?']

为了使这个列表统一,我们需要去除空白、移除标点符号并统一大小写。

import re

def clean_strings(strings):
  result = []
  for value in strings:
    value = value.strip()
    value = re.sub('[!#?]', '', value)
    value = value.title()
    result.append(value)
  return result

clean_strings(states)
列表 2.81
['Alabama',
 'Georgia',
 'Georgia',
 'Georgia',
 'Florida',
 'South Carolina',
 'West Virginia']

一种更灵活、函数式的方法是定义一个你想要应用的操作列表。

def remove_punctuation(value):
  return re.sub('[!#?]', '', value)

clean_ops = [str.strip, remove_punctuation, str.title]

def clean_strings(strings, ops):
  result = []
  for value in strings:
    for func in ops:
      value = func(value)
    result.append(value)
  return result

clean_strings(states, clean_ops)
列表 2.82
['Alabama',
 'Georgia',
 'Georgia',
 'Georgia',
 'Florida',
 'South Carolina',
 'West Virginia']

你还可以将函数用作其他内置函数的参数,比如 map,它将一个函数应用于一个序列。

列表 2.83
for x in map(remove_punctuation, states):
  print(x)
 Alabama 
Georgia
Georgia
georgia
FlorIda
south carolina
West virginia

2.5.4 匿名 (Lambda) 函数

Python 支持所谓的匿名 (anonymous)lambda 函数,这是一种编写只包含单个语句的函数的方式,该语句的结果就是返回值。

列表 2.84
def short_function(x):
  return x * 2

equiv_anon = lambda x: x * 2

它们在数据分析中特别方便,因为许多数据转换函数都接受其他函数作为参数。传递一个 lambda 函数通常比编写一个完整的函数声明更清晰。

def apply_to_list(some_list, f):
  return [f(x) for x in some_list]

ints = [4, 7, 1, 2, 9]
apply_to_list(ints, lambda x: x * 2)
列表 2.85
[8, 14, 2, 4, 18]

再举一个例子,假设你想按每个字符串中不同字母的数量对一个字符串集合进行排序:

strings = ['foo', 'card', 'bar', 'aaaa', 'abab']
strings.sort(key=lambda x: len(set(x)))
strings
列表 2.86
['aaaa', 'foo', 'abab', 'bar', 'card']

2.5.5 生成器 (Generators)

Python 中的许多对象都支持迭代。这是通过迭代器协议 (iterator protocol) 实现的。例如,迭代一个字典会产生字典的键。

列表 2.87
some_dict = {'a': 1, 'b': 2, 'c': 3}
for key in some_dict:
  print(key)
a
b
c

当你写 for key in some_dict 时,Python 解释器首先尝试从 some_dict 创建一个迭代器。

dict_iterator = iter(some_dict)
print(dict_iterator)
list(dict_iterator)
<dict_keyiterator object at 0x0000020234465260>
列表 2.88
['a', 'b', 'c']

生成器 (generator) 是构建新可迭代对象的一种便捷方式。普通函数执行并返回单个结果,而生成器可以通过暂停和恢复执行来返回一系列多个值。要创建一个生成器,使用 yield 关键字而不是 return

2.5.6 惰性求值的数学基础

从计算机科学和函数式编程理论角度理解生成器需要掌握惰性求值的核心概念。

严格求值 vs 惰性求值

在编程语言理论中,表达式的求值策略分为两类:

求值策略 定义 示例 内存模式
严格求值 表达式一被遇到就计算所有值 sum([f(x) for x in S]) 立即分配所有内存
惰性求值 仅在实际需要时才计算值 Python 生成器 按需分配内存

惰性序列的数学表示

一个惰性序列 \(L\) 可以表示为一个** thunk(延迟计算)**:

\[ L = \lambda: [\text{yield } e_1, \text{yield } e_2, ..., \text{yield } e_n] \]

其中每个 \(e_i\) 产生一个值,但只有在被请求时才计算。

流(Streams)的数学定义

在数据处理理论中,流(Stream)是一个无限序列:

\[ S = [s_1, s_2, s_3, ...] \]

惰性求值允许我们处理无限流而不耗尽内存:

# 有限内存处理无限数据流
def process_stream(stream):
    for item in stream:  # 每次只处理一个元素
        yield process(item)  # 转换为下游数据

共递归的数学表示

许多生成器可以建模为共递归(Corecursion)

\[ f(n) = \text{if } n = 0: \text{then } 1\] \[ f(n) = \text{if } n = 0: \text{then } 1: \text{else } n \times f(n-1)\]

尾递归优化使得: - 时间复杂度:从指数级 \(O(2^n)\) 降到线性 \(O(n)\) - 空间复杂度:从 \(O(n)\) 栈降到 \(O(1)\)(记忆化)

这在金融计算中极其重要,例如:

# 斐波那契数列的共递归实现
def fib_stream(n):
    a, b = 0, 1
    for _ in range(n):
        yield b
        a, b = b, a + b

数据管道(Pipeline)的数学模型

生成器使数据管道成为可能:

\[ \mathrm{数据} \rightarrow \mathrm{过滤} \rightarrow \mathrm{转换} \rightarrow \mathrm{聚合} \]

每个阶段都是一个惰性操作:

def pipeline(data):
    return aggregate(transform(filter(data)))

这种函数式编程风格在量化金融中非常重要,特别是处理大规模 tick 级数据时。

关键概念: 迭代器与生成器

对于处理大规模经济或金融数据集(如逐笔成交数据),理解生成器的“惰性求值 (lazy evaluation)”特性至关重要。

  • 列表 (List): 立即计算并存储所有元素。对于包含数千万条记录的 A 股历史 Tick 数据,这会瞬间耗尽内存。
  • 生成器 (Generator): 仅在被请求时(如 for 循环中)才“屈服 (yield)”一个值。它在任何时刻都只需在内存中保存一个元素。

这种机制允许我们以极低的内存成本处理几乎无限大的数据流,是量化金融工程中的核心技术。

列表 2.89
def squares(n=10):
  print(f'生成从1到{n}的平方数')
  for i in range(1, n + 1):
    yield i ** 2

# 当你调用生成器时,没有代码会立即执行
gen = squares()
print(gen)

# 直到你请求元素时,它才开始执行
for x in gen:
  print(x, end=' ')
<generator object squares at 0x000002023385CB30>
生成从1到10的平方数
1 4 9 16 25 36 49 64 81 100 

2.5.6.1 生成器表达式 (Generator expressions)

制作生成器的另一种方法是使用生成器表达式 (generator expression)。这是列表推导式的生成器版本。要创建一个,将本应是列表推导式的内容用圆括号括起来。

gen = (x ** 2 for x in range(100))
gen
列表 2.90
<generator object <genexpr> at 0x000002023385CC80>

这等同于更冗长的生成器:

def _make_gen():
    for x in range(100):
        yield x ** 2
gen = _make_gen()

生成器表达式可以用作函数参数,代替列表推导式,这样可以更节省内存,也可能更快。

列表 2.91
print(sum(x ** 2 for x in range(100)))
print(dict((i, i ** 2) for i in range(5)))
328350
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

2.5.6.2 itertools 模块

标准库 itertools 模块有许多常见数据算法的生成器集合。例如,groupby 接受一个序列和一个函数,按函数的返回值对连续的元素进行分组。

列表 2.92
import itertools
names = ['Alan', 'Adam', 'Wes', 'Will', 'Albert', 'Steven']
def first_letter(x):
  return x
for letter, names_group in itertools.groupby(names, first_letter):
  print(letter, list(names_group)) # names_group 是一个生成器
Alan ['Alan']
Adam ['Adam']
Wes ['Wes']
Will ['Will']
Albert ['Albert']
Steven ['Steven']

表 2.2 列出了一些其他有用的 itertools 函数。

表 2.2: 一些有用的 itertools 函数
函数 描述
chain(*iterables) 通过将迭代器链接在一起来生成一个序列。
combinations(iterable, k) 生成所有可能的k元组,忽略顺序且无放回。
permutations(iterable, k) 生成所有可能的k元组,考虑顺序。
groupby(iterable[, keyfunc]) 为每个唯一键生成 (键, 子迭代器)。
product(*iterables, repeat=1) 生成输入可迭代对象的笛卡尔积(作为元组)。

2.6 错误与异常处理

优雅地处理 Python 错误或异常 (exceptions) 是构建健壮程序的重要部分。在数据分析中,许多函数只对特定类型的输入有效,而真实世界的数据往往很杂乱。例如,float() 在输入不当时会失败,并引发 ValueError

列表 2.93
float('something')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[94], line 1
----> 1 float('something')

ValueError: could not convert string to float: 'something'

假设我们想要一个能够优雅失败的 float 版本。我们可以通过编写一个函数,将对 float 的调用封装在 try/except 块中来实现。

列表 2.94
def attempt_float(x):
  try:
    return float(x)
  except:
    return x

print(attempt_float('1.2345'))
print(attempt_float('something'))
1.2345
something

你可能只想抑制 ValueError,因为 TypeError 可能表明你的程序中存在一个真正的 bug。要做到这一点,在 except 后面写上异常类型:

列表 2.95
def attempt_float_specific(x):
  try:
    return float(x)
  except ValueError:
    return x

# 这现在会引发 TypeError,正如预期的那样
attempt_float_specific((1, 2))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[96], line 8
      5     return x
      7 # 这现在会引发 TypeError,正如预期的那样
----> 8 attempt_float_specific((1, 2))

Cell In[96], line 3, in attempt_float_specific(x)
      1 def attempt_float_specific(x):
      2   try:
----> 3     return float(x)
      4   except ValueError:
      5     return x

TypeError: float() argument must be a string or a real number, not 'tuple'

你可以通过编写一个异常类型的元组来捕获多种异常类型:

列表 2.96
def attempt_float_multiple(x):
  try:
    return float(x)
  except (TypeError, ValueError):
    return x

在某些情况下,你可能希望无论 try 块是否成功,都执行一些代码。为此,使用 finally。类似地,你可以使用 else 来执行仅在 try 块成功时才执行的代码。

# 虚构的文件写入示例
# f = open(path, mode='w')
# try:
#     write_to_file(f)
# except:
#     print('失败')
# else:
#     print('成功')
# finally:
#     f.close()

2.7 文件与操作系统

本书大部分内容使用像 pandas.read_csv 这样的高级工具来读取数据文件。然而,理解如何在 Python 中处理文件的基础知识很重要。要打开一个文件进行读写,使用内置的 open 函数。一个最佳实践是传递一个 encoding 参数。

# path = 'examples/segismundo.txt' # 假设这个文件存在
# f = open(path, encoding='utf-8')

默认情况下,文件以只读模式('r')打开。然后我们可以像处理列表一样处理文件对象 f,并迭代其行。行内容会带有行尾(EOL)标记,所以你通常会用 rstrip() 来移除它们。

列表 2.97
# 这段代码假设在指定路径存在一个文件
path = 'examples/segismundo.txt'
lines = [x.rstrip() for x in open(path, encoding='utf-8')]

当你使用 open 创建文件对象时,建议在完成后关闭文件。with 语句通过在退出块时自动关闭文件,使这变得更容易。

列表 2.98
path = 'examples/segismundo.txt'
with open(path, encoding='utf-8') as f:
  lines = [x.rstrip() for x in f]

表 2.3 列出了有效的文件读/写模式。

表 2.3: Python 文件模式
模式 描述
r 只读模式
w 只写模式;创建一个新文件(擦除现有数据)
x 只写模式;创建一个新文件,但如果路径已存在则失败
a 追加到现有文件(如果文件不存在则创建)
r+ 读写
b 添加到模式中用于二进制文件(例如,'rb''wb'
t 文本模式文件(默认);自动将字节解码为 Unicode

对于可读文件,一些最常用的方法是 readseektellread 从文件中返回一定数量的字符。

列表 2.99
# 假设 path 指向一个 utf-8 编码的文件
with open(path, encoding='utf-8') as f:
    # 读取前10个字符
    content_text = f.read(10)
    # 获取当前位置
    position_text = f.tell()
    # 将位置移动到第3个字节
    f.seek(3)
    # 从新位置读取一个字符
    char_after_seek = f.read(1)

with open(path, mode='rb') as f: # 二进制模式
    # 读取前10个字节
    content_binary = f.read(10)
    # 获取当前位置
    position_binary = f.tell()

# 用于演示的虚构输出
print(f'文本内容: "{content_text}", 位置: {position_text}')
print(f'二进制内容: {content_binary}, 位置: {position_binary}')
print(f'移动后读取的字符: "{char_after_seek}"')

表 2.4 总结了许多最常用的文件方法。

表 2.4: 重要的 Python 文件方法
方法/属性 描述
read([size]) 以字符串或字节形式返回文件数据,可选大小。
readlines([size]) 返回文件中的行列表,可选大小参数。
write(string) 将传递的字符串写入文件。
writelines(strings) 将传递的字符串序列写入文件。
close() 关闭文件对象。
flush() 将内部 I/O 缓冲区刷新到磁盘。
seek(pos) 移动到指定的文件位置(整数)。
tell() 以整数形式返回当前文件位置。
closed 如果文件已关闭,则为 True。
encoding 用于将字节解释为 Unicode 的编码。

2.8 习题

2.8.1 基础习题

习题 3.1: 列表与字符串操作

给定一个字符串列表:

stocks = ['贵州茅台', '宁波港', '宁波银行', '中国平安', '招商银行']
  1. 将列表中的每个股票名称与其股票代码配对
codes = ['600519.SH', '601018.SH', '002142.SZ', '601318.SH', '600036.SH']
  1. 使用列表推导式创建一个包含股票信息的字典列表
  2. 筛选出以”宁波”开头的股票
  3. 将股票名称按字母顺序排序

解答:

stocks = ['贵州茅台', '宁波港', '宁波银行', '中国平安', '招商银行']
codes = ['600519.SH', '601018.SH', '002142.SZ', '601318.SH', '600036.SH']

# (a) 配对股票名称和代码
paired = list(zip(stocks, codes))
print('(a) 股票配对:')
for stock, code in paired:
    print(f'{stock}: {code}')

# (b) 创建字典列表
stock_list = [{'name': s, 'code': c} for s, c in paired]
print('\n(b) 字典列表:')
print(stock_list)

# (c) 筛选以"宁波"开头的股票
ningbo_stocks = [s for s in stocks if s.startswith('宁波')]
print(f'\n(c) 宁波相关股票: {ningbo_stocks}')

# (d) 按字母顺序排序
sorted_stocks = sorted(stocks)
print(f'\n(d) 排序后: {sorted_stocks}')
(a) 股票配对:
贵州茅台: 600519.SH
宁波港: 601018.SH
宁波银行: 002142.SZ
中国平安: 601318.SH
招商银行: 600036.SH

(b) 字典列表:
[{'name': '贵州茅台', 'code': '600519.SH'}, {'name': '宁波港', 'code': '601018.SH'}, {'name': '宁波银行', 'code': '002142.SZ'}, {'name': '中国平安', 'code': '601318.SH'}, {'name': '招商银行', 'code': '600036.SH'}]

(c) 宁波相关股票: ['宁波港', '宁波银行']

(d) 排序后: ['中国平安', '宁波港', '宁波银行', '招商银行', '贵州茅台']

习题 3.2: 字典操作

创建一个表示长三角地区部分省份2022年GDP的字典(单位:万亿元):

gdp_2022 = {
    '江苏': 12.29,
    '浙江': 7.77,
    '上海': 4.47,
    '安徽': 4.50
}
  1. 添加福建的数据:5.31万亿元
  2. 计算所有省份的总GDP
  3. 找出GDP最高的省份
  4. 将GDP转换为美元(汇率按1美元=7元人民币计算)
  5. 按GDP值降序排列

解答:

gdp_2022 = {
    '江苏': 12.29,
    '浙江': 7.77,
    '上海': 4.47,
    '安徽': 4.50
}

# (a) 添加福建
gdp_2022['福建'] = 5.31
print('(a) 添加福建后的字典:')
print(gdp_2022)

# (b) 计算总GDP
total_gdp = sum(gdp_2022.values())
print(f'\n(b) 总GDP: {total_gdp:.2f} 万亿元')

# (c) 找出GDP最高的省份
max_province = max(gdp_2022, key=gdp_2022.get)
max_gdp = gdp_2022[max_province]
print(f'\n(c) GDP最高省份: {max_province} ({max_gdp} 万亿元)')

# (d) 转换为美元
exchange_rate = 7.0  # 1美元=7人民币
gdp_usd = {k: v / exchange_rate for k, v in gdp_2022.items()}
print(f'\n(d) GDP(万亿美元):')
for province, gdp in gdp_usd.items():
    print(f'  {province}: {gdp:.3f}')

# (e) 按GDP降序排列
sorted_gdp = dict(sorted(gdp_2022.items(), key=lambda x: x[1], reverse=True))
print(f'\n(e) 降序排列:')
for province, gdp in sorted_gdp.items():
    print(f'  {province}: {gdp} 万亿元')
(a) 添加福建后的字典:
{'江苏': 12.29, '浙江': 7.77, '上海': 4.47, '安徽': 4.5, '福建': 5.31}

(b) 总GDP: 34.34 万亿元

(c) GDP最高省份: 江苏 (12.29 万亿元)

(d) GDP(万亿美元):
  江苏: 1.756
  浙江: 1.110
  上海: 0.639
  安徽: 0.643
  福建: 0.759

(e) 降序排列:
  江苏: 12.29 万亿元
  浙江: 7.77 万亿元
  福建: 5.31 万亿元
  安徽: 4.5 万亿元
  上海: 4.47 万亿元

习题 3.3: 集合操作

给定两个集合:

set_a = {'宁波港', '宁波银行', '贵州茅台', '中国平安'}
set_b = {'宁波港', '招商银行', '工商银行', '中国平安'}
  1. 计算交集(两个集合中都有的股票)
  2. 计算并集(两个集合中所有的股票)
  3. 计算差集(在A中但不在B中的股票)
  4. 判断’宁波银行’是否在set_b中

解答:

set_a = {'宁波港', '宁波银行', '贵州茅台', '中国平安'}
set_b = {'宁波港', '招商银行', '工商银行', '中国平安'}

# (a) 交集
intersection = set_a & set_b
print(f'(a) 交集: {intersection}')

# (b) 并集
union = set_a | set_b
print(f'\n(b) 并集: {union}')

# (c) 差集(A-B)
difference = set_a - set_b
print(f'\n(c) A-B差集: {difference}')

# (d) 成员测试
is_in_b = '宁波银行' in set_b
print(f'\n(d) 宁波银行在set_b中: {is_in_b}')
(a) 交集: {'中国平安', '宁波港'}

(b) 并集: {'中国平安', '工商银行', '宁波港', '贵州茅台', '宁波银行', '招商银行'}

(c) A-B差集: {'贵州茅台', '宁波银行'}

(d) 宁波银行在set_b中: False

2.8.2 进阶习题

习题 3.4: 函数与递归

斐波那契数列在金融中有广泛应用,例如技术分析中的斐波那契回调位。数列定义为: - F(0) = 0 - F(1) = 1 - F(n) = F(n-1) + F(n-2) (当 n > 1 时)

  1. 编写一个递归函数计算斐波那契数
  2. 编写一个使用循环的版本(更高效)
  3. 使用记忆化(memoization)优化递归版本
  4. 计算前15个斐波那契数
  5. 计算相邻两项的比值,观察其趋近于什么值(黄金分割率)

解答:

# (a) 递归版本
def fib_recursive(n):
    """递归计算斐波那契数"""
    if n <= 1:
        return n
    return fib_recursive(n-1) + fib_recursive(n-2)

# (b) 循环版本(更高效)
def fib_iterative(n):
    """使用循环计算斐波那契数"""
    if n <= 1:
        return n

    a, b = 0, 1
    for _ in range(2, n + 1):
        a, b = b, a + b
    return b

# (c) 记忆化优化版本
from functools import lru_cache

@lru_cache(maxsize=None)
def fib_memoized(n):
    """使用记忆化优化递归计算"""
    if n <= 1:
        return n
    return fib_memoized(n-1) + fib_memoized(n-2)

# (d) 计算前15个斐波那契数
print('(d) 前15个斐波那契数:')
fib_numbers = [fib_iterative(i) for i in range(15)]
print(fib_numbers)

# (e) 计算相邻两项的比值
print('\n(e) 相邻项比值(趋近于黄金分割率1.618...):')
for i in range(2, 15):
    ratio = fib_numbers[i] / fib_numbers[i-1]
    print(f'F({i})/F({i-1}) = {ratio:.6f}')

print('\n说明:这个比值趋近于黄金分割率 φ ≈ 1.6180339887...')
print('在金融技术分析中,斐波那契回调位(如38.2%, 61.8%)')
print('就是基于黄金分割率及其衍生比例。')
(d) 前15个斐波那契数:
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377]

(e) 相邻项比值(趋近于黄金分割率1.618...):
F(2)/F(1) = 1.000000
F(3)/F(2) = 2.000000
F(4)/F(3) = 1.500000
F(5)/F(4) = 1.666667
F(6)/F(5) = 1.600000
F(7)/F(6) = 1.625000
F(8)/F(7) = 1.615385
F(9)/F(8) = 1.619048
F(10)/F(9) = 1.617647
F(11)/F(10) = 1.618182
F(12)/F(11) = 1.617978
F(13)/F(12) = 1.618056
F(14)/F(13) = 1.618026

说明:这个比值趋近于黄金分割率 φ ≈ 1.6180339887...
在金融技术分析中,斐波那契回调位(如38.2%, 61.8%)
就是基于黄金分割率及其衍生比例。

斐波那契数列与黄金分割率的数学关系

斐波那契数列相邻两项的比值趋近于黄金分割率 \(\phi\),这是一个重要的数学常数:

\[ \phi = \frac{1 + \sqrt{5}}{2} \approx 1.6180339887... \]

\(n \to \infty\) 时,\(F(n+1)/F(n) \to \phi\)。这是因为斐波那契数列的通项公式(Binet’s Formula)为:

\[ F(n) = \frac{\phi^n - \psi^n}{\sqrt{5}} \]

其中 \(\psi = \frac{1 - \sqrt{5}}{2} \approx -0.618\)。在金融技术分析中,斐波那契回调位(Fibonacci Retracement)是常用工具,关键比率如 38.2% (\(1 - \phi^{-1}\)) 和 61.8% (\(\phi^{-1}\)) 均源于此。


习题 3.5: 文件处理与数据清洗

假设你有一个包含股票价格数据的CSV文件(格式:日期,开盘价,最高价,最低价,收盘价,成交量),编写代码:

  1. 读取文件并解析每一行
  2. 跳过标题行
  3. 将数据转换为字典列表
  4. 计算每日涨跌幅
  5. 找出涨幅最大和最小的交易日
  6. 将清洗后的数据写入新文件

解答:

import csv
from datetime import datetime

# 为了演示,先创建一个模拟的CSV文件
sample_data = """日期,开盘价,最高价,最低价,收盘价,成交量
2023-01-03,35.50,36.20,35.30,36.00,125000
2023-01-04,36.10,36.80,35.90,36.50,138000
2023-01-05,36.40,37.20,36.30,37.00,142000
2023-01-06,37.10,37.50,36.80,36.90,115000
2023-01-09,36.80,37.10,36.50,36.70,98000
"""

# 写入模拟文件
with open('stock_data_sample.csv', 'w', encoding='utf-8') as f:
    f.write(sample_data)

# (a)-(c): 读取并解析文件
stock_data = []
with open('stock_data_sample.csv', 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        # 转换数据类型
        record = {
            'date': datetime.strptime(row['日期'], '%Y-%m-%d'),
            'open': float(row['开盘价']),
            'high': float(row['最高价']),
            'low': float(row['最低价']),
            'close': float(row['收盘价']),
            'volume': int(row['成交量'])
        }
        stock_data.append(record)

print('(a)-(c) 读取的数据:')
for record in stock_data:
    print(f"  {record['date'].strftime('%Y-%m-%d')}: 收盘价={record['close']}")

# (d) 计算每日涨跌幅
for i in range(len(stock_data)):
    if i == 0:
        stock_data[i]['change_pct'] = 0.0
    else:
        prev_close = stock_data[i-1]['close']
        curr_close = stock_data[i]['close']
        stock_data[i]['change_pct'] = (curr_close - prev_close) / prev_close * 100

print('\n(d) 添加涨跌幅后的数据:')
for record in stock_data:
    if record['change_pct'] != 0:
        print(f"  {record['date'].strftime('%Y-%m-%d')}: {record['change_pct']:+.2f}%")

# (e) 找出涨幅最大和最小的交易日
max_gain = max(stock_data[1:], key=lambda x: x['change_pct'])
max_loss = min(stock_data[1:], key=lambda x: x['change_pct'])

print(f'\n(e) 涨幅最大: {max_gain["date"].strftime("%Y-%m-%d")} (+{max_gain["change_pct"]:.2f}%)')
print(f'    跌幅最大: {max_loss["date"].strftime("%Y-%m-%d")} ({max_loss["change_pct"]:.2f}%)')

# (f) 写入新文件
with open('stock_data_cleaned.csv', 'w', encoding='utf-8', newline='') as f:
    fieldnames = ['date', 'open', 'high', 'low', 'close', 'volume', 'change_pct']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(stock_data)

print('\n(f) 已将清洗后的数据写入 stock_data_cleaned.csv')
(a)-(c) 读取的数据:
  2023-01-03: 收盘价=36.0
  2023-01-04: 收盘价=36.5
  2023-01-05: 收盘价=37.0
  2023-01-06: 收盘价=36.9
  2023-01-09: 收盘价=36.7

(d) 添加涨跌幅后的数据:
  2023-01-04: +1.39%
  2023-01-05: +1.37%
  2023-01-06: -0.27%
  2023-01-09: -0.54%

(e) 涨幅最大: 2023-01-04 (+1.39%)
    跌幅最大: 2023-01-09 (-0.54%)

(f) 已将清洗后的数据写入 stock_data_cleaned.csv

2.8.3 应用习题

习题 3.6: 股票收益率计算

使用真实股票数据(宁波港601018),编写代码:

  1. 读取股票日度价格数据
  2. 计算日收益率、对数收益率
  3. 统计正收益天数和负收益天数
  4. 计算平均收益率和标准差
  5. 找出最大单日涨幅和跌幅及其日期

解答:

import pandas as pd
import numpy as np
from pathlib import Path

# 读取宁波港数据
# 从本地 Parquet 文件读取数据代替 HDF5
ningbo_port = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})

# 准备数据
ningbo_port['trade_date'] = pd.to_datetime(ningbo_port['trade_date'], format='%Y%m%d')
ningbo_port = ningbo_port.sort_values('trade_date')

# 提取需要的字段
prices = ningbo_port[['trade_date', 'close']].copy()
prices = prices.tail(100)  # 使用最近100个交易日

# (a) 读取数据完成
print(f'(a) 已读取 {len(prices)} 个交易日的数据')
print(f'日期范围: {prices["trade_date"].iloc[0].strftime("%Y-%m-%d")}{prices["trade_date"].iloc[-1].strftime("%Y-%m-%d")}')

# (b) 计算收益率
prices['return'] = prices['close'].pct_change()
prices['log_return'] = np.log(prices['close'] / prices['close'].shift(1))

print('\n(b) 最近5天的收益率:')
print(prices[['trade_date', 'return', 'log_return']].tail())

# (c) 统计正负收益天数
positive_days = (prices['return'] > 0).sum()
negative_days = (prices['return'] < 0).sum()
total_days = len(prices) - 1  # 减去第一天的NaN

print(f'\n(c) 收益天数统计(共{total_days}天):')
print(f'  上涨天数: {positive_days} ({positive_days/total_days*100:.1f}%)')
print(f'  下跌天数: {negative_days} ({negative_days/total_days*100:.1f}%)')

# (d) 计算统计量
mean_return = prices['return'].mean() * 100  # 转为百分比
std_return = prices['return'].std() * 100

print(f'\n(d) 收益率统计:')
print(f'  日平均收益率: {mean_return:.4f}%')
print(f'  日收益率标准差: {std_return:.4f}%')
print(f'  年化波动率: {std_return * np.sqrt(252):.2f}%')

# (e) 最大涨跌幅
max_gain_idx = prices['return'].idxmax()
max_loss_idx = prices['return'].idxmin()

print(f'\n(e) 极端交易日:')
print(f'  最大涨幅: {prices.loc[max_gain_idx, "trade_date"].strftime("%Y-%m-%d")}')
print(f'    涨幅: {prices.loc[max_gain_idx, "return"]*100:+.2f}%')
print(f'  最大跌幅: {prices.loc[max_loss_idx, "trade_date"].strftime("%Y-%m-%d")}')
print(f'    跌幅: {prices.loc[max_loss_idx, "return"]*100:+.2f}%')
(a) 已读取 100 个交易日的数据
日期范围: 2025-08-06 至 2025-12-31

(b) 最近5天的收益率:
     trade_date    return  log_return
3703 2025-12-25  0.005450    0.005435
3704 2025-12-26 -0.005420   -0.005435
3705 2025-12-29 -0.005450   -0.005464
3706 2025-12-30 -0.005479   -0.005495
3707 2025-12-31  0.000000    0.000000

(c) 收益天数统计(共99天):
  上涨天数: 43 (43.4%)
  下跌天数: 42 (42.4%)

(d) 收益率统计:
  日平均收益率: 0.0099%
  日收益率标准差: 0.9855%
  年化波动率: 15.64%

(e) 极端交易日:
  最大涨幅: 2025-09-23
    涨幅: +3.24%
  最大跌幅: 2025-09-25
    跌幅: -2.95%

简单收益率与对数收益率的对比

在金融计量学中,理解两类收益率的区别至关重要:

  1. 简单收益率 (Simple Return): \(R_t = \frac{P_t - P_{t-1}}{P_{t-1}}\)
  2. 对数收益率 (Log Return): \(r_t = \ln(P_t / P_{t-1}) = \ln(P_t) - \ln(P_{t-1})\)

对数收益率的优势: - 时间可加性: \(T\) 期对数收益率等于各单期对数收益率之和,即 \(r_{0,T} = \sum_{t=1}^{T} r_t\)。 - 对称性: 上涨 10% 后下跌 10%,对数收益率之和为 0,而简单收益率由于基数变化会导致净亏损。 - 正态性: 在连续时间资产定价模型(如 Black-Scholes)中,资产价格常被假设服从对数正态分布,这意味着对数收益率服从正态分布。

在实践中,简单收益率更直观,适合向客户展示业绩;而对数收益率因其优良的数学特性,更适合进行学术研究和波动率建模。


习题 3.7: 生成器与内存效率

在处理大量金融数据时,内存效率很重要。比较三种方法:

  1. 使用列表生成10万个随机数
  2. 使用生成器表达式
  3. 使用生成器函数(yield)
  4. 比较三者的内存占用

解答:

import sys
import random

# (a) 列表方法
def get_random_list(n):
    """生成包含n个随机数的列表"""
    return [random.random() for _ in range(n)]

# (b) 生成器表达式
def get_random_generator_expr(n):
    """返回生成器表达式"""
    return (random.random() for _ in range(n))

# (c) 生成器函数
def get_random_generator_func(n):
    """生成器函数"""
    for _ in range(n):
        yield random.random()

n = 100000

# (d) 比较内存占用
random_list = get_random_list(n)
list_size = sys.getsizeof(random_list)

random_gen_expr = get_random_generator_expr(n)
gen_expr_size = sys.getsizeof(random_gen_expr)

random_gen_func = get_random_generator_func(n)
gen_func_size = sys.getsizeof(random_gen_func)

print(f'(d) 内存占用比较 (n={n:,}):')
print(f'  列表: {list_size:,} 字节')
print(f'  生成器表达式: {gen_expr_size} 字节')
print(f'  生成器函数: {gen_func_size} 字节')
print(f'\n内存节省比例:')
print(f'  生成器 vs 列表: {list_size/gen_expr_size:.1f}x')
print(f'\n说明:生成器在处理大规模数据时可以显著节省内存')
print('因为它不会一次性生成所有数据,而是按需生成。')

# 实际使用示例:计算前1000个随机数的平均值
print(f'\n使用生成器计算平均值:')
avg_from_list = sum(get_random_list(1000)) / 1000
avg_from_gen = sum(get_random_generator_func(1000)) / 1000
print(f'  从列表: {avg_from_list:.6f}')
print(f'  从生成器: {avg_from_gen:.6f}')
(d) 内存占用比较 (n=100,000):
  列表: 800,984 字节
  生成器表达式: 104 字节
  生成器函数: 104 字节

内存节省比例:
  生成器 vs 列表: 7701.8x

说明:生成器在处理大规模数据时可以显著节省内存
因为它不会一次性生成所有数据,而是按需生成。

使用生成器计算平均值:
  从列表: 0.490975
  从生成器: 0.494433

生成器在金融大数据中的应用

在处理 A 股高频交易(Tick-by-Tick)数据时,数据量往往以 GB 甚至 TB 计。

核心应用模式: - 流式处理: 逐行读取超大型 CSV 或 H5 文件,避免一次性加载导致的内存崩溃(OOM)。 - 数据管道 (Pipeline): 通过链接多个生成器(如:读取 -> 清洗 -> 聚合),每个环节只处理当前记录。

这种模式特别适合研究长周期的日内高频波动特征或构建极其复杂的阿尔法因子(Alpha Factors)。


2.8.4 挑战习题

习题 3.8: 实现一个简单的回测系统

编写一个面向对象的交易系统,包含以下类:

  1. Stock 类:存储股票信息
  2. Portfolio 类:管理持仓
  3. Backtest 类:执行回测
  4. 使用宁波港的历史数据进行简单回测

解答:

import pandas as pd
import numpy as np
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime

# (a) Stock类
@dataclass
class Stock:
    """股票类"""
    symbol: str
    name: str
    prices: pd.DataFrame

    def __post_init__(self):
        """确保日期是索引"""
        if not isinstance(self.prices.index, pd.DatetimeIndex):
            if 'trade_date' in self.prices.columns:
                self.prices['trade_date'] = pd.to_datetime(self.prices['trade_date'])
                self.prices.set_index('trade_date', inplace=True)

    def get_price(self, date):
        """获取指定日期的价格"""
        if date in self.prices.index:
            return self.prices.loc[date, 'close']
        return None

    def calculate_return(self, start_date, end_date):
        """计算期间收益率"""
        if start_date in self.prices.index and end_date in self.prices.index:
            start_price = self.prices.loc[start_date, 'close']
            end_price = self.prices.loc[end_date, 'close']
            return (end_price - start_price) / start_price
        return None

# (b) Portfolio类
class Portfolio:
    """投资组合类"""

    def __init__(self, initial_capital: float = 100000):
        self.initial_capital = initial_capital
        self.cash = initial_capital
        self.positions = {}  # {symbol: shares}
        self.transaction_history = []

    def buy(self, symbol: str, shares: int, price: float, date: datetime):
        """买入股票"""
        cost = shares * price
        if cost <= self.cash:
            self.cash -= cost
            if symbol in self.positions:
                self.positions[symbol] += shares
            else:
                self.positions[symbol] = shares

            self.transaction_history.append({
                'date': date,
                'action': 'BUY',
                'symbol': symbol,
                'shares': shares,
                'price': price,
                'total': cost
            })
            return True
        return False

    def sell(self, symbol: str, shares: int, price: float, date: datetime):
        """卖出股票"""
        if symbol in self.positions and self.positions[symbol] >= shares:
            proceeds = shares * price
            self.cash += proceeds
            self.positions[symbol] -= shares

            if self.positions[symbol] == 0:
                del self.positions[symbol]

            self.transaction_history.append({
                'date': date,
                'action': 'SELL',
                'symbol': symbol,
                'shares': shares,
                'price': price,
                'total': proceeds
            })
            return True
        return False

    def get_value(self, stock_prices: Dict[str, float]):
        """计算组合总价值"""
        stocks_value = sum(shares * stock_prices.get(symbol, 0)
                          for symbol, shares in self.positions.items())
        return self.cash + stocks_value

    def get_positions(self):
        """获取当前持仓"""
        return self.positions.copy()

# (c) Backtest类
class Backtest:
    """回测类"""

    def __init__(self, stock: Stock, strategy='ma_crossover',
                 short_window=5, long_window=20):
        self.stock = stock
        self.strategy = strategy
        self.short_window = short_window
        self.long_window = long_window
        self.portfolio = None
        self.signals = None

    def generate_signals(self):
        """生成交易信号"""
        prices = self.stock.prices['close']

        if self.strategy == 'ma_crossover':
            # 移动平均线交叉策略
            short_ma = prices.rolling(window=self.short_window).mean()
            long_ma = prices.rolling(window=self.long_window).mean()

            # 生成信号:1=买入,-1=卖出,0=持有
            signals = pd.DataFrame(index=prices.index)
            signals['signal'] = 0
            signals.loc[short_ma > long_ma, 'signal'] = 1
            signals.loc[short_ma < long_ma, 'signal'] = -1

            # 只在信号变化时交易
            signals['position'] = signals['signal'].diff()
            signals.loc[signals['position'] == 0, 'position'] = 0

            self.signals = signals
            return signals

    def run(self, start_date, end_date, initial_capital=100000):
        """执行回测"""
        self.portfolio = Portfolio(initial_capital)
        signals = self.generate_signals()

        # 筛选回测期间
        test_period = signals.loc[start_date:end_date]

        for date, signal in test_period.iterrows():
            price = self.stock.get_price(date)
            if price is None:
                continue

            # 买入信号
            if signal['position'] == 1:
                # 使用可用现金的95%买入
                shares = int((self.portfolio.cash * 0.95) / price)
                if shares > 0:
                    self.portfolio.buy(self.stock.symbol, shares, price, date)

            # 卖出信号
            elif signal['position'] == -1:
                current_positions = self.portfolio.get_positions()
                shares = current_positions.get(self.stock.symbol, 0)
                if shares > 0:
                    self.portfolio.sell(self.stock.symbol, shares, price, date)

        return self.portfolio

    def evaluate(self):
        """评估回测结果"""
        if not self.portfolio:
            return None

        # 计算最终价值
        final_date = self.stock.prices.index[-1]
        final_price = self.stock.get_price(final_date)
        final_value = self.portfolio.get_value({self.stock.symbol: final_price})

        # 计算收益率
        total_return = (final_value - self.portfolio.initial_capital) / self.portfolio.initial_capital

        # 计算买入持有策略的收益率
        first_price = self.stock.prices['close'].iloc[0]
        buy_hold_return = (final_price - first_price) / first_price

        results = {
            'initial_capital': self.portfolio.initial_capital,
            'final_value': final_value,
            'total_return': total_return,
            'buy_hold_return': buy_hold_return,
            'excess_return': total_return - buy_hold_return,
            'num_trades': len(self.portfolio.transaction_history)
        }

        return results

# (d) 使用宁波港数据进行回测
print('(d) 宁波港回测示例:\n')

# 加载数据
# 从本地 Parquet 文件读取数据代替 HDF5
ningbo_port_df = pd.read_parquet(
    'C:/qiufei/data/stock/stock_price_pre_adjusted.parquet',
    filters=[('order_book_id', '==', '601018.XSHG')]
).reset_index().rename(columns={'date': 'trade_date', 'vol': 'volume'})
ningbo_port_df['trade_date'] = pd.to_datetime(ningbo_port_df['trade_date'], format='%Y%m%d')

# 创建Stock对象
ningbo_port = Stock(
    symbol='601018.SH',
    name='宁波港',
    prices=ningbo_port_df[['trade_date', 'close', 'high', 'low', 'volume']].copy()
)

print(f'股票: {ningbo_port.name} ({ningbo_port.symbol})')
print(f'数据范围: {ningbo_port.prices.index[0]}{ningbo_port.prices.index[-1]}')
print(f'数据点数: {len(ningbo_port.prices)}')

# 创建回测对象
backtest = Backtest(ningbo_port, strategy='ma_crossover',
                   short_window=5, long_window=20)

# 选择回测期间(最近一年)
start_date = ningbo_port.prices.index[-252]
end_date = ningbo_port.prices.index[-1]

print(f'\n回测期间: {start_date}{end_date}')
print(f'策略: 5日/20日移动平均线交叉')

# 运行回测
portfolio = backtest.run(start_date, end_date, initial_capital=100000)

# 评估结果
results = backtest.evaluate()

print(f'\n回测结果:')
print(f'  初始资金: ¥{results["initial_capital"]:,.2f}')
print(f'  最终价值: ¥{results["final_value"]:,.2f}')
print(f'  策略收益率: {results["total_return"]*100:+.2f}%')
print(f'  买入持有收益率: {results["buy_hold_return"]*100:+.2f}%')
print(f'  超额收益: {results["excess_return"]*100:+.2f}%')
print(f'  交易次数: {results["num_trades"]}')

print(f'\n持仓情况:')
positions = portfolio.get_positions()
if positions:
    for symbol, shares in positions.items():
        print(f'  {symbol}: {shares} 股')
else:
    print('  无持仓')
(d) 宁波港回测示例:

股票: 宁波港 (601018.SH)
数据范围: 2010-09-28 00:00:00 至 2025-12-31 00:00:00
数据点数: 3708

回测期间: 2024-12-19 00:00:00 至 2025-12-31 00:00:00
策略: 5日/20日移动平均线交叉

回测结果:
  初始资金: ¥100,000.00
  最终价值: ¥100,000.00
  策略收益率: +0.00%
  买入持有收益率: +42.33%
  超额收益: -42.33%
  交易次数: 0

持仓情况:
  无持仓

移动平均线交叉策略的数学原理

移动平均线交叉是最经典的技术分析策略之一,其核心在于利用低通滤波器(Low-pass Filter)原理过滤市场白噪声(White Noise),提取股价的长期趋势。

数学定义\(n\) 日简单移动平均线(SMA)定义为:

\[ MA_t(n) = \frac{1}{n}\sum_{i=0}^{n-1} P_{t-i} \]

交易逻辑: 该策略本质上是基于动量(Momentum)效应: - 看涨信号: 短期均线向上穿越长期均线(“金叉”),暗示短期动量超过长期动量。 - 看跌信号: 短期均线向下穿越长期均线(“死叉”),暗示股价进入阶段性下行。

通过此类量化手段,投资者可以克服心理锚定偏差(Anchoring Bias),实现纪律化投资。

2.9 结论

本章我们不仅掌握了 Python 的核心内置数据结构——元组、列表、字典和集合,还深入探讨了函数编程的最佳实践与文件操作的底层机制。这些工具构成了 Python 量化分析生态系统的基石。掌握它们的数学特性(如可哈希性)和计算复杂度(如 \(O(1)\)\(O(n)\) 的区别),对于后续使用 Pandas 处理海量高频金融数据至关重要。

现在,你已经具备了构建复杂交易逻辑的基础,接下来我们将迈入 NumPy 的大门,探索面向数组的向量化计算,这是提升模型性能的关键一步。

2.10 延伸阅读与开发进阶

为了进一步提升你的编程能力和对底层机制的理解,建议你在掌握本章基本语法后,关注以下进阶方向:

1. 深入理解 Python 内部机制 建议阅读 Luciano Ramalho 的 Fluent Python。该书深入解析了 Python 的数据模型,特别是迭代器和生成器的内部运作。理解这些底层的“魔法方法”能帮助你写出更符合 Python 哲学且高性能的量化交易算法。

2. 编写工程级的 Python 代码 在金融机构工作时,代码的可维护性与正确性同等重要。Effective Python(Brett Slatkin 著)提供了大量关于命名规范、异步编程和性能优化的最佳实践。遵循其中的标准能让你的分析工具更具专业性,减少在大规模并发计算中的潜在错误。

3. 算法与复杂度的数学严谨性 对于习惯于定量分析的学生,建议阅读经典的《算法导论》(CLRS)。特别是关于哈希表和概率分析的章节,能让你从数学层面理解为什么字典查找是 \(O(1)\)。这种思维方式对于优化高频交易系统中的关键路径至关重要。

4. 量化金融实战应用 理论最终要落地。Yves Hilpisch 的 Python for Finance 展示了如何将这些基础结构应用于时间序列分析和风险管理。如果你对交易策略感兴趣,Stefan Jansen 的作品提供了从数据获取到回测的完整框架。请务必记住,任何量化策略都涉及高度的市场风险,应在充分验证后谨慎使用。

2.10.1 补充练习

2.10.2 练习 1:筛选股票代码

题目: 给定一个包含股票代码的列表,使用列表推导式筛选出所有以上海证券交易所(.XSHG)或深圳证券交易所(.XSHE)结尾的股票代码。

stock_list = ['600000.XSHG', 'AAPL', '000001.XSHE', 'GOOG', '000002.XSHE', 'MSFT']

解答

stock_list = ['600000.XSHG', 'AAPL', '000001.XSHE', 'GOOG', '000002.XSHE', 'MSFT']
# 筛选以 .XSHG 或 .XSHE 结尾的股票
cn_stocks = [x for x in stock_list if x.endswith(('.XSHG', '.XSHE'))]
print(cn_stocks)

2.10.3 练习 2:计算投资组合加权收益率

题目: 给定两个字典,分别存储了股票的持仓权重和日收益率。请计算该投资组合的加权收益率。

# 浦发银行,招商银行,工商银行
weights = {'600000.XSHG': 0.4, '600036.XSHG': 0.3, '601398.XSHG': 0.3}
# 假设的日收益率
returns = {'600000.XSHG': 0.012, '600036.XSHG': 0.005, '601398.XSHG': -0.002}

解答

weights = {'600000.XSHG': 0.4, '600036.XSHG': 0.3, '601398.XSHG': 0.3}
returns = {'600000.XSHG': 0.012, '600036.XSHG': 0.005, '601398.XSHG': -0.002}

# 方法1:使用循环
portfolio_return = 0.0
for ticker, weight in weights.items():
    portfolio_return += weight * returns[ticker]

print(f"Portfolio Return (Loop): {portfolio_return:.4f}")

# 方法2:使用列表推导式和 sum 函数 (更 Pythonic)
portfolio_return_v2 = sum([weights[t] * returns[t] for t in weights])
print(f"Portfolio Return (List Comp): {portfolio_return_v2:.4f}")

2.10.4 练习 3:金融新闻标题处理

题目: 给定一个包含金融新闻标题的列表,请完成以下任务: 1. 移除每条新闻标题两端的空白字符。 2. 过滤掉不包含“增长”或“下跌”关键词的标题。

headlines = [
    "  贵州茅台净利润增长20%  ",
    "市场展望中性",
    "  宁德时代股价受挫下跌  ",
    "每日财经简报"
]

解答

headlines = [
    "  贵州茅台净利润增长20%  ",
    "市场展望中性",
    "  宁德时代股价受挫下跌  ",
    "每日财经简报"
]

# 处理流程
processed_news = [
    h.strip() 
    for h in headlines 
    if '增长' in h or '下跌' in h
]
print(processed_news)