如何在 PySpark 中選擇 DataFrame 中的一段行?
PySpark 中的 DataFrame 由一個共享的資料集合定義,該集合可用於在計算機上執行並以行和列格式對資料進行結構化。行範圍定義了資料集中的一條水平線(根據條件設定多個值)。通常,範圍設定最低值和最高值。在 Python 中,我們有一些內建函式,如 filter()、where() 和 collect(),用於從 PySpark 中的 DataFrame 中選擇一段行。
語法
以下語法在示例中使用:
createDataFrame()
這是 Python 中的一個內建方法,它接受 schema 引數來定義 DataFrame 的模式。
filter()
filter() 是 Python 中的一個內建函式,它定義並允許根據給定條件迭代特定的行或列。
where()
這是 Python 中的一個內建函式,用於根據行或列設定特定條件,並由特定元素返回。
collect()
Pyspark collect 用於訪問給定資料集中的所有元素,並在迴圈中使用。
show()
show() 是 Python 中的內建方法,在程式末尾使用以獲取結果。
安裝需求:
pip install pyspark
使用 CreateDataframe()
createDataframe 遵循 PySpark 模組,它接受兩個引數 - data_name(設定每行中每列的值)和 data_Columns(透過定義所有列來設定值)。
示例
在以下示例中,我們將透過構建 SparkSession.builder 並使用 appName() 設定資料庫名稱來啟動程式,並使用 getOrCreate() 函式建立會話並將其儲存在變數 spark 中。然後在變數 customer_Data 中設定客戶資料庫的資料值。接下來,在變數 data_Columns 中設定所有值。現在使用 createDataFrame() 方法建立 DataFrame,該方法接受兩個引數 - customer_Data 和 data_Columns,它們將以表格形式對格式進行結構化並連線到 spark,並將其儲存在變數 customer_DF 中。表格形式顯示行和列中的值。最後,我們使用名為 show() 的方法以及 customer_DF 來獲取每列的整體資料。
# Creation of SparkSession spark = SparkSession.builder \ .appName('CUSTOMER') \ .getOrCreate() # customer DataFrame customer_Data = [("PREM KUMAR", 1281, "AC", 40000,4000), ("RATAN SINGH", 1289, "HOME THEATER", 35000, 5000), ("DAVID K", 1221, "NIKON CAMERA", 88000, 10000), ("JONATHAN REDDY", 1743, "GEYSER", 15000, 500), ("JASPREET BRAR", 1234, "HP LAPTOP", 78000, 3564), ("NEIL KAMANT", 1222, "WASHING MACHINE", 25000, 2000) ] data_Columns = ["CUSTOMER NAME","PRODUCT ID","PRODUCT NAME", "Actual Price","EMI PER MONTH"] customer_DF = spark.createDataFrame(customer_Data, data_Columns) customer_DF.show()
輸出
+--------------+----------+---------------+------------+-------------+ | CUSTOMER NAME|PRODUCT ID| PRODUCT NAME|Actual Price|EMI PER MONTH| +--------------+----------+---------------+------------+-------------+ | PREM KUMAR| 1281| AC| 40000| 4000| | RATAN SINGH| 1289| HOME THEATER| 35000| 5000| | DAVID K| 1221| NIKON CAMERA| 88000| 10000| |JONATHAN REDDY| 1743| GEYSER| 15000| 500| | JASPREET BRAR| 1234| HP LAPTOP| 78000| 3564| | NEIL KAMANT| 1222|WASHING MACHINE| 25000| 2000| +--------------+----------+---------------+------------+-------------+
使用 Filter 方法
此方法遵循上述程式,使用它允許我們設定從 DataFrame 中選擇行範圍的條件。
示例
在以下示例中,我們遵循前面的示例程式碼,並且它允許我們使用 DataFrame(即 DF)設定兩個條件,該條件接受引數值作為 Actual Price,它設定價格在 25000 到 40000 之間的行範圍條件,並且它將從行中找到特定範圍。最後,我們使用 show() 方法獲取結果。
DF.filter((DF['Actual Price'] >= 25000) & (DF['Actual Price'] <= 40000)).show()
輸出
+-------------+----------+---------------+------------+-------------+ |CUSTOMER NAME|PRODUCT ID| PRODUCT NAME|Actual Price|EMI PER MONTH| +-------------+----------+---------------+------------+-------------+ | PREM KUMAR| 1281| AC| 40000| 4000| | RATAN SINGH| 1289| HOME THEATER| 35000| 5000| | NEIL KAMANT| 1222|WASHING MACHINE| 25000| 2000| +-------------+----------+---------------+------------+-------------+
使用 Where() 方法
此方法遵循本文中的第一個示例,它使用 where() 方法來設定 PySpark 中 DataFrame 中的行範圍。
示例
在以下示例中,我們將使用內建的 where() 方法,該方法使用 and(&) 運算子接受兩個條件以獲取行範圍。接下來,where() 方法與 show() 方法一起使用以獲取結果。
DF.where((DF['EMI PER MONTH'] >= 10000) & (DF['EMI PER MONTH'] <= 38000)).show()
輸出
+-------------+----------+------------+------------+-------------+ |CUSTOMER NAME|PRODUCT ID|PRODUCT NAME|Actual Price|EMI PER MONTH| +-------------+----------+------------+------------+-------------+ | DAVID K| 1221|NIKON CAMERA| 88000| 10000| +-------------+----------+------------+------------+-------------+
使用 Collect() 方法
此方法遵循第一個示例,允許它迭代到特定列以從 PySpark 中的 DataFrame 中獲取行範圍。
示例
在以下示例中,我們將使用 for 迴圈,其中變數 row 透過帶有 collect() 方法的 DataFrame 進行迭代,它將迭代給定 DataFrame 中的所有元素。現在它使用 if 語句設定條件,如果 Actual price 在 30000 到 50000 之間,則返回特定範圍的行。接下來,它使用接受 row 作為引數的 print() 方法返回結果。
for row in DF.collect(): if 30000 <= row['Actual Price'] <= 50000: print(row)
輸出
Row(CUSTOMER NAME='PREM KUMAR', PRODUCT ID=1281, PRODUCT NAME='AC', Actual Price=40000, EMI PER MONTH=4000) Row(CUSTOMER NAME='RATAN SINGH', PRODUCT ID=1289, PRODUCT NAME='HOME THEATER', Actual Price=35000, EMI PER MONTH=5000)
結論
我們討論了從 PySpark 中的 DataFrame 中獲取行範圍的各種方法。所有方法都遵循第一個示例,因為該示例設定了行和列資料的完整輸入結構。PySpark 已被亞馬遜、沃爾瑪、途易等頂級跨國公司使用,以及更多。