使用 Apache Spark 在 Python 中清洗資料


在當今時代,隨著大量資料的高速流動,Apache Spark 作為一個開源的大資料處理框架,成為了一個普遍的選擇,因為它允許對資料進行並行和分散式處理。對這些資料的清洗是一個重要的步驟,Apache Spark 為我們提供了各種工具和方法來清洗資料。在本教程中,我們將瞭解如何在 Python 中使用 Apache Spark 清洗資料,具體步驟如下:

  • 將資料載入到 Spark DataFrame 中 - SparkSession.read 方法允許我們從各種來源讀取資料,例如 CSV、JSON、Parquet 等。

  • 處理缺失值 - DataFrame.dropna 或 DataFrame.fillna 方法分別允許我們刪除包含任何缺失值的資料或用特定值填充缺失值。

  • 處理重複項 - 資料中經常包含重複條目。為了處理這種情況,DataFrame.dropDuplicates 方法允許我們從 DataFrame 中刪除重複項。

  • 處理異常值 - DataFrame.filter 方法允許我們刪除所有包含異常值的行。

  • 處理資料型別轉換 - 為了轉換列的資料型別,我們有一個名為 DataFrame.cast 的方法。

在繼續學習如何使用 PySpark 清洗資料之前,我們必須安裝 PySpark 庫。為此,我們必須在終端中執行以下命令:

pip install pyspark

處理缺失值

在 Apache Spark 中處理缺失值涉及到識別和處理儲存在 Spark DataFrame 中的資料集中缺失或不完整的資料。在 Spark 中處理缺失值有幾種方法,包括:

  • 刪除包含缺失值的記錄 - 這涉及到從 DataFrame 中刪除包含缺失值的記錄。

  • 估算缺失值 - 這涉及到用計算出的值(例如列中資料的平均值、中位數或眾數)替換缺失值。

  • 填充缺失值 - 這涉及到用特定值(例如零或預設值)替換缺失值。

  • 插值缺失值 - 這涉及到使用數學方法(例如線性插值或樣條插值)來估計缺失值。

處理缺失值的方法取決於資料分析的具體需求和目標。以一致且可重複的方式處理缺失值非常重要,以確保資料的完整性和結果的準確性。

在 Apache Spark 中,pyspark.sql.DataFrame 和 pyspark.sql.DataFrameNaFunctions 模組提供的函式可用於處理缺失值。這些函式包括 dropna、fillna 和 interpolate。

示例

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MissingData").getOrCreate()

# Create a sample data frame
data = [("John", 25, None), ("Jane", 30, 35.5), ("Jim", None, 40.0), ("Joan", 32, None)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Replacing Missing Values with Mean
from pyspark.sql.functions import mean
mean_age = df.agg(mean(df["Age"])).first()[0]
mean_salary = df.agg(mean(df["Salary"])).first()[0]
df = df.fillna({"Age": mean_age, "Salary": mean_salary})

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

輸出

Original Data Frame:
+----+----+------+
|Name| Age|Salary|
+----+----+------+
|John|  25|  null|
|Jane|  30|  35.5|
| Jim|null|  40.0|
|Joan|  32|  null|
+----+----+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25| 37.75|
|Jane| 30|  35.5|
| Jim| 29|  40.0|
|Joan| 32| 37.75|
+----+---+------+

處理重複項

在 Apache Spark 中處理重複項涉及到識別和處理儲存在 Spark DataFrame 中的資料集中重複的記錄。在 Spark 中處理重複項有幾種方法,包括:

  • 刪除重複項 - 這涉及到識別並從 DataFrame 中刪除重複的記錄。dropDuplicates 函式可用於在 Spark 中刪除重複記錄。

  • 保留重複項 - 這涉及到保留 DataFrame 中重複記錄的所有例項,通常是為每個記錄新增唯一的識別符號或索引。

  • 標記重複項 - 這涉及到標記 DataFrame 中的重複記錄,但不刪除它們,以便可以進一步分析或處理它們。

處理重複項的方法取決於資料分析的具體需求和目標。以一致且可重複的方式處理重複項非常重要,以確保資料的完整性和結果的準確性。

在 Apache Spark 中,dropDuplicates 函式可用於刪除 DataFrame 中的重複記錄。該函式以一個或多個列作為輸入,並刪除指定列中的值完全相同的記錄。dropDuplicates 函式返回一個新的 DataFrame,其中已刪除重複記錄。

示例

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DuplicateData").getOrCreate()

# Create a sample data frame
data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0),
   ("John", 25, 90.0), ("Jim", 20, 200.0)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Remove duplicates
