PySpark DataFrame中的全外部連線


全外部連線是一種操作,它結合了左外部連線和右外部連線的結果。在 PySpark 中,它用於基於特定條件連線兩個 DataFrame,其中無論是否匹配,兩個 DataFrame 的所有記錄都包含在輸出中。本文將詳細解釋如何在 PySpark 中執行全外部連線,並提供一個實際示例來說明其實現。

安裝和設定

在 PySpark 中執行全外部連線之前,我們需要設定一個工作環境。首先,我們需要透過在終端執行命令 **“pip install pyspark”** 來安裝 PySpark。其次,我們需要透過執行以下命令匯入必要的模組:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

語法

在 PySpark 中執行全外部連線的語法如下:

df_full = df1.join(df2, (df1.column_name == df2.column_name), 'full')

演算法

  • 首先,我們匯入必要的模組,即 **SparkSession** 和 **col**。

  • 我們使用 **builder()** 方法建立一個 SparkSession 物件,並指定應用程式名稱和主節點 URL。

  • 我們從 CSV 檔案讀取資料,並使用 **read.csv()** 方法將它們轉換為 DataFrame。在本例中,我們使用的是虛擬 DataFrame。

  • 我們使用 join() 方法執行全外部連線操作,並將條件作為引數傳遞。

  • 我們使用 **show()** 方法顯示生成的 DataFrame。

示例

讓我們考慮兩個 DataFrame,“sales_df” 和 “customer_df”。“sales_df” 包含公司銷售資訊,“customer_df” 包含客戶購買資訊。我們希望根據 “customer_id” 列連線這兩個 DataFrame,並獲取這兩個 DataFrame 的所有記錄。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession object
spark = SparkSession.builder.appName("Full Outer Join").getOrCreate()

# Create sample dataframes
data_sales = [("S1", "Product1", 100), 
              ("S2", "Product2", 200), 
              ("S3", "Product3", 300),
              ("S4", "Product4", 400),
              ("S5", "Product5", 500),
              ("S6", "Product6", 600),
              ("S7", "Product7", 700),
              ("S8", "Product8", 800),
              ("S9", "Product9", 900),
              ("S10", "Product10", 1000)]
df_sales = spark.createDataFrame(data_sales, ["sale_id", "product", "amount"])

data_customers = [("C1", "John"), 
                  ("C2", "Jane"), 
                  ("C3", "Mike"), 
                  ("C4", "Emily"), 
                  ("C5", "Bob"), 
                  ("C6", "Alice"),
                  ("C7", "Dave"), 
                  ("C8", "Jenny"), 
                  ("C9", "Peter"), 
                  ("C10", "Sarah")]
df_customers = spark.createDataFrame(data_customers, ["customer_id", "name"])

# Perform the full outer join operation
df_full = df_sales.join(df_customers, (df_sales.sale_id == df_customers.customer_id), 'full')

# Display the resultant dataframe
df_full.show()

輸出

sale_id	product	amount	customer_id	name
S1	      Product1	 100	       C1	   John
S2	      Product2	 200	       C2	   Jane
S3	      Product3	 300	       C3	   Mike
S4	      Product4	 400	       C4	   Emily
S5	      Product5	 500	       C5	   Bob
S6	      Product6	 600	       C6	   Alice
S7	      Product7	 700	       C7	   Dave
S8	      Product8	 800	       C8	   Jenny
S9	      Product9	 900	       C9	   Peter
S10	      Product10 1000	       C10	   Sarah

這段程式碼建立了兩個 DataFrame,df_sales 和 df_customers,每個 DataFrame 包含 10 組示例資料。df_sales DataFrame 包含三個變數:銷售 ID、商品和金額。df_customers DataFrame 包含兩個變數:客戶 ID 和姓名。然後,使用 join() 方法和 full join 型別在兩個 DataFrame 之間執行全外部連線操作。連線必須滿足 df_customers 中的 customer_id 列和 df_sales 中的 sales_id 列匹配。

然後,指令碼使用 show() 方法顯示最終的 DataFrame。這樣,組合後的 DataFrame df_full 中就會顯示來自兩個 DataFrame 的列。如果一個 DataFrame 中的條目在另一個 DataFrame 中沒有對應的記錄,則缺失值將被替換為 null。

應用

在處理可能包含缺失資料或空值的大型資料庫時,全外部連線是一個有用的操作。它可以應用於各種場景,包括資料清理、組合來自不同來源的資料以及分析來自不同領域的的資料。

結論

基於預定義條件,可以使用稱為全外部連線的強大操作來組合來自兩個 DataFrame 的資料。透過將條件作為引數傳遞給 PySpark 中的 join() 方法,我們可以執行全外部連線。按照本文提供的說明,並利用其在資料分析和處理任務中的優勢,您可以輕鬆地在 PySpark 中執行全外部連線。

更新於:2023年5月8日

2K+ 次瀏覽

啟動您的 職業生涯

透過完成課程獲得認證

開始
廣告
© . All rights reserved.