Apache Kafka - 基本操作



首先,讓我們開始實現`單節點-單代理`配置,然後我們將設定遷移到單節點-多代理配置。

希望你現在已經在你的機器上安裝了 Java、ZooKeeper 和 Kafka。在進入 Kafka 叢集設定之前,你需要先啟動你的 ZooKeeper,因為 Kafka 叢集使用 ZooKeeper。

啟動 ZooKeeper

開啟一個新的終端並輸入以下命令:

bin/zookeeper-server-start.sh config/zookeeper.properties

要啟動 Kafka 代理,請輸入以下命令:

bin/kafka-server-start.sh config/server.properties

啟動 Kafka 代理後,在 ZooKeeper 終端輸入命令`jps`,你將看到以下響應:

821 QuorumPeerMain
928 Kafka
931 Jps

現在你可以看到終端上執行著兩個守護程序,其中 QuorumPeerMain 是 ZooKeeper 守護程序,另一個是 Kafka 守護程序。

單節點-單代理配置

在此配置中,你只有一個 ZooKeeper 和代理 ID 例項。以下是配置步驟:

**建立 Kafka 主題** - Kafka 提供了一個名為`kafka-topics.sh`的命令列實用程式來在伺服器上建立主題。開啟新的終端並輸入以下示例。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

我們剛剛建立了一個名為`Hello-Kafka`的主題,它只有一個分割槽和一個副本因子。上面建立的輸出將類似於以下輸出:

**輸出** - 建立主題`Hello-Kafka`

建立主題後,你可以在 Kafka 代理終端視窗中收到通知,並在 `config/server.properties` 檔案中指定的“/tmp/kafka-logs/”中檢視已建立主題的日誌。

主題列表

要獲取 Kafka 伺服器中的主題列表,可以使用以下命令:

語法

bin/kafka-topics.sh --list --zookeeper localhost:2181

輸出

Hello-Kafka

由於我們已經建立了一個主題,它只會列出`Hello-Kafka`。假設你建立了多個主題,你將在輸出中獲得主題名稱。

啟動生產者傳送訊息

語法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

從上面的語法來看,生產者命令列客戶端需要兩個主要引數:

**代理列表** - 我們要向其傳送訊息的代理列表。在本例中,我們只有一個代理。`Config/server.properties` 檔案包含代理埠 ID,因為我們知道我們的代理正在埠 9092 上監聽,所以你可以直接指定它。

主題名稱 - 這是一個主題名稱的示例。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生產者將等待來自 stdin 的輸入併發布到 Kafka 叢集。預設情況下,每一行都作為一條新訊息釋出,然後預設的生產者屬性在`config/producer.properties`檔案中指定。現在你可以在終端輸入幾行訊息,如下所示。

輸出

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

啟動消費者接收訊息

與生產者類似,預設的消費者屬性在`config/consumer.properties`檔案中指定。開啟一個新的終端並輸入以下語法來消費訊息。

語法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

輸出

Hello
My first message
My second message

最後,你能夠從生產者的終端輸入訊息,並看到它們出現在消費者的終端。目前,你對具有單個代理的單節點叢集有了很好的理解。現在讓我們繼續討論多個代理的配置。

單節點-多代理配置

在繼續進行多個代理叢集設定之前,首先啟動你的 ZooKeeper 伺服器。

**建立多個 Kafka 代理** - 我們已經在 `config/server.properties` 中已經有了一個 Kafka 代理例項。現在我們需要多個代理例項,所以將現有的 `server.properties` 檔案複製到兩個新的配置檔案中,並將其重新命名為 `server-one.properties` 和 `server-two.properties`。然後編輯這兩個新檔案並進行以下更改:

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

**啟動多個代理** - 在對三個伺服器進行所有更改後,開啟三個新的終端來一個一個地啟動每個代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

現在我們有三臺不同的代理在機器上執行。自己嘗試一下,透過在 ZooKeeper 終端輸入 **jps** 來檢查所有守護程序,然後你將看到響應。

建立主題

讓我們為此主題分配副本因子值為 3,因為我們有三個不同的代理正在執行。如果你有兩個代理,則分配的副本值為 2。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

輸出

created topic “Multibrokerapplication”

`Describe` 命令用於檢查哪個代理正在監聽當前建立的主題,如下所示:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

輸出

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

從上面的輸出中,我們可以得出結論:第一行總結了所有分割槽,顯示了主題名稱、分割槽計數和我們已經選擇的副本因子。在第二行中,每個節點將成為隨機選擇的一部分分割槽的領導者。

在我們的例子中,我們看到我們的第一個代理(broker.id 為 0)是領導者。然後 Replicas:0,2,1 表示所有代理最終複製主題,`Isr` 是`in-sync`副本的集合。嗯,這是當前處於活動狀態並與領導者保持同步的副本子集。

啟動生產者傳送訊息

此過程與單代理設定中的過程相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

輸出

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

啟動消費者接收訊息

此過程與單代理設定中顯示的過程相同。

示例

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

輸出

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

基本主題操作

在本章中,我們將討論各種基本主題操作。

修改主題

正如你已經瞭解瞭如何在 Kafka 叢集中建立主題一樣。現在讓我們使用以下命令修改已建立的主題

語法

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

示例

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

輸出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

刪除主題

要刪除主題,可以使用以下語法。

語法

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

示例

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

輸出

> Topic Hello-kafka marked for deletion

**注意 -** 如果未將 **delete.topic.enable** 設定為 true,則此操作無效

廣告