PySpark 快速指南



PySpark – 簡介

本章我們將瞭解 Apache Spark 是什麼以及 PySpark 是如何開發的。

Spark – 概述

Apache Spark 是一個閃電般快速的即時處理框架。它進行記憶體計算以即時分析資料。它的出現是因為 **Apache Hadoop MapReduce** 僅執行批處理,缺乏即時處理功能。因此,引入了 Apache Spark,因為它可以執行即時流處理,也可以處理批處理。

除了即時和批處理之外,Apache Spark 還支援互動式查詢和迭代演算法。Apache Spark 擁有自己的叢集管理器,可以在其中託管其應用程式。它利用 Apache Hadoop 進行儲存和處理。它使用 **HDFS**(Hadoop 分散式檔案系統)進行儲存,也可以在 **YARN** 上執行 Spark 應用程式。

PySpark – 概述

Apache Spark 使用 **Scala 程式語言**編寫。為了支援 Spark 使用 Python,Apache Spark 社群釋出了一個工具 PySpark。使用 PySpark,你也可以在 Python 程式語言中使用 **RDD**。這是因為一個名為 **Py4j** 的庫,它們才能實現這一點。

PySpark 提供了 **PySpark Shell**,它將 Python API 連結到 Spark Core 並初始化 Spark Context。今天,大多數資料科學家和分析專家都使用 Python,因為它擁有豐富的庫集。將 Python 與 Spark 整合對他們來說是一個福音。

PySpark - 環境搭建

本章我們將瞭解 PySpark 的環境搭建。

**注意** − 這是假設你的計算機上已安裝 Java 和 Scala。

現在讓我們按照以下步驟下載並設定 PySpark。

**步驟 1** − 前往 Apache Spark 官方 下載 頁面並下載最新的 Apache Spark 版本。在本教程中,我們使用的是 **spark-2.1.0-bin-hadoop2.7**。

**步驟 2** − 現在,解壓下載的 Spark tar 檔案。預設情況下,它將下載到 Downloads 目錄。

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

它將建立一個目錄 **spark-2.1.0-bin-hadoop2.7**。在啟動 PySpark 之前,你需要設定以下環境來設定 Spark 路徑和 **Py4j 路徑**。

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

或者,要全域性設定上述環境,請將它們放入 **.bashrc 檔案**中。然後執行以下命令使環境生效。

# source .bashrc

現在我們已經設定了所有環境,讓我們進入 Spark 目錄並透過執行以下命令呼叫 PySpark shell:

# ./bin/pyspark

這將啟動你的 PySpark shell。

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

PySpark - SparkContext

SparkContext 是任何 Spark 功能的入口點。當我們執行任何 Spark 應用程式時,會啟動一個驅動程式,其中包含主函式,你的 SparkContext 在此處啟動。然後,驅動程式在工作節點上的執行程式上執行操作。

SparkContext 使用 Py4J 啟動一個 **JVM** 並建立一個 **JavaSparkContext**。預設情況下,PySpark 提供了名為 **‘sc’** 的 SparkContext,因此建立新的 SparkContext 將不起作用。

SparkContext

以下程式碼塊包含 PySpark 類和 SparkContext 可以接受的引數的詳細資訊。

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

引數

以下是 SparkContext 的引數。

  • **Master** − 它是要連線到的叢集的 URL。

  • **appName** − 你的作業的名稱。

  • **sparkHome** − Spark 安裝目錄。

  • **pyFiles** − 要傳送到叢集並新增到 PYTHONPATH 的 .zip 或 .py 檔案。

  • **Environment** − 工作節點的環境變數。

  • **batchSize** − 表示單個 Java 物件的 Python 物件的數量。設定為 1 可停用批處理,設定為 0 可根據物件大小自動選擇批處理大小,設定為 -1 可使用無限批處理大小。

  • **Serializer** − RDD 序列化器。

  • **Conf** − 一個 L{SparkConf} 物件,用於設定所有 Spark 屬性。

  • **Gateway** − 使用現有閘道器和 JVM,否則初始化新的 JVM。

  • **JSC** − JavaSparkContext 例項。

  • **profiler_cls** − 用於執行分析的自定義 Profiler 類(預設為 pyspark.profiler.BasicProfiler)。

在上述引數中,**master** 和 **appname** 最常用。任何 PySpark 程式的前兩行如下所示:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext 示例 – PySpark Shell

現在你已經足夠了解 SparkContext,讓我們在 PySpark shell 上執行一個簡單的示例。在這個示例中,我們將計算 **README.md** 檔案中包含字元 'a' 或 'b' 的行數。例如,如果檔案中共有 5 行,其中 3 行包含字元 'a',則輸出將為 → **包含 a 的行:3**。字元 ‘b’ 也將進行同樣的操作。

