18 大数据的处理与学习

欢迎来到大数据时代

  • 我们正处在一个数据爆炸的时代。
  • 从金融交易到社交媒体,海量数据正在重塑商业决策。
  • 但我们迄今为止所学的工具,在面对“真正的大数据”时,会遇到一堵看不见的墙。
  • 今天,我们的任务就是——翻越这堵墙

本章学习议程

我们将循序渐进,探索大数据的世界:

  1. 问题的根源:为什么传统方法会失效?
  2. 核心思想:解决大数据问题的通用策略。
  3. 关键工具(Dask):深入了解Python生态的并行计算利器。
  4. 动手实践:亲手用Dask处理一个超越内存的数据集。
  5. 高级话题:如何在海量数据上进行机器学习。

学习目标 (1): 理解挑战

在这堂课结束时,我希望你们能够理解

为什么处理海量数据是一个巨大的挑战,以及它与传统数据分析的根本区别。

海量数据 传统分析方法 内存限制 ? 无用 / 残缺的信息

学习目标 (2): 了解工具

并且,能够了解

Dask和Spark这两个业界主流的大数据处理框架的核心思想。

大任务 Dask Spark 并行处理结果

学习目标 (3): 掌握实践

最终,能够实践掌握

  • 使用Dask执行基本的数据操作,并体会其与Pandas的异同。
  • 在海量数据上进行机器学习的基本策略,特别是分布式训练。
掌握大数据实践 一个代码块被执行,将混乱的数据点转化为清晰的图表结果。 > import dask.dataframe as dd > ddf = dd.read_csv('big.csv') > result = ddf.mean().compute() 清晰的结果

问题的根源

核心问题:当数据大到内存装不下时,该怎么办?

我们之前所有的数据分析工作,都有一个隐含的前提…

那个我们从未质疑过的前提…

所有数据都能一次性加载到计算机的内存(RAM)中。

pandas 就是这样工作的。当你调用 pd.read_csv() 时,它会尝试将整个文件读入内存。

这在小数据集上表现优异,但在大数据时代,这个前提本身就是问题的根源。

内存之墙:理想与现实

我们希望用我们有限的内存,去分析无限增长的数据。

内存与数据的矛盾 一个小盒子代表的RAM面对着一个巨大的、无边无际的数据流,中间被一堵墙隔开。 RAM 有限 DATA 无限增长 内存之墙

个人电脑的内存是极其有限的

让我们来看一下现实:

  • 学生笔记本电脑:通常是 8GB16GB 内存。
  • 高端工作站:也许能达到 64GB 或 128GB,但很少超过 200GB。

这就像…

一个形象的比喻

…试图用一个水杯去装整个游泳池的水。

水杯与游泳池 一个小水杯面对一个巨大的游泳池,比喻内存和数据的关系。 16GB RAM TB级数据

真实商业场景:支付宝的交易数据

让我们把问题具体化,思考一下支付宝的月度交易数据。

  • 月活跃用户: 超过10亿
  • 假设每位用户每月交易: 50次
  • 每次交易记录的变量: 10个 (交易金额, 时间, 双方信息等)
  • 每个变量存储大小: 8字节 (float64)

这会产生多大的数据量?

触目惊心的数据量估算

让我们来做个简单的计算:

\[ \large{ \text{数据量} = \underbrace{10^9}_{\text{用户数}} \times \underbrace{50}_{\text{交易数}} \times \underbrace{10}_{\text{变量数}} \times \underbrace{8}_{\text{字节/变量}} } \]

问题的严重性:4TB

计算结果是:

\[ \large{ \text{结果} \approx 4 \times 10^{12} \text{ 字节} \approx \textbf{4 Terabytes} } \]

4TB 的数据,没有任何个人电脑能够直接将其读入内存。

传统的分析方法在这里完全失效。

解决方案的核心思想

解决方案:分而治之 (Divide and Conquer)

既然我们无法一次性处理整个数据集,那么唯一的办法就是:

将一个大任务(处理整个数据集)分解成许多可以在小块数据上执行的小任务,然后将结果汇总起来。

这正是大数据处理框架的精髓所在。

“分而治之”的可视化理解

分而治之 一个大的数据块被分解成小块,处理后结果再合并。 大数据 1. 分解 2. 并行处理 3. 汇总 最终结果

