PySpark - RDD



現在我們已經在系統上安裝並配置了 PySpark,我們就可以在 Apache Spark 上使用 Python 進行程式設計了。但在開始之前,讓我們先了解 Spark 中一個基本的概念 - RDD。

RDD 代表 **彈性分散式資料集**,它們是在多個節點上執行和操作的元素,用於在叢集上進行並行處理。RDD 是不可變的元素,這意味著一旦建立了 RDD 就無法更改它。RDD 也是容錯的,因此在發生任何故障時,它們會自動恢復。您可以對這些 RDD 應用多個操作來完成特定任務。

要對這些 RDD 應用操作,有兩種方法:

  • 轉換和
  • 行動

讓我們詳細瞭解這兩種方法。

**轉換** - 這些是在 RDD 上應用的操作,用於建立新的 RDD。Filter、groupBy 和 map 是轉換的示例。

**行動** - 這些是在 RDD 上應用的操作,指示 Spark 執行計算並將結果傳送回驅動程式。

要在 PySpark 中應用任何操作,我們首先需要建立一個 **PySpark RDD**。以下程式碼塊詳細介紹了 PySpark RDD 類:

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

讓我們看看如何使用 PySpark 執行一些基本操作。以下 Python 檔案中的程式碼建立了 RDD words,它儲存了一組提到的單詞。

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

我們現在將在 words 上執行一些操作。

count()

返回 RDD 中元素的數量。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

**命令** - count() 的命令如下:

$SPARK_HOME/bin/spark-submit count.py

**輸出** - 上述命令的輸出為:

Number of elements in RDD → 8

collect()

返回 RDD 中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

**命令** - collect() 的命令如下:

$SPARK_HOME/bin/spark-submit collect.py

**輸出** - 上述命令的輸出為:

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach(f)

僅返回滿足 foreach 內部函式條件的元素。在以下示例中,我們在 foreach 中呼叫 print 函式,該函式列印 RDD 中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

**命令** - foreach(f) 的命令如下:

$SPARK_HOME/bin/spark-submit foreach.py

**輸出** - 上述命令的輸出為:

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回一個新的 RDD,其中包含滿足 filter 內部函式的元素。在以下示例中,我們過濾掉包含“spark”的字串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

**命令** - filter(f) 的命令如下:

$SPARK_HOME/bin/spark-submit filter.py

**輸出** - 上述命令的輸出為:

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

透過將函式應用於 RDD 中的每個元素來返回一個新的 RDD。在以下示例中,我們形成了一個鍵值對,並將每個字串對映到值為 1。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

**命令** - map(f, preservesPartitioning=False) 的命令如下:

$SPARK_HOME/bin/spark-submit map.py

**輸出** - 上述命令的輸出為:

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reduce(f)

在執行指定的交換和關聯二元運算後,返回 RDD 中的元素。在以下示例中,我們從 operator 中匯入 add 包,並將其應用於 'num' 以執行簡單的加法運算。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

**命令** - reduce(f) 的命令如下:

$SPARK_HOME/bin/spark-submit reduce.py

**輸出** - 上述命令的輸出為:

Adding all the elements -> 15

join(other, numPartitions = None)

它返回一個 RDD,其中包含具有匹配鍵的元素對以及該特定鍵的所有值。在以下示例中,兩個不同的 RDD 中有兩個元素對。連線這兩個 RDD 後,我們得到一個 RDD,其中包含具有匹配鍵及其值的元素。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

**命令** - join(other, numPartitions = None) 的命令如下:

$SPARK_HOME/bin/spark-submit join.py

**輸出** - 上述命令的輸出為:

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache()

使用預設儲存級別 (MEMORY_ONLY) 持久化此 RDD。您還可以檢查 RDD 是否已快取。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

**命令** - cache() 的命令如下:

$SPARK_HOME/bin/spark-submit cache.py

**輸出** - 上述程式的輸出為:

Words got cached -> True

這些是在 PySpark RDD 上執行的一些最重要的操作。

廣告