df = df.dropDuplicates()

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

輸出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
|John| 25|  90.0|
| Jim| 20| 200.0|
+----+---+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|Jane| 30|  35.5|
|John| 25|  90.0|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

處理異常值

在 Apache Spark 中處理異常值是指識別並刪除或轉換資料集中被認為是極端值或超出正常值範圍的值的過程。異常值會對統計分析的結果產生重大影響,因此通常需要以某種方式處理它們。

在 Apache Spark 中處理異常值有幾種常見的方法,包括:

刪除包含異常值的記錄:這涉及到過濾掉特定列的值超出指定範圍或超出平均值一定標準差的記錄。

  • 用平均值或中位數替換異常值 - 這涉及到用列中剩餘值的平均值或中位數替換被認為是異常值的那些值。

  • Winsorize 異常值 - 這涉及到用指定的百分位數值(例如第 5 或 95 百分位數)替換異常值。

  • 裁剪異常值 - 這涉及到用指定的最大值或最小值替換異常值。

為了在 Apache Spark 中處理異常值,您可以使用 pyspark.sql.functions 模組中提供的內建函式來計算平均值和標準差等統計資料,然後使用 filter 或 withColumn 方法根據需要刪除或替換異常值。

示例

from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev, abs

spark = SparkSession.builder.appName("OutlierHandlingExample").getOrCreate()

# Create a sample data frame
data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0)]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Calculate mean and standard deviation
mean_salary = df.agg(mean(df["Salary"])).first()[0]
stddev_salary = df.agg(stddev(df["Salary"])).first()[0]

# Identify and filter out outliers
df = df.filter(abs(df["Salary"] - mean_salary) < stddev_salary)

# Display the cleaned data frame
print("Cleaned Data Frame:")
df.show()

spark.stop()

輸出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

Cleaned Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.5|
|Joan| 32|  50.0|
+----+---+------+

轉換資料型別

轉換資料型別是指將資料的表示形式從一種資料型別更改為另一種資料型別的過程。在資料處理和分析中,通常會遇到不同格式的資料,這些資料不適合所需的分析。在這種情況下,需要將資料型別轉換為合適的格式才能正確執行分析。

例如,在 DataFrame 中,某列可能具有字串資料型別,但該列中的值是數字。在這種情況下,需要將該列的資料型別更改為整數或浮點數,具體取決於分析的要求。類似地,某列可能具有整數資料型別,但該列中的值是日期字串。在這種情況下,需要將該列的資料型別更改為日期型別。

轉換資料型別是資料清洗和預處理中的一個重要步驟,因為它確保資料以正確的格式進行分析。

示例

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType

spark = SparkSession.builder.appName("DataTypeConversion").getOrCreate()

# Create a sample data frame
data = [("John", "25", "90"), ("Jane", "30", "35"), ("Jim", "20", "200"), ("Joan", "32", "50")]
columns = ["Name", "Age", "Salary"]
df = spark.createDataFrame(data, columns)

# Display the original data frame
print("Original Data Frame:")
df.show()

# Convert the data type of the 'Age' column to integer
df = df.withColumn("Age", df["Age"].cast(IntegerType()))

# Convert the data type of the 'Salary' column to float
df = df.withColumn("Salary", df["Salary"].cast(FloatType()))

# Display the converted data frame
print("Converted Data Frame:")
df.show()

spark.stop()

輸出

Original Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|   90|
|Jane| 30|   35|
| Jim| 20|   200|
|Joan| 32|   50|
+----+---+------+

Converted Data Frame:
+----+---+------+
|Name|Age|Salary|
+----+---+------+
|John| 25|  90.0|
|Jane| 30|  35.0|
| Jim| 20| 200.0|
|Joan| 32|  50.0|
+----+---+------+

結論

在 Apache Spark 中清洗資料是資料準備過程中的一個重要部分。Apache Spark 為我們提供了一個強大且高效的平臺來處理大型資料集,並幫助我們同時執行各種資料清洗任務,例如處理缺失值、重複項等。pyspark.sql.functions 模組為我們提供了大量函式,這些函式與在分散式環境中執行復雜操作的能力相結合,使 Apache Spark 成為資料清洗和準備的完美選擇。透過使用正確的工具和技術,我們可以為分析、機器學習或任何其他型別的資料驅動應用程式準備資料,從而提高結果的準確性。

更新於: 2023年10月4日

728 次檢視

開啟您的 職業生涯

透過完成課程獲得認證

開始學習
廣告