使用反射推斷模式



此方法使用反射來生成包含特定型別物件的RDD的模式。Spark SQL 的 Scala 介面支援自動將包含 case class 的 RDD 轉換為 DataFrame。case class 定義表的模式。case class 的引數名稱使用反射讀取,併成為列的名稱。

Case class 也可以巢狀,或者包含複雜型別,例如序列或陣列。此 RDD 可以隱式轉換為 DataFrame,然後註冊為表。表可用於後續的 SQL 語句。

示例

讓我們考慮一個名為employee.txt 的文字檔案中員工記錄的示例。透過讀取文字檔案中的資料建立 RDD,並使用預設 SQL 函式將其轉換為 DataFrame。

給定資料 - 檢視名為employee.txt的檔案的以下資料,將其放置在 Spark shell 執行的當前相應目錄中。

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

以下示例說明如何使用反射生成模式。

啟動 Spark Shell

使用以下命令啟動 Spark Shell。

$ spark-shell

建立 SQLContext

使用以下命令生成 SQLContext。這裡,sc 表示 SparkContext 物件。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

匯入 SQL 函式

使用以下命令匯入所有用於將 RDD 隱式轉換為 DataFrame 的 SQL 函式。

scala> import sqlContext.implicts._

建立 Case Class

接下來,我們必須使用 case class 為員工記錄資料定義一個模式。以下命令用於根據給定資料 (id、name、age) 宣告 case class。

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

建立 RDD 並應用轉換

使用以下命令透過讀取employee.txt中的資料並使用 Map 函式將其轉換為 DataFrame 來生成名為empl的 RDD。

這裡定義了兩個 map 函式。一個是將文字記錄拆分為欄位 (.map(_.split(“,”))),第二個 map 函式是將各個欄位 (id、name、age) 轉換為一個 case class 物件 (.map(e(0).trim.toInt, e(1), e(2).trim.toInt))。

最後,toDF()方法用於將具有模式的 case class 物件轉換為 DataFrame。

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

輸出

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

將 DataFrame 資料儲存到表中

使用以下命令將 DataFrame 資料儲存到名為employee的表中。執行此命令後,我們可以對其應用所有型別的 SQL 語句。

scala> empl.registerTempTable("employee")

employee 表已準備就緒。現在讓我們使用SQLContext.sql()方法對錶執行一些 sql 查詢。

對 DataFrame 執行 Select 查詢

使用以下命令從employee表中選擇所有記錄。這裡,我們使用變數allrecords來捕獲所有記錄資料。要顯示這些記錄,請在其上呼叫show()方法。

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")

要檢視allrecords DataFrame 的結果資料,請使用以下命令。

scala> allrecords.show()

輸出

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

對 DataFrame 使用 Where 子句 SQL 查詢

使用以下命令在表中應用where語句。這裡,變數agefilter儲存年齡在 20 到 35 之間的員工的記錄。

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")

要檢視agefilter DataFrame 的結果資料,請使用以下命令。

scala> agefilter.show()

輸出

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

前面兩個查詢針對的是整個表 DataFrame。現在讓我們嘗試透過對其應用轉換來從結果 DataFrame 中提取資料。

使用列索引從 agefilter DataFrame 中獲取 ID 值

以下語句用於使用欄位索引從agefilter RDD 結果中獲取 ID 值。

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

輸出

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

這種基於反射的方法可以編寫更簡潔的程式碼,並且在編寫 Spark 應用程式時已經知道模式的情況下效果很好。

spark_sql_dataframes.htm
廣告
© . All rights reserved.