**注意** − 在下面的示例中,我們沒有建立任何 SparkContext 物件,因為預設情況下,當 PySpark shell 啟動時,Spark 會自動建立名為 sc 的 SparkContext 物件。如果你嘗試建立另一個 SparkContext 物件,你將收到以下錯誤 – **“ValueError: Cannot run multiple SparkContexts at once”。**

PySpark Shell

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext 示例 - Python 程式

讓我們使用 Python 程式執行相同的示例。建立一個名為 **firstapp.py** 的 Python 檔案,並在該檔案中輸入以下程式碼。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

然後,我們將在終端中執行以下命令來執行此 Python 檔案。我們將獲得與上面相同的輸出。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

PySpark - RDD

現在我們已經在系統上安裝並配置了 PySpark,我們可以在 Apache Spark 上使用 Python 進行程式設計。但是,在此之前,讓我們瞭解 Spark 中的一個基本概念 - RDD。

RDD 代表 **彈性分散式資料集**,這些是在多個節點上執行和操作以對叢集進行並行處理的元素。RDD 是不可變的元素,這意味著一旦你建立了 RDD,就無法更改它。RDD 也是容錯的,因此在發生任何故障的情況下,它們會自動恢復。你可以在這些 RDD 上應用多個操作來完成特定任務。

要對這些 RDD 應用操作,有兩種方法:

  • 轉換和
  • 行動

讓我們詳細瞭解這兩種方法。

**轉換** − 這些是在 RDD 上應用以建立新 RDD 的操作。Filter、groupBy 和 map 是轉換的示例。

**行動** − 這些是在 RDD 上應用的操作,它指示 Spark 執行計算並將結果傳送回驅動程式。

要在 PySpark 中應用任何操作,我們首先需要建立一個 **PySpark RDD**。以下程式碼塊包含 PySpark RDD 類的詳細資訊:

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

讓我們看看如何使用 PySpark 執行一些基本操作。Python 檔案中的以下程式碼建立了 RDD words,它儲存一組提到的單詞。

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

現在,我們將對 words 執行一些操作。

count()

返回 RDD 中的元素數量。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

**命令** − count() 的命令為:

$SPARK_HOME/bin/spark-submit count.py

**輸出** − 上述命令的輸出為:

Number of elements in RDD → 8

collect()

返回 RDD 中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

**命令** − collect() 的命令為:

$SPARK_HOME/bin/spark-submit collect.py

**輸出** − 上述命令的輸出為:

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach(f)

僅返回滿足 foreach 內函式條件的元素。在下面的示例中,我們在 foreach 中呼叫列印函式,該函式列印 RDD 中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

**命令** − foreach(f) 的命令為:

$SPARK_HOME/bin/spark-submit foreach.py

**輸出** − 上述命令的輸出為:

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回一個新的 RDD,其中包含滿足 filter 內函式的元素。在下面的示例中,我們過濾掉包含“spark”的字串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

**命令** − filter(f) 的命令為:

$SPARK_HOME/bin/spark-submit filter.py

**輸出** − 上述命令的輸出為:

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

透過對 RDD 中的每個元素應用函式來返回一個新的 RDD。在下面的示例中,我們形成一個鍵值對並將每個字串對映到值為 1。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

**命令** − map(f, preservesPartitioning=False) 的命令為:

$SPARK_HOME/bin/spark-submit map.py

**輸出** − 上述命令的輸出為:

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reduce(f)

執行指定的交換和關聯二元運算後,返回 RDD 中的元素。在下面的示例中,我們從 operator 匯入 add 包並將其應用於 'num' 以執行簡單的加法運算。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

**命令** − reduce(f) 的命令為:

$SPARK_HOME/bin/spark-submit reduce.py

**輸出** − 上述命令的輸出為:

Adding all the elements -> 15

join(other, numPartitions = None)

它返回一個 RDD,其中包含具有匹配鍵的元素對以及該特定鍵的所有值。在下面的示例中,兩個不同的 RDD 中有兩個元素對。連線這兩個 RDD 後,我們得到一個 RDD,其中包含具有匹配鍵及其值的元素。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

**命令** − join(other, numPartitions = None) 的命令為:

$SPARK_HOME/bin/spark-submit join.py

**輸出** − 上述命令的輸出為:

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache()

使用預設儲存級別 (MEMORY_ONLY) 持久化此 RDD。你還可以檢查 RDD 是否已快取。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

**命令** − cache() 的命令為:

