以程式設計方式指定模式



建立 DataFrame 的第二種方法是透過程式設計介面,它允許您構建模式,然後將其應用於現有的 RDD。我們可以使用以下三個步驟以程式設計方式建立 DataFrame。

  • 從原始 RDD 建立行 RDD。

  • 建立由 StructType 表示的模式,該模式與步驟 1 中建立的 RDD 中行的結構匹配。

  • 透過 SQLContext 提供的 createDataFrame 方法將模式應用於行 RDD。

示例

讓我們考慮一個名為 employee.txt 的文字檔案中員工記錄的示例。透過直接從文字檔案讀取資料來使用 DataFrame 建立模式。

給定資料 - 檢視名為 employee.txt 的檔案資料,該檔案位於 Spark shell 執行的當前相應目錄中。

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

按照以下步驟以程式設計方式生成模式。

開啟 Spark Shell

使用以下示例啟動 Spark shell。

$ spark-shell

建立 SQLContext 物件

使用以下命令生成 SQLContext。這裡,sc 表示 SparkContext 物件。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

從文字檔案讀取輸入

透過使用以下命令讀取名為 employee.txt 的文字檔案中的資料來建立 RDD DataFrame。

scala> val employee = sc.textFile("employee.txt")

以字串格式建立編碼模式

使用以下命令以字串格式建立編碼模式。這意味著,假設表的欄位結構並使用一些分隔符傳遞欄位名稱。

scala> val schemaString = "id name age"

輸出

schemaString: String = id name age

匯入相應 API

使用以下命令匯入 Row 功能和 SQL 資料型別。

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

生成模式

以下命令用於透過讀取 schemaString 變數來生成模式。這意味著您需要透過以空格作為分隔符拆分整個字串來讀取每個欄位,並預設將每個欄位型別都視為字串型別。

scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))

應用從文字檔案讀取資料的轉換

使用以下命令將 RDD (employee) 轉換為行。這意味著,這裡我們指定了讀取 RDD 資料並將其儲存到 rowRDD 中的邏輯。這裡我們使用了兩個 map 函式:一個是用於拆分記錄字串的分隔符(.map(_.split(","))),第二個 map 函式用於使用欄位索引值定義行(.map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt)))。

scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))

根據模式將 RowRDD 應用於行資料

使用以下語句使用 rowRDD 資料和 schema (SCHEMA) 變數建立 DataFrame。

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

輸出

employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]

將 DataFrame 資料儲存到表中

使用以下命令將 DataFrame 儲存到名為 employee 的表中。

scala> employeeDF.registerTempTable("employee")

employee 表現已準備就緒。讓我們使用 SQLContext.sql() 方法向表中傳遞一些 SQL 查詢。

在 DataFrame 上執行選擇查詢

使用以下語句從 employee 表中選擇所有記錄。這裡我們使用變數 allrecords 來捕獲所有記錄資料。要顯示這些記錄,請在其上呼叫 show() 方法。

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")

要檢視 allrecords DataFrame 的結果資料,請使用以下命令。

scala> allrecords.show()

輸出

+------+--------+----+
|  id  | name   |age |
+------+--------+----+
| 1201 | satish | 25 |
| 1202 | krishna| 28 |
| 1203 | amith  | 39 |
| 1204 | javed  | 23 |
| 1205 | prudvi | 23 |
+------+--------+----+

sqlContext.sql 方法允許您在列及其型別在執行時未知時構建 DataFrame。現在您可以對其執行不同的 SQL 查詢。

spark_sql_dataframes.htm
廣告

© . All rights reserved.