如何在PySpark中按值排序?


PySpark是一個分散式資料處理引擎,用於編寫API程式碼。PySpark是Apache Spark和Python的結合。Spark是一個大規模資料處理平臺,能夠處理PB級資料。在Python中,PySpark內建函式如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last()可用於對值進行排序。

語法

以下語法用於示例:

createDataFrame()

這是Python中的一個內建函式,它表示從PySpark模組建立DataFrame的另一種方法。

orderBy()

這是Python中的內建方法,它遵循PySpark模組來表示一列或多列的升序或降序。

sort()

此方法用於列出升序。如果我們在sort函式上設定任何條件,則允許我們以降序工作。

sortBy()

Python中的sortBy()方法遵循sparkContext,可用於按有序序列對資料進行排序。

parallelize() 

此方法包含在sparkContext中,允許資料分佈在多個節點上。

collect()

這通常被稱為PySpark RDD或DataFrame collect,可用於訪問資料集中的所有專案或數字。

asc_nulls_last("column_name")

這是Python中的一個內建函式,它返回升序序列。

所需安裝:

pip install pyspark

此命令有助於基於PySpark執行程式。

示例1

在以下示例中,我們將展示**如何按單個列對DataFrame進行排序**。首先,開始匯入名為pyspark.sql.SparkSession的模組。然後建立一個名為SparkSession的物件。然後將元組列表儲存在變數spark中。接下來,使用spark.createDataFrame()建立一個DataFrame,並提供資料和列名。接下來,在DataFrame上使用orderBy()函式按所需的列(在本例中為“Age”)進行排序。最後,使用show方法使用排序後的DataFrame來獲得最終結果。

from pyspark.sql 
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)]
df = spark.createDataFrame(stu_data, ["Name", "Age"])

# Sorting of Dataframe column(Age) in ascending order
sorted_df = df.orderBy("Age")

# sorted DataFrame
sorted_df.show()

輸出

+------+---+
|  Name|Age|
+------+---+
| Peter| 18|
|Bhuvan| 23|
| Akash| 25|
| Mohan| 26|
+------+---+

示例2

在以下示例中,我們將展示如何按多列對DataFrame進行排序。這裡它使用orderBy方法,該方法接受兩個引數- list(設定列名)和ascending(將值設定為true,允許排序序列),並將其儲存在變數sorted_df中。然後使用show()方法和sorted_df獲得結果。

from pyspark.sql 
import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)]
df = spark.createDataFrame(data, ["Product", "Price"])

# Sort DataFrame by product and price in ascending order
sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True])

# Show the sorted DataFrame
sorted_df.show()

輸出

+--------+-----+
| Product|Price|
+--------+-----+
|  Bottle|   20|
| Colgate|  118|
|Umbrella|  125|
+--------+-----+

示例3

在以下示例中,我們將展示如何以降序對DataFrame進行排序。這裡它使用內建函式createDataframe手動將DataFrame設定為二維結構。然後初始化名為sorted_df的變數,該變數使用名為sort()的方法儲存值[sort函式的引數是內建函式desc(“Age”),它將遵循降序序列]。最後,我們使用sorted_df.show()列印結果。

from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import desc
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame
Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")]
df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"])

# Sort DataFrame by Age in descending order
sorted_df = df.sort(desc("Age"))

# Show the sorted DataFrame
sorted_df.show()

輸出

+-------+---+------+
|   Name|Age|Gender|
+-------+---+------+
|   Elon| 50|  Male|
| Deepak| 33|  Male|
|  Meera| 32|Female|
|Abhinav| 25|  Male|
|   Riya| 18|Female|
+-------+---+------+

示例4

在以下示例中,**我們將展示如何按值對RDD進行排序**。這裡我們從元組列表建立一個RDD。然後在RDD上使用sortBy()函式,並提供一個lambda函式,該函式提取要按每個元組的第二個元素排序的值。接下來,收集並迭代排序後的RDD以訪問排序後的記錄。

# Sorting RDD by Value
from pyspark.sql 
import SparkSession

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of RDD
data = [("X", 25), ("Y", 32), ("Z", 18)]
rdd = spark.sparkContext.parallelize(data)

# Sort RDD by value in ascending order
sorted_rdd = rdd.sortBy(lambda x: x[1])

# Print the sorted RDD
for record in sorted_rdd.collect():
   print(record)

輸出

('Z', 18)
('X', 25)
('Y', 32)

示例5

在以下示例中,我們將展示**如何對包含空值的DataFrame進行排序**。這裡,它使用DataFrame上的sort()函式按“Price”列升序排序,其中包含空值,即none。接下來,它將asc_nulls_last(“Price”)作為引數傳遞給sort(),然後指定升序到降序的排序順序。然後將排序後的DataFrame分配給變數sorted_df,然後使用show()方法和相同的變數來獲得結果。

# Sorting DataFrame with Null Values
from pyspark.sql 
import SparkSession
from pyspark.sql.functions 
import asc_nulls_last

# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()

# Creation of DataFrame with null values
data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null
df = spark.createDataFrame(data, ["Product", "Price"])

# Sorting of DataFrame column(Price) in ascending order with null values last
sorted_df = df.sort(asc_nulls_last("Price"))

# Show the sorted DataFrame
sorted_df.show()

輸出

+--------+-----+
| Product|Price|
+--------+-----+
|     PEN|   18|
|   Mouse|  320|
|     Bag| 1000|
| Charger| null|
|Notebook| null|
+--------+-----+

結論

我們討論了在PuSpark中對值進行排序的不同方法。我們使用了一些內建函式,如orderBy()、sort()、asc_nulls_last()等。排序的目的是獲得升序或降序的序列順序。PySpark的各種應用,例如即時庫、大規模資料處理和構建API。

更新於:2023年7月17日

631 次瀏覽

開啟你的職業生涯

透過完成課程獲得認證

開始學習
廣告
© . All rights reserved.