PySpark DataFrame中的全外部連線
全外部連線是一種操作,它結合了左外部連線和右外部連線的結果。在 PySpark 中,它用於基於特定條件連線兩個 DataFrame,其中無論是否匹配,兩個 DataFrame 的所有記錄都包含在輸出中。本文將詳細解釋如何在 PySpark 中執行全外部連線,並提供一個實際示例來說明其實現。
安裝和設定
在 PySpark 中執行全外部連線之前,我們需要設定一個工作環境。首先,我們需要透過在終端執行命令 **“pip install pyspark”** 來安裝 PySpark。其次,我們需要透過執行以下命令匯入必要的模組:
from pyspark.sql import SparkSession from pyspark.sql.functions import col
語法
在 PySpark 中執行全外部連線的語法如下:
df_full = df1.join(df2, (df1.column_name == df2.column_name), 'full')
演算法
首先,我們匯入必要的模組,即 **SparkSession** 和 **col**。
我們使用 **builder()** 方法建立一個 SparkSession 物件,並指定應用程式名稱和主節點 URL。
我們從 CSV 檔案讀取資料,並使用 **read.csv()** 方法將它們轉換為 DataFrame。在本例中,我們使用的是虛擬 DataFrame。
我們使用 join() 方法執行全外部連線操作,並將條件作為引數傳遞。
我們使用 **show()** 方法顯示生成的 DataFrame。
示例
讓我們考慮兩個 DataFrame,“sales_df” 和 “customer_df”。“sales_df” 包含公司銷售資訊,“customer_df” 包含客戶購買資訊。我們希望根據 “customer_id” 列連線這兩個 DataFrame,並獲取這兩個 DataFrame 的所有記錄。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a SparkSession object
spark = SparkSession.builder.appName("Full Outer Join").getOrCreate()
# Create sample dataframes
data_sales = [("S1", "Product1", 100),
("S2", "Product2", 200),
("S3", "Product3", 300),
("S4", "Product4", 400),
("S5", "Product5", 500),
("S6", "Product6", 600),
("S7", "Product7", 700),
("S8", "Product8", 800),
("S9", "Product9", 900),
("S10", "Product10", 1000)]
df_sales = spark.createDataFrame(data_sales, ["sale_id", "product", "amount"])
data_customers = [("C1", "John"),
("C2", "Jane"),
("C3", "Mike"),
("C4", "Emily"),
("C5", "Bob"),
("C6", "Alice"),
("C7", "Dave"),
("C8", "Jenny"),
("C9", "Peter"),
("C10", "Sarah")]
df_customers = spark.createDataFrame(data_customers, ["customer_id", "name"])
# Perform the full outer join operation
df_full = df_sales.join(df_customers, (df_sales.sale_id == df_customers.customer_id), 'full')
# Display the resultant dataframe
df_full.show()
輸出
sale_id product amount customer_id name S1 Product1 100 C1 John S2 Product2 200 C2 Jane S3 Product3 300 C3 Mike S4 Product4 400 C4 Emily S5 Product5 500 C5 Bob S6 Product6 600 C6 Alice S7 Product7 700 C7 Dave S8 Product8 800 C8 Jenny S9 Product9 900 C9 Peter S10 Product10 1000 C10 Sarah
這段程式碼建立了兩個 DataFrame,df_sales 和 df_customers,每個 DataFrame 包含 10 組示例資料。df_sales DataFrame 包含三個變數:銷售 ID、商品和金額。df_customers DataFrame 包含兩個變數:客戶 ID 和姓名。然後,使用 join() 方法和 full join 型別在兩個 DataFrame 之間執行全外部連線操作。連線必須滿足 df_customers 中的 customer_id 列和 df_sales 中的 sales_id 列匹配。
然後,指令碼使用 show() 方法顯示最終的 DataFrame。這樣,組合後的 DataFrame df_full 中就會顯示來自兩個 DataFrame 的列。如果一個 DataFrame 中的條目在另一個 DataFrame 中沒有對應的記錄,則缺失值將被替換為 null。
應用
在處理可能包含缺失資料或空值的大型資料庫時,全外部連線是一個有用的操作。它可以應用於各種場景,包括資料清理、組合來自不同來源的資料以及分析來自不同領域的的資料。
結論
基於預定義條件,可以使用稱為全外部連線的強大操作來組合來自兩個 DataFrame 的資料。透過將條件作為引數傳遞給 PySpark 中的 join() 方法,我們可以執行全外部連線。按照本文提供的說明,並利用其在資料分析和處理任務中的優勢,您可以輕鬆地在 PySpark 中執行全外部連線。
資料結構
網路
關係資料庫管理系統 (RDBMS)
作業系統
Java
iOS
HTML
CSS
Android
Python
C語言程式設計
C++
C#
MongoDB
MySQL
Javascript
PHP