從 PySpark 資料框中獲取特定行
PySpark 是一個強大的資料處理和分析工具。在使用 PySpark DataFrame 處理資料時,有時需要從資料框中獲取特定行。它幫助使用者以分散式和並行的方式輕鬆操作和訪問資料,使其成為大資料應用的理想選擇。在本文中,我們將探討如何使用 PySpark 中的各種方法從 PySpark 資料框中獲取特定行。我們將介紹使用 PySpark 的 DataFrame API 進行函數語言程式設計的方法。
在繼續之前,讓我們建立一個示例資料框,從中獲取行。
from colorama import Fore
from pyspark.sql import SparkSession
# Building a SparkSession named "column_sum"
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Creating the Spark DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3),
('Row2', 4, 5, 6),
('Row3', 7, 8, 9)],
['__', 'Col1', 'Col2', 'Col3'])
# Printing the schema of the DataFrame
df.printSchema()
# Showing the DataFrame
df.show()
輸出
此 Python 指令碼將首先列印我們建立的資料框的模式,然後列印資料框本身。
root |-- __: string (nullable = true) |-- Col1: long (nullable = true) |-- Col2: long (nullable = true) |-- Col3: long (nullable = true) +----+----+----+----+ | __|Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| |Row3| 7| 8| 9| +----+----+----+----+
下面提到了可以用來完成任務的方法
方法
使用 collect()
使用 first()
使用 show()
使用 head()
使用 tail()
使用 select() 和 collect()
使用 filter() 和 collect()
使用 where() 和 collect()
使用 take()
現在讓我們討論每種方法以及如何使用它們來新增列。
方法 1:使用 collect()
在 PySpark 中,collect() 方法可用於從 PySpark DataFrame 中檢索所有資料並將其作為列表返回。此函式通常用於檢視或操作資料框中的資料。以下是使用的語法
dataframe.collect()[index]
這裡
dataframe 是我們應用此方法的資料框
Index 是我們要獲取的行。
將資料框以列表的形式獲取後,我們可以將表示所需行的索引傳遞給列表。
演算法
首先,使用上面的程式碼建立一個數據框。
使用 collect() 函式從 DataFrame 中檢索所需的行,並將每一行儲存在單獨的變數中。
將包含所需行的變數的值列印到控制檯。
示例
# Retrieving the first row of the DataFrame using collect() function Row1 = df.collect()[0] print(Row1) # Retrieving the last row of the DataFrame using collect() function Row2 = df.collect()[-1] print(Row2) # Retrieving the second row of the DataFrame using collect() function Row3 = df.collect()[1] print(Row3)
輸出
Row(__='Row1', Col1=1, Col2=2, Col3=3) Row(__='Row3', Col1=7, Col2=8, Col3=9) Row(__='Row2', Col1=4, Col2=5, Col3=6)
方法 2:使用 first()
PySpark 中的 first() 函式返回資料框或 RDD 的第一個元素。我們可以使用此函式從資料框中提取特定行。此函式通常用於檢視資料框中的資料。以下是使用的語法
dataframe.first()
這裡
dataframe 是我們應用此方法的資料框
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
使用 first() 函式檢索 DataFrame 的第一行
將第一行列印到控制檯
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the first row
Row1 = df.first()
print(Row1)
輸出
Row(Row1, 1, 2, 3)
方法 3:使用 show()
在 PySpark 中,show() 函式用於顯示 Python 資料框中前 n 行。此函式的返回值是由前 n 行組成的小型資料框。以下是使用的語法
dataframe.show(n)
這裡
dataframe 是我們應用此方法的資料框
n 是行數
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
透過將行引數傳遞為 1,使用 show() 函式檢索 DataFrame 的第一行
將第一行列印到控制檯
透過將行引數傳遞為 2,使用 show() 函式檢索 DataFrame 的前兩行
將前兩行列印到控制檯
透過將行引數傳遞為 3,使用 show() 函式檢索 DataFrame 的前三行
將前三行列印到控制檯
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the first row
df1= df.show(1)
print(df1)
# Retrieve the first two rows
df2= df.show(2)
print(df2)
# Retrieve the first three rows
df3= df.show(3)
print(df3)
輸出
+----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| +----+----+----+----+ +----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| +----+----+----+----+ +----+----+----+----+ |__ |Col1|Col2|Col3| +----+----+----+----+ |Row1| 1| 2| 3| |Row2| 4| 5| 6| |Row3| 7| 8| 9| +----+----+----+----+
方法 4:使用 head()
在 PySpark 中,head() 函式用於顯示 Python 資料框中前 n 行。此函式的返回值是由前 n 行組成的小型資料框。以下是使用的語法
dataframe.head(n)
這裡
dataframe 是我們應用此方法的資料框
n 是行數
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
透過將行引數傳遞為 1,使用 head() 函式檢索 DataFrame 的第一行
將第一行列印到控制檯
透過將行引數傳遞為 2,使用 head() 函式檢索 DataFrame 的前兩行
將前兩行列印到控制檯
透過將行引數傳遞為 3,使用 head() 函式檢索 DataFrame 的前三行
將前三行列印到控制檯
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the first row
df1= df.head(1)
print(df1)
# Retrieve the first two rows
df2= df.head(2)
print(df2)
# Retrieve the first three rows
df3= df.head(3)
print(df3)
輸出
[Row(__='Row1', Col1=1, Col2=2, Col3=3)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 5:使用 tail()
在 PySpark 中,tail() 函式用於顯示 Python 資料框中最後 n 行。此函式的返回值是由最後 n 行組成的小型資料框。以下是使用的語法
dataframe.tail(n)
這裡
dataframe 是我們應用此方法的資料框
n 是行數
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
透過將行引數傳遞為 1,使用 tail() 函式檢索 DataFrame 的最後一行
將最後一行列印到控制檯
透過將行引數傳遞為 2,使用 tail() 函式檢索 DataFrame 的最後兩行
將最後兩行列印到控制檯
透過將行引數傳遞為 3,使用 tail() 函式檢索 DataFrame 的最後三行
將最後三行列印到控制檯
示例
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the last row
df1= df.tail(1)
print(df1)
# Retrieve the last two rows
df2= df.tail(2)
print(df2)
# Retrieve the last three rows
df3= df.tail(3)
print(df3)
輸出
[Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 6:使用 select() 和 collect()
我們可以結合使用 select() 函式和 collect() 方法來顯示 Pyspark 資料框中的特定行。以下是使用的語法
dataframe.select([columns]).collect()[index]
這裡
dataframe 是我們應用此方法的資料框
columns 是我們想要在輸出中包含的列的列表。
Index 是我們想要在輸出中包含的行號。
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
結合使用 select() 函式和 collect() 函式從 DataFrame 中檢索所需的行,並將每一行儲存在單獨的變數中。
將包含所需行的變數的值列印到控制檯。
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the last row
df1= df.select(['Col1', 'Col2', 'Col3']).collect(0)
print(df1)
# Retrieve the last two rows
df2= df.select(['Col1', 'Col2', 'Col3']).collect(-1)
print(df2)
# Retrieve the last three rows
df3= df.select(['Col1', 'Col2', 'Col3']).collect(1)
print(df3)
輸出
[Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
方法 7:使用 filter() 和 collect()
我們可以結合使用 filter() 函式和 collect() 方法來顯示 Pyspark 資料框中的特定行。以下是使用的語法
dataframe.filter(condition).collect()[index]
這裡
dataframe 是我們應用此方法的資料框
Condition 是根據其過濾資料框行的條件。
Index 是我們想要在輸出中包含的行號。
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
結合使用 filter() 函式和 collect() 函式從 DataFrame 中檢索所需的行,並將每一行儲存在單獨的變數中。
將包含所需行的變數的值列印到控制檯。
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Filter the DataFrame
df1 = df.filter(df.Col1 > 1).collect()[0]
# Print the collected data
print(df1)
# Filter the DataFrame
df2 = df.filter(df.Col1 > 1).collect()[1]
# Print the collected data
print(df2)
# Filter the DataFrame
df3 = df.filter(df.Col1 > 1).collect()[-1]
# Print the collected data
print(df3)
輸出
Row(Col1=4, Col2=5, Col3=6) Row(Col1=7, Col2=8, Col3=9) Row(Col1=7, Col2=8, Col3=9)
方法 8:使用 where() 和 collect()
我們可以結合使用 where() 函式和 collect() 方法來顯示 Pyspark 資料框中的特定行。使用 where() 方法,我們可以根據方法中傳遞的條件過濾特定行,然後我們可以應用 collect() 方法將結果儲存在變數中。以下是使用的語法
dataframe.where(condition).collect()[index]
這裡
dataframe 是我們應用此方法的資料框
Condition 是根據其過濾資料框行的條件。
Index 是我們想要在輸出中包含的行號。
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
結合使用 where() 函式和 collect() 函式從 DataFrame 中檢索所需的行,並將每一行儲存在單獨的變數中。
將包含所需行的變數的值列印到控制檯。
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Filter the DataFrame
df1 = df.where(df.Col1 > 1).collect()[0]
# Print the collected data
print(df1)
# Filter the DataFrame
df2 = df.where(df.Col1 > 1).collect()[1]
# Print the collected data
print(df2)
# Filter the DataFrame
df3 = df.where(df.Col1 > 1).collect()[-1]
# Print the collected data
print(df3)
輸出
Row(Col1=4, Col2=5, Col3=6) Row(Col1=7, Col2=8, Col3=9) Row(Col1=7, Col2=8, Col3=9)
方法 9:使用 take()
在 PySpark 中,take() 函式也用於顯示 Python 資料框中前 n 行。此函式的返回值是由前 n 行組成的小型資料框。以下是使用的語法
dataframe.take(n)
這裡
dataframe 是我們應用此方法的資料框
n 是行數
演算法
匯入必要的庫
建立 SparkSession
建立 DataFrame
透過將行引數傳遞為 1,使用 take() 函式檢索 DataFrame 的第一行
將第一行列印到控制檯
透過將行引數傳遞為 2,使用 take() 函式檢索 DataFrame 的前兩行
將前兩行列印到控制檯
透過將行引數傳遞為 3,使用 take() 函式檢索 DataFrame 的前三行
將前三行列印到控制檯
示例
# Import necessary libraries
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()
# Create the DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])
# Retrieve the first row
df1= df.take(1)
print(df1)
# Retrieve the first two rows
df2= df.take(2)
print(df2)
# Retrieve the first three rows
df3= df.take(3)
print(df3)
輸出
[Row(__='Row1', Col1=1, Col2=2, Col3=3)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)] [Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
結論
根據用例的不同,每種方法的效率可能高於或低於其他方法,並且每種方法都有其自身的優點或缺點。為特定任務選擇最佳方法最為重要。由於這些方法的效率高,因此也可以將其應用於大型資料集。
資料結構
網路
關係資料庫管理系統 (RDBMS)
作業系統
Java
iOS
HTML
CSS
Android
Python
C 程式設計
C++
C#
MongoDB
MySQL
Javascript
PHP