import dask.dataframe as dd
# 使用Dask读取CSV
# blocksize参数告诉Dask将文件分成大约64MB的块进行处理
ddf = dd.read_csv('large_transactions.csv', blocksize='64MB')
# 注意,这行代码瞬间完成!因为Dask还没有真正读取任何数据。
print('Dask DataFrame已创建!')
Dask DataFrame已创建!
我们将循序渐进,探索大数据的世界:
在这堂课结束时,我希望你们能够理解:
为什么处理海量数据是一个巨大的挑战,以及它与传统数据分析的根本区别。
并且,能够了解:
Dask和Spark这两个业界主流的大数据处理框架的核心思想。
最终,能够实践并掌握:
我们之前所有的数据分析工作,都有一个隐含的前提…
所有数据都能一次性加载到计算机的内存(RAM)中。
pandas
就是这样工作的。当你调用 pd.read_csv()
时,它会尝试将整个文件读入内存。
这在小数据集上表现优异,但在大数据时代,这个前提本身就是问题的根源。
我们希望用我们有限的内存,去分析无限增长的数据。
让我们来看一下现实:
这就像…
…试图用一个水杯去装整个游泳池的水。
让我们把问题具体化,思考一下支付宝的月度交易数据。
这会产生多大的数据量?
让我们来做个简单的计算:
\[ \large{ \text{数据量} = \underbrace{10^9}_{\text{用户数}} \times \underbrace{50}_{\text{交易数}} \times \underbrace{10}_{\text{变量数}} \times \underbrace{8}_{\text{字节/变量}} } \]
计算结果是:
\[ \large{ \text{结果} \approx 4 \times 10^{12} \text{ 字节} \approx \textbf{4 Terabytes} } \]
4TB 的数据,没有任何个人电脑能够直接将其读入内存。
传统的分析方法在这里完全失效。
既然我们无法一次性处理整个数据集,那么唯一的办法就是:
将一个大任务(处理整个数据集)分解成许多可以在小块数据上执行的小任务,然后将结果汇总起来。
这正是大数据处理框架的精髓所在。
在Python世界里,有两个强大的程序库专门用来实现“分而治之”的思想:
我们今天将主要以Dask为例进行讲解,因为它对熟悉Pandas的你来说最容易上手。
Dask: 轻量级、Python原生、与现有库(Pandas, NumPy)无缝集成,学习曲线平缓。
Apache Spark: 行业标准、功能强大、生态成熟、支持多语言(通过PySpark
在Python中使用)。
Dask是一个开源项目,旨在将Python数据科学生态系统(Pandas, Scikit-Learn, NumPy)的能力扩展到大数据和并行计算领域。
它的设计哲学是:尽可能地模仿现有库的API,让你用最小的学习成本处理更大的数据集。
Dask最吸引人的一点在于,你不需要学习一套全新的语法。
这使得从Pandas迁移到Dask变得异常平滑。
我们将逐一深入探讨这些概念。
这是Dask与Pandas最本质的区别。
相反,Dask会构建一个任务图(Task Graph),记录下你想要执行的所有计算步骤。
任务图是Dask的“作战计划”。它是一个有向无环图(DAG),描述了:
计算只会在你明确要求结果时(通过调用.compute()
方法)才真正发生。
想象一下做一道复杂的菜:
[.column width=“50%”] 立即执行 (Pandas)
::: 懒惰执行 (Dask)
:::
Dask的这种方式让它有机会在执行前对整个计算流程进行优化。
懒惰执行构建了任务图,而任务调度器(Task Scheduler)则是负责执行这个图的“大脑”。
Dask会将大的计算任务分解成许多小的、独立的任务,然后智能地安排它们的执行顺序,并分配到不同的CPU核心或机器上并行处理。
Dask在处理大数据时,会非常智能地管理内存。
它只会在计算某个数据块(chunk)时才将其加载到内存中,计算完成后,如果该数据块不再被需要,Dask会立即将其从内存中释放。
这种“即用即取,用完即弃”的策略是它能够处理远超内存大小数据的关键。
Dask的强大之处在于它的可伸缩性:
并行计算 (Parallel)
::: 分布式计算 (Distributed)
:::
Dask不仅仅是模仿了Pandas,它与整个Python科学计算栈都深度集成。
这意味着你可以用很小的代码改动,就将现有分析流程从处理小数据升级为处理大数据。
下表直观地展示了Dask与Pandas在基本操作上的相似与不同之处。
操作 | Pandas | Dask |
---|---|---|
数据结构 | DataFrame, Series | Dask DataFrame, Dask Series |
导入库 | import pandas as pd |
import dask.dataframe as dd |
读取数据 | pd.read_csv(...) |
dd.read_csv(...) |
查看数据 | df.head() , df.tail() |
df.head() , df.tail() |
数据过滤 | df[df['col'] > 0] |
df[df['col'] > 0] |
分组聚合 | df.groupby('col').sum() |
df.groupby('col').sum() |
数据合并 | pd.concat() , df.merge() |
dd.concat() , df.merge() |
计算执行 | 立即执行 | 懒惰执行,需要.compute() 触发 |
.compute()
: Dask操作的“执行”按钮.compute()
方法是Dask懒惰执行模型的关键。
df_filtered = df[df['column'] > 0]
时,如果df
是一个Dask DataFrame,df_filtered
并不是一个包含结果的数据帧。df_filtered
此时只是一个“任务图”,一个“计算计划”,它描述了如何得到过滤后的结果。result = df_filtered.compute()
时,Dask才会真正开始读取数据、执行过滤操作,并返回一个Pandas DataFrame作为最终结果。.compute()
的作用:从计划到现实.compute()
。打开你的终端或Anaconda Prompt,根据你的需求选择安装命令:
bash pip install "dask[complete]"
bash pip install dask
由于我们无法分发一个TB级的文件,我们将用代码在本地生成一个大型CSV文件,模拟真实的交易数据。这个文件将远大于你电脑的内存。
我们的目标是生成一个约25MB的文件。
import pandas as pd
import numpy as np
import os
# --- 配置参数 ---
# 在真实场景中,这个文件可能是100GB或更大
n_rows = 1_000_000 # 100万行
file_path = 'large_transactions.csv'
# --- 检查文件是否存在,避免重复生成 ---
if not os.path.exists(file_path):
print(f'文件 {file_path} 不存在,开始生成...')
# 使用pandas分块生成,避免自身内存溢出
chunk_size = 100_000 # 每次处理10万行
header = True
for i in range(0, n_rows, chunk_size):
current_chunk_size = min(chunk_size, n_rows - i)
df_chunk = pd.DataFrame({
'transaction_id': np.arange(i, i + current_chunk_size),
'timestamp': pd.to_datetime(np.random.randint(1.5e9, 1.6e9, size=current_chunk_size), unit='s'),
'stock_id': np.random.randint(1, 1000, size=current_chunk_size),
'price': np.random.uniform(10, 500, size=current_chunk_size).round(2),
'volume': np.random.randint(100, 10000, size=current_chunk_size)
})
# 第一次写入表头,之后以追加模式写入
df_chunk.to_csv(file_path, mode='a', header=header, index=False)
header = False # 确保表头只写一次
print(f'已写入 {i + current_chunk_size} / {n_rows} 行...')
print('数据生成完毕!')
else:
print(f'文件 {file_path} 已存在,跳过生成。')
# --- 检查文件大小 ---
file_size_mb = os.path.getsize(file_path) / (1024**2)
print(f'生成的文件大小约为: {file_size_mb:.2f} MB')
文件 large_transactions.csv 已存在,跳过生成。
生成的文件大小约为: 40.44 MB
chunk_size
和循环。df_chunk
),然后追加到CSV文件中。现在,关键时刻来了。我们不用pandas
,而是用dask.dataframe
来读取这个大文件。
dd.read_csv
import dask.dataframe as dd
: 这是Dask DataFrame模块的惯用别名,就像import pandas as pd
一样。blocksize='64MB'
: 这是一个关键参数。它告诉Dask不要一次读取整个文件,而是将其逻辑上分割成约64MB大小的块。Dask会为每个块创建一个任务。让我们看看刚刚创建的ddf
对象到底是什么。
Dask DataFrame Structure
,包含了列名和dtype
。Dask通过智能地读取文件的一小部分来推断这些元信息。blocksize
。Dask的美妙之处在于它的计算计划(任务图)是透明的。我们可以将它可视化。
让我们定义一个简单的计算:计算交易总额,并可视化其任务图。 (注意: 可能需要安装 graphviz
才能显示图片)
import dask
# 尝试设置graphviz引擎,如果失败则忽略
try:
dask.config.set({'visualization.engine': 'graphviz'})
import graphviz
can_visualize = True
except ImportError:
print("Graphviz not found. Skipping visualization.")
can_visualize = False
# 定义一个计算:交易额 = 价格 * 数量
ddf['amount'] = ddf['price'] * ddf['volume']
# 获取总交易额
total_amount = ddf['amount'].sum()
# 可视化这个计算的任务图
if can_visualize:
try:
total_amount.visualize(filename='dask_graph.png')
from IPython.display import Image
display(Image(filename='dask_graph.png'))
except Exception as e:
print(f"Visualization failed: {e}")
else:
print("Task graph defined but not visualized due to missing dependencies.")
Graphviz not found. Skipping visualization.
Task graph defined but not visualized due to missing dependencies.
这个图展示了Dask为了计算总交易额需要执行的所有步骤。
整个计算流程从底部(读取数据)流向顶部(最终结果)。
现在,让我们通过调用.compute()
来触发这些计算。我们将计算price
列的基本描述性统计。
# 计算描述性统计
# 这个过程会花费一些时间,因为Dask现在正在读取所有数据块并进行计算
print("开始计算描述性统计...")
summary_stats = ddf['price'].describe().compute()
print("计算完成!")
print(summary_stats)
开始计算描述性统计...
计算完成!
count 1000000.000000
mean 255.179348
std 141.401938
min 10.000000
25% 132.800000
50% 255.115000
75% 377.550000
max 500.000000
Name: price, dtype: float64
.compute()
幕后发生的事describe
任务图。count
, mean
, std
, min
, max
等。Dask的groupby
操作也和Pandas几乎一样。让我们计算每只股票的平均交易量。
# 计算每只股票的平均交易量
# 同样,这只是定义了计算计划
mean_volume_by_stock = ddf.groupby('stock_id')['volume'].mean()
# 使用.compute()执行计算
# 我们只看前10个结果以节省屏幕空间
# 注意:在Dask中,head()也会触发部分计算,但为了得到Pandas对象,通常还是需要compute()
print("开始分组聚合计算...")
result = mean_volume_by_stock.head(10)
print("计算完成!")
print(result)
开始分组聚合计算...
计算完成!
stock_id
1 4951.722832
2 5243.609553
3 5261.428571
4 5137.991812
5 5016.143145
6 5053.174588
7 4856.658436
8 5126.087500
9 5171.106833
10 5086.707676
Name: volume, dtype: float64
.compute()
才执行。这让它能够处理比内存更大的数据。当数据规模进一步扩大,达到数十TB甚至PB级别,或者需要一个更复杂的、跨语言的生态系统时,Apache Spark就登场了。
Spark最著名的特点是其内存计算(In-memory Computing)能力。
与早期的大数据系统(如Hadoop MapReduce)不同,Spark可以将中间计算结果保存在集群中各台机器的内存里,大大加快了迭代算法(如机器学习)和交互式查询的速度。
这是一个常见的问题,这里有一个简单的决策指南:
特性 | Dask | Apache Spark |
---|---|---|
生态系统 | 纯Python,与Pandas/Numpy/Scikit-Learn紧密集成 | 多语言 (Scala, Java, Python, R),更庞大、更复杂的生态 |
设置复杂度 | 非常简单 (pip install dask ) |
相对复杂,通常需要配置Java环境和集群管理器 |
适用场景 | 单机并行、中等规模集群,Python原生项目 | 大型企业级集群,超大规模数据,多语言团队 |
学习曲线 | 平缓,对Pandas用户友好 | 较陡峭,需要理解Spark的架构和概念 |
简单来说:
我们之前学习的所有机器学习算法,都假设训练数据可以被读入内存。
当数据大到无法装入内存时,model.fit(X, y)
这样的代码就会直接失败。
model.fit(X, y)
会失败?最直接的方法是将大数据集缩小到内存可以容纳的大小。
scikit-learn
工具。缺点:
分布式训练的核心思想与Dask处理数据的思想一致:分而治之。
它利用多个计算单元(CPU核心或多台机器上的GPU)并行地进行模型训练,以加快训练过程并处理大型数据集。
幸运的是,我们有现成的工具来处理这些复杂性。
Dask-ML就是Dask框架中专门用于扩展机器学习工作流的模块。它让分布式训练变得像使用scikit-learn
一样简单。
StandardScaler
等工具,用于大数据预处理。回到之前生成的large_transactions.csv
数据集,尝试训练一个线性回归模型,预测交易量volume
。
我们将使用dask-ml
来处理这个无法一次性载入内存的数据。
首先,我们像之前一样用Dask读取数据,并定义我们的特征(X)和目标(y)。
import dask.dataframe as dd
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LinearRegression
# 读取数据
ddf = dd.read_csv('large_transactions.csv', blocksize='64MB')
# 定义特征和目标
# 为了简化,我们只用 price 和 stock_id 作为特征
# 注意:直接使用 stock_id 是不好的做法,这里仅为演示
X = ddf[['price', 'stock_id']]
y = ddf['volume']
# Dask-ML要求输入是Dask Array,我们进行转换
X = X.to_dask_array(lengths=True)
y = y.to_dask_array(lengths=True)
print("X 和 y 已准备为 Dask Arrays.")
print(X)
X 和 y 已准备为 Dask Arrays.
dask.array<read-_to_string_dtype-getitem-values, shape=(1000000, 2), dtype=float64, chunksize=(1000000, 2), chunktype=numpy.ndarray>
我们使用dask_ml
的StandardScaler
对特征进行标准化。注意,这个操作也是懒惰的!
现在,我们可以定义并训练一个线性回归模型。Dask-ML的模型会自动处理数据分块和并行计算。
# 初始化线性回归模型
lr = LinearRegression()
# 训练模型
# 这一步会触发计算!
# Dask-ML会在后台读取数据块,进行计算
print('开始训练模型...')
lr.fit(X_scaled, y)
print('模型训练完毕!')
# 打印模型系数
print(f'模型系数 (Coefficients): {lr.coef_}')
print(f'模型截距 (Intercept): {lr.intercept_}')
开始训练模型...
模型训练完毕!
模型系数 (Coefficients): [ 4.90540524 -2.65732221]
模型截距 (Intercept): 5008.6812618078075
.fit()
.fit()
时,Dask-ML和Dask在后台协同工作。从“一次性加载”到“分块处理”: 这是从传统数据分析迈向大数据分析最根本的思维转变。
你必须放弃“所有数据都在我眼前”的假设,开始用“流水线”和“分治”的眼光看待问题。
懒惰执行的力量: 通过先规划(构建任务图)再执行(调用.compute()
)的策略,Dask等工具能够优化计算流程,并处理远超内存的数据。
这是一种更智能、更具伸缩性的计算范式。
Python生态的威力: Dask和PySpark让Python这门通用语言具备了处理工业级大数据的能力,而Dask-ML则将这种能力无缝延伸到了机器学习领域。
你所学的技能,可以直接应用于解决真实世界的大规模问题。
使用我们生成的large_transactions.csv
数据集,用Dask完成以下任务:
timestamp
列转换为年份。price
)。ddf['timestamp'] = dd.to_datetime(ddf['timestamp'])
ddf['year'] = ddf['timestamp'].dt.year
groupby()
和 agg()
假设你还有一个stocks_info.csv
文件,包含stock_id
和stock_name
两列。
请描述你将如何使用Dask将它与large_transactions.csv
合并(merge),以便分析每个stock_name
的交易情况。需要考虑stocks_info.csv
文件的大小吗?