- Apache Kafka 教程
- Apache Kafka - 首頁
- Apache Kafka - 簡介
- Apache Kafka - 基礎知識
- Apache Kafka - 叢集架構
- Apache Kafka - 工作流程
- Apache Kafka - 安裝步驟
- Apache Kafka - 基本操作
- 簡單生產者示例
- 消費者組示例
- 與 Storm 整合
- 與 Spark 整合
- 即時應用(Twitter)
- Apache Kafka - 工具
- Apache Kafka - 應用
- Apache Kafka 有用資源
- Apache Kafka - 快速指南
- Apache Kafka - 有用資源
- Apache Kafka - 討論
Apache Kafka - 簡單生產者示例
讓我們建立一個應用程式,使用 Java 客戶端釋出和消費訊息。Kafka 生產者客戶端包含以下 API。
KafkaProducer API
讓我們在本節中瞭解 Kafka 生產者 API 中最重要的部分。KafkaProducer API 的核心部分是 KafkaProducer
類。KafkaProducer 類提供了一個選項,可以在其建構函式中連線 Kafka 代理,並使用以下方法。
KafkaProducer 類提供 send 方法以非同步方式將訊息傳送到主題。send() 的簽名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - 生產者管理一個等待發送的記錄緩衝區。
Callback - 使用者提供的回撥函式,在伺服器確認記錄後執行(null 表示沒有回撥)。
KafkaProducer 類提供了一個 flush 方法,以確保所有先前傳送的訊息都已實際完成。flush 方法的語法如下 -
public void flush()
KafkaProducer 類提供 partitionFor 方法,該方法有助於獲取給定主題的分割槽元資料。這可用於自定義分割槽。此方法的簽名如下 -
public Map metrics()
它返回生產者維護的內部指標對映。
public void close() - KafkaProducer 類提供 close 方法,該方法會阻塞,直到所有先前傳送的請求都已完成。
生產者 API
生產者 API 的核心部分是 Producer
類。Producer 類提供了一個選項,可以透過以下方法在其建構函式中連線 Kafka 代理。
生產者類
生產者類提供 send 方法來傳送訊息到單個或多個主題,使用以下簽名。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop);
生產者有兩種型別:同步和非同步。
相同的 API 配置也適用於 Sync
生產者。它們之間的區別在於同步生產者直接傳送訊息,而非同步生產者在後臺傳送訊息。當您需要更高的吞吐量時,建議使用非同步生產者。在之前的版本(例如 0.8)中,非同步生產者沒有 send() 的回撥來註冊錯誤處理程式。這僅在當前 0.9 版本中可用。
public void close()
Producer 類提供close方法來關閉生產者池到所有 Kafka 代理的連線。
配置設定
為了更好地理解,生產者 API 的主要配置設定列在下表中 -
| 序號 | 配置設定及說明 |
|---|---|
| 1 |
client.id 標識生產者應用程式 |
| 2 |
producer.type 同步或非同步 |
| 3 |
acks acks 配置控制生產者請求被視為完成的標準。 |
| 4 |
retries 如果生產者請求失敗,則自動重試特定次數。 |
| 5 |
bootstrap.servers 代理引導列表。 |
| 6 |
linger.ms 如果要減少請求數量,可以將 linger.ms 設定為大於某個值。 |
| 7 |
key.serializer 序列化器介面的鍵。 |
| 8 |
value.serializer 序列化器介面的值。 |
| 9 |
batch.size 緩衝區大小。 |
| 10 |
buffer.memory 控制生產者用於緩衝的可用記憶體總量。 |
ProducerRecord API
ProducerRecord 是傳送到 Kafka 叢集的鍵值對。ProducerRecord 類建構函式用於使用以下簽名建立帶有分割槽、鍵和值對的記錄。
public ProducerRecord (string topic, int partition, k key, v value)
主題 - 將附加到記錄的使用者定義主題名稱。
分割槽 - 分割槽數量
鍵 - 將包含在記錄中的鍵。
- 值 - 記錄內容
public ProducerRecord (string topic, k key, v value)
ProducerRecord 類建構函式用於建立帶鍵值對且不帶分割槽的記錄。
主題 - 建立一個主題來分配記錄。
鍵 - 記錄的鍵。
值 - 記錄內容。
public ProducerRecord (string topic, v value)
ProducerRecord 類建立不帶分割槽和鍵的記錄。
主題 - 建立一個主題。
值 - 記錄內容。
ProducerRecord 類的方法列在下表中 -
| 序號 | 類方法及說明 |
|---|---|
| 1 |
public string topic() 將附加到記錄的主題。 |
| 2 |
public K key() 將包含在記錄中的鍵。如果沒有此類鍵,則此處將返回 null。 |
| 3 |
public V value() 記錄內容。 |
| 4 |
partition() 記錄的分割槽數量 |
SimpleProducer 應用程式
在建立應用程式之前,首先啟動 ZooKeeper 和 Kafka 代理,然後使用 create topic 命令在 Kafka 代理中建立您自己的主題。之後,建立一個名為 Sim-pleProducer.java
的 Java 類,並輸入以下程式碼。
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
編譯 - 可以使用以下命令編譯應用程式。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
執行 - 可以使用以下命令執行應用程式。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
輸出
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
簡單消費者示例
到目前為止,我們建立了一個生產者來將訊息傳送到 Kafka 叢集。現在讓我們建立一個消費者來從 Kafka 叢集消費訊息。KafkaConsumer API 用於從 Kafka 叢集消費訊息。KafkaConsumer 類建構函式定義如下。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - 返回消費者配置的對映。
KafkaConsumer 類具有以下重要方法,列在下表中。
| 序號 | 方法及說明 |
|---|---|
| 1 |
public java.util.Set<TopicPar-tition> assignment() 獲取消費者當前分配的分割槽集。 |
| 2 |
public string subscription() 訂閱給定的主題列表以獲取動態分配的分割槽。 |
| 3 |
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 訂閱給定的主題列表以獲取動態分配的分割槽。 |
| 4 |
public void unsubscribe() 取消訂閱給定分割槽列表中的主題。 |
| 5 |
public void sub-scribe(java.util.List<java.lang.String> topics) 訂閱給定的主題列表以獲取動態分配的分割槽。如果給定的主題列表為空,則將其視為與取消訂閱相同。 |
| 6 |
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 引數 pattern 指的是正則表示式格式的訂閱模式,而 listener 引數從訂閱模式獲取通知。 |
| 7 |
public void as-sign(java.util.List<TopicParti-tion> partitions) 手動將分割槽列表分配給客戶。 |
| 8 |
poll() 獲取使用其中一個 subscribe/assign API 指定的主題或分割槽的資料。如果在輪詢資料之前沒有訂閱主題,則將返回錯誤。 |
| 9 |
public void commitSync() 提交上次 poll() 返回的所有已訂閱主題和分割槽的偏移量。相同的操作應用於 commitAsyn()。 |
| 10 |
public void seek(TopicPartition partition, long offset) 獲取消費者將在下一個 poll() 方法中使用的當前偏移量值。 |
| 11 |
public void resume() 恢復已暫停的分割槽。 |
| 12 |
public void wakeup() 喚醒消費者。 |
ConsumerRecord API
ConsumerRecord API 用於接收來自 Kafka 叢集的記錄。此 API 包含主題名稱、分割槽編號(從中接收記錄)以及指向 Kafka 分割槽中記錄的偏移量。ConsumerRecord 類用於建立具有特定主題名稱、分割槽數量和 <鍵,值> 對的消費者記錄。它具有以下簽名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
主題 - 從 Kafka 叢集接收的消費者記錄的主題名稱。
分割槽 - 主題的分割槽。
鍵 - 記錄的鍵,如果不存在鍵,則返回 null。
值 - 記錄內容。
ConsumerRecords API
ConsumerRecords API 充當 ConsumerRecord 的容器。此 API 用於儲存特定主題每個分割槽的 ConsumerRecord 列表。其建構函式定義如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition - 返回特定主題的分割槽對映。
Records - 返回 ConsumerRecord 列表。
ConsumerRecords 類定義了以下方法。
| 序號 | 方法及說明 |
|---|---|
| 1 |
public int count() 所有主題的記錄數量。 |
| 2 |
public Set partitions() 此記錄集中包含資料的分割槽集(如果未返回資料,則該集合為空)。 |
| 3 |
public Iterator iterator() 迭代器使您能夠遍歷集合,獲取或刪除元素。 |
| 4 |
public List records() 獲取給定分割槽的記錄列表。 |
配置設定
消費者客戶端 API 的主要配置設定列在下表中 -
| 序號 | 設定及說明 |
|---|---|
| 1 |
bootstrap.servers 代理引導列表。 |
| 2 |
group.id 將單個消費者分配到一個組。 |
| 3 |
enable.auto.commit 如果值為 true,則啟用偏移量的自動提交,否則不提交。 |
| 4 |
auto.commit.interval.ms 返回更新的已消費偏移量寫入 ZooKeeper 的頻率。 |
| 5 |
session.timeout.ms 指示 Kafka 在放棄並繼續消費訊息之前,將等待 ZooKeeper 對請求(讀取或寫入)做出響應的毫秒數。 |
SimpleConsumer 應用程式
生產者應用程式的步驟在此保持不變。首先,啟動您的 ZooKeeper 和 Kafka 代理。然後建立一個名為 SimpleConsumer
的 Java 類應用程式,名為 SimpleCon-sumer.java
,並輸入以下程式碼。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
編譯 - 可以使用以下命令編譯應用程式。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
執行 - 可以使用以下命令執行應用程式
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
輸入 - 開啟生產者 CLI 並向主題傳送一些訊息。您可以將示例輸入設定為“Hello Consumer”。
輸出 - 以下是輸出。
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer