如何在PySpark中按值排序?
PySpark是一個分散式資料處理引擎,用於編寫API程式碼。PySpark是Apache Spark和Python的結合。Spark是一個大規模資料處理平臺,能夠處理PB級資料。在Python中,PySpark內建函式如orderBy()、sort()、sortBy()、createDataFrame()、collect()和asc_nulls_last()可用於對值進行排序。
語法
以下語法用於示例:
createDataFrame()
這是Python中的一個內建函式,它表示從PySpark模組建立DataFrame的另一種方法。
orderBy()
這是Python中的內建方法,它遵循PySpark模組來表示一列或多列的升序或降序。
sort()
此方法用於列出升序。如果我們在sort函式上設定任何條件,則允許我們以降序工作。
sortBy()
Python中的sortBy()方法遵循sparkContext,可用於按有序序列對資料進行排序。
parallelize()
此方法包含在sparkContext中,允許資料分佈在多個節點上。
collect()
這通常被稱為PySpark RDD或DataFrame collect,可用於訪問資料集中的所有專案或數字。
asc_nulls_last("column_name")
這是Python中的一個內建函式,它返回升序序列。
所需安裝:
pip install pyspark
此命令有助於基於PySpark執行程式。
示例1
在以下示例中,我們將展示**如何按單個列對DataFrame進行排序**。首先,開始匯入名為pyspark.sql.SparkSession的模組。然後建立一個名為SparkSession的物件。然後將元組列表儲存在變數spark中。接下來,使用spark.createDataFrame()建立一個DataFrame,並提供資料和列名。接下來,在DataFrame上使用orderBy()函式按所需的列(在本例中為“Age”)進行排序。最後,使用show方法使用排序後的DataFrame來獲得最終結果。
from pyspark.sql
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame
stu_data = [("Akash", 25), ("Bhuvan", 23), ("Peter", 18), ("Mohan", 26)]
df = spark.createDataFrame(stu_data, ["Name", "Age"])
# Sorting of Dataframe column(Age) in ascending order
sorted_df = df.orderBy("Age")
# sorted DataFrame
sorted_df.show()
輸出
+------+---+ | Name|Age| +------+---+ | Peter| 18| |Bhuvan| 23| | Akash| 25| | Mohan| 26| +------+---+
示例2
在以下示例中,我們將展示如何按多列對DataFrame進行排序。這裡它使用orderBy方法,該方法接受兩個引數- list(設定列名)和ascending(將值設定為true,允許排序序列),並將其儲存在變數sorted_df中。然後使用show()方法和sorted_df獲得結果。
from pyspark.sql
import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a DataFrame
data = [("Umbrella", 125), ("Bottle", 20), ("Colgate", 118)]
df = spark.createDataFrame(data, ["Product", "Price"])
# Sort DataFrame by product and price in ascending order
sorted_df = df.orderBy(["Price", "Product"], ascending=[True, True])
# Show the sorted DataFrame
sorted_df.show()
輸出
+--------+-----+ | Product|Price| +--------+-----+ | Bottle| 20| | Colgate| 118| |Umbrella| 125| +--------+-----+
示例3
在以下示例中,我們將展示如何以降序對DataFrame進行排序。這裡它使用內建函式createDataframe手動將DataFrame設定為二維結構。然後初始化名為sorted_df的變數,該變數使用名為sort()的方法儲存值[sort函式的引數是內建函式desc(“Age”),它將遵循降序序列]。最後,我們使用sorted_df.show()列印結果。
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import desc
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame
Emp_data = [("Abhinav", 25, "Male"), ("Meera", 32, "Female"), ("Riya", 18, "Female"), ("Deepak", 33, "Male"), ("Elon", 50, "Male")]
df = spark.createDataFrame(Emp_data, ["Name", "Age", "Gender"])
# Sort DataFrame by Age in descending order
sorted_df = df.sort(desc("Age"))
# Show the sorted DataFrame
sorted_df.show()
輸出
+-------+---+------+ | Name|Age|Gender| +-------+---+------+ | Elon| 50| Male| | Deepak| 33| Male| | Meera| 32|Female| |Abhinav| 25| Male| | Riya| 18|Female| +-------+---+------+
示例4
在以下示例中,**我們將展示如何按值對RDD進行排序**。這裡我們從元組列表建立一個RDD。然後在RDD上使用sortBy()函式,並提供一個lambda函式,該函式提取要按每個元組的第二個元素排序的值。接下來,收集並迭代排序後的RDD以訪問排序後的記錄。
# Sorting RDD by Value
from pyspark.sql
import SparkSession
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of RDD
data = [("X", 25), ("Y", 32), ("Z", 18)]
rdd = spark.sparkContext.parallelize(data)
# Sort RDD by value in ascending order
sorted_rdd = rdd.sortBy(lambda x: x[1])
# Print the sorted RDD
for record in sorted_rdd.collect():
print(record)
輸出
('Z', 18)
('X', 25)
('Y', 32)
示例5
在以下示例中,我們將展示**如何對包含空值的DataFrame進行排序**。這裡,它使用DataFrame上的sort()函式按“Price”列升序排序,其中包含空值,即none。接下來,它將asc_nulls_last(“Price”)作為引數傳遞給sort(),然後指定升序到降序的排序順序。然後將排序後的DataFrame分配給變數sorted_df,然後使用show()方法和相同的變數來獲得結果。
# Sorting DataFrame with Null Values
from pyspark.sql
import SparkSession
from pyspark.sql.functions
import asc_nulls_last
# Creation of SparkSession
spark = SparkSession.builder.getOrCreate()
# Creation of DataFrame with null values
data = [("Charger", None), ("Mouse", 320), ("PEN", 18), ("Bag", 1000), ("Notebook", None)] # None = null
df = spark.createDataFrame(data, ["Product", "Price"])
# Sorting of DataFrame column(Price) in ascending order with null values last
sorted_df = df.sort(asc_nulls_last("Price"))
# Show the sorted DataFrame
sorted_df.show()
輸出
+--------+-----+ | Product|Price| +--------+-----+ | PEN| 18| | Mouse| 320| | Bag| 1000| | Charger| null| |Notebook| null| +--------+-----+
結論
我們討論了在PuSpark中對值進行排序的不同方法。我們使用了一些內建函式,如orderBy()、sort()、asc_nulls_last()等。排序的目的是獲得升序或降序的序列順序。PySpark的各種應用,例如即時庫、大規模資料處理和構建API。
資料結構
網路
關係資料庫管理系統 (RDBMS)
作業系統
Java
iOS
HTML
CSS
Android
Python
C語言程式設計
C++
C#
MongoDB
MySQL
Javascript
PHP