Python生态中的两大主流工具

在Python世界里,有两个强大的程序库专门用来实现“分而治之”的思想:

  1. Dask: 一个轻量级、原生支持Python的并行计算库。
  2. Apache Spark: 一个功能更强大、生态更成熟的分布式计算引擎。

我们今天将主要以Dask为例进行讲解,因为它对熟悉Pandas的你来说最容易上手。

工具介绍: Dask

Dask Logo Dask的蛇形标志,象征其Python原生特性。

Dask: 轻量级、Python原生、与现有库(Pandas, NumPy)无缝集成,学习曲线平缓。

工具介绍: Apache Spark

Apache Spark Logo Spark的星火标志,象征其强大的计算能力。

Apache Spark: 行业标准、功能强大、生态成熟、支持多语言(通过PySpark在Python中使用)。

深入Dask: 为Python设计的并行计算库

Dask: 熟悉的味道,更强的能力

Dask是一个开源项目,旨在将Python数据科学生态系统(Pandas, Scikit-Learn, NumPy)的能力扩展到大数据和并行计算领域。

它的设计哲学是:尽可能地模仿现有库的API,让你用最小的学习成本处理更大的数据集。

Dask的优点:与Pandas的惊人相似性

Dask最吸引人的一点在于,你不需要学习一套全新的语法。

  • Dask DataFrame: Dask的核心数据结构之一,它在内部由多个小的Pandas DataFrame组成。
  • API兼容: 基本上所有你能在Pandas中使用的功能,都可以在Dask中找到对应的实现。

这使得从Pandas迁移到Dask变得异常平滑。

Dask的五大核心特点

  1. 懒惰执行 (Lazy Execution)
  2. 任务调度 (Task Scheduling)
  3. 智能内存管理 (Smart Memory Management)
  4. 并行与分布式计算 (Parallel & Distributed)
  5. 无缝生态集成 (Ecosystem Integration)

我们将逐一深入探讨这些概念。

核心特点 (1): 懒惰执行 (Lazy Execution)

这是Dask与Pandas最本质的区别。

  • Pandas: 你写的每一行代码都会立即执行
  • Dask: 你写的代码不会立即执行

相反,Dask会构建一个任务图(Task Graph),记录下你想要执行的所有计算步骤。

什么是任务图?

任务图是Dask的“作战计划”。它是一个有向无环图(DAG),描述了:

  • 需要执行的所有任务(例如:从文件中读取一个数据块,过滤行,计算平均值)。
  • 任务之间的依赖关系(例如:必须先读取数据块,然后才能对其进行过滤)。

计算只会在你明确要求结果时(通过调用.compute()方法)才真正发生。

思想实验:懒惰执行就像是写一份菜谱

想象一下做一道复杂的菜:

[.column width=“50%”] 立即执行 (Pandas)

  • 读第一步“切洋葱”
  • 立刻切洋葱
  • 读第二步“热锅”
  • 立刻热锅
  • …一步一动… :::

::: 懒惰执行 (Dask)

  • 读完整份菜谱
  • 🧠 在脑中规划
  • “先切好所有蔬菜,再混合酱料,最后一起下锅。”
  • 直到决定“开火”时,才真正开始动手。 :::

:::

Dask的这种方式让它有机会在执行前对整个计算流程进行优化。

核心特点 (2): 任务调度 (Task Scheduling)

懒惰执行构建了任务图,而任务调度器(Task Scheduler)则是负责执行这个图的“大脑”。

Dask会将大的计算任务分解成许多小的、独立的任务,然后智能地安排它们的执行顺序,并分配到不同的CPU核心或机器上并行处理。

任务调度器的工作原理

Dask任务调度器 调度器将任务图中的任务分配给多个工作核心。 调度器 任务图 (Plan) CPU 核心 1 CPU 核心 2 CPU 核心 ...N 分配任务

核心特点 (3): 智能内存管理

Dask在处理大数据时,会非常智能地管理内存。

它只会在计算某个数据块(chunk)时才将其加载到内存中,计算完成后,如果该数据块不再被需要,Dask会立即将其从内存中释放。

这种“即用即取,用完即弃”的策略是它能够处理远超内存大小数据的关键。

内存管理的“流水线”模式

Dask内存管理 数据块按顺序流入内存进行处理,然后流出。 磁盘 内存 (RAM) 结果 处理中...

