Dask DataFrame已创建!
我们将循序渐进,探索大数据的世界:
在这堂课结束时,我希望你们能够理解:
为什么处理海量数据是一个巨大的挑战,以及它与传统数据分析的根本区别。
并且,能够了解:
Dask和Spark这两个业界主流的大数据处理框架的核心思想。
最终,能够实践并掌握:
我们之前所有的数据分析工作,都有一个隐含的前提…
所有数据都能一次性加载到计算机的内存(RAM)中。
pandas 就是这样工作的。当你调用 pd.read_csv() 时,它会尝试将整个文件读入内存。
这在小数据集上表现优异,但在大数据时代,这个前提本身就是问题的根源。
我们希望用我们有限的内存,去分析无限增长的数据。
让我们来看一下现实:
这就像…
…试图用一个水杯去装整个游泳池的水。
让我们把问题具体化,思考一下支付宝的月度交易数据。
这会产生多大的数据量?
让我们来做个简单的计算:
\[ \large{ \text{数据量} = \underbrace{10^9}_{\text{用户数}} \times \underbrace{50}_{\text{交易数}} \times \underbrace{10}_{\text{变量数}} \times \underbrace{8}_{\text{字节/变量}} } \]
计算结果是:
\[ \large{ \text{结果} \approx 4 \times 10^{12} \text{ 字节} \approx \textbf{4 Terabytes} } \]
4TB 的数据,没有任何个人电脑能够直接将其读入内存。
传统的分析方法在这里完全失效。
既然我们无法一次性处理整个数据集,那么唯一的办法就是:
将一个大任务(处理整个数据集)分解成许多可以在小块数据上执行的小任务,然后将结果汇总起来。
这正是大数据处理框架的精髓所在。
在Python世界里,有两个强大的程序库专门用来实现“分而治之”的思想:
我们今天将主要以Dask为例进行讲解,因为它对熟悉Pandas的你来说最容易上手。
Dask: 轻量级、Python原生、与现有库(Pandas, NumPy)无缝集成,学习曲线平缓。
Apache Spark: 行业标准、功能强大、生态成熟、支持多语言(通过PySpark在Python中使用)。
Dask是一个开源项目,旨在将Python数据科学生态系统(Pandas, Scikit-Learn, NumPy)的能力扩展到大数据和并行计算领域。
它的设计哲学是:尽可能地模仿现有库的API,让你用最小的学习成本处理更大的数据集。
Dask最吸引人的一点在于,你不需要学习一套全新的语法。
这使得从Pandas迁移到Dask变得异常平滑。
我们将逐一深入探讨这些概念。
这是Dask与Pandas最本质的区别。
相反,Dask会构建一个任务图(Task Graph),记录下你想要执行的所有计算步骤。
任务图是Dask的“作战计划”。它是一个有向无环图(DAG),描述了:
计算只会在你明确要求结果时(通过调用.compute()方法)才真正发生。
想象一下做一道复杂的菜:
[.column width=“50%”] 立即执行 (Pandas)
::: 懒惰执行 (Dask)
:::
Dask的这种方式让它有机会在执行前对整个计算流程进行优化。
懒惰执行构建了任务图,而任务调度器(Task Scheduler)则是负责执行这个图的“大脑”。
Dask会将大的计算任务分解成许多小的、独立的任务,然后智能地安排它们的执行顺序,并分配到不同的CPU核心或机器上并行处理。
Dask在处理大数据时,会非常智能地管理内存。
它只会在计算某个数据块(chunk)时才将其加载到内存中,计算完成后,如果该数据块不再被需要,Dask会立即将其从内存中释放。
这种“即用即取,用完即弃”的策略是它能够处理远超内存大小数据的关键。
Dask的强大之处在于它的可伸缩性:
并行计算 (Parallel)
::: 分布式计算 (Distributed)
:::
Dask不仅仅是模仿了Pandas,它与整个Python科学计算栈都深度集成。
这意味着你可以用很小的代码改动,就将现有分析流程从处理小数据升级为处理大数据。
下表直观地展示了Dask与Pandas在基本操作上的相似与不同之处。
| 操作 | Pandas | Dask |
|---|---|---|
| 数据结构 | DataFrame, Series | Dask DataFrame, Dask Series |
| 导入库 | import pandas as pd |
import dask.dataframe as dd |
| 读取数据 | pd.read_csv(...) |
dd.read_csv(...) |
| 查看数据 | df.head(), df.tail() |
df.head(), df.tail() |
| 数据过滤 | df[df['col'] > 0] |
df[df['col'] > 0] |
| 分组聚合 | df.groupby('col').sum() |
df.groupby('col').sum() |
| 数据合并 | pd.concat(), df.merge() |
dd.concat(), df.merge() |
| 计算执行 | 立即执行 | 懒惰执行,需要.compute()触发 |
.compute(): Dask操作的“执行”按钮.compute()方法是Dask懒惰执行模型的关键。
df_filtered = df[df['column'] > 0] 时,如果df是一个Dask DataFrame,df_filtered并不是一个包含结果的数据帧。df_filtered此时只是一个“任务图”,一个“计算计划”,它描述了如何得到过滤后的结果。result = df_filtered.compute() 时,Dask才会真正开始读取数据、执行过滤操作,并返回一个Pandas DataFrame作为最终结果。.compute() 的作用:从计划到现实.compute()。打开你的终端或Anaconda Prompt,根据你的需求选择安装命令:
bash pip install "dask[complete]"bash pip install dask由于我们无法分发一个TB级的文件,我们将用代码在本地生成一个大型CSV文件,模拟真实的交易数据。这个文件将远大于你电脑的内存。
我们的目标是生成一个约25MB的文件。
import pandas as pd
import numpy as np
import os
# --- 配置参数 ---
# 在真实场景中,这个文件可能是100GB或更大
n_rows = 1_000_000 # 100万行
file_path = 'large_transactions.csv'
# --- 检查文件是否存在,避免重复生成 ---
if not os.path.exists(file_path):
print(f'文件 {file_path} 不存在,开始生成...')
# 使用pandas分块生成,避免自身内存溢出
chunk_size = 100_000 # 每次处理10万行
header = True
for i in range(0, n_rows, chunk_size):
current_chunk_size = min(chunk_size, n_rows - i)
df_chunk = pd.DataFrame({
'transaction_id': np.arange(i, i + current_chunk_size),
'timestamp': pd.to_datetime(np.random.randint(1.5e9, 1.6e9, size=current_chunk_size), unit='s'),
'stock_id': np.random.randint(1, 1000, size=current_chunk_size),
'price': np.random.uniform(10, 500, size=current_chunk_size).round(2),
'volume': np.random.randint(100, 10000, size=current_chunk_size)
})
# 第一次写入表头,之后以追加模式写入
df_chunk.to_csv(file_path, mode='a', header=header, index=False)
header = False # 确保表头只写一次
print(f'已写入 {i + current_chunk_size} / {n_rows} 行...')
print('数据生成完毕!')
else:
print(f'文件 {file_path} 已存在,跳过生成。')
# --- 检查文件大小 ---
file_size_mb = os.path.getsize(file_path) / (1024**2)
print(f'生成的文件大小约为: {file_size_mb:.2f} MB')文件 large_transactions.csv 已存在,跳过生成。
生成的文件大小约为: 40.44 MB
chunk_size 和循环。df_chunk),然后追加到CSV文件中。现在,关键时刻来了。我们不用pandas,而是用dask.dataframe来读取这个大文件。
dd.read_csvimport dask.dataframe as dd: 这是Dask DataFrame模块的惯用别名,就像import pandas as pd一样。blocksize='64MB': 这是一个关键参数。它告诉Dask不要一次读取整个文件,而是将其逻辑上分割成约64MB大小的块。Dask会为每个块创建一个任务。让我们看看刚刚创建的ddf对象到底是什么。
Dask DataFrame Structure,包含了列名和dtype。Dask通过智能地读取文件的一小部分来推断这些元信息。blocksize。Dask的美妙之处在于它的计算计划(任务图)是透明的。我们可以将它可视化。
让我们定义一个简单的计算:计算交易总额,并可视化其任务图。 (注意: 可能需要安装 graphviz 才能显示图片)
import dask
# 尝试设置graphviz引擎,如果失败则忽略
try:
dask.config.set({'visualization.engine': 'graphviz'})
import graphviz
can_visualize = True
except ImportError:
print("Graphviz not found. Skipping visualization.")
can_visualize = False
# 定义一个计算:交易额 = 价格 * 数量
ddf['amount'] = ddf['price'] * ddf['volume']
# 获取总交易额
total_amount = ddf['amount'].sum()
# 可视化这个计算的任务图
if can_visualize:
try:
total_amount.visualize(filename='dask_graph.png')
from IPython.display import Image
display(Image(filename='dask_graph.png'))
except Exception as e:
print(f"Visualization failed: {e}")
else:
print("Task graph defined but not visualized due to missing dependencies.")Graphviz not found. Skipping visualization.
Task graph defined but not visualized due to missing dependencies.
这个图展示了Dask为了计算总交易额需要执行的所有步骤。
整个计算流程从底部(读取数据)流向顶部(最终结果)。
现在,让我们通过调用.compute()来触发这些计算。我们将计算price列的基本描述性统计。
开始计算描述性统计...
计算完成!
count 1000000.000000
mean 255.179348
std 141.401938
min 10.000000
25% 132.800000
50% 255.115000
75% 377.550000
max 500.000000
Name: price, dtype: float64
.compute() 幕后发生的事describe 任务图。count, mean, std, min, max 等。Dask的groupby操作也和Pandas几乎一样。让我们计算每只股票的平均交易量。
开始分组聚合计算...
计算完成!
stock_id
1 4951.722832
2 5243.609553
3 5261.428571
4 5137.991812
5 5016.143145
6 5053.174588
7 4856.658436
8 5126.087500
9 5171.106833
10 5086.707676
Name: volume, dtype: float64
.compute()才执行。这让它能够处理比内存更大的数据。当数据规模进一步扩大,达到数十TB甚至PB级别,或者需要一个更复杂的、跨语言的生态系统时,Apache Spark就登场了。
Spark最著名的特点是其内存计算(In-memory Computing)能力。
与早期的大数据系统(如Hadoop MapReduce)不同,Spark可以将中间计算结果保存在集群中各台机器的内存里,大大加快了迭代算法(如机器学习)和交互式查询的速度。
这是一个常见的问题,这里有一个简单的决策指南:
| 特性 | Dask | Apache Spark |
|---|---|---|
| 生态系统 | 纯Python,与Pandas/Numpy/Scikit-Learn紧密集成 | 多语言 (Scala, Java, Python, R),更庞大、更复杂的生态 |
| 设置复杂度 | 非常简单 (pip install dask) |
相对复杂,通常需要配置Java环境和集群管理器 |
| 适用场景 | 单机并行、中等规模集群,Python原生项目 | 大型企业级集群,超大规模数据,多语言团队 |
| 学习曲线 | 平缓,对Pandas用户友好 | 较陡峭,需要理解Spark的架构和概念 |
简单来说:
我们之前学习的所有机器学习算法,都假设训练数据可以被读入内存。
当数据大到无法装入内存时,model.fit(X, y)这样的代码就会直接失败。
model.fit(X, y) 会失败?最直接的方法是将大数据集缩小到内存可以容纳的大小。
scikit-learn工具。缺点:
分布式训练的核心思想与Dask处理数据的思想一致:分而治之。
它利用多个计算单元(CPU核心或多台机器上的GPU)并行地进行模型训练,以加快训练过程并处理大型数据集。
幸运的是,我们有现成的工具来处理这些复杂性。
Dask-ML就是Dask框架中专门用于扩展机器学习工作流的模块。它让分布式训练变得像使用scikit-learn一样简单。
StandardScaler等工具,用于大数据预处理。回到之前生成的large_transactions.csv数据集,尝试训练一个线性回归模型,预测交易量volume。
我们将使用dask-ml来处理这个无法一次性载入内存的数据。
首先,我们像之前一样用Dask读取数据,并定义我们的特征(X)和目标(y)。
import dask.dataframe as dd
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LinearRegression
# 读取数据
ddf = dd.read_csv('large_transactions.csv', blocksize='64MB')
# 定义特征和目标
# 为了简化,我们只用 price 和 stock_id 作为特征
# 注意:直接使用 stock_id 是不好的做法,这里仅为演示
X = ddf[['price', 'stock_id']]
y = ddf['volume']
# Dask-ML要求输入是Dask Array,我们进行转换
X = X.to_dask_array(lengths=True)
y = y.to_dask_array(lengths=True)
print("X 和 y 已准备为 Dask Arrays.")
print(X)X 和 y 已准备为 Dask Arrays.
dask.array<read-_to_string_dtype-getitem-values, shape=(1000000, 2), dtype=float64, chunksize=(1000000, 2), chunktype=numpy.ndarray>
我们使用dask_ml的StandardScaler对特征进行标准化。注意,这个操作也是懒惰的!
现在,我们可以定义并训练一个线性回归模型。Dask-ML的模型会自动处理数据分块和并行计算。
开始训练模型...
模型训练完毕!
模型系数 (Coefficients): [ 4.90540492 -2.65732191]
模型截距 (Intercept): 5008.681261808293
.fit().fit()时,Dask-ML和Dask在后台协同工作。从“一次性加载”到“分块处理”: 这是从传统数据分析迈向大数据分析最根本的思维转变。
你必须放弃“所有数据都在我眼前”的假设,开始用“流水线”和“分治”的眼光看待问题。
懒惰执行的力量: 通过先规划(构建任务图)再执行(调用.compute())的策略,Dask等工具能够优化计算流程,并处理远超内存的数据。
这是一种更智能、更具伸缩性的计算范式。
Python生态的威力: Dask和PySpark让Python这门通用语言具备了处理工业级大数据的能力,而Dask-ML则将这种能力无缝延伸到了机器学习领域。
你所学的技能,可以直接应用于解决真实世界的大规模问题。
使用我们生成的large_transactions.csv数据集,用Dask完成以下任务:
timestamp列转换为年份。price)。ddf['timestamp'] = dd.to_datetime(ddf['timestamp'])ddf['year'] = ddf['timestamp'].dt.yeargroupby() 和 agg()假设你还有一个stocks_info.csv文件,包含stock_id和stock_name两列。
请描述你将如何使用Dask将它与large_transactions.csv合并(merge),以便分析每个stock_name的交易情况。需要考虑stocks_info.csv文件的大小吗?