Apache Kafka - 簡單生產者示例



讓我們建立一個應用程式,使用 Java 客戶端釋出和消費訊息。Kafka 生產者客戶端包含以下 API。

KafkaProducer API

讓我們在本節中瞭解 Kafka 生產者 API 中最重要的部分。KafkaProducer API 的核心部分是 KafkaProducer 類。KafkaProducer 類提供了一個選項,可以在其建構函式中連線 Kafka 代理,並使用以下方法。

  • KafkaProducer 類提供 send 方法以非同步方式將訊息傳送到主題。send() 的簽名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - 生產者管理一個等待發送的記錄緩衝區。

  • Callback - 使用者提供的回撥函式,在伺服器確認記錄後執行(null 表示沒有回撥)。

  • KafkaProducer 類提供了一個 flush 方法,以確保所有先前傳送的訊息都已實際完成。flush 方法的語法如下 -

public void flush()
  • KafkaProducer 類提供 partitionFor 方法,該方法有助於獲取給定主題的分割槽元資料。這可用於自定義分割槽。此方法的簽名如下 -

public Map metrics()

它返回生產者維護的內部指標對映。

  • public void close() - KafkaProducer 類提供 close 方法,該方法會阻塞,直到所有先前傳送的請求都已完成。

生產者 API

生產者 API 的核心部分是 Producer 類。Producer 類提供了一個選項,可以透過以下方法在其建構函式中連線 Kafka 代理。

生產者類

生產者類提供 send 方法來傳送訊息到單個或多個主題,使用以下簽名。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

生產者有兩種型別:同步非同步

相同的 API 配置也適用於 Sync 生產者。它們之間的區別在於同步生產者直接傳送訊息,而非同步生產者在後臺傳送訊息。當您需要更高的吞吐量時,建議使用非同步生產者。在之前的版本(例如 0.8)中,非同步生產者沒有 send() 的回撥來註冊錯誤處理程式。這僅在當前 0.9 版本中可用。

public void close()

Producer 類提供close方法來關閉生產者池到所有 Kafka 代理的連線。

配置設定

為了更好地理解,生產者 API 的主要配置設定列在下表中 -

序號 配置設定及說明
1

client.id

標識生產者應用程式

2

producer.type

同步或非同步

3

acks

acks 配置控制生產者請求被視為完成的標準。

4

retries

如果生產者請求失敗,則自動重試特定次數。

5

bootstrap.servers

代理引導列表。

6

linger.ms

如果要減少請求數量,可以將 linger.ms 設定為大於某個值。

7

key.serializer

序列化器介面的鍵。

8

value.serializer

序列化器介面的值。

9

batch.size

緩衝區大小。

10

buffer.memory

控制生產者用於緩衝的可用記憶體總量。

ProducerRecord API

ProducerRecord 是傳送到 Kafka 叢集的鍵值對。ProducerRecord 類建構函式用於使用以下簽名建立帶有分割槽、鍵和值對的記錄。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主題 - 將附加到記錄的使用者定義主題名稱。

  • 分割槽 - 分割槽數量

  • - 將包含在記錄中的鍵。

  • - 記錄內容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 類建構函式用於建立帶鍵值對且不帶分割槽的記錄。

  • 主題 - 建立一個主題來分配記錄。

  • - 記錄的鍵。

  • - 記錄內容。

public ProducerRecord (string topic, v value)

ProducerRecord 類建立不帶分割槽和鍵的記錄。

  • 主題 - 建立一個主題。

  • - 記錄內容。

ProducerRecord 類的方法列在下表中 -

序號 類方法及說明
1

public string topic()

將附加到記錄的主題。

2

public K key()

將包含在記錄中的鍵。如果沒有此類鍵,則此處將返回 null。

3

public V value()

記錄內容。

4

partition()

記錄的分割槽數量

SimpleProducer 應用程式

在建立應用程式之前,首先啟動 ZooKeeper 和 Kafka 代理,然後使用 create topic 命令在 Kafka 代理中建立您自己的主題。之後,建立一個名為 Sim-pleProducer.java 的 Java 類,並輸入以下程式碼。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

編譯 - 可以使用以下命令編譯應用程式。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