核心特点 (4): 并行与分布式计算

Dask的强大之处在于它的可伸缩性:

并行计算 (Parallel)

  • 单机多核
  • 在你的笔记本电脑上,Dask可以利用所有的CPU核心,将计算速度提升数倍。 :::

::: 分布式计算 (Distributed)

  • 多机集群
  • 对于TB级别的数据,Dask可以协调一个由多台计算机组成的集群,共同完成计算任务。 :::

:::

并行 vs. 分布式

并行计算与分布式计算 左侧显示单台机器内的多核并行,右侧显示多台机器的分布式集群。 并行计算 (单机) Core1 Core2 Core3 ... ... CoreN 分布式计算 (集群) 机器1 机器2 ...N 网络

核心特点 (5): 与现有生态系统无缝集成

Dask不仅仅是模仿了Pandas,它与整个Python科学计算栈都深度集成。

这意味着你可以用很小的代码改动,就将现有分析流程从处理小数据升级为处理大数据。

Dask 与 Python科学计算全家桶

Dask生态系统集成 Dask作为中心,与Pandas, NumPy, Scikit-Learn等库协同工作。 Dask Pandas NumPy Scikit Learn ...and more

Dask 与 Pandas 的操作对比

下表直观地展示了Dask与Pandas在基本操作上的相似与不同之处。

操作 Pandas Dask
数据结构 DataFrame, Series Dask DataFrame, Dask Series
导入库 import pandas as pd import dask.dataframe as dd
读取数据 pd.read_csv(...) dd.read_csv(...)
查看数据 df.head(), df.tail() df.head(), df.tail()
数据过滤 df[df['col'] > 0] df[df['col'] > 0]
分组聚合 df.groupby('col').sum() df.groupby('col').sum()
数据合并 pd.concat(), df.merge() dd.concat(), df.merge()
计算执行 立即执行 懒惰执行,需要.compute()触发

理解.compute(): Dask操作的“执行”按钮

.compute()方法是Dask懒惰执行模型的关键。

  • 当你写下 df_filtered = df[df['column'] > 0] 时,如果df是一个Dask DataFrame,df_filtered不是一个包含结果的数据帧。
  • df_filtered此时只是一个“任务图”,一个“计算计划”,它描述了如何得到过滤后的结果。
  • 只有当你执行 result = df_filtered.compute() 时,Dask才会真正开始读取数据、执行过滤操作,并返回一个Pandas DataFrame作为最终结果。

.compute() 的作用:从计划到现实

.compute()的作用 Dask对象通过.compute()方法转换成Pandas对象。 Dask DataFrame (计算计划) Pandas DataFrame (真实结果) .compute()

让我们进入实践:用Dask分析大型数据集

实践环节目标

  1. 安装 Dask 库。
  2. 在本地生成一个超越内存的模拟数据集。
  3. 使用Dask读取并分析该数据集。
  4. 亲身体验懒惰执行.compute()

步骤1: 安装 Dask 库

打开你的终端或Anaconda Prompt,根据你的需求选择安装命令:

  • 安装完整版 (推荐): 包含所有依赖项,如分布式调度、可视化等。 bash pip install "dask[complete]"
  • 仅安装核心库: bash pip install dask

步骤2: 生成一个模拟的大型数据集

由于我们无法分发一个TB级的文件,我们将用代码在本地生成一个大型CSV文件,模拟真实的交易数据。这个文件将远大于你电脑的内存。

我们的目标是生成一个约25MB的文件。

生成数据的代码

import pandas as pd
import numpy as np
import os

# --- 配置参数 ---
# 在真实场景中,这个文件可能是100GB或更大
n_rows = 1_000_000 # 100万行
file_path = 'large_transactions.csv'

