Spark 中的 RDD 共享變數


RDD的全稱是彈性分散式資料集。Spark 的效能基於這個容錯的資料集,使其能夠有效地處理各種大資料處理任務,包括 MapReduce、流處理、SQL、機器學習、圖計算等。

Spark 支援多種程式語言,包括 Scala、Python 和 R。RDD 也支援這些語言的持久化。

如何建立 RDD

Spark 在許多地方支援 RDD 架構,包括本地檔案系統、HDFS 檔案系統、記憶體和 HBase。

對於本地檔案系統,我們可以透過以下方式建立 RDD:

val distFile = sc.textFile("file:///user/root/rddData.txt")

預設情況下,Spark 從 HDFS 檔案系統獲取資料。因此,在 HDFS 檔案系統中建立 RDD 的方法如下:

val distFile = sc.textFile("/user/root/rddData.txt")

使用者也可以透過以下方式指定 HDFS URL:

val distFile = sc.textFile("hdfs://:4440/user/rddData.txt")

RDD 共享變數

如果任何函式在 Spark 中傳播到執行函式,它都會應用於叢集節點。Spark 使用計算中使用的每個變數的不同副本。這些更改會複製到每臺機器,並且不會將對遠端裝置的動態更新恢復到驅動程式系統。

如果遠端節點執行 Spark 的執行函式來工作,系統會將所有函式變數複製到該節點。如果這些變數在其他節點上更新,系統將不會更新當前節點變數,直到它被恢復到驅動程式系統。通常,在所有活動中靈活的讀寫能力效果不佳。Spark 使用兩種型別的共享變數:

累加器

類似於 C 語言中的全域性變數,Spark 支援多個函式更新同一個變數。這允許多個任務順序地更新同一個變數。

建立累加器時,可以使用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 建立長整型和雙精度型兩種型別的累加器。任務可以使用 add 方法向累加器新增值。執行器無法讀取累加器變數的值,只有驅動程式才能讀取其最終值。

scala> val accum = sc.longAccumulator("Accumulator Data")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Accumulator Data), value: 0)
scala> sc.parallelize(Array(6, 7, 8, 9)).foreach(x => accum.add(x))
22/02/09 01:37:51 INFO SparkContext: Tasks finished in 0.274529s

廣播變數

廣播變數允許開發者在每個位置儲存一個只讀的只讀副本,而無需將其複製到每個任務。這使得系統可以有效地將大型資料集複製到每個節點。Spark 透過使用廣播變數來減少網路傳輸成本。

只有當任務有多個階段,需要相同的資料,或者快取的資料被反序列化時,廣播變數才有效。要建立廣播變數,可以使用以下命令:

scala> val broadcastVar = sc.broadcast(Array(6, 7, 8))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(6, 7, 8)

找到值欄位中的廣播變數。顧名思義,廣播變數是單向從驅動程式傳送到任務的。系統無法更新廣播變數,驅動程式也無法更新它們。確保所有節點都接收相同的資料。

呼叫 unpersist() 方法釋放廣播變數使用的資源。如果應用程式再次使用該變數,系統會重新建立它。如果要永久刪除廣播變數,可以呼叫 destroy()。

結論

因此,在本文中,我們解釋了 Spark 中的 RDD 共享變數。廣播變數用於只讀資料,可以在每個位置的第一次使用之前複製,然後用於進一步計算。

然後,我們瞭解了累加器如何幫助管理共享狀態。希望透過這篇文章,您已經理解了共享變數的概念。

更新於:2022年8月25日

瀏覽量:513

開啟您的職業生涯

完成課程獲得認證

開始學習
廣告