- Apache Spark 教程
- Apache Spark - 首頁
- Apache Spark - 簡介
- Apache Spark - RDD
- Apache Spark - 安裝
- Apache Spark - 核心程式設計
- Apache Spark - 部署
- 高階 Spark 程式設計
- Apache Spark 有用資源
- Apache Spark 快速指南
- Apache Spark - 有用資源
- Apache Spark - 討論
Apache Spark 快速指南
Apache Spark - 簡介
許多行業廣泛使用 Hadoop 來分析其資料集。原因是 Hadoop 框架基於簡單的程式設計模型(MapReduce),它能夠提供可擴充套件、靈活、容錯且經濟高效的計算解決方案。這裡,主要關注的是在處理大型資料集時保持速度,包括查詢之間的等待時間和程式執行的等待時間。
Apache 軟體基金會推出了 Spark,以加快 Hadoop 計算軟體的處理速度。
與普遍看法相反,Spark 不是 Hadoop 的修改版本,實際上也不依賴於 Hadoop,因為它有自己的叢集管理。Hadoop 只是實現 Spark 的方法之一。
Spark 以兩種方式使用 Hadoop——一種是儲存,另一種是處理。由於 Spark 擁有自己的叢集管理計算,它僅將 Hadoop 用於儲存目的。
Apache Spark
Apache Spark 是一種閃電般快速的叢集計算技術,專為快速計算而設計。它基於 Hadoop MapReduce,並擴充套件了 MapReduce 模型,以便高效地將其用於更多型別的計算,包括互動式查詢和流處理。Spark 的主要特點是其記憶體中叢集計算,這提高了應用程式的處理速度。
Spark 設計用於涵蓋各種工作負載,例如批處理應用程式、迭代演算法、互動式查詢和流處理。除了在各自的系統中支援所有這些工作負載外,它還減少了維護單獨工具的管理負擔。
Apache Spark 的發展
Spark 是 Hadoop 的一個子專案,2009 年由 Matei Zaharia 在加州大學伯克利分校的 AMPLab 開發。它於 2010 年在 BSD 許可下開源。2013 年捐贈給 Apache 軟體基金會,現在 Apache Spark 已成為從 2014 年 2 月開始的頂級 Apache 專案。
Apache Spark 的特點
Apache Spark 具有以下特點:
速度 - Spark 可以幫助在 Hadoop 叢集中執行應用程式,在記憶體中快 100 倍,在磁碟上執行時快 10 倍。這是透過減少對磁碟的讀/寫操作次數來實現的。它將中間處理資料儲存在記憶體中。
支援多種語言 - Spark 提供了 Java、Scala 或 Python 的內建 API。因此,您可以使用不同的語言編寫應用程式。Spark 提供了 80 多個高階運算子用於互動式查詢。
高階分析 - Spark 不僅支援“Map”和“reduce”。它還支援 SQL 查詢、流資料、機器學習 (ML) 和圖演算法。
Spark 基於 Hadoop
下圖顯示了 Spark 如何使用 Hadoop 元件的三種方式。
Spark 的部署方式如下所示。
獨立模式 - Spark 獨立模式部署意味著 Spark 位於 HDFS(Hadoop 分散式檔案系統)之上,併為 HDFS 顯式分配空間。在這裡,Spark 和 MapReduce 將並排執行,以涵蓋叢集上的所有 Spark 作業。
Hadoop Yarn - Hadoop Yarn 部署意味著 Spark 僅在 Yarn 上執行,無需任何預安裝或 root 訪問許可權。它有助於將 Spark 整合到 Hadoop 生態系統或 Hadoop 堆疊中。它允許其他元件在堆疊頂部執行。
MapReduce 中的 Spark (SIMR) - MapReduce 中的 Spark 用於除了獨立部署之外啟動 Spark 作業。使用 SIMR,使用者可以啟動 Spark 並使用其 shell,無需任何管理許可權。
Spark 的元件
下圖描述了 Spark 的不同元件。
Apache Spark Core
Spark Core 是 Spark 平臺的基礎通用執行引擎,所有其他功能都是基於它構建的。它提供記憶體計算和引用外部儲存系統中的資料集。
Spark SQL
Spark SQL 是 Spark Core 之上的一個元件,它引入了一種名為 SchemaRDD 的新的資料抽象,它提供對結構化和半結構化資料的支援。
Spark Streaming
Spark Streaming 利用 Spark Core 的快速排程功能來執行流分析。它以小批次的形式攝取資料,並在這些小批次資料上執行 RDD(彈性分散式資料集)轉換。
MLlib(機器學習庫)
MLlib 是 Spark 之上的一個分散式機器學習框架,因為它基於分散式記憶體的 Spark 架構。根據 MLlib 開發人員與交替最小二乘法 (ALS) 實現進行的基準測試,Spark MLlib 的速度是 Hadoop 基於磁碟的Apache Mahout版本(在 Mahout 獲得 Spark 介面之前)的九倍。
GraphX
GraphX 是 Spark 之上的一個分散式圖處理框架。它提供了一個 API 用於表達圖計算,可以使用 Pregel 抽象 API 對使用者定義的圖進行建模。它還為此抽象提供了最佳化的執行時。
Apache Spark - RDD
彈性分散式資料集
彈性分散式資料集 (RDD) 是 Spark 的基本資料結構。它是一個不可變的物件分散式集合。RDD 中的每個資料集都細分為邏輯分割槽,這些分割槽可以在叢集的不同節點上計算。RDD 可以包含任何型別的 Python、Java 或 Scala 物件,包括使用者定義的類。
正式地說,RDD 是一個只讀的分割槽記錄集合。RDD 可以透過對穩定儲存上的資料或其他 RDD 執行確定性操作來建立。RDD 是一個容錯的元素集合,可以並行操作。
建立 RDD 有兩種方法 - 將驅動程式程式中現有的集合並行化,或引用外部儲存系統(例如共享檔案系統、HDFS、HBase 或任何提供 Hadoop 輸入格式的資料來源)中的資料集。
Spark 利用 RDD 的概念來實現更快、更高效的 MapReduce 操作。讓我們首先討論 MapReduce 操作是如何進行的以及為什麼它們效率不高。
MapReduce 中的資料共享速度慢
MapReduce 被廣泛用於使用叢集上的並行分散式演算法處理和生成大型資料集。它允許使用者使用一組高階運算子編寫平行計算,而無需擔心工作分配和容錯。
不幸的是,在大多數當前框架中,在計算之間(例如,在兩個 MapReduce 作業之間)重用資料的唯一方法是將其寫入外部穩定儲存系統(例如 HDFS)。儘管此框架提供了許多用於訪問叢集計算資源的抽象,但使用者仍然需要更多。
迭代和互動式應用程式都需要在並行作業之間進行更快的資料共享。由於複製、序列化和磁碟 I/O,MapReduce 中的資料共享速度很慢。關於儲存系統,大多數 Hadoop 應用程式花費超過 90% 的時間進行 HDFS 讀寫操作。
MapReduce 上的迭代操作
在多階段應用程式中跨多個計算重用中間結果。下圖說明了當前框架在 MapReduce 上執行迭代操作時的工作方式。由於資料複製、磁碟 I/O 和序列化,這會產生大量的開銷,從而使系統變慢。
MapReduce 上的互動式操作
使用者對同一資料集的子集執行 ad-hoc 查詢。每個查詢都將對穩定儲存執行磁碟 I/O,這可能會主導應用程式執行時間。
下圖說明了當前框架在 MapReduce 上執行互動式查詢時的工作方式。
使用 Spark RDD 進行資料共享
由於複製、序列化和磁碟 I/O,MapReduce 中的資料共享速度很慢。大多數 Hadoop 應用程式花費超過 90% 的時間進行 HDFS 讀寫操作。
研究人員認識到這個問題,開發了一個名為 Apache Spark 的專用框架。Spark 的關鍵思想是彈性分散式資料集 (RDD);它支援記憶體處理計算。這意味著它將記憶體狀態作為物件跨作業儲存,並且該物件可在這些作業之間共享。記憶體中的資料共享速度比網路和磁碟快 10 到 100 倍。
現在讓我們嘗試找出 Spark RDD 中迭代和互動操作是如何進行的。
Spark RDD 上的迭代操作
下圖顯示了 Spark RDD 上的迭代操作。它將中間結果儲存在分散式記憶體中而不是穩定儲存(磁碟)中,從而使系統更快。
注意 - 如果分散式記憶體 (RAM) 足以儲存中間結果(作業狀態),則它將這些結果儲存在磁碟上。
Spark RDD 上的互動式操作
此圖顯示了 Spark RDD 上的互動式操作。如果對同一資料集重複執行不同的查詢,則可以將此特定資料儲存在記憶體中以獲得更好的執行時間。
預設情況下,每次對轉換後的 RDD 執行操作時,都可能會重新計算它。但是,您也可以將 RDD 儲存在記憶體中,在這種情況下,Spark 將在叢集上保留這些元素,以便下次查詢時可以更快地訪問它們。還支援將 RDD 儲存在磁碟上或跨多個節點複製。
Apache Spark - 安裝
Spark 是 Hadoop 的子專案。因此,最好將 Spark 安裝到基於 Linux 的系統中。以下步驟顯示如何安裝 Apache Spark。
步驟 1:驗證 Java 安裝
Java 安裝是安裝 Spark 的一項必備工作。嘗試以下命令來驗證 JAVA 版本。
$java -version
如果 Java 已安裝在您的系統上,您將看到以下響應:
java version "1.7.0_71" Java(TM) SE Runtime Environment (build 1.7.0_71-b13) Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您系統上沒有安裝 Java,請先安裝 Java,然後再進行下一步。
步驟 2:驗證 Scala 安裝
您應該使用 Scala 語言來實現 Spark。因此,讓我們使用以下命令驗證 Scala 安裝。
$scala -version
如果您的系統上已經安裝了 Scala,您將看到以下響應:
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
如果您系統上沒有安裝 Scala,請繼續下一步進行 Scala 安裝。
步驟 3:下載 Scala
訪問以下連結下載最新版本的 Scala 下載 Scala。本教程使用 scala-2.11.6 版本。下載後,您將在下載資料夾中找到 Scala 的 tar 檔案。
步驟 4:安裝 Scala
請按照以下步驟安裝 Scala。
解壓 Scala tar 檔案
輸入以下命令來解壓 Scala tar 檔案:
$ tar xvf scala-2.11.6.tgz
移動 Scala 軟體檔案
使用以下命令將 Scala 軟體檔案移動到相應的目錄 **(/usr/local/scala)**。
$ su – Password: # cd /home/Hadoop/Downloads/ # mv scala-2.11.6 /usr/local/scala # exit
設定 Scala 的 PATH
使用以下命令設定 Scala 的 PATH。
$ export PATH = $PATH:/usr/local/scala/bin
驗證 Scala 安裝
安裝完成後,最好驗證一下。使用以下命令驗證 Scala 安裝。
$scala -version
如果您的系統上已經安裝了 Scala,您將看到以下響應:
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
步驟 5:下載 Apache Spark
訪問以下連結下載最新版本的 Spark 下載 Spark。本教程使用 **spark-1.3.1-bin-hadoop2.6** 版本。下載後,您將在下載資料夾中找到 Spark 的 tar 檔案。
步驟 6:安裝 Spark
請按照以下步驟安裝 Spark。
解壓 Spark tar 檔案
使用以下命令解壓 Spark tar 檔案:
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
移動 Spark 軟體檔案
使用以下命令將 Spark 軟體檔案移動到相應的目錄 **(/usr/local/spark)**。
$ su – Password: # cd /home/Hadoop/Downloads/ # mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark # exit
設定 Spark 的環境
將以下行新增到 ~**/.bashrc** 檔案中。這意味著將 Spark 軟體檔案所在的路徑新增到 PATH 變數中。
export PATH=$PATH:/usr/local/spark/bin
使用以下命令載入 ~/.bashrc 檔案。
$ source ~/.bashrc
步驟 7:驗證 Spark 安裝
使用以下命令開啟 Spark shell。
$spark-shell
如果 Spark 安裝成功,您將看到以下輸出。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Apache Spark - 核心程式設計
Spark Core 是整個專案的基石。它提供分散式任務排程和基本的 I/O 功能。Spark 使用一種稱為 RDD(彈性分散式資料集)的特殊基本資料結構,它是在機器之間劃分的資料的邏輯集合。RDD 可以透過兩種方式建立;一種是引用外部儲存系統中的資料集,另一種是對現有 RDD 應用轉換(例如 map、filter、reducer、join)。
RDD 抽象透過語言整合 API 公開。這簡化了程式設計的複雜性,因為應用程式操作 RDD 的方式類似於操作本地資料集合。
Spark Shell
Spark 提供了一個互動式 shell——一個強大的工具,用於互動式地分析資料。它可以使用 Scala 或 Python 語言。Spark 的主要抽象是稱為彈性分散式資料集 (RDD) 的專案的分散式集合。RDD 可以從 Hadoop 輸入格式(例如 HDFS 檔案)建立,也可以透過轉換其他 RDD 來建立。
開啟 Spark Shell
使用以下命令開啟 Spark shell。
$ spark-shell
建立簡單的 RDD
讓我們從文字檔案建立一個簡單的 RDD。使用以下命令建立一個簡單的 RDD。
scala> val inputfile = sc.textFile(“input.txt”)
以上命令的輸出為:
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API 引入了一些**轉換**和一些**動作**來操作 RDD。
RDD 轉換
RDD 轉換返回指向新 RDD 的指標,並允許您在 RDD 之間建立依賴關係。依賴鏈(依賴字串)中的每個 RDD 都有一個計算其資料的函式,並有一個指向其父 RDD 的指標(依賴)。
Spark 是惰性的,所以除非您呼叫一些轉換或動作來觸發作業建立和執行,否則不會執行任何操作。請看以下單詞計數示例程式碼片段。
因此,RDD 轉換不是一組資料,而是在程式中的一步(可能是唯一的一步),告訴 Spark 如何獲取資料以及如何處理它。
| 序號 | 轉換及含義 |
|---|---|
| 1 |
map(func) 返回一個新的分散式資料集,該資料集透過函式 **func** 傳遞源的每個元素而形成。 |
| 2 |
filter(func) 返回一個新的資料集,該資料集透過選擇源中 **func** 返回 true 的那些元素而形成。 |
| 3 |
flatMap(func) 類似於 map,但每個輸入項可以對映到 0 個或多個輸出項(因此 *func* 應該返回一個 Seq 而不是單個項)。 |
| 4 |
mapPartitions(func) 類似於 map,但在 RDD 的每個分割槽(塊)上單獨執行,因此當在型別為 T 的 RDD 上執行時,**func** 必須是 Iterator<T> ⇒ Iterator<U> 型別。 |
| 5 |
mapPartitionsWithIndex(func) 類似於 mapPartitions,但還為 **func** 提供一個整數,表示分割槽的索引,因此當在型別為 T 的 RDD 上執行時,**func** 必須是 (Int, Iterator<T>) ⇒ Iterator<U> 型別。 |
| 6 |
sample(withReplacement, fraction, seed) 對資料進行 **fraction** 抽樣,是否放回,使用給定的隨機數生成器種子。 |
| 7 |
union(otherDataset) 返回一個新的資料集,其中包含源資料集和引數中的元素的並集。 |
| 8 |
intersection(otherDataset) 返回一個新的 RDD,其中包含源資料集和引數中元素的交集。 |
| 9 |
distinct([numTasks]) 返回一個新的資料集,其中包含源資料集的唯一元素。 |
| 10 |
groupByKey([numTasks]) 當在一個 (K, V) 對的資料集上呼叫時,返回一個 (K, Iterable<V>) 對的資料集。 **注意** - 如果您是為了對每個鍵進行聚合(例如求和或平均值)而進行分組,則使用 reduceByKey 或 aggregateByKey 將產生更好的效能。 |
| 11 |
reduceByKey(func, [numTasks]) 當在一個 (K, V) 對的資料集上呼叫時,返回一個 (K, V) 對的資料集,其中每個鍵的值使用給定的 reduce 函式 *func* 進行聚合,該函式必須是 (V, V) ⇒ V 型別。與 groupByKey 一樣,reduce 任務的數量可以透過可選的第二個引數進行配置。 |
| 12 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 當在一個 (K, V) 對的資料集上呼叫時,返回一個 (K, U) 對的資料集,其中每個鍵的值使用給定的組合函式和中性“零”值進行聚合。允許聚合的值型別與輸入值型別不同,同時避免不必要的分配。與 groupByKey 一樣,reduce 任務的數量可以透過可選的第二個引數進行配置。 |
| 13 |
sortByKey([ascending], [numTasks]) 當在一個 (K, V) 對的資料集上呼叫時,其中 K 實現 Ordered,返回一個按鍵的升序或降序排序的 (K, V) 對的資料集,如布林型 ascending 引數中指定。 |
| 14 |
join(otherDataset, [numTasks]) 當在型別為 (K, V) 和 (K, W) 的資料集上呼叫時,返回一個 (K, (V, W)) 對的資料集,其中包含每個鍵的所有元素對。透過 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支援外部連線。 |
| 15 |
cogroup(otherDataset, [numTasks]) 當在型別為 (K, V) 和 (K, W) 的資料集上呼叫時,返回一個 (K, (Iterable<V>, Iterable<W>)) 元組的資料集。此操作也稱為 groupWith。 |
| 16 |
cartesian(otherDataset) 當在型別為 T 和 U 的資料集上呼叫時,返回一個 (T, U) 對的資料集(所有元素對)。 |
| 17 |
pipe(command, [envVars]) 透過 shell 命令(例如 Perl 或 bash 指令碼)傳遞 RDD 的每個分割槽。RDD 元素寫入程序的 stdin,並將其 stdout 輸出的行作為字串的 RDD 返回。 |
| 18 |
coalesce(numPartitions) 減少 RDD 中的分割槽數量到 numPartitions。在過濾掉大型資料集後,有助於更有效地執行操作。 |
| 19 |
repartition(numPartitions) 隨機重新調整 RDD 中的資料,以建立更多或更少的分割槽,並在它們之間進行平衡。這總是會在網路上打亂所有資料。 |
| 20 |
repartitionAndSortWithinPartitions(partitioner) 根據給定的分割槽器重新分割槽 RDD,並在每個結果分割槽內按其鍵對記錄進行排序。這比呼叫 repartition 然後在每個分割槽內排序更有效,因為它可以將排序推送到 shuffle 機制中。 |
動作
| 序號 | Action 及含義 |
|---|---|
| 1 |
reduce(func) 使用函式 **func**(它接受兩個引數並返回一個引數)聚合資料集的元素。該函式應該是可交換的和關聯的,以便可以並行正確地計算它。 |
| 2 |
collect() 將資料集的所有元素作為陣列返回到驅動程式程式。這通常在篩選或其他返回足夠小的資料子集的操作之後很有用。 |
| 3 |
count() 返回資料集中元素的數量。 |
| 4 |
first() 返回資料集的第一個元素(類似於 take(1))。 |
| 5 |
take(n) 返回一個數組,其中包含資料集的前 **n** 個元素。 |
| 6 |
takeSample(withReplacement, num, [seed]) 返回一個數組,其中包含資料集的 **num** 個元素的隨機樣本,是否放回,可以選擇預先指定隨機數生成器種子。 |
| 7 |
takeOrdered(n, [ordering]) 使用它們的自然順序或自定義比較器返回 RDD 的前 **n** 個元素。 |
| 8 |
saveAsTextFile(path) 將資料集的元素作為文字檔案(或一組文字檔案)寫入本地檔案系統、HDFS 或任何其他 Hadoop 支援的檔案系統中的給定目錄中。Spark 呼叫 toString 來將每個元素轉換為檔案中的文字行。 |
| 9 |
saveAsSequenceFile(path)(Java 和 Scala) 將資料集的元素作為 Hadoop SequenceFile 寫入本地檔案系統、HDFS 或任何其他 Hadoop 支援的檔案系統中的給定路徑中。這適用於實現 Hadoop 的 Writable 介面的鍵值對的 RDD。在 Scala 中,它也適用於可以隱式轉換為 Writable 的型別(Spark 包含對 Int、Double、String 等基本型別的轉換)。 |
| 10 |
saveAsObjectFile(path)(Java 和 Scala) 使用 Java 序列化以簡單的格式寫入資料集的元素,然後可以使用 SparkContext.objectFile() 載入。 |
| 11 |
countByKey() 僅適用於型別為 (K, V) 的 RDD。返回一個 (K, Int) 對的雜湊對映,其中包含每個鍵的計數。 |
| 12 |
foreach(func) 在資料集的每個元素上執行函式 **func**。這通常是為了產生副作用,例如更新累加器或與外部儲存系統互動。 **注意** - 修改 foreach() 外的累加器以外的變數可能會導致未定義的行為。有關詳細資訊,請參閱理解閉包。 |
使用RDD程式設計
讓我們透過一個例子來看看RDD程式設計中一些RDD轉換和操作的實現。
示例
考慮一個單詞計數示例——它計算文件中出現的每個單詞。將以下文字作為輸入,並將其儲存為input.txt檔案到主目錄。
input.txt - 輸入檔案。
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
按照以下步驟執行給定示例。
開啟Spark-Shell
使用以下命令開啟spark shell。通常,spark是用Scala構建的。因此,Spark程式在Scala環境中執行。
$ spark-shell
如果Spark shell成功開啟,您將看到以下輸出。“Spark context available as sc”的最後一行表示Spark容器自動建立名為sc的Spark context物件。在開始程式的第一步之前,應該建立SparkContext物件。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
建立RDD
首先,我們必須使用Spark-Scala API讀取輸入檔案並建立一個RDD。
以下命令用於從給定位置讀取檔案。這裡,使用inputfile的名稱建立一個新的RDD。在textFile("")方法中作為引數給出的字串是輸入檔名的絕對路徑。但是,如果只給出檔名,則表示輸入檔案位於當前位置。
scala> val inputfile = sc.textFile("input.txt")
執行單詞計數轉換
我們的目標是計算檔案中單詞的數量。建立一個flat map,將每一行分割成單詞 (flatMap(line ⇒ line.split(" ")))。
接下來,使用map函式將每個單詞作為鍵值對,值為'1'(<鍵, 值> = <單詞,1>)(map(word ⇒ (word, 1)))。
最後,透過將相似鍵的值相加來減少這些鍵 (reduceByKey(_+_))。
以下命令用於執行單詞計數邏輯。執行此操作後,您不會發現任何輸出,因為這不是一個action(動作),而是一個transformation(轉換);指向一個新的RDD或告訴spark如何處理給定資料。
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
當前RDD
在使用RDD時,如果想了解當前RDD,請使用以下命令。它將顯示有關當前RDD及其依賴項的資訊,用於除錯。
scala> counts.toDebugString
快取轉換
可以使用persist()或cache()方法標記要持久化的RDD。第一次在action中計算它時,它將儲存在節點的記憶體中。使用以下命令將中間轉換儲存在記憶體中。
scala> counts.cache()
應用Action
應用一個action,例如將所有轉換結果儲存到文字檔案中。saveAsTextFile(" ")方法的字串引數是輸出資料夾的絕對路徑。嘗試以下命令將輸出儲存到文字檔案。在下面的示例中,“output”資料夾位於當前位置。
scala> counts.saveAsTextFile("output")
檢查輸出
開啟另一個終端以轉到主目錄(在另一個終端中執行spark)。使用以下命令檢查輸出目錄。
[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1 part-00000 part-00001 _SUCCESS
以下命令用於檢視Part-00000檔案的輸出。
[hadoop@localhost output]$ cat part-00000
輸出
(people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
以下命令用於檢視Part-00001檔案的輸出。
[hadoop@localhost output]$ cat part-00001
輸出
(walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
取消持久化儲存
在取消持久化之前,如果要檢視此應用程式使用的儲存空間,請在瀏覽器中使用以下URL。
https://:4040
您將看到以下螢幕,其中顯示了正在Spark shell上執行的應用程式使用的儲存空間。
如果要取消持久化特定RDD的儲存空間,請使用以下命令。
Scala> counts.unpersist()
您將看到如下輸出:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
要驗證瀏覽器中的儲存空間,請使用以下URL。
https://:4040/
您將看到以下螢幕。它顯示了正在Spark shell上執行的應用程式使用的儲存空間。
Apache Spark - 部署
使用spark-submit的Spark應用程式是一個shell命令,用於將Spark應用程式部署到叢集。它透過統一介面使用所有相應的叢集管理器。因此,您不必為每個管理器配置應用程式。
示例
讓我們以之前使用的單詞計數為例,使用shell命令。在這裡,我們將相同的示例視為一個Spark應用程式。
示例輸入
以下文字是輸入資料,檔名是in.txt。
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.
檢視以下程式:
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
將上述程式儲存到名為SparkWordCount.scala的檔案中,並將其放在名為spark-application的使用者定義目錄中。
注意 - 在將inputRDD轉換為countRDD時,我們使用flatMap()將文字檔案中的行標記為單詞,使用map()方法計算單詞頻率,使用reduceByKey()方法計算每個單詞的重複次數。
使用以下步驟提交此應用程式。透過終端在spark-application目錄中執行所有步驟。
步驟1:下載Spark Jar
編譯需要Spark core jar,因此,從以下連結下載spark-core_2.10-1.3.0.jar Spark core jar 並將jar檔案從下載目錄移動到spark-application目錄。
步驟2:編譯程式
使用以下命令編譯上述程式。此命令應從spark-application目錄執行。這裡,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是從Spark庫中獲取的Hadoop支援jar。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
步驟3:建立JAR
使用以下命令建立Spark應用程式的jar檔案。這裡,wordcount是jar檔案的檔名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
步驟4:提交Spark應用程式
使用以下命令提交Spark應用程式:
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功執行,您將找到以下輸出。以下輸出中的OK用於使用者識別,它是程式的最後一行。如果您仔細閱讀以下輸出,您會發現不同的內容,例如:
- successfully started service 'sparkDriver' on port 42954
- MemoryStore started with capacity 267.3 MB
- Started SparkUI at http://192.168.1.217:4040
- Added JAR file:/home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
- Stopped Spark web UI at http://192.168.1.217:4040
- MemoryStore cleared
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
步驟5:檢查輸出
程式成功執行後,您將在spark-application目錄中找到名為outfile的目錄。
以下命令用於開啟和檢查outfile目錄中的檔案列表。
$ cd outfile $ ls Part-00000 part-00001 _SUCCESS
檢查part-00000檔案的命令是:
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
檢查part-00001檔案的命令是:
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
閱讀以下部分以瞭解有關“spark-submit”命令的更多資訊。
Spark-submit語法
spark-submit [options] <app jar | python file> [app arguments]
選項
| 序號 | 選項 | 描述 |
|---|---|---|
| 1 | --master | spark://host:port, mesos://host:port, yarn, 或 local。 |
| 2 | --deploy-mode | 是在本地 ("client") 啟動驅動程式,還是在叢集內的某個工作機器上啟動 ("cluster") (預設值:client)。 |
| 3 | --class | 應用程式的主類 (對於 Java/Scala 應用程式)。 |
| 4 | --name | 應用程式的名稱。 |
| 5 | --jars | 要在驅動程式和執行程式類路徑中包含的本地 jar 的逗號分隔列表。 |
| 6 | --packages | 要在驅動程式和執行程式類路徑中包含的 jar 的 maven 座標的逗號分隔列表。 |
| 7 | --repositories | 要搜尋 --packages 給出的 maven 座標的其他遠端儲存庫的逗號分隔列表。 |
| 8 | --py-files | 要在 Python 應用程式的 PYTHON PATH 上放置的 .zip、.egg 或 .py 檔案的逗號分隔列表。 |
| 9 | --files | 要放在每個執行程式的工作目錄中的檔案的逗號分隔列表。 |
| 10 | --conf (prop=val) | 任意的 Spark 配置屬性。 |
| 11 | --properties-file | 要從中載入額外屬性的檔案的路徑。如果未指定,它將查詢 conf/spark-defaults。 |
| 12 | --driver-memory | 驅動程式的記憶體 (例如 1000M、2G) (預設值:512M)。 |
| 13 | --driver-java-options | 要傳遞給驅動程式的額外 Java 選項。 |
| 14 | --driver-library-path | 要傳遞給驅動程式的額外庫路徑條目。 |
| 15 | --driver-class-path | 要傳遞給驅動程式的額外類路徑條目。 請注意,使用 --jars 新增的 jar 會自動包含在類路徑中。 |
| 16 | --executor-memory | 每個執行程式的記憶體 (例如 1000M、2G) (預設值:1G)。 |
| 17 | --proxy-user | 提交應用程式時要模擬的使用者。 |
| 18 | --help, -h | 顯示此幫助訊息並退出。 |
| 19 | --verbose, -v | 列印額外的除錯輸出。 |
| 20 | --version | 列印當前 Spark 的版本。 |
| 21 | --driver-cores NUM | 驅動程式的核心數 (預設值:1)。 |
| 22 | --supervise | 如果給出,則在發生故障時重新啟動驅動程式。 |
| 23 | --kill | 如果給出,則終止指定的驅動程式。 |
| 24 | --status | 如果給出,則請求指定的驅動程式的狀態。 |
| 25 | --total-executor-cores | 所有執行程式的總核心數。 |
| 26 | --executor-cores | 每個執行程式的核心數。(預設值:在 YARN 模式下為 1,或在獨立模式下為工作程式上的所有可用核心)。 |
高階 Spark 程式設計
Spark包含兩種不同型別的共享變數——一個是廣播變數,另一個是累加器。
廣播變數 - 用於高效地分發大值。
累加器 - 用於聚合特定集合的資訊。
廣播變數
廣播變數允許程式設計師將只讀變數快取在每臺機器上,而不是將它的副本與任務一起傳送。例如,它們可以用於以高效的方式為每個節點提供大型輸入資料集的副本。Spark 還嘗試使用高效的廣播演算法來分發廣播變數,以降低通訊成本。
Spark 操作透過一系列階段執行,這些階段由分散式“混洗”操作分隔。Spark 自動廣播每個階段中任務所需公共資料。
以這種方式廣播的資料以序列化形式快取,並在執行每個任務之前反序列化。這意味著顯式建立廣播變數僅在多個階段的任務需要相同資料或在反序列化形式快取資料很重要時才有用。
透過呼叫SparkContext.broadcast(v)從變數v建立廣播變數。廣播變數是v的包裝器,其值可以透過呼叫value方法來訪問。以下程式碼顯示了這一點:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
輸出:
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
建立廣播變數後,應在叢集上執行的任何函式中使用它,而不是使用值v,這樣v就不會多次傳送到節點。此外,在廣播物件v之後,不應修改物件v,以確保所有節點獲得廣播變數的相同值。
累加器
累加器是隻能透過關聯操作“新增”的變數,因此可以在並行環境中有效地支援它們。它們可以用來實現計數器(如在MapReduce中)或求和。Spark 本地支援數值型別的累加器,程式設計師可以新增對新型別的支援。如果累加器是用名稱建立的,它們將顯示在Spark 的 UI中。這對於瞭解正在執行的階段的進度很有用(注意:Python 中尚不支援此功能)。
累加器 (Accumulator) 透過呼叫 **SparkContext.accumulator(v)** 從初始值 **v** 建立。叢集上執行的任務可以使用 **add** 方法或 += 運算子(在 Scala 和 Python 中)向其新增值。但是,它們無法讀取累加器的值。只有驅動程式 (driver program) 可以使用其 **value** 方法讀取累加器的值。
下面的程式碼展示瞭如何使用累加器對陣列元素求和:
scala> val accum = sc.accumulator(0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
如果您想檢視上述程式碼的輸出,請使用以下命令:
scala> accum.value
輸出
res2: Int = 10
數值 RDD 操作
Spark 允許您使用預定義的 API 方法對數值資料執行不同的操作。Spark 的數值操作使用流式演算法實現,允許一次構建一個元素的模型。
這些操作透過呼叫 **status()** 方法計算並作為 **StatusCounter** 物件返回。
| 序號 | 方法及含義 |
|---|---|
| 1 | count() count() |
| 2 | RDD 中元素的數量。 mean() |
| 3 | RDD 中元素的平均值。 sum() |
| 4 | RDD 中元素的總和。 max() |
| 5 | RDD 中所有元素中的最大值。 min() |
| 6 | RDD 中所有元素中的最小值。 variance() |
| 7 | 元素的方差。 stdev() |
標準差。