如何將PySpark資料框按行分成兩個資料框?


PySpark 資料框定義為可在不同機器上使用的分散式資料集合,並將結構化資料生成到命名列中。“切片”一詞通常用於表示資料的劃分。在 Python 中,我們有一些內建函式,如 limit()、collect()、exceptAll() 等,可用於將 PySpark 資料框按行分成兩個資料框。

語法

以下語法在示例中使用:

limit()

這是 Python 中的內建方法,可用於透過指定整數值來設定行的範圍。

subtract()

subtract() 方法返回行結果,形成一個新資料框,其中不包含另一個數據框中的資料。

collect()

Pyspark collect 用於檢索給定資料集中的所有元素,它透過迴圈和變數使用。

createDataFrame()

這是 Python 中的內建方法,它採用 schema 引數來定義資料框的模式。

[: before_slicing] [after_slicing :]

以上表示法稱為列表切片,它將用於將資料框分成兩行。

head()

通常,Python 中的 head() 方法表示資料表中的前 5 行,但此處它接受某些引數並根據給定條件返回結果。

exceptAll()

這是 Python 中的內建函式,它遵循 PySpark 模組,返回包含 DataFrame 中的行但不在另一個 DataFrame 中的新 DataFrame,同時保留重複項。

count()

這是 Python 中的內建函式,它將用於返回指定數量的行作為結果。

drop()

drop 方法消除特定行或列。

Window.orderBy()

PySpark 視窗函式透過計算結果(例如行號或排名)來定義。orderBy() 是對資料進行分割槽的唯一方法。

安裝要求

pip install pyspark

此必要的命令用於安裝有助於執行 PySpark 程式的工具。

使用 Limit() 和 Subtract() 方法

limit() 和 subtract 方法用於將單個數據轉換為兩個行資料框。limit() 用於透過為其分配整數值來設定特定數量的行,而 subtract 方法可用於包含另一個 DataFrame 中不存在的唯一行。

示例

在以下示例中,我們將首先匯入 pyspark 和 SparkSession 模組,這將建立資料框的會話。然後在變數 rows 中設定值作為行資料。接下來,在變數 cols 中設定資料的列值。現在使用名為 createDataFrame() 的方法和 SparkSession 模組來設定行和列,這定義了資料框的兩個不同模式,並將其儲存在變數 df_first 中。然後初始化變數 df_second,它將值設定為名為 subtract() 的內建函式,該函式接受名為變數 df_first 的引數,這將導致返回新的資料框。最後,我們對這兩個變數 df_first 和 df_second 使用 show() 方法來獲取結果。

# Import the PySpark module
import pyspark
from pyspark.sql 
import SparkSession
# Create the session
Spark_Session = SparkSession.builder.appName(
   'EMPLOYEE DATA'
).getOrCreate()
# rows of Dataframe
rows = [['1', 'RAHUL', 'INDIA','1243'],
   ['2','PETER', 'SRI LANKA','5461'],
   [ '3',' JOHN', 'SOUTH KOREA','2224'],
   [ '4', 'MARK', 'NEWYORK','9985'],
   [ '5', 'SUNNY', 'BANGLADESH','8912']
]
# Columns of DataFrame
cols = ['S.N', 'EMPLOYEE NAME', 'COUNTRY', 'EMP_ID']
# DataFrame creation for rows and columns
df = Spark_Session.createDataFrame(rows, cols)
# Getting the first two slicing of rows
df_first = df.limit(2)
# Getting the second slicing by removing the variable df1
df_second = df.subtract(df_first)
# first slice with 2 rows with columns names
df_first.show()
# Second slice with 3 rows with columns names
df_second.show()

輸出

+---+-------------+---------+------+
|S.N|EMPLOYEE NAME|  COUNTRY|EMP_ID|
+---+-------------+---------+------+
|  1|        RAHUL|    INDIA|  1243|
|  2|        PETER|SRI LANKA|  5461|
+---+-------------+---------+------+

+---+-------------+-----------+------+
|S.N|EMPLOYEE NAME|    COUNTRY|EMP_ID|
+---+-------------+-----------+------+
|  3|         JOHN|SOUTH KOREA|  2224|
|  5|        SUNNY| BANGLADESH|  8912|
|  4|         MARK|    NEWYORK|  9985|
+---+-------------+-----------+------+

使用 Collect() 和 CreateDataFrame() 方法

collect 方法用於從給定資料中檢索所有元素,而 createDataFrame() 將資料框的兩個模式分開。

請注意,模式由表的結構定義。

示例

在以下示例中,首先使用 SparkSession 建立會話。然後初始化變數 data,它將以列表格式設定行資料。然後使用帶有 spark 的 createDataframe() 方法,該方法接受引數 - data(給定的行)和 ["Name", "Age"](設定列的名稱)。為了獲取行列表,它將使用 collect() 方法作為變數 df 的物件引用,並將其儲存在變數 rows 中。接下來,我們在變數 rows1 和 rows2 中分別使用兩個列表切片,即之前和之後。繼續使用內建方法 createDataframe(),該方法接受兩個引數 - name_of_rows(rows1 和 rows2)和 df.schema(設定資料框的模式),並將其分別儲存在變數 df1 和 df2 中。最後,它將對這兩個變數 df1 和 df2 使用 show 函式來獲取結果。

