使用 Python PySpark 處理大型資料集
在本教程中,我們將探索 Python 和 PySpark 的強大組合,用於處理大型資料集。PySpark 是一個 Python 庫,它為 Apache Spark 提供了一個介面,Apache Spark 是一個快速且通用的叢集計算系統。透過利用 PySpark,我們可以有效地將資料分佈和處理到多個機器的叢集中,使我們能夠輕鬆處理大規模資料集。
在本文中,我們將深入探討 PySpark 的基礎知識,並演示如何在大型資料集上執行各種資料處理任務。我們將涵蓋關鍵概念,例如 RDD(彈性分散式資料集)和 DataFrame,並透過分步示例展示它們的實際應用。在本教程結束時,您將對如何利用 PySpark 有效地處理和分析海量資料集有一個紮實的瞭解。
第 1 部分:PySpark 入門
在本節中,我們將設定我們的開發環境,並熟悉 PySpark 的基本概念。我們將介紹如何安裝 PySpark、初始化 SparkSession 以及將資料載入到 RDD 和 DataFrame 中。讓我們從安裝 PySpark 開始
# Install PySpark !pip install pyspark
輸出
Collecting pyspark ... Successfully installed pyspark-3.1.2
安裝 PySpark 後,我們可以初始化一個 SparkSession 以連線到我們的 Spark 叢集
from pyspark.sql import SparkSession # Create a SparkSession spark = SparkSession.builder.appName("LargeDatasetProcessing").getOrCreate()
準備好 SparkSession 後,我們現在可以將資料載入到 RDD 或 DataFrame 中。RDD 是 PySpark 中的基本資料結構,並提供了一個分散式元素集合。另一方面,DataFrame 將資料組織成命名列,類似於關係資料庫中的表。讓我們將 CSV 檔案載入為 DataFrame
# Load a CSV file as a DataFrame df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)
輸出
+---+------+--------+ |id |name |age | +---+------+--------+ |1 |John |32 | |2 |Alice |28 | |3 |Bob |35 | +---+------+--------+
從上面的程式碼片段中可以看到,我們使用 `read.csv()` 方法將 CSV 檔案讀取到資料框中。`header=True` 引數表示第一行包含列名,`inferSchema=True` 自動推斷每列的資料型別。
第 2 部分:資料轉換和分析
在本節中,我們將探索使用 PySpark 的各種資料轉換和分析技術。我們將介紹諸如過濾、聚合和連線資料集等操作。讓我們從根據特定條件過濾資料開始
# Filter data filtered_data = df.filter(df["age"] > 30)
輸出
+---+----+---+ |id |name|age| +---+----+---+ |1 |John|32 | |3 |Bob |35 | +---+----+---+
在上面的程式碼片段中,我們使用 `filter()` 方法選擇“age”列大於 30 的行。此操作允許我們從大型資料集中提取相關的資料子集。
接下來,讓我們使用 `groupBy()` 和 `agg()` 方法對資料集執行聚合
# Aggregate data aggregated_data = df.groupBy("gender").agg({"salary": "mean", "age": "max"})
輸出
+------+-----------+--------+ |gender|avg(salary)|max(age)| +------+-----------+--------+ |Male |2500 |32 | |Female|3000 |35 | +------+-----------+--------+
在這裡,我們按“gender”列對資料進行分組,並計算每個組的平均工資和最大年齡。生成的 `aggregated_data` DataFrame 為我們提供了對資料集的有價值的見解。
除了過濾和聚合之外,PySpark 還使我們能夠有效地連線多個數據集。讓我們考慮一個我們有兩個 DataFrame 的示例:`df1` 和 `df2`。我們可以根據公共列連線它們
# Join two DataFrames joined_data = df1.join(df2, on="id", how="inner")
輸出
+---+----+---------+------+ |id |name|department|salary| +---+----+---------+------+ |1 |John|HR |2500 | |2 |Alice|IT |3000 | |3 |Bob |Sales |2000 | +---+----+---------+------+
`join()` 方法允許我們根據由 `on` 引數指定的公共列組合 DataFrame。我們可以根據需要選擇不同的連線型別,例如“inner”、“outer”、“left”或“right”。
第 3 部分:高階 PySpark 技術
在本節中,我們將探索高階 PySpark 技術以進一步增強我們的資料處理能力。我們將介紹使用者定義函式 (UDF)、視窗函式和快取等主題。讓我們從定義和使用 UDF 開始
from pyspark.sql.functions import udf # Define a UDF def square(x): return x ** 2 # Register the UDF square_udf = udf(square) # Apply the UDF to a column df = df.withColumn("age_squared", square_udf(df["age"]))
輸出
+---+------+---+------------+ |id |name |age|age_squared | +---+------+---+------------+ |1 |John |32 |1024 | |2 |Alice |28 |784 | |3 |Bob |35 |1225 | +---+------+---+------------+
在上面的程式碼片段中,我們定義了一個名為 `square()` 的簡單 UDF,它對給定的輸入進行平方。然後,我們使用 `udf()` 函式註冊 UDF 並將其應用於“age”列,在我們的 DataFrame 中建立一個名為“age_squared”的新列。
PySpark 還提供了強大的視窗函式,使我們能夠在特定視窗範圍內執行計算。讓我們計算每個員工的平均工資,同時考慮前一行和下一行
from pyspark.sql.window import Window from pyspark.sql.functions import lag, lead, avg # Define the window window = Window.orderBy("id") # Calculate average salary with lag and lead df = df.withColumn("avg_salary", (lag(df["salary"]).over(window) + lead(df["salary"]).over(window) + df["salary"]) / 3)
輸出
+---+----+---------+------+----------+ |id |name|department|salary|avg_salary| +---+----+---------+------+----------+ |1 |John|HR |2500 |2666.6667 | |2 |Alice| IT |3000 |2833.3333 | |3 |Bob |Sales |2000 |2500 | +---+----+---------+------+----------+
在上面的程式碼片段中,我們使用 `Window.orderBy()` 方法定義了一個視窗,指定基於“id”列的行排序。然後,我們分別使用 `lag()` 和 `lead()` 函式訪問前一行和下一行。最後,我們透過考慮當前行及其鄰居來計算平均工資。
最後,快取是 PySpark 中一項重要的技術,用於提高迭代演算法或重複計算的效能。我們可以使用 `cache()` 方法將 DataFrame 或 RDD 快取到記憶體中
# Cache a DataFrame df.cache()
快取不會顯示任何輸出,但依賴於快取 DataFrame 的後續操作將更快,因為資料儲存在記憶體中。
結論
在本教程中,我們探索了 PySpark 在 Python 中處理大型資料集的強大功能。我們首先設定了我們的開發環境並將資料載入到 RDD 和 DataFrame 中。然後,我們深入研究了資料轉換和分析技術,包括過濾、聚合和連線資料集。最後,我們討論了高階 PySpark 技術,例如使用者定義函式、視窗函式和快取。