使用 Python PySpark 處理大型資料集


在本教程中,我們將探索 Python 和 PySpark 的強大組合,用於處理大型資料集。PySpark 是一個 Python 庫,它為 Apache Spark 提供了一個介面,Apache Spark 是一個快速且通用的叢集計算系統。透過利用 PySpark,我們可以有效地將資料分佈和處理到多個機器的叢集中,使我們能夠輕鬆處理大規模資料集。

在本文中,我們將深入探討 PySpark 的基礎知識,並演示如何在大型資料集上執行各種資料處理任務。我們將涵蓋關鍵概念,例如 RDD(彈性分散式資料集)和 DataFrame,並透過分步示例展示它們的實際應用。在本教程結束時,您將對如何利用 PySpark 有效地處理和分析海量資料集有一個紮實的瞭解。

第 1 部分:PySpark 入門

在本節中,我們將設定我們的開發環境,並熟悉 PySpark 的基本概念。我們將介紹如何安裝 PySpark、初始化 SparkSession 以及將資料載入到 RDD 和 DataFrame 中。讓我們從安裝 PySpark 開始

# Install PySpark
!pip install pyspark

輸出

Collecting pyspark
...
Successfully installed pyspark-3.1.2

安裝 PySpark 後,我們可以初始化一個 SparkSession 以連線到我們的 Spark 叢集

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()

準備好 SparkSession 後,我們現在可以將資料載入到 RDD 或 DataFrame 中。RDD 是 PySpark 中的基本資料結構,並提供了一個分散式元素集合。另一方面,DataFrame 將資料組織成命名列,類似於關係資料庫中的表。讓我們將 CSV 檔案載入為 DataFrame

# Load a CSV file as a DataFrame
df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)

輸出

+---+------+--------+
|id |name  |age     |
+---+------+--------+
|1  |John  |32      |
|2  |Alice |28      |
|3  |Bob   |35      |
+---+------+--------+

從上面的程式碼片段中可以看到,我們使用 `read.csv()` 方法將 CSV 檔案讀取到資料框中。`header=True` 引數表示第一行包含列名,`inferSchema=True` 自動推斷每列的資料型別。

第 2 部分:資料轉換和分析

在本節中,我們將探索使用 PySpark 的各種資料轉換和分析技術。我們將介紹諸如過濾、聚合和連線資料集等操作。讓我們從根據特定條件過濾資料開始

# Filter data
filtered_data = df.filter(df["age"] > 30)

輸出

+---+----+---+
|id |name|age|
+---+----+---+
|1  |John|32 |
|3  |Bob |35 |
+---+----+---+

在上面的程式碼片段中,我們使用 `filter()` 方法選擇“age”列大於 30 的行。此操作允許我們從大型資料集中提取相關的資料子集。

接下來,讓我們使用 `groupBy()` 和 `agg()` 方法對資料集執行聚合

# Aggregate data
aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})

輸出

+------+-----------+--------+
|gender|avg(salary)|max(age)|
+------+-----------+--------+
|Male  |2500       |32      |
|Female|3000       |35      |
+------+-----------+--------+

在這裡,我們按“gender”列對資料進行分組,並計算每個組的平均工資和最大年齡。生成的 `aggregated_data` DataFrame 為我們提供了對資料集的有價值的見解。

除了過濾和聚合之外,PySpark 還使我們能夠有效地連線多個數據集。讓我們考慮一個我們有兩個 DataFrame 的示例:`df1` 和 `df2`。我們可以根據公共列連線它們

# Join two DataFrames
joined_data = df1.join(df2, on="id", how="inner")

輸出

+---+----+---------+------+
|id |name|department|salary|
+---+----+---------+------+
|1  |John|HR       |2500  |
|2  |Alice|IT      |3000  |
|3  |Bob |Sales    |2000  |
+---+----+---------+------+

`join()` 方法允許我們根據由 `on` 引數指定的公共列組合 DataFrame。我們可以根據需要選擇不同的連線型別,例如“inner”、“outer”、“left”或“right”。

第 3 部分:高階 PySpark 技術

在本節中,我們將探索高階 PySpark 技術以進一步增強我們的資料處理能力。我們將介紹使用者定義函式 (UDF)、視窗函式和快取等主題。讓我們從定義和使用 UDF 開始

from pyspark.sql.functions import udf

# Define a UDF
def square(x):
    return x ** 2

# Register the UDF
square_udf = udf(square)

# Apply the UDF to a column
df = df.withColumn("age_squared", square_udf(df["age"]))

輸出

+---+------+---+------------+
|id |name  |age|age_squared |
+---+------+---+------------+
|1  |John  |32 |1024        |
|2  |Alice |28 |784         |
|3  |Bob   |35 |1225        |
+---+------+---+------------+

在上面的程式碼片段中,我們定義了一個名為 `square()` 的簡單 UDF,它對給定的輸入進行平方。然後,我們使用 `udf()` 函式註冊 UDF 並將其應用於“age”列,在我們的 DataFrame 中建立一個名為“age_squared”的新列。

PySpark 還提供了強大的視窗函式,使我們能夠在特定視窗範圍內執行計算。讓我們計算每個員工的平均工資,同時考慮前一行和下一行

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, avg

# Define the window
window = Window.orderBy("id")

# Calculate average salary with lag and lead
df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)

輸出

+---+----+---------+------+----------+
|id |name|department|salary|avg_salary|
+---+----+---------+------+----------+
|1  |John|HR       |2500  |2666.6667 |
|2  |Alice|

IT      |3000  |2833.3333 |
|3  |Bob |Sales    |2000  |2500      |
+---+----+---------+------+----------+

在上面的程式碼片段中,我們使用 `Window.orderBy()` 方法定義了一個視窗,指定基於“id”列的行排序。然後,我們分別使用 `lag()` 和 `lead()` 函式訪問前一行和下一行。最後,我們透過考慮當前行及其鄰居來計算平均工資。

最後,快取是 PySpark 中一項重要的技術,用於提高迭代演算法或重複計算的效能。我們可以使用 `cache()` 方法將 DataFrame 或 RDD 快取到記憶體中

# Cache a DataFrame
df.cache()

快取不會顯示任何輸出,但依賴於快取 DataFrame 的後續操作將更快,因為資料儲存在記憶體中。

結論

在本教程中,我們探索了 PySpark 在 Python 中處理大型資料集的強大功能。我們首先設定了我們的開發環境並將資料載入到 RDD 和 DataFrame 中。然後,我們深入研究了資料轉換和分析技術,包括過濾、聚合和連線資料集。最後,我們討論了高階 PySpark 技術,例如使用者定義函式、視窗函式和快取。

更新於:2023 年 7 月 25 日

1K+ 次檢視

啟動你的 職業生涯

透過完成課程獲得認證

開始
廣告