from pyspark.sql 
import SparkSession
# Create the Spark session
spark = SparkSession.builder.appName("EMPLOYEE DATA").getOrCreate()
# Create the sample dataframe
data = [("Vivek", 31), ("Aman", 20), ("Sohan", 13), ("David", 24)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Getting the list of row objects using the collect function
rows = df.collect()
# Getting two rows of the list by using slicing
rows1 = rows[:2]
rows2 = rows[2:]
# Convert the lists of Rows to PySpark DataFrames
df1 = spark.createDataFrame(rows1, df.schema)
df2 = spark.createDataFrame(rows2, df.schema)
# result
df1.show()
df2.show()

輸出

+-----+---+

| Name|Age|
+-----+---+
|Vivek| 31|
| Aman| 20|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|Sohan| 13|
|David| 24|
+-----+---+

使用 Count()、Filter() 和 Drop() 方法

在此程式中,將資料框分成兩個行資料框需要 count() 和 filter() 方法,它們將劃分特定唯一行。count() 返回總行數,而 filter() 用於劃分 DataFrame 的兩行。然後,Drop() 方法刪除表示資料框劃分的行。

示例

在以下示例中,首先構建 spark 會話,然後在名為 data 的變數中設定行資料。然後使用帶有 spark 的 createDataFrame() 設定列名稱,該方法接受資料框的兩個引數 - data(設定行)和列表(設定列名),並將其儲存在變數 df 中。然後在變數 total_rows 中使用 df.count() 來查詢總行數。接下來,在變數 n_rows_first_df 中定義第一個資料框的行數。然後,我們使用內建方法 row_number()、over() 和 Window.orderBy() 將行號列新增到資料框中。現在使用內建方法 filter() 將資料框分成兩個不同的行,並將其儲存在其各自的變數中。最後,它將對兩個不同的變數使用兩個 show() 方法,以兩個行資料框的形式獲取結果。

from pyspark.sql 
import SparkSession, Window
from pyspark.sql.functions import row_number
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create the original DataFrame
data = [("Rabina", 35), ("Stephen", 31), ("Raman", 33), ("Salman", 44),("Meera",37)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Get the total number of rows
total_rows = df.count()
# Define the number of rows for the first DataFrame
n_rows_first_df = 2
# Add a row number column to the DataFrame
df_with_row_number = df.withColumn("row_number", row_number().over(Window.orderBy("Name")))
# Slice the DataFrame into two using filter()
first_df = df_with_row_number.filter(df_with_row_number.row_number <= n_rows_first_df).drop("row_number")
second_df = df_with_row_number.filter(df_with_row_number.row_number > n_rows_first_df).drop("row_number")
# Show the resulting DataFrames
first_df.show()
second_df.show()

輸出

+------+---+
|  Name|Age|
+------+---+
| Meera| 37|
|Rabina| 35|
+------+---+

+-------+---+
|   Name|Age|
+-------+---+
|  Raman| 33|
| Salman| 44|
|Stephen| 31|
+-------+---+

使用 Head() 和 ExceptAll() 方法

將資料框分成兩個行資料框,它使用 head() 和 exceptAll() 這兩種方法,這些方法將用於分離具有唯一資料行的兩個資料框。

示例

在以下示例中,它使用內建方法 count 獲取總行數。然後它在變數 n_rows_first_df 中分配第一個 DataFrame 的行數。為了建立兩個資料框,它將使用三個不同的內建函式,如 head()、createDataFrame() 和 exceptAll(),並將它們儲存在其各自的變數中。最後,它將使用兩個 show() 函式來獲取兩個行資料框。

from pyspark.sql 
import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create the original DataFrame
data = [("karisma", 25), ("Bobby", 30), ("Champak", 35), ("Mark", 40)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Get the total number of rows
total_rows = df.count()

# Define the number of rows for the first DataFrame
n_rows_first_df = 2

# Slice the DataFrame into two using head() and exceptAll()
first_rows = df.head(n_rows_first_df)
first_df = spark.createDataFrame(first_rows, df.schema)
second_df = df.exceptAll(first_df)

# Show the resulting DataFrames
first_df.show()
second_df.show()

輸出

+-------+---+
|   Name|Age|
+-------+---+
|karisma| 25|
|  Bobby| 30|
+-------+---+

+-------+---+
|   Name|Age|
+-------+---+
|Champak| 35|
|   Mark| 40|
+-------+---+

結論

我們討論了四種獨特的方法來將 PySpark 資料框按行分成兩個資料框。所有這些方法都具有表示資料框劃分的獨特方式。PySpark 資料框是高階互動式資料,可供資料工程師和資料科學家使用。Spark 和 ML 的 Python API 是視覺化 PySpark 資料框的常見示例。

更新於:2023年7月17日

693 次瀏覽

啟動您的職業生涯

完成課程獲得認證

開始
廣告
© . All rights reserved.