- Spark SQL 教程
- Spark SQL - 首頁
- Spark - 簡介
- Spark - RDD
- Spark - 安裝
- Spark SQL - 簡介
- Spark SQL - DataFrame
- Spark SQL - 資料來源
- Spark SQL 有用資源
- Spark SQL - 快速指南
- Spark SQL - 有用資源
- Spark SQL - 討論
使用反射推斷模式
此方法使用反射來生成包含特定型別物件的RDD的模式。Spark SQL 的 Scala 介面支援自動將包含 case class 的 RDD 轉換為 DataFrame。case class 定義表的模式。case class 的引數名稱使用反射讀取,併成為列的名稱。
Case class 也可以巢狀,或者包含複雜型別,例如序列或陣列。此 RDD 可以隱式轉換為 DataFrame,然後註冊為表。表可用於後續的 SQL 語句。
示例
讓我們考慮一個名為employee.txt 的文字檔案中員工記錄的示例。透過讀取文字檔案中的資料建立 RDD,並使用預設 SQL 函式將其轉換為 DataFrame。
給定資料 - 檢視名為employee.txt的檔案的以下資料,將其放置在 Spark shell 執行的當前相應目錄中。
1201, satish, 25 1202, krishna, 28 1203, amith, 39 1204, javed, 23 1205, prudvi, 23
以下示例說明如何使用反射生成模式。
啟動 Spark Shell
使用以下命令啟動 Spark Shell。
$ spark-shell
建立 SQLContext
使用以下命令生成 SQLContext。這裡,sc 表示 SparkContext 物件。
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
匯入 SQL 函式
使用以下命令匯入所有用於將 RDD 隱式轉換為 DataFrame 的 SQL 函式。
scala> import sqlContext.implicts._
建立 Case Class
接下來,我們必須使用 case class 為員工記錄資料定義一個模式。以下命令用於根據給定資料 (id、name、age) 宣告 case class。
scala> case class Employee(id: Int, name: String, age: Int) defined class Employee
建立 RDD 並應用轉換
使用以下命令透過讀取employee.txt中的資料並使用 Map 函式將其轉換為 DataFrame 來生成名為empl的 RDD。
這裡定義了兩個 map 函式。一個是將文字記錄拆分為欄位 (.map(_.split(“,”))),第二個 map 函式是將各個欄位 (id、name、age) 轉換為一個 case class 物件 (.map(e(0).trim.toInt, e(1), e(2).trim.toInt))。
最後,toDF()方法用於將具有模式的 case class 物件轉換為 DataFrame。
scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()
輸出
empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
將 DataFrame 資料儲存到表中
使用以下命令將 DataFrame 資料儲存到名為employee的表中。執行此命令後,我們可以對其應用所有型別的 SQL 語句。
scala> empl.registerTempTable("employee")
employee 表已準備就緒。現在讓我們使用SQLContext.sql()方法對錶執行一些 sql 查詢。
對 DataFrame 執行 Select 查詢
使用以下命令從employee表中選擇所有記錄。這裡,我們使用變數allrecords來捕獲所有記錄資料。要顯示這些記錄,請在其上呼叫show()方法。
scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
要檢視allrecords DataFrame 的結果資料,請使用以下命令。
scala> allrecords.show()
輸出
+------+---------+----+ | id | name |age | +------+---------+----+ | 1201 | satish | 25 | | 1202 | krishna | 28 | | 1203 | amith | 39 | | 1204 | javed | 23 | | 1205 | prudvi | 23 | +------+---------+----+
對 DataFrame 使用 Where 子句 SQL 查詢
使用以下命令在表中應用where語句。這裡,變數agefilter儲存年齡在 20 到 35 之間的員工的記錄。
scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")
要檢視agefilter DataFrame 的結果資料,請使用以下命令。
scala> agefilter.show()
輸出
<console>:25, took 0.112757 s +------+---------+----+ | id | name |age | +------+---------+----+ | 1201 | satish | 25 | | 1202 | krishna | 28 | | 1204 | javed | 23 | | 1205 | prudvi | 23 | +------+---------+----+
前面兩個查詢針對的是整個表 DataFrame。現在讓我們嘗試透過對其應用轉換來從結果 DataFrame 中提取資料。
使用列索引從 agefilter DataFrame 中獲取 ID 值
以下語句用於使用欄位索引從agefilter RDD 結果中獲取 ID 值。
scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)
輸出
<console>:25, took 0.093844 s ID: 1201 ID: 1202 ID: 1204 ID: 1205
這種基於反射的方法可以編寫更簡潔的程式碼,並且在編寫 Spark 應用程式時已經知道模式的情況下效果很好。