如何驗證 PySpark 資料框列型別?


PySpark 是 Apache Spark 的 Python API,提供了一個強大且可擴充套件的大資料處理和分析框架。在使用 PySpark DataFrame 時,瞭解並驗證每個列的資料型別至關重要。準確的列型別驗證可確保資料完整性,並使您能夠準確地執行操作和轉換。在本文中,我們將探討驗證 PySpark DataFrame 列型別的各種方法,並提供示例以幫助更好地理解。

PySpark DataFrame 列型別的概述

在 PySpark 中,DataFrame 表示一個組織成命名列的分散式資料集合。每列都有一個特定的資料型別,可以是任何有效的 PySpark 資料型別,例如 IntegerType、StringType、BooleanType 等。瞭解列型別至關重要,因為它允許您根據預期的資料型別執行操作。

使用 printSchema() 方法

printSchema() 方法提供了一個簡潔且結構化的 DataFrame 模式表示,包括列名及其對應的資料型別。它是驗證列型別最簡單的方法之一。

語法

df.printSchema()

此處,df.printSchema() 語法用於顯示 PySpark DataFrame 的模式。它列印列名及其相應的資料型別,以及它們是否允許空值。

示例

在下面的示例中,我們建立一個 SparkSession 併為 PySpark DataFrame 定義一個模式。然後使用樣本資料建立 DataFrame,其中包含名為“col1”、“col2”和“col3”的列,分別具有 IntegerType、StringType 和 DoubleType 的對應資料型別。最後,使用 printSchema() 方法列印 DataFrame 的模式,該方法顯示列名及其資料型別。

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Define the schema
schema = [
    ("col1", IntegerType(), True),
    ("col2", StringType(), True),
    ("col3", DoubleType(), True)
]

# Create a DataFrame with the provided data and schema
df = spark.createDataFrame(data, schema)

# Print the DataFrame schema
df.printSchema()

輸出

root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)

使用 dtypes 檢查列型別

dtypes 屬性返回一個元組列表,其中每個元組包含列名及其對應的資料型別。此方法允許以程式設計方式訪問列型別。

語法

column_types = df.dtypes
for column_name, data_type in column_types:
    print(f"Column '{column_name}' has data type: {data_type}")

此處,df.dtypes 從 PySpark DataFrame 中檢索列名及其對應的資料型別作為元組列表。for 迴圈遍歷每個元組,提取列名和資料型別,然後使用 f-字串格式列印它們。

示例

在下面的示例中,我們使用 SparkSession 建立一個 PySpark DataFrame。它將樣本資料定義為元組列表,並建立一個名為 df 的 DataFrame,其中包含“col1”、“col2”和“col3”列。df.dtypes 屬性檢索列名及其對應的資料型別作為元組列表。for 迴圈遍歷每個元組,提取列名和資料型別,然後使用 f-字串格式列印它們。

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Get the column types
column_types = df.dtypes

# Display the column types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' has data type: {data_type}")

輸出

輸出顯示列名(col1、col2、col3)及其對應的資料型別(int、string、double)。此資訊是使用 DataFrame 的 dtypes 屬性獲得的,該屬性返回一個元組列表,其中每個元組包含列名及其資料型別。

Column 'col1' has data type: int
Column 'col2' has data type: string
Column 'col3' has data type: double

使用 selectExpr() 驗證列型別

selectExpr() 方法允許我們選擇列並在其上應用表示式或轉換。將其與 typeof() 函式結合使用,您可以直接檢查特定列的資料型別。

語法

from pyspark.sql.functions import expr

column_names = ["col1", "col2", "col3"]
exprs = [expr(f"typeof({col}) as {col}_type") for col in column_names]
df.selectExpr(*exprs).show()

此處,**typeof()** 函式檢索每列的資料型別,並將其與包含“_type”的新列名關聯。然後,**df.selectExpr(*exprs).show()** 將這些表示式應用於 DataFrame,選擇動態建立的列並顯示其結果。

示例

在下面的示例中,我們建立一個 SparkSession 並定義一個名為 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”。為了驗證列型別,程式碼在 DataFrame 上使用 **selectExpr()** 方法。它使用列表推導式建立一個表示式列表,其中每個表示式都使用 **typeof()** 函式確定列的資料型別,並將其與包含“_type”的新列名關聯。最後,df.selectExpr(*exprs).show() 將這些表示式應用於 DataFrame,選擇動態建立的列及其列名和相應的資料型別。show() 方法顯示結果 DataFrame。

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Verify column types using selectExpr()
column_names = ["col1", "col2", "col3"]
exprs = [f"typeof({col}) as {col}_type" for col in column_names]
df.selectExpr(*exprs).show()

輸出

+---------+---------+---------+
|col1_type|col2_type|col3_type|
+---------+---------+---------+
|  integer|   string|   double|
|  integer|   string|   double|
|  integer|   string|   double|
+---------+---------+---------+

使用 cast() 檢查列型別

cast() 函式允許我們顯式地將列轉換為不同的資料型別。透過比較原始列和轉換後的列,您可以驗證轉換是否成功,這表明原始列具有預期的資料型別。

示例

在下面的示例中,我們建立一個 SparkSession 並定義一個名為 df 的 PySpark DataFrame,其中包含三列:“col1”、“col2”和“col3”,以及樣本資料。程式碼定義了一個字典 expected_data_types,該字典指定每列的預期資料型別。for 迴圈遍歷 expected_data_types 字典中的每個專案。在迴圈內,程式碼使用 cast() 函式嘗試將列轉換為預期的資料型別。它使用轉換後的值建立一個新列,並將其與原始列進行比較,以識別轉換成功的行。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# Define the expected data types
expected_data_types = {
    "col1": "integer",
    "col2": "string",
    "col3": "double"
}

# Check column types using cast()
for column_name, expected_type in expected_data_types.items():
    cast_column = df.select(col(column_name).cast(expected_type).alias(column_name))
    matched_rows = df.filter(col(column_name) == cast_column[column_name])
    print(f"Column '{column_name}' has the expected data type: {expected_type}?")
    matched_rows.show()

輸出

輸出透過嘗試使用 cast() 函式將其轉換為預期型別來驗證每列的資料型別。原始 DataFrame 根據轉換後匹配的行進行過濾,如果所有行都匹配,則表明該列具有預期的資料型別。

Column 'col1' has the expected data type: integer?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col2' has the expected data type: string?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col3' has the expected data type: double?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

結論

在本文中,我們討論瞭如何驗證 Pyspark 資料框列型別。驗證 PySpark DataFrame 列型別對於確保資料準確性和執行有意義的操作至關重要。在本文中,我們探討了幾種驗證列型別的方法,包括使用 printSchema()、dtypes、selectExpr()、cast()。

更新於: 2023年10月16日

1K+ 閱讀量

開啟你的 職業生涯

透過完成課程獲得認證

立即開始
廣告

© . All rights reserved.