執行 - 可以使用以下命令執行應用程式。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

輸出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

簡單消費者示例

到目前為止,我們建立了一個生產者來將訊息傳送到 Kafka 叢集。現在讓我們建立一個消費者來從 Kafka 叢集消費訊息。KafkaConsumer API 用於從 Kafka 叢集消費訊息。KafkaConsumer 類建構函式定義如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消費者配置的對映。

KafkaConsumer 類具有以下重要方法,列在下表中。

序號 方法及說明
1

public java.util.Set<TopicPar-tition> assignment()

獲取消費者當前分配的分割槽集。

2

public string subscription()

訂閱給定的主題列表以獲取動態分配的分割槽。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

訂閱給定的主題列表以獲取動態分配的分割槽。

4

public void unsubscribe()

取消訂閱給定分割槽列表中的主題。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

訂閱給定的主題列表以獲取動態分配的分割槽。如果給定的主題列表為空,則將其視為與取消訂閱相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

引數 pattern 指的是正則表示式格式的訂閱模式,而 listener 引數從訂閱模式獲取通知。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

手動將分割槽列表分配給客戶。

8

poll()

獲取使用其中一個 subscribe/assign API 指定的主題或分割槽的資料。如果在輪詢資料之前沒有訂閱主題,則將返回錯誤。

9

public void commitSync()

提交上次 poll() 返回的所有已訂閱主題和分割槽的偏移量。相同的操作應用於 commitAsyn()。

10

public void seek(TopicPartition partition, long offset)

獲取消費者將在下一個 poll() 方法中使用的當前偏移量值。

11

public void resume()

恢復已暫停的分割槽。

12

public void wakeup()

喚醒消費者。

ConsumerRecord API

ConsumerRecord API 用於接收來自 Kafka 叢集的記錄。此 API 包含主題名稱、分割槽編號(從中接收記錄)以及指向 Kafka 分割槽中記錄的偏移量。ConsumerRecord 類用於建立具有特定主題名稱、分割槽數量和 <鍵,值> 對的消費者記錄。它具有以下簽名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主題 - 從 Kafka 叢集接收的消費者記錄的主題名稱。

  • 分割槽 - 主題的分割槽。

  • - 記錄的鍵,如果不存在鍵,則返回 null。

  • - 記錄內容。

ConsumerRecords API

ConsumerRecords API 充當 ConsumerRecord 的容器。此 API 用於儲存特定主題每個分割槽的 ConsumerRecord 列表。其建構函式定義如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - 返回特定主題的分割槽對映。

  • Records - 返回 ConsumerRecord 列表。

ConsumerRecords 類定義了以下方法。

序號 方法及說明
1

public int count()

所有主題的記錄數量。

2

public Set partitions()

此記錄集中包含資料的分割槽集(如果未返回資料,則該集合為空)。

3

public Iterator iterator()

迭代器使您能夠遍歷集合,獲取或刪除元素。

4

public List records()

獲取給定分割槽的記錄列表。

配置設定

消費者客戶端 API 的主要配置設定列在下表中 -

序號 設定及說明
1

bootstrap.servers

代理引導列表。

2

group.id

將單個消費者分配到一個組。

3

enable.auto.commit

如果值為 true,則啟用偏移量的自動提交,否則不提交。

4

auto.commit.interval.ms

返回更新的已消費偏移量寫入 ZooKeeper 的頻率。

5

session.timeout.ms

指示 Kafka 在放棄並繼續消費訊息之前,將等待 ZooKeeper 對請求(讀取或寫入)做出響應的毫秒數。

SimpleConsumer 應用程式

生產者應用程式的步驟在此保持不變。首先,啟動您的 ZooKeeper 和 Kafka 代理。然後建立一個名為 SimpleConsumer 的 Java 類應用程式,名為 SimpleCon-sumer.java,並輸入以下程式碼。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

編譯 - 可以使用以下命令編譯應用程式。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

執行 - 可以使用以下命令執行應用程式

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

輸入 - 開啟生產者 CLI 並向主題傳送一些訊息。您可以將示例輸入設定為“Hello Consumer”。

輸出 - 以下是輸出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
廣告

© . All rights reserved.