在 PySpark DataFrame 中刪除重複行


PySpark 是由Apache Spark社群設計的一個工具,用於即時處理資料並在本地 Python 環境中分析結果。Spark DataFrame與其他 DataFrame不同,因為它會分發資訊並遵循模式。

Spark 可以處理流處理和批處理,這就是它們受歡迎的原因。PySpark DataFrame需要一個會話才能生成入口點,它在系統上執行資料處理(RAM)。您可以使用以下命令在 Windows 上安裝 PySpark 模組 -

pip install pyspark

在本文中,我們將建立一個 PySpark DataFrame 並討論從該 DataFrame 中刪除重複行的不同方法。讓我們瞭解 PySpark DataFrame 的概念。

建立和理解 PySpark DataFrame

與任何其他 DataFrame 一樣,PySpark 以表格方式儲存資料。它允許程式設計師處理結構化和半結構化資料,並提供高階 API(Python、Java)來處理複雜的資料集。它可以非常快速地分析資料,因此它在流處理和批處理中都非常有用。

既然我們已經討論了 PySpark DataFrame 的基礎知識,讓我們使用 Python 程式碼建立一個。我們將建立一個包含與不同賽車手相關的資訊的 PySpark DataFrame。

示例

  • 我們匯入了必要的庫,包括“pandas”和“pyspark”。我們還匯入了一個名為“SparkSession”的統一介面。

  • 此介面確保 Spark 框架正常執行。它充當 Spark API 的“入口點”,從而提高資料處理效率。簡而言之,我們建立 SparkSession 以設定所需的配置。

  • 在此之後,我們使用“builder”API 建立了此 SparkSession 的例項。我們還使用了“getorCreate()”方法來鎖定現有會話或將其替換為新的會話。

  • 完成配置部分後,我們準備了一個包含不同汽車特徵的資料集字典。我們使用此資料集生成了一個 Pandas DataFrame。

  • 生成的 5X3 DataFrame 儲存在“dataframe_pd”變數中。此變數作為“SparkSession”的“createDataFrame()”方法的引數傳遞,以建立 PySpark DataFrame。

  • 我們使用 Pandas DataFrame 生成了 PySpark DataFrame,但這並非強制步驟。我們可以直接使用元組列表來建立資料集,然後將其傳遞給“createDataFrame()”方法。

  • 最後,我們使用“dataframe_spk.show()”方法顯示了 DataFrame。

示例

import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.getOrCreate()
dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset, index= ["Racer1", "Racer2", "Racer3", "Racer1", "Racer1"])
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()

輸出

The original data frame is like: -
    Carname Max Speed Car number
      Audi   300 KPH      MS321
  Mercedes   250 KPH      QR345
       BMW   220 KPH      WX281
      Audi   300 KPH      MS321
      Audi   300 KPH      MS321

既然我們建立了一個 PySpark DataFrame,讓我們討論從該 DataFrame 中刪除行的不同方法。

使用 Distinct() 函式刪除行

此函式返回一個包含不同或唯一行的新的 DataFrame。它消除了 DataFrame 中的所有重複行。

示例

我們不為該函式傳遞任何引數。讓我們看看它的實現。

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate()

dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset)
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()
print("After dropping the duplicate rows we get: -")
dataframe_spk.distinct().show()

輸出

The original data frame is like: -
    Carname Max Speed Car number
      Audi   300 KPH      MS321
  Mercedes   250 KPH      QR345
       BMW   220 KPH      WX281
      Audi   300 KPH      MS321
      Audi   300 KPH      MS321
After dropping the duplicate rows we get: -

    Carname Max Speed Car number
  Mercedes  250 KPH      QR345
      BMW   220 KPH      WX281
      Audi  300 KPH      MS321

建立 PySpark DataFrame 後,我們使用distinct()函式來定位唯一行並將其從 DataFrame 中刪除。

使用 dropDuplicate() 函式

這是一種替代方法,其工作方式與distinct()函式相同。我們可以定位列並相應地刪除行。讓我們構建程式碼。

示例

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate()

dataset = {"Carname":["Audi", "Mercedes", "BMW", "Audi", "Audi"], "Max Speed": ["300 KPH", "250 KPH", "220 KPH", "300 KPH", "300 KPH"], "Car number":["MS321", "QR345", "WX281", "MS321", "MS321"]}
dataframe_pd = pd.DataFrame(dataset)
dataframe_spk = spark.createDataFrame(dataframe_pd)
print("The original data frame is like: -")
dataframe_spk.show()

print("After dropping the duplicate rows we get: -")
dataframe_spk.dropDuplicates().show()

輸出

The original data frame is like: -
+--------+---------+----------+
| Carname|Max Speed|Car number|
+--------+---------+----------+
|    Audi|  300 KPH|     MS321|
|Mercedes|  250 KPH|     QR345|
|     BMW|  220 KPH|     WX281|
|    Audi|  300 KPH|     MS321|
|    Audi|  300 KPH|     MS321|
+--------+---------+----------+
After dropping the duplicate rows we get: -
+--------+---------+----------+
| Carname|Max Speed|Car number|
+--------+---------+----------+
|Mercedes|  250 KPH|     QR345|
|    Audi|  300 KPH|     MS321|
|     BMW|  220 KPH|     WX281|
+--------+---------+----------+

定位特定列

我們可以藉助“select()”函式檢查特定列的重複值。我們將對選定的列使用 dropDuplicate() 函式。

示例

dataframe_spk.select(["Carname"]).dropDuplicates().show()

輸出

+--------+
| Carname|
+--------+
|Mercedes|
|     BMW|
|    Audi|
+--------+ 

結論

本文解釋了刪除“列”值包含任何重複或重複資料的行的基本操作。我們討論了涉及的不同函式,包括“dropDuplicate()”、“Distinct()”和“select()”。我們建立了一個參考 DataFrame 並從中刪除了重複值。

更新於: 2023年5月5日

398 次檢視

開啟你的 職業生涯

透過完成課程獲得認證

開始
廣告

© . All rights reserved.