- Protocol Buffers 教程
- Protocol Buffers - 首頁
- Protocol Buffers - 簡介
- Protocol Buffers - 基本應用
- Protocol Buffers - 結構
- Protocol Buffers - 訊息
- Protocol Buffers - 字串
- Protocol Buffers - 數字
- Protocol Buffers - 布林值
- Protocol Buffers - 列舉
- Protocol Buffers - 重複欄位
- Protocol Buffers - 對映
- Protocol Buffers - 巢狀類
- Protocol Buffers - 可選欄位和預設值
- Protocol Buffers - 語言無關性
- Protocol Buffers - 複合資料型別
- Protocol Buffers - 命令列使用
- Protocol Buffers - 更新定義規則
- Protocol Buffers - 與 Kafka 整合
- Protocol Buffers - 在其他語言中的使用
- Protocol Buffers 有用資源
- Protocol Buffers - 快速指南
- Protocol Buffers - 有用資源
- Protocol Buffers - 討論
Protocol Buffers - Kafka 整合
我們已經涵蓋了相當多的 Protocol Buffers 及其資料型別的示例。在本章中,讓我們再舉一個例子,看看 Protocol Buffers 如何與 Kafka 使用的 Schema Registry 整合。讓我們首先了解什麼是“schema registry”。
Schema Registry
Kafka 是廣泛使用的訊息佇列之一。它用於大規模應用釋出-訂閱模型。有關 Kafka 的更多資訊,請訪問此處 - https://tutorialspoint.tw/apache_kafka/index.htm
然而,在基本層面上,Kafka **生產者**應該傳送一條訊息,即 Kafka **消費者**可以讀取的資訊片段。而這種訊息的傳送和消費是我們需要 schema 的地方。在大型組織中,有多個團隊讀取/寫入 Kafka 主題時,尤其需要 schema。Kafka 提供了一種方法將此 schema 儲存在 *schema registry* 中,然後在生產者/消費者建立/消費訊息時建立/使用這些 schema。
維護 schema 有兩個主要好處:
**相容性** - 在大型組織中,必須確保生產訊息的團隊不會破壞消費這些訊息的下游工具。Schema registry 確保更改向後相容。
**高效編碼** - 在每條訊息中傳送欄位名稱及其型別會佔用空間和計算資源。使用 schema,我們不需要在每條訊息中傳送此資訊。
Schema registry 支援 **Avro、Google Protocol Buffers** 和 **JSON** Schema 作為 schema 語言。這些語言中的 schema 可以儲存在 schema registry 中。在本教程中,我們需要 Kafka 設定和 Schema registry 設定。
要安裝 Kafka,您可以檢視以下連結:
安裝 Kafka 後,您可以透過更新 ** /etc/schema-registry/schema-registry.properties ** 檔案來設定 Schema Registry。
# where should schema registry listen on listeners=http://0.0.0.0:8081 # Schema registry uses Kafka beneath it, so we need to tell where are the Kafka brokers available kafkastore.bootstrap.servers=PLAINTEXT://hostname:9092,SSL://hostname2:9092 Once done, you can then run: sudo systemctl start confluent-schema-registry
設定完成後,讓我們開始將 Google Protocol Buffers 與 Schema Registry 一起使用。
使用 Protocol Buffers Schema 的 Kafka 生產者
讓我們繼續我們的 **theater** 示例。我們將使用以下 Protocol Buffers schema:
theater.proto
syntax = "proto3";
package theater;
option java_package = "com.tutorialspoint.theater";
message Theater {
string name = 1;
string address = 2;
int32 total_capcity = 3;
int64 mobile = 4;
float base_ticket_price = 5;
bool drive_in = 6;
enum PAYMENT_SYSTEM{
CASH = 0;
CREDIT_CARD = 1;
DEBIT_CARD = 2;
APP = 3;
}
PAYMENT_SYSTEM payment = 7;
repeated string snacks = 8;
map<string, int32> movieTicketPrice = 9;
}
現在,讓我們建立一個簡單的 Kafka **寫入器**,它將以這種格式編碼的訊息寫入 Kafka 主題。但為此,首先我們需要在 Maven POM 中新增一些依賴項:
Kafka 客戶端,用於使用 Kafka 生產者和消費者
Kafka Protocol Buffers 序列化器,用於序列化和反序列化訊息
**Slf4j** simple,確保我們獲得 Kafka 的日誌
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-protobuf-serializer</artifactId> <version>5.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency>
完成此操作後,讓我們現在建立一個 Kafka **生產者**。此生產者將建立併發送一條訊息,其中將包含 **theater** 物件。
KafkaProtbufProducer.java
package com.tutorialspoint.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.tutorialspoint.theater.TheaterOuterClass.Theater;
import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM;
public class KafkaProtbufProducer {
public static void main(String[] args) throws Exception{
String topicName = "testy1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("clientid", "foo");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtocol BuffersSerializer");
props.put("schema.registry.url", "https://:8081");
props.put("auto.register.schemas", "true");
Producer<String, Theater> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get();
System.out.println("Sent to Kafka: \n" + getTheater());
producer.flush();
producer.close();
}
public static Theater getTheater() {
List<String> snacks = new ArrayList<>();
snacks.add("Popcorn");
snacks.add("Coke");
snacks.add("Chips");
snacks.add("Soda");
Map<String, Integer> ticketPrice = new HashMap<>();
ticketPrice.put("Avengers Endgame", 700);
ticketPrice.put("Captain America", 200);
ticketPrice.put("Wonder Woman 1984", 400);
Theater theater = Theater.newBuilder()
.setName("Silver Screener")
.setAddress("212, Maple Street, LA, California")
.setDriveIn(true)
.setTotalCapacity(320)
.setMobile(98234567189L)
.setBaseTicketPrice(22.45f)
.setPayment(PAYMENT_SYSTEM.CREDIT_CARD)
.putAllMovieTicketPrice(ticketPrice)
.addAllSnacks(snacks)
.build();
return theater;
}
}
以下列出了我們需要了解的一些要點:
我們需要將 Schema Registry URL 傳遞給生產者。
我們還需要傳遞正確的 Protocol Buffers 序列化器,該序列化器特定於 Schema Registry。
Schema registry 會在我們傳送完成後自動儲存 **theater** 物件的 schema。
最後,我們從自動生成的 Java 程式碼建立了一個 **theater** 物件,這就是我們將要傳送的內容。
輸出
讓我們現在編譯並執行程式碼:
mvn clean install ; java -cp .\target\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufProducer
我們將看到以下輸出:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621692205607 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 7kwQVXjYSz--bE47MiXmjw
已傳送到 Kafka
name: "Silver Screener"
address: "212, Maple Street, LA, California"
total_capacity: 320
mobile: 98234567189
base_ticket_price: 22.45
drive_in: true
payment: CREDIT_CARD
snacks: "Popcorn"
snacks: "Coke"
snacks: "Chips"
snacks: "Soda"
movieTicketPrice {
key: "Avengers Endgame"
value: 700
}
movieTicketPrice {
key: "Captain America"
value: 200
}
movieTicketPrice {
key: "Wonder Woman 1984"
value: 400
}
[main] INFO org.apache.kafka.clients.producer.KafkaProducer -
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
這意味著我們的訊息已傳送。
現在,讓我們確認 schema 是否已儲存在 Schema Registry 中。
curl -X GET https://:8081/subjects | jq
顯示的輸出為 **"topicName" + "key/value"**
[ "testy1-value" ]
我們還可以檢視 registry 儲存的 schema:
curl -X GET https://:8081/schemas/ids/1 | jq {
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage theater;\n\noption java_package = \"com.tutorialspoint.theater\";\n\nmessage Theater {
\n string name = 1;\n string address = 2;\n int64 total_capacity = 3;\n
int64 mobile = 4;\n float base_ticket_price = 5;\n bool drive_in = 6;\n
.theater.Theater.PAYMENT_SYSTEM payment = 7;\n repeated string snacks = 8;\n
repeated .theater.Theater.MovieTicketPriceEntry movieTicketPrice = 9;\n\n
message MovieTicketPriceEntry {\n option map_entry = true;\n \n
string key = 1;\n int32 value = 2;\n }\n enum PAYMENT_SYSTEM {
\n CASH = 0;\n CREDIT_CARD = 1;\n DEBIT_CARD = 2;\n APP = 3;\n
}\n
}\n"
}
使用 Protocol Buffers Schema 的 Kafka 消費者
讓我們現在建立一個 Kafka **消費者**。此消費者將消費包含 **theater** 物件的訊息。
KafkaProtbufConsumer.java
package com.tutorialspoint.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.tutorialspoint.theater.TheaterOuterClass.Theater;
import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM;
public class KafkaProtbufConsumer {
public static void main(String[] args) throws Exception{
String topicName = "testy1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("clientid", "foo");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtocol BuffersSerializer");
props.put("schema.registry.url", "https://:8081");
props.put("auto.register.schemas", "true");
Producer<String, Theater> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get();
System.out.println("Sent to Kafka: \n" + getTheater());
producer.flush();
producer.close();
}
public static Theater getTheater() {
List<String> snacks = new ArrayList<>();
snacks.add("Popcorn");
snacks.add("Coke");
snacks.add("Chips");
snacks.add("Soda");
Map<String, Integer> ticketPrice = new HashMap<>();
ticketPrice.put("Avengers Endgame", 700);
ticketPrice.put("Captain America", 200);
ticketPrice.put("Wonder Woman 1984", 400);
Theater theater = Theater.newBuilder()
.setName("Silver Screener")
.setAddress("212, Maple Street, LA, California")
.setDriveIn(true)
.setTotalCapacity(320)
.setMobile(98234567189L)
.setBaseTicketPrice(22.45f)
.setPayment(PAYMENT_SYSTEM.CREDIT_CARD)
.putAllMovieTicketPrice(ticketPrice)
.addAllSnacks(snacks)
.build();
return theater;
}
}
以下列出了我們需要了解的一些要點:
我們需要將 Schema Registry URL 傳遞給消費者。
我們還需要傳遞正確的 Protocol Buffers 反序列化器,該反序列化器特定於 Schema Registry。
Schema Registry 會在我們消費完成後自動讀取儲存的 **theater** 物件的 schema。
最後,我們從自動生成的 Java 程式碼建立了一個 **theater** 物件,這就是我們將要傳送的內容。
輸出
讓我們現在編譯並執行程式碼:
mvn clean install ; java -cp .\target\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufConsumer
offset = 0, key = SilverScreen, value = May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo
May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo
name: "Silver Screener"
address: "212, Maple Street, LA, California"
total_capacity: 320
mobile: 98234567189
base_ticket_price: 22.45
drive_in: true
payment: CREDIT_CARD
snacks: "Popcorn"
snacks: "Coke"
snacks: "Chips"
snacks: "Soda"
movieTicketPrice {
key: "Captain America"
value: 200
}
movieTicketPrice {
key: "Wonder Woman 1984"
value: 400
}
movieTicketPrice {
key: "Avengers Endgame"
value: 700
}
因此,正如我們所看到的,寫入 Kafka 的訊息被消費者正確地消費了。此外,Registry 儲存了 schema,也可以透過 REST API 訪問。