Spark SQL - DataFrame



DataFrame 是一個分散式的資料集合,組織成命名列。從概念上講,它等同於具有良好最佳化技術的關聯表。

DataFrame 可以從各種不同的來源構建,例如 Hive 表、結構化資料檔案、外部資料庫或現有的 RDD。這個 API 是為現代大資料和資料科學應用程式設計的,其靈感來自R 程式設計中的 DataFramePython 中的 Pandas

DataFrame 的特性

以下是 DataFrame 的一些特徵:

  • 能夠在單節點叢集到大型叢集上處理從千位元組到拍位元組大小的資料。

  • 支援不同的資料格式(Avro、csv、Elasticsearch 和 Cassandra)和儲存系統(HDFS、Hive 表、MySQL 等)。

  • 透過 Spark SQL Catalyst 最佳化器(樹轉換框架)進行最先進的最佳化和程式碼生成。

  • 可以透過 Spark-Core 輕鬆整合所有大資料工具和框架。

  • 提供 Python、Java、Scala 和 R 程式設計的 API。

SQLContext

SQLContext 是一個類,用於初始化 Spark SQL 的功能。初始化 SQLContext 類物件需要 SparkContext 類物件 (sc)。

以下命令用於透過 spark-shell 初始化 SparkContext。

$ spark-shell

預設情況下,當 spark-shell 啟動時,SparkContext 物件以名稱sc初始化。

使用以下命令建立 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

示例

讓我們考慮一個名為employee.json 的 JSON 檔案中的員工記錄示例。使用以下命令建立一個 DataFrame (df) 並讀取名為employee.json 的 JSON 文件,其內容如下所示。

employee.json - 將此檔案放在當前scala> 指標所在的目錄中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

DataFrame 操作

DataFrame 提供了一種用於結構化資料操作的特定領域語言。在這裡,我們包含一些使用 DataFrame 進行結構化資料處理的基本示例。

按照以下步驟執行 DataFrame 操作:

讀取 JSON 文件

首先,我們必須讀取 JSON 文件。基於此,生成一個名為 (dfs) 的 DataFrame。

使用以下命令讀取名為employee.json 的 JSON 文件。資料顯示為一個表,欄位為:id、name 和 age。

scala> val dfs = sqlContext.read.json("employee.json")

輸出 - 欄位名稱自動從employee.json 獲取。

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

顯示資料

如果要檢視 DataFrame 中的資料,請使用以下命令。

scala> dfs.show()

輸出 - 你可以在表格格式中看到員工資料。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

使用 printSchema 方法

如果要檢視 DataFrame 的結構(模式),請使用以下命令。

scala> dfs.printSchema()

輸出

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

使用 Select 方法

使用以下命令從 DataFrame 中獲取三個列中的name列。

scala> dfs.select("name").show()

輸出 - 你可以看到name列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

使用 Age 過濾器

使用以下命令查詢年齡大於 23 (age > 23) 的員工。

scala> dfs.filter(dfs("age") > 23).show()

輸出

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

使用 groupBy 方法

使用以下命令計算相同年齡的員工人數。

scala> dfs.groupBy("age").count().show()

輸出 - 兩名員工的年齡為 23。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

以程式設計方式執行 SQL 查詢

SQLContext 使應用程式能夠在執行 SQL 函式的同時以程式設計方式執行 SQL 查詢,並將結果作為 DataFrame 返回。

通常,在後臺,SparkSQL 支援兩種不同的方法將現有的 RDD 轉換為 DataFrame:

序號 方法和描述
1 使用反射推斷模式

此方法使用反射來生成包含特定型別物件的 RDD 的模式。

2 以程式設計方式指定模式

建立 DataFrame 的第二種方法是透過程式設計介面,允許你構建模式,然後將其應用於現有的 RDD。

廣告
© . All rights reserved.