# --- 检查文件是否存在,避免重复生成 ---
if not os.path.exists(file_path):
    print(f'文件 {file_path} 不存在,开始生成...')
    # 使用pandas分块生成,避免自身内存溢出
    chunk_size = 100_000 # 每次处理10万行
    header = True
    for i in range(0, n_rows, chunk_size):
        current_chunk_size = min(chunk_size, n_rows - i)
        df_chunk = pd.DataFrame({
            'transaction_id': np.arange(i, i + current_chunk_size),
            'timestamp': pd.to_datetime(np.random.randint(1.5e9, 1.6e9, size=current_chunk_size), unit='s'),
            'stock_id': np.random.randint(1, 1000, size=current_chunk_size),
            'price': np.random.uniform(10, 500, size=current_chunk_size).round(2),
            'volume': np.random.randint(100, 10000, size=current_chunk_size)
        })
        # 第一次写入表头,之后以追加模式写入
        df_chunk.to_csv(file_path, mode='a', header=header, index=False)
        header = False # 确保表头只写一次
        print(f'已写入 {i + current_chunk_size} / {n_rows} 行...')
    print('数据生成完毕!')
else:
    print(f'文件 {file_path} 已存在,跳过生成。')

# --- 检查文件大小 ---
file_size_mb = os.path.getsize(file_path) / (1024**2)
print(f'生成的文件大小约为: {file_size_mb:.2f} MB')
文件 large_transactions.csv 已存在,跳过生成。
生成的文件大小约为: 40.44 MB
Figure 1

代码解读:为什么要分块生成?

  • 注意代码中的 chunk_size 和循环。
  • 我们没有一次性在内存中创建100万行的数据,那会导致内存溢出。
  • 相反,我们循环创建小的Pandas DataFrame (df_chunk),然后追加到CSV文件中。
  • 这个生成过程本身就是“分而治之”思想的一个体现!

步骤3: 使用 Dask 读取数据

现在,关键时刻来了。我们不用pandas,而是用dask.dataframe来读取这个大文件。

import dask.dataframe as dd

# 使用Dask读取CSV
# blocksize参数告诉Dask将文件分成大约64MB的块进行处理
ddf = dd.read_csv('large_transactions.csv', blocksize='64MB')

# 注意,这行代码瞬间完成!因为Dask还没有真正读取任何数据。
print('Dask DataFrame已创建!')
Dask DataFrame已创建!

解读 dd.read_csv

  • import dask.dataframe as dd: 这是Dask DataFrame模块的惯用别名,就像import pandas as pd一样。
  • blocksize='64MB': 这是一个关键参数。它告诉Dask不要一次读取整个文件,而是将其逻辑上分割成约64MB大小的块。Dask会为每个块创建一个任务。

审视Dask DataFrame对象

让我们看看刚刚创建的ddf对象到底是什么。

print(ddf)
Dask DataFrame Structure:
              transaction_id timestamp stock_id    price volume
npartitions=1                                                  
                       int64    string    int64  float64  int64
                         ...       ...      ...      ...    ...
Dask Name: to_string_dtype, 2 expressions
Expr=ArrowStringConversion(frame=FromMapProjectable(4c362d6))

解读输出结果

  • 没有数据! 和Pandas不同,它不会打印出数据内容。
  • 列名和类型: 它显示了Dask DataFrame Structure,包含了列名和dtype。Dask通过智能地读取文件的一小部分来推断这些元信息。
  • 分区 (npartitions): 这是核心。它告诉你这个Dask DataFrame在逻辑上被分成了多少块(分区)。这个数量取决于文件总大小和blocksize

可视化任务图:揭示Dask的计算计划

Dask的美妙之处在于它的计算计划(任务图)是透明的。我们可以将它可视化。

让我们定义一个简单的计算:计算交易总额,并可视化其任务图。 (注意: 可能需要安装 graphviz 才能显示图片)

Code
import dask
# 尝试设置graphviz引擎,如果失败则忽略
try:
    dask.config.set({'visualization.engine': 'graphviz'})
    import graphviz
    can_visualize = True
except ImportError:
    print("Graphviz not found. Skipping visualization.")
    can_visualize = False

# 定义一个计算:交易额 = 价格 * 数量
ddf['amount'] = ddf['price'] * ddf['volume']

# 获取总交易额
total_amount = ddf['amount'].sum()

# 可视化这个计算的任务图
if can_visualize:
    try:
        total_amount.visualize(filename='dask_graph.png')
        from IPython.display import Image
        display(Image(filename='dask_graph.png'))
    except Exception as e:
        print(f"Visualization failed: {e}")
else:
    print("Task graph defined but not visualized due to missing dependencies.")
Graphviz not found. Skipping visualization.
Task graph defined but not visualized due to missing dependencies.
Figure 2

如何解读任务图

