PySpark - SparkContext



SparkContext 是任何 Spark 功能的入口點。當我們執行任何 Spark 應用程式時,會啟動一個驅動程式,其中包含主函式,並且在此處初始化 SparkContext。然後,驅動程式在工作節點上的執行器上執行操作。

SparkContext 使用 Py4J 啟動一個JVM並建立一個JavaSparkContext。預設情況下,PySpark 提供了名為‘sc’的 SparkContext,因此建立新的 SparkContext 將不起作用。

SparkContext

以下程式碼塊包含 PySpark 類和 SparkContext 可以接受的引數的詳細資訊。

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

引數

以下是 SparkContext 的引數。

  • Master − 它是要連線到的叢集的 URL。

  • appName − 作業的名稱。

  • sparkHome − Spark 安裝目錄。

  • pyFiles − 要傳送到叢集並新增到 PYTHONPATH 的 .zip 或 .py 檔案。

  • Environment − 工作節點的環境變數。

  • batchSize − 表示為單個 Java 物件的 Python 物件數量。設定為 1 可停用批處理,設定為 0 可根據物件大小自動選擇批處理大小,設定為 -1 可使用無限批處理大小。

  • Serializer − RDD 序列化器。

  • Conf − L{SparkConf} 物件,用於設定所有 Spark 屬性。

  • Gateway − 使用現有的閘道器和 JVM,否則初始化新的 JVM。

  • JSC − JavaSparkContext 例項。

  • profiler_cls − 用於執行分析的自定義 Profiler 類(預設為 pyspark.profiler.BasicProfiler)。

在以上引數中,masterappname 最常使用。任何 PySpark 程式的前兩行如下所示:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext 示例 – PySpark Shell

現在您已經瞭解了足夠的 SparkContext 知識,讓我們在 PySpark shell 上執行一個簡單的示例。在這個示例中,我們將計算README.md檔案中包含字元“a”或“b”的行數。例如,如果檔案中共有 5 行,其中 3 行包含字元“a”,則輸出將為 → 包含 a 的行數:3。字元“b”的處理方式相同。

注意 − 在以下示例中,我們沒有建立任何 SparkContext 物件,因為預設情況下,當 PySpark shell 啟動時,Spark 會自動建立名為 sc 的 SparkContext 物件。如果您嘗試建立另一個 SparkContext 物件,將會收到以下錯誤 – “ValueError: Cannot run multiple SparkContexts at once”。

PySpark Shell

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext 示例 - Python 程式

讓我們使用 Python 程式執行相同的示例。建立一個名為firstapp.py的 Python 檔案,並在該檔案中輸入以下程式碼。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

然後,我們將在終端中執行以下命令來執行此 Python 檔案。我們將得到與上面相同的輸出。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30
廣告