如何驗證 PySpark 資料框列型別?
PySpark 是 Apache Spark 的 Python API,提供了一個強大且可擴充套件的大資料處理和分析框架。在使用 PySpark DataFrame 時,瞭解並驗證每個列的資料型別至關重要。準確的列型別驗證可確保資料完整性,並使您能夠準確地執行操作和轉換。在本文中,我們將探討驗證 PySpark DataFrame 列型別的各種方法,並提供示例以幫助更好地理解。
PySpark DataFrame 列型別的概述
在 PySpark 中,DataFrame 表示一個組織成命名列的分散式資料集合。每列都有一個特定的資料型別,可以是任何有效的 PySpark 資料型別,例如 IntegerType、StringType、BooleanType 等。瞭解列型別至關重要,因為它允許您根據預期的資料型別執行操作。
使用 printSchema() 方法
printSchema() 方法提供了一個簡潔且結構化的 DataFrame 模式表示,包括列名及其對應的資料型別。它是驗證列型別最簡單的方法之一。
語法
df.printSchema()
此處,df.printSchema() 語法用於顯示 PySpark DataFrame 的模式。它列印列名及其相應的資料型別,以及它們是否允許空值。
示例
在下面的示例中,我們建立一個 SparkSession 併為 PySpark DataFrame 定義一個模式。然後使用樣本資料建立 DataFrame,其中包含名為“col1”、“col2”和“col3”的列,分別具有 IntegerType、StringType 和 DoubleType 的對應資料型別。最後,使用 printSchema() 方法列印 DataFrame 的模式,該方法顯示列名及其資料型別。
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleType
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [
(1, "John", 3.14),
(2, "Jane", 2.71),
(3, "Alice", 1.23)
]
# Define the schema
schema = [
("col1", IntegerType(), True),
("col2", StringType(), True),
("col3", DoubleType(), True)
]
# Create a DataFrame with the provided data and schema
df = spark.createDataFrame(data, schema)
# Print the DataFrame schema
df.printSchema()
輸出
root |-- col1: integer (nullable = true) |-- col2: string (nullable = true) |-- col3: double (nullable = true)
使用 dtypes 檢查列型別
dtypes 屬性返回一個元組列表,其中每個元組包含列名及其對應的資料型別。此方法允許以程式設計方式訪問列型別。
語法
column_types = df.dtypes
for column_name, data_type in column_types:
print(f"Column '{column_name}' has data type: {data_type}")
此處,df.dtypes 從 PySpark DataFrame 中檢索列名及其對應的資料型別作為元組列表。for 迴圈遍歷每個元組,提取列名和資料型別,然後使用 f-字串格式列印它們。
示例
在下面的示例中,我們使用 SparkSession 建立一個 PySpark DataFrame。它將樣本資料定義為元組列表,並建立一個名為 df 的 DataFrame,其中包含“col1”、“col2”和“col3”列。df.dtypes 屬性檢索列名及其對應的資料型別作為元組列表。for 迴圈遍歷每個元組,提取列名和資料型別,然後使用 f-字串格式列印它們。
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [
(1, "John", 3.14),
(2, "Jane", 2.71),
(3, "Alice", 1.23)
]
# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
# Get the column types
column_types = df.dtypes
# Display the column types
for column_name, data_type in column_types:
print(f"Column '{column_name}' has data type: {data_type}")
輸出
輸出顯示列名(col1、col2、col3)及其對應的資料型別(int、string、double)。此資訊是使用 DataFrame 的 dtypes 屬性獲得的,該屬性返回一個元組列表,其中每個元組包含列名及其資料型別。
Column 'col1' has data type: int Column 'col2' has data type: string Column 'col3' has data type: double
使用 selectExpr() 驗證列型別
selectExpr() 方法允許我們選擇列並在其上應用表示式或轉換。將其與 typeof() 函式結合使用,您可以直接檢查特定列的資料型別。
語法
from pyspark.sql.functions import expr
column_names = ["col1", "col2", "col3"]
exprs = [expr(f"typeof({col}) as {col}_type") for col in column_names]
df.selectExpr(*exprs).show()
此處,**typeof()** 函式檢索每列的資料型別,並將其與包含“_type”的新列名關聯。然後,**df.selectExpr(*exprs).show()** 將這些表示式應用於 DataFrame,選擇動態建立的列並顯示其結果。
示例
在下面的示例中,我們建立一個 SparkSession 並定義一個名為 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”。為了驗證列型別,程式碼在 DataFrame 上使用 **selectExpr()** 方法。它使用列表推導式建立一個表示式列表,其中每個表示式都使用 **typeof()** 函式確定列的資料型別,並將其與包含“_type”的新列名關聯。最後,df.selectExpr(*exprs).show() 將這些表示式應用於 DataFrame,選擇動態建立的列及其列名和相應的資料型別。show() 方法顯示結果 DataFrame。
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [
(1, "John", 3.14),
(2, "Jane", 2.71),
(3, "Alice", 1.23)
]
# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
# Verify column types using selectExpr()
column_names = ["col1", "col2", "col3"]
exprs = [f"typeof({col}) as {col}_type" for col in column_names]
df.selectExpr(*exprs).show()
輸出
+---------+---------+---------+ |col1_type|col2_type|col3_type| +---------+---------+---------+ | integer| string| double| | integer| string| double| | integer| string| double| +---------+---------+---------+
使用 cast() 檢查列型別
cast() 函式允許我們顯式地將列轉換為不同的資料型別。透過比較原始列和轉換後的列,您可以驗證轉換是否成功,這表明原始列具有預期的資料型別。
示例
在下面的示例中,我們建立一個 SparkSession 並定義一個名為 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”,以及樣本資料。程式碼定義了一個字典 expected_data_types,該字典指定每列的預期資料型別。for 迴圈遍歷 expected_data_types 字典中的每個專案。在迴圈內,程式碼使用 cast() 函式嘗試將列轉換為預期的資料型別。它使用轉換後的值建立一個新列,並將其與原始列進行比較,以識別轉換成功的行。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [
(1, "John", 3.14),
(2, "Jane", 2.71),
(3, "Alice", 1.23)
]
# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
# Define the expected data types
expected_data_types = {
"col1": "integer",
"col2": "string",
"col3": "double"
}
# Check column types using cast()
for column_name, expected_type in expected_data_types.items():
cast_column = df.select(col(column_name).cast(expected_type).alias(column_name))
matched_rows = df.filter(col(column_name) == cast_column[column_name])
print(f"Column '{column_name}' has the expected data type: {expected_type}?")
matched_rows.show()
輸出
輸出透過嘗試使用 cast() 函式將其轉換為預期型別來驗證每列的資料型別。原始 DataFrame 根據轉換後匹配的行進行過濾,如果所有行都匹配,則表明該列具有預期的資料型別。
Column 'col1' has the expected data type: integer? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+ Column 'col2' has the expected data type: string? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+ Column 'col3' has the expected data type: double? +----+-----+----+ |col1|col2 |col3| +----+-----+----+ | 1| John|3.14| | 2| Jane|2.71| | 3|Alice|1.23| +----+-----+----+
結論
在本文中,我們討論瞭如何驗證 Pyspark 資料框列型別。驗證 PySpark DataFrame 列型別對於確保資料準確性和執行有意義的操作至關重要。在本文中,我們探討了幾種驗證列型別的方法,包括使用 printSchema()、dtypes、selectExpr()、cast()。
資料結構
網路
關係資料庫管理系統 (RDBMS)
作業系統
Java
iOS
HTML
CSS
Android
Python
C 程式設計
C++
C#
MongoDB
MySQL
Javascript
PHP