
- Apache Kafka 教程
- Apache Kafka - 首頁
- Apache Kafka - 簡介
- Apache Kafka - 基礎知識
- Apache Kafka - 叢集架構
- Apache Kafka - 工作流程
- Apache Kafka - 安裝步驟
- Apache Kafka - 基本操作
- 簡單的生產者示例
- 消費者組示例
- 與Storm整合
- 與Spark整合
- 即時應用(Twitter)
- Apache Kafka - 工具
- Apache Kafka - 應用
- Apache Kafka 有用資源
- Apache Kafka - 快速指南
- Apache Kafka - 有用資源
- Apache Kafka - 討論
Apache Kafka - 與Spark整合
本章將討論如何將Apache Kafka與Spark Streaming API整合。
關於Spark
Spark Streaming API 能夠對即時資料流進行可擴充套件、高吞吐量、容錯的流處理。資料可以從許多來源匯入,例如Kafka、Flume、Twitter等,並可以使用複雜的演算法(例如map、reduce、join和window等高階函式)進行處理。最後,處理後的資料可以推送到檔案系統、資料庫和即時儀表板。彈性分散式資料集 (RDD) 是Spark的基本資料結構。它是一個不可變的分散式物件集合。RDD中的每個資料集都劃分為邏輯分割槽,這些分割槽可以在叢集的不同節點上計算。
與Spark整合
Kafka是一個潛在的用於Spark流處理的訊息傳遞和整合平臺。Kafka充當即時資料流的中心樞紐,並使用Spark Streaming中的複雜演算法進行處理。資料處理完成後,Spark Streaming可以將結果釋出到另一個Kafka主題,或者儲存在HDFS、資料庫或儀表板中。下圖描述了概念流程。

現在,讓我們詳細瞭解Kafka-Spark API。
SparkConf API
它表示Spark應用程式的配置。用於將各種Spark引數設定為鍵值對。
SparkConf
類具有以下方法:
set(string key, string value) − 設定配置變數。
remove(string key) − 從配置中刪除鍵。
setAppName(string name) − 為您的應用程式設定應用程式名稱。
get(string key) − 獲取鍵
StreamingContext API
這是Spark功能的主要入口點。SparkContext表示與Spark叢集的連線,可用於在叢集上建立RDD、累加器和廣播變數。簽名定義如下所示。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
master − 要連線到的叢集URL(例如mesos://host:port,spark://host:port,local[4])。
appName − 您的作業的名稱,用於在叢集Web UI上顯示
batchDuration − 流資料將被分成批次的時間間隔
public StreamingContext(SparkConf conf, Duration batchDuration)
透過提供新的SparkContext所需的配置來建立StreamingContext。
conf − Spark引數
batchDuration − 流資料將被分成批次的時間間隔
KafkaUtils API
KafkaUtils API 用於將Kafka叢集連線到Spark Streaming。此API具有重要的createStream
方法,簽名如下所示。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上述方法用於建立一個輸入流,該輸入流從Kafka Broker拉取訊息。
ssc − StreamingContext物件。
zkQuorum − Zookeeper叢集地址。
groupId − 此消費者的組ID。
topics − 返回要消費的主題對映。
storageLevel − 用於儲存接收到的物件的儲存級別。
KafkaUtils API 還有另一個方法createDirectStream,它用於建立一個輸入流,該輸入流直接從Kafka Broker拉取訊息,而無需使用任何接收器。此流可以保證Kafka中的每條訊息都只包含在轉換中一次。
示例應用程式是用Scala編寫的。要編譯應用程式,請下載並安裝sbt
,Scala構建工具(類似於maven)。主要應用程式程式碼如下所示。
import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
構建指令碼
spark-kafka整合依賴於spark、spark streaming和spark Kafka整合jar包。建立一個新檔案build.sbt
並指定應用程式詳細資訊及其依賴項。sbt
將在編譯和打包應用程式時下載必要的jar包。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
編譯/打包
執行以下命令來編譯和打包應用程式的jar檔案。我們需要將jar檔案提交到spark控制檯才能執行應用程式。
sbt package
提交到Spark
啟動Kafka Producer CLI(在上一章中解釋),建立一個名為my-first-topic
的新主題,並提供一些示例訊息,如下所示。
Another spark test message
執行以下命令將應用程式提交到spark控制檯。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
此應用程式的示例輸出如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message ..