这个图展示了Dask为了计算总交易额需要执行的所有步骤。

  • 圆形节点: 代表函数调用(例如,读取一个CSV块,做乘法,求和)。
  • 方形节点: 代表中间结果。
  • 箭头: 表示依赖关系。

整个计算流程从底部(读取数据)流向顶部(最终结果)。

步骤4: 执行计算并获取结果

现在,让我们通过调用.compute()来触发这些计算。我们将计算price列的基本描述性统计。

# 计算描述性统计
# 这个过程会花费一些时间,因为Dask现在正在读取所有数据块并进行计算
print("开始计算描述性统计...")
summary_stats = ddf['price'].describe().compute()
print("计算完成!")
print(summary_stats)
开始计算描述性统计...
计算完成!
count    1000000.000000
mean         255.179348
std          141.401938
min           10.000000
25%          132.800000
50%          255.115000
75%          377.550000
max          500.000000
Name: price, dtype: float64

.compute() 幕后发生的事

  1. Dask Scheduler接收到 describe 任务图。
  2. 它将任务(为每个分区计算统计量)分配给可用的CPU核心。
  3. 每个核心读取一个数据块,计算 count, mean, std, min, max 等。
  4. 所有分区的中间结果被汇总起来,计算出最终的全局统计量。
  5. 整个过程中,内存占用被严格控制在小范围内。

Dask的Pandas-like操作:分组聚合

Dask的groupby操作也和Pandas几乎一样。让我们计算每只股票的平均交易量。

# 计算每只股票的平均交易量
# 同样,这只是定义了计算计划
mean_volume_by_stock = ddf.groupby('stock_id')['volume'].mean()

# 使用.compute()执行计算
# 我们只看前10个结果以节省屏幕空间
# 注意:在Dask中,head()也会触发部分计算,但为了得到Pandas对象,通常还是需要compute()
print("开始分组聚合计算...")
result = mean_volume_by_stock.head(10) 
print("计算完成!")
print(result)
开始分组聚合计算...
计算完成!
stock_id
1     4951.722832
2     5243.609553
3     5261.428571
4     5137.991812
5     5016.143145
6     5053.174588
7     4856.658436
8     5126.087500
9     5171.106833
10    5086.707676
Name: volume, dtype: float64

简单小结:Dask vs. Pandas

  • 相似性: API非常接近,如果你会Pandas,你几乎就会用Dask DataFrame。
  • 核心差异: Dask是懒惰的。它构建任务图,直到你调用.compute()才执行。这让它能够处理比内存更大的数据。
  • 适用场景: 当你的数据大小在GB到TB级别,并且可以在一台强大的机器上处理时,Dask是一个绝佳的选择。

超越单机:Apache Spark简介

简介:Apache Spark 与 PySpark

当数据规模进一步扩大,达到数十TB甚至PB级别,或者需要一个更复杂的、跨语言的生态系统时,Apache Spark就登场了。

  • Apache Spark: 是一个为大规模数据处理而设计的统一分析引擎。它最初用Scala编写,拥有卓越的性能和可伸缩性。
  • PySpark: 是Spark为Python提供的官方API,让我们可以在Python中利用Spark的强大功能。

Spark的核心特点:内存集群计算

Spark最著名的特点是其内存计算(In-memory Computing)能力。

与早期的大数据系统(如Hadoop MapReduce)不同,Spark可以将中间计算结果保存在集群中各台机器的内存里,大大加快了迭代算法(如机器学习)和交互式查询的速度。

Spark的集群计算模型

Spark集群计算模型 一个主节点协调多个工作节点,每个节点都有自己的CPU和内存。 主节点 (Driver) 工作节点 1 工作节点 2 工作节点 N

Dask 与 Spark: 何时选择哪个?

这是一个常见的问题,这里有一个简单的决策指南:

特性 Dask Apache Spark
生态系统 纯Python,与Pandas/Numpy/Scikit-Learn紧密集成 多语言 (Scala, Java, Python, R),更庞大、更复杂的生态
设置复杂度 非常简单 (pip install dask) 相对复杂,通常需要配置Java环境和集群管理器
适用场景 单机并行、中等规模集群,Python原生项目 大型企业级集群,超大规模数据,多语言团队
学习曲线 平缓,对Pandas用户友好 较陡峭,需要理解Spark的架构和概念

