高階 Spark 程式設計



Spark 包含兩種不同型別的共享變數——一種是**廣播變數**,另一種是**累加器**。

  • **廣播變數** - 用於高效地分發大型值。

  • **累加器** - 用於聚合特定集合的資訊。

廣播變數

廣播變數允許程式設計師將只讀變數快取在每臺機器上,而不是隨任務一起傳送它的副本。例如,它們可以用於以有效的方式為每個節點提供大型輸入資料集的副本。Spark 還嘗試使用高效的廣播演算法來分發廣播變數,以降低通訊成本。

Spark 操作透過一系列階段執行,這些階段由分散式“洗牌”操作分隔。Spark 自動廣播每個階段內任務所需的公共資料。

以這種方式廣播的資料以序列化形式快取,並在執行每個任務之前進行反序列化。這意味著,僅當跨多個階段的任務需要相同的資料或當以反序列化形式快取資料很重要時,顯式建立廣播變數才有用。

透過呼叫**SparkContext.broadcast(v)**從變數**v**建立廣播變數。廣播變數是**v**的包裝器,其值可以透過呼叫**value**方法來訪問。下面給出的程式碼顯示了這一點 -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

**輸出** -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

建立廣播變數後,應在叢集上執行的任何函式中使用它,而不是使用值**v**,以確保**v**不會多次傳送到節點。此外,在廣播後不應修改物件**v**,以確保所有節點獲得廣播變數的相同值。

累加器

累加器是僅透過關聯操作“新增”的變數,因此可以在並行環境中得到有效支援。它們可以用於實現計數器(如 MapReduce 中)或總和。Spark 原生支援數值型別的累加器,程式設計師可以新增對新型別的支援。如果使用名稱建立累加器,它們將顯示在**Spark 的 UI** 中。這對於瞭解正在執行的階段的進度很有用(注意 - Python 中尚不支援此功能)。

透過呼叫**SparkContext.accumulator(v)**從初始值**v**建立累加器。然後,在叢集上執行的任務可以使用**add**方法或+=運算子(在 Scala 和 Python 中)新增到它。但是,它們無法讀取其值。只有驅動程式程式可以使用其**value**方法讀取累加器的值。

下面給出的程式碼顯示了一個用於將陣列元素加起來的累加器 -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

如果要檢視上述程式碼的輸出,請使用以下命令 -

scala> accum.value 

輸出

res2: Int = 10 

數值 RDD 操作

Spark 允許您使用預定義的 API 方法之一對數值資料執行不同的操作。Spark 的數值操作使用流演算法實現,該演算法允許一次構建一個元素的模型。

這些操作透過呼叫**status()**方法計算並作為**StatusCounter**物件返回。

以下是**StatusCounter**中可用的數值方法列表。

序號 方法及含義
1

count()

RDD 中元素的數量。

2

Mean()

RDD 中元素的平均值。

3

Sum()

RDD 中元素的總值。

4

Max()

RDD 中所有元素中的最大值。

5

Min()

RDD 中所有元素中的最小值。

6

Variance()

元素的方差。

7

Stdev()

標準差。

如果只想使用其中一種方法,可以直接在 RDD 上呼叫相應的方法。

廣告

© . All rights reserved.