$SPARK_HOME/bin/spark-submit cache.py

**輸出** − 上述程式的輸出為:

Words got cached -> True

這些是在 PySpark RDD 上執行的一些最重要的操作。

PySpark - 廣播變數 & 累加器

對於並行處理,Apache Spark 使用共享變數。當驅動程式將任務傳送到叢集上的執行程式時,共享變數的副本會進入叢集的每個節點,以便它可以用於執行任務。

Apache Spark 支援兩種型別的共享變數:

  • 廣播變數
  • 累加器

讓我們詳細瞭解它們。

廣播變數

廣播變數用於在所有節點上儲存資料的副本。此變數快取在所有機器上,不會發送到具有任務的機器上。以下程式碼塊包含 PySpark 廣播類的詳細資訊。

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

以下示例顯示瞭如何使用廣播變數。廣播變數具有一個名為 value 的屬性,該屬性儲存資料並用於返回廣播的值。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

**命令** − 廣播變數的命令如下:

$SPARK_HOME/bin/spark-submit broadcast.py

**輸出** − 以下命令的輸出如下所示。

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

累加器

累加器變數用於透過關聯和交換運算聚合資訊。例如,你可以將累加器用於求和運算或計數器(在 MapReduce 中)。以下程式碼塊包含 PySpark 累加器類的詳細資訊。

class pyspark.Accumulator(aid, value, accum_param)

以下示例顯示瞭如何使用累加器變數。累加器變數具有一個名為 value 的屬性,類似於廣播變數。它儲存資料並用於返回累加器的值,但僅可在驅動程式程式中使用。

在此示例中,累加器變數由多個工作程式使用並返回累積值。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

**命令** − 累加器變數的命令如下:

$SPARK_HOME/bin/spark-submit accumulator.py

**輸出** − 上述命令的輸出如下所示。

Accumulated value is -> 150

PySpark - SparkConf

要在本地/叢集上執行 Spark 應用程式,你需要設定一些配置和引數,這就是 SparkConf 提供幫助的地方。它提供執行 Spark 應用程式的配置。以下程式碼塊包含 PySpark SparkConf 類的詳細資訊。

class pyspark.SparkConf (
   loadDefaults = True, 
   _jvm = None, 
   _jconf = None
)

首先,我們將使用 SparkConf() 建立一個 SparkConf 物件,它還將載入來自 **spark.*** Java 系統屬性的值。現在你可以使用 SparkConf 物件設定不同的引數,它們的優先順序將高於系統屬性。

在 SparkConf 類中,存在支援鏈式呼叫的 setter 方法。例如,您可以編寫 **conf.setAppName("PySpark App").setMaster("local")**。一旦我們將 SparkConf 物件傳遞給 Apache Spark,任何使用者都無法修改它。

以下是 SparkConf 的一些最常用的屬性:

  • **set(key, value)** - 設定配置屬性。

  • **setMaster(value)** - 設定主 URL。

  • **setAppName(value)** - 設定應用程式名稱。

  • **get(key, defaultValue=None)** - 獲取鍵的配置值。

  • **setSparkHome(value)** - 設定工作節點上的 Spark 安裝路徑。

讓我們考慮一下在 PySpark 程式中使用 SparkConf 的以下示例。在這個示例中,我們將 spark 應用程式名稱設定為 **PySpark App**,並將 spark 應用程式的主 URL 設定為 → **spark://master:7077**。

以下程式碼塊包含這些行,當它們新增到 Python 檔案中時,它將設定執行 PySpark 應用程式的基本配置。

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

PySpark - SparkFiles

在 Apache Spark 中,您可以使用 **sc.addFile**(sc 是您的預設 SparkContext)上傳檔案,並使用 **SparkFiles.get** 獲取工作節點上的路徑。因此,SparkFiles 將路徑解析為透過 **SparkContext.addFile()** 新增的檔案。

SparkFiles 包含以下類方法:

  • get(filename)
  • getrootdirectory()

讓我們詳細瞭解它們。

get(filename)

它指定透過 SparkContext.addFile() 新增的檔案的路徑。

getrootdirectory()

它指定包含透過 SparkContext.addFile() 新增的檔案的根目錄的路徑。

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

**命令** - 命令如下:

$SPARK_HOME/bin/spark-submit sparkfiles.py

**輸出** − 上述命令的輸出為:

Absolute Path -> 
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

PySpark - StorageLevel

StorageLevel 決定了 RDD 如何儲存。在 Apache Spark 中,StorageLevel 決定 RDD 是否應儲存在記憶體中,還是應儲存在磁碟上,或者兩者兼而有之。它還決定是否序列化 RDD 以及是否複製 RDD 分割槽。