总结:如何选择

简单来说

  • 如果你的团队主要使用Python,数据在几TB以内,优先考虑Dask
  • 如果你的公司已经有了一个大数据平台,或者数据量极其庞大,PySpark是更好的选择

大数据时代的机器学习

大数据机器学习的挑战

我们之前学习的所有机器学习算法,都假设训练数据可以被读入内存。

当数据大到无法装入内存时,model.fit(X, y)这样的代码就会直接失败。

为什么 model.fit(X, y) 会失败?

模型拟合失败 一个算法试图将巨大的数据块装入小内存中,导致内存溢出。 X_train (500GB) RAM (16GB) model.fit() MemoryError!

对策1 (简单但有缺陷): 随机抽样

最直接的方法是将大数据集缩小到内存可以容纳的大小。

  • 做法: 从海量数据中随机抽取一部分样本(例如1%)。
  • 优点: 简单易行,可以使用所有标准的scikit-learn工具。

随机抽样的代价

缺点:

  1. 信息丢失: 丢弃了99%的数据,可能会错过重要的模式,尤其是在处理稀有事件(如金融欺诈)时。
  2. 模型性能: 对于需要大量数据才能学习的复杂模型(如深度学习),抽样后的数据可能不足以训练一个高性能的模型。
随机抽样的信息丢失 大部分数据被丢弃,只有一个小样本被保留。 全部数据 99% 被丢弃! 保留 1% 样本

对策2 (更优越): 分布式训练

分布式训练的核心思想与Dask处理数据的思想一致:分而治之

它利用多个计算单元(CPU核心或多台机器上的GPU)并行地进行模型训练,以加快训练过程并处理大型数据集。

分布式训练的基本流程 (1/4): 分区与分发

  1. 准备环境: 配置一个包含多个计算节点(worker)的集群。
  2. 数据分区: 将大型数据集分割成许多小批次(batches)。
  3. 数据分配: 将这些数据批次均匀地分配给每个节点。
数据分区与分发 大数据被切分并发送到不同的工作节点。 大数据 Worker 1 Worker 2 Worker 3

分布式训练的基本流程 (2/4): 模型复制

  1. 模型复制: 在每个节点上初始化一个完全相同的模型副本(拥有相同的初始权重)。
模型复制 一个模型模板被复制到所有工作节点。 模型

分布式训练的基本流程 (3/4): 并行计算

  1. 并行计算: 每个节点使用自己分配到的数据批次,独立地对模型进行训练,并计算出模型参数的更新量(梯度)。
并行计算梯度 每个工作节点独立计算梯度。 + 计算 梯度 G1 梯度 G2 梯度 G3

分布式训练的基本流程 (4/4): 聚合与更新

  1. 梯度聚合: 所有节点计算出的梯度被汇总起来。
  2. 模型更新: 使用聚合后的梯度来更新所有节点上的模型参数。
  3. 迭代: 重复并行计算和更新的步骤。
聚合与更新 所有梯度被发送到中心进行聚合,然后更新所有模型。 聚合梯度 已更新

分布式训练的优化与工具:Dask-ML

幸运的是,我们有现成的工具来处理这些复杂性。

Dask-ML就是Dask框架中专门用于扩展机器学习工作流的模块。它让分布式训练变得像使用scikit-learn一样简单。

Dask-ML 的三大应用方式

  1. 扩展 Scikit-Learn
    • 提供了许多与Scikit-Learn API兼容的并行算法。
  2. 增量学习 (核外学习)
    • 支持在无法一次性加载的数据上逐步训练模型。
  3. 可扩展的预处理
    • 提供了并行的StandardScaler等工具,用于大数据预处理。

实践:使用Dask-ML进行核外线性回归

Dask-ML实践: 目标

回到之前生成的large_transactions.csv数据集,尝试训练一个线性回归模型,预测交易量volume

我们将使用dask-ml来处理这个无法一次性载入内存的数据。

步骤1 - 准备数据

首先,我们像之前一样用Dask读取数据,并定义我们的特征(X)和目标(y)。

import dask.dataframe as dd
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LinearRegression

# 读取数据
ddf = dd.read_csv('large_transactions.csv', blocksize='64MB')