以下程式碼塊包含 StorageLevel 的類定義:

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

現在,要決定 RDD 的儲存方式,有不同的儲存級別,如下所示:

  • **DISK_ONLY** = StorageLevel(True, False, False, False, 1)

  • **DISK_ONLY_2** = StorageLevel(True, False, False, False, 2)

  • **MEMORY_AND_DISK** = StorageLevel(True, True, False, False, 1)

  • **MEMORY_AND_DISK_2** = StorageLevel(True, True, False, False, 2)

  • **MEMORY_AND_DISK_SER** = StorageLevel(True, True, False, False, 1)

  • **MEMORY_AND_DISK_SER_2** = StorageLevel(True, True, False, False, 2)

  • **MEMORY_ONLY** = StorageLevel(False, True, False, False, 1)

  • **MEMORY_ONLY_2** = StorageLevel(False, True, False, False, 2)

  • **MEMORY_ONLY_SER** = StorageLevel(False, True, False, False, 1)

  • **MEMORY_ONLY_SER_2** = StorageLevel(False, True, False, False, 2)

  • **OFF_HEAP** = StorageLevel(True, True, True, False, 1)

讓我們考慮一下 StorageLevel 的以下示例,其中我們使用儲存級別 **MEMORY_AND_DISK_2**,這意味著 RDD 分割槽將具有 2 個副本。

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local", 
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

**命令** - 命令如下:

$SPARK_HOME/bin/spark-submit storagelevel.py

**輸出** - 以上命令的輸出如下所示:

Disk Memory Serialized 2x Replicated

PySpark - MLlib

Apache Spark 提供了一個名為 **MLlib** 的機器學習 API。PySpark 也在 Python 中提供了這個機器學習 API。它支援不同型別的演算法,如下所述:

  • **mllib.classification** - **spark.mllib** 包支援各種二元分類、多類分類和迴歸分析方法。分類中一些最流行的演算法是 **隨機森林、樸素貝葉斯、決策樹** 等。

  • **mllib.clustering** - 聚類是一個無監督學習問題,您旨在根據某種相似性概念將實體的子集相互分組。

  • **mllib.fpm** - 頻繁模式匹配是挖掘頻繁項、項集、子序列或其他子結構,這些通常是分析大規模資料集的第一步。這多年來一直是資料探勘領域的一個活躍研究課題。

  • **mllib.linalg** - MLlib 用於線性代數的實用程式。

  • **mllib.recommendation** - 協同過濾通常用於推薦系統。這些技術旨在填充使用者專案關聯矩陣中的缺失條目。

  • **spark.mllib** - 它目前支援基於模型的協同過濾,其中使用者和產品由一小組潛在因素來描述,這些因素可用於預測缺失的條目。spark.mllib 使用交替最小二乘法 (ALS) 演算法來學習這些潛在因素。

  • **mllib.regression** - 線性迴歸屬於迴歸演算法家族。迴歸的目標是找到變數之間的關係和依賴性。用於處理線性迴歸模型和模型摘要的介面與邏輯迴歸的情況類似。

mllib 包中還有其他演算法、類和函式。目前,讓我們瞭解一下 **pyspark.mllib** 的演示。

以下示例使用 ALS 演算法進行協同過濾,以構建推薦模型並在訓練資料上對其進行評估。

**使用的資料集** - test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0

--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

**命令** - 命令如下:

$SPARK_HOME/bin/spark-submit recommend.py

**輸出** - 以上命令的輸出將為:

Mean Squared Error = 1.20536041839e-05

PySpark - 序列化器

序列化用於 Apache Spark 的效能調優。所有透過網路傳送、寫入磁碟或持久化到記憶體中的資料都應被序列化。序列化在代價高昂的操作中扮演著重要角色。

PySpark 支援自定義序列化器以進行效能調整。PySpark 支援以下兩個序列化器:

MarshalSerializer

使用 Python 的 Marshal 序列化器序列化物件。此序列化器比 PickleSerializer 更快,但支援的資料型別更少。

class pyspark.MarshalSerializer

PickleSerializer

使用 Python 的 Pickle 序列化器序列化物件。此序列化器幾乎支援任何 Python 物件,但可能不如更專業的序列化器快。

class pyspark.PickleSerializer

讓我們看看 PySpark 序列化的一個示例。在這裡,我們使用 MarshalSerializer 序列化資料。

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

**命令** - 命令如下:

$SPARK_HOME/bin/spark-submit serializing.py

**輸出** − 上述命令的輸出為:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
廣告