# 定义特征和目标
# 为了简化,我们只用 price 和 stock_id 作为特征
# 注意:直接使用 stock_id 是不好的做法,这里仅为演示
X = ddf[['price', 'stock_id']]
y = ddf['volume']

# Dask-ML要求输入是Dask Array,我们进行转换
X = X.to_dask_array(lengths=True)
y = y.to_dask_array(lengths=True)

print("X 和 y 已准备为 Dask Arrays.")
print(X)
X 和 y 已准备为 Dask Arrays.
dask.array<read-_to_string_dtype-getitem-values, shape=(1000000, 2), dtype=float64, chunksize=(1000000, 2), chunktype=numpy.ndarray>

步骤2 - 数据标准化

我们使用dask_mlStandardScaler对特征进行标准化。注意,这个操作也是懒惰的!

# 初始化标准化工具
scaler = StandardScaler()

# 对特征X进行拟合和转换
# 同样,这些操作是懒惰的,只构建任务图
X_scaled = scaler.fit_transform(X)

print('数据标准化步骤已定义!')
print(X_scaled)
数据标准化步骤已定义!
dask.array<truediv, shape=(1000000, 2), dtype=float64, chunksize=(1000000, 2), chunktype=numpy.ndarray>

步骤3 - 训练模型

现在,我们可以定义并训练一个线性回归模型。Dask-ML的模型会自动处理数据分块和并行计算。

# 初始化线性回归模型
lr = LinearRegression()

# 训练模型
# 这一步会触发计算!
# Dask-ML会在后台读取数据块,进行计算
print('开始训练模型...')
lr.fit(X_scaled, y)
print('模型训练完毕!')

# 打印模型系数
print(f'模型系数 (Coefficients): {lr.coef_}')
print(f'模型截距 (Intercept): {lr.intercept_}')
开始训练模型...
模型训练完毕!
模型系数 (Coefficients): [ 4.90540524 -2.65732221]
模型截距 (Intercept): 5008.6812618078075

解读 Dask-ML 的 .fit()

  • 当我们调用.fit()时,Dask-ML和Dask在后台协同工作。
  • Dask负责高效地从磁盘读取数据块(partitions)。
  • Dask-ML则负责在这些数据块上执行并行的机器学习算法(例如,计算每个块的梯度)。
  • 最后,Dask-ML负责聚合所有块的结果,得到一个统一的最终模型。
  • 整个过程对用户来说是透明的,API与Scikit-Learn高度一致。

结论:关键思想的转变

本章总结 (1): 思维的转变

从“一次性加载”到“分块处理”: 这是从传统数据分析迈向大数据分析最根本的思维转变。

你必须放弃“所有数据都在我眼前”的假设,开始用“流水线”和“分治”的眼光看待问题。

本章总结 (2): 工具的力量

懒惰执行的力量: 通过先规划(构建任务图)再执行(调用.compute())的策略,Dask等工具能够优化计算流程,并处理远超内存的数据。

这是一种更智能、更具伸缩性的计算范式。

本章总结 (3): 生态的威力

Python生态的威力: Dask和PySpark让Python这门通用语言具备了处理工业级大数据的能力,而Dask-ML则将这种能力无缝延伸到了机器学习领域。

你所学的技能,可以直接应用于解决真实世界的大规模问题。

课后练习

习题 1 & 2: 场景分析

  1. 场景一: 你需要处理一个2GB的CSV文件,并且你的电脑有16GB内存。你会选择哪个库(Pandas还是Dask)?为什么?
  2. 场景二: 你需要处理一个5TB的用户行为日志数据集,并构建一个复杂的推荐系统,你的公司有一个大型计算集群。你应该考虑使用哪个库(Dask还是PySpark)?为什么?

习题 3: Dask实践

使用我们生成的large_transactions.csv数据集,用Dask完成以下任务:

  • timestamp列转换为年份。
  • 计算每年最高最低交易价格(price)。
  • 提示:
    • ddf['timestamp'] = dd.to_datetime(ddf['timestamp'])
    • ddf['year'] = ddf['timestamp'].dt.year
    • 使用 groupby()agg()

习题 4: Dask思维

假设你还有一个stocks_info.csv文件,包含stock_idstock_name两列。

描述你将如何使用Dask将它与large_transactions.csv合并(merge),以便分析每个stock_name的交易情况。需要考虑stocks_info.csv文件的大小吗?