
- Apache Kafka 教程
- Apache Kafka - 首頁
- Apache Kafka - 簡介
- Apache Kafka - 基礎知識
- Apache Kafka - 叢集架構
- Apache Kafka - 工作流程
- Apache Kafka - 安裝步驟
- Apache Kafka - 基本操作
- 簡單的生產者示例
- 消費者組示例
- 與 Storm 整合
- 與 Spark 整合
- 即時應用(Twitter)
- Apache Kafka - 工具
- Apache Kafka - 應用
- Apache Kafka 有用資源
- Apache Kafka - 快速指南
- Apache Kafka - 有用資源
- Apache Kafka - 討論
Apache Kafka - 基本操作
首先,讓我們開始實現`單節點-單代理
`配置,然後我們將設定遷移到單節點-多代理配置。
希望你現在已經在你的機器上安裝了 Java、ZooKeeper 和 Kafka。在進入 Kafka 叢集設定之前,你需要先啟動你的 ZooKeeper,因為 Kafka 叢集使用 ZooKeeper。
啟動 ZooKeeper
開啟一個新的終端並輸入以下命令:
bin/zookeeper-server-start.sh config/zookeeper.properties
要啟動 Kafka 代理,請輸入以下命令:
bin/kafka-server-start.sh config/server.properties
啟動 Kafka 代理後,在 ZooKeeper 終端輸入命令`jps
`,你將看到以下響應:
821 QuorumPeerMain 928 Kafka 931 Jps
現在你可以看到終端上執行著兩個守護程序,其中 QuorumPeerMain 是 ZooKeeper 守護程序,另一個是 Kafka 守護程序。
單節點-單代理配置
在此配置中,你只有一個 ZooKeeper 和代理 ID 例項。以下是配置步驟:
**建立 Kafka 主題** - Kafka 提供了一個名為`kafka-topics.sh
`的命令列實用程式來在伺服器上建立主題。開啟新的終端並輸入以下示例。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
我們剛剛建立了一個名為`Hello-Kafka
`的主題,它只有一個分割槽和一個副本因子。上面建立的輸出將類似於以下輸出:
**輸出** - 建立主題`Hello-Kafka
`
建立主題後,你可以在 Kafka 代理終端視窗中收到通知,並在 `config/server.properties` 檔案中指定的“/tmp/kafka-logs/”中檢視已建立主題的日誌。
主題列表
要獲取 Kafka 伺服器中的主題列表,可以使用以下命令:
語法
bin/kafka-topics.sh --list --zookeeper localhost:2181
輸出
Hello-Kafka
由於我們已經建立了一個主題,它只會列出`Hello-Kafka
`。假設你建立了多個主題,你將在輸出中獲得主題名稱。
啟動生產者傳送訊息
語法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
從上面的語法來看,生產者命令列客戶端需要兩個主要引數:
**代理列表** - 我們要向其傳送訊息的代理列表。在本例中,我們只有一個代理。`Config/server.properties` 檔案包含代理埠 ID,因為我們知道我們的代理正在埠 9092 上監聽,所以你可以直接指定它。
主題名稱 - 這是一個主題名稱的示例。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生產者將等待來自 stdin 的輸入併發布到 Kafka 叢集。預設情況下,每一行都作為一條新訊息釋出,然後預設的生產者屬性在`config/producer.properties
`檔案中指定。現在你可以在終端輸入幾行訊息,如下所示。
輸出
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message
My second message
啟動消費者接收訊息
與生產者類似,預設的消費者屬性在`config/consumer.properties
`檔案中指定。開啟一個新的終端並輸入以下語法來消費訊息。
語法
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning
輸出
Hello My first message My second message
最後,你能夠從生產者的終端輸入訊息,並看到它們出現在消費者的終端。目前,你對具有單個代理的單節點叢集有了很好的理解。現在讓我們繼續討論多個代理的配置。
單節點-多代理配置
在繼續進行多個代理叢集設定之前,首先啟動你的 ZooKeeper 伺服器。
**建立多個 Kafka 代理** - 我們已經在 `config/server.properties` 中已經有了一個 Kafka 代理例項。現在我們需要多個代理例項,所以將現有的 `server.properties` 檔案複製到兩個新的配置檔案中,並將其重新命名為 `server-one.properties` 和 `server-two.properties`。然後編輯這兩個新檔案並進行以下更改:
config/server-one.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
config/server-two.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
**啟動多個代理** - 在對三個伺服器進行所有更改後,開啟三個新的終端來一個一個地啟動每個代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties
現在我們有三臺不同的代理在機器上執行。自己嘗試一下,透過在 ZooKeeper 終端輸入 **jps** 來檢查所有守護程序,然後你將看到響應。
建立主題
讓我們為此主題分配副本因子值為 3,因為我們有三個不同的代理正在執行。如果你有兩個代理,則分配的副本值為 2。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
輸出
created topic “Multibrokerapplication”
`Describe` 命令用於檢查哪個代理正在監聽當前建立的主題,如下所示:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
輸出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
從上面的輸出中,我們可以得出結論:第一行總結了所有分割槽,顯示了主題名稱、分割槽計數和我們已經選擇的副本因子。在第二行中,每個節點將成為隨機選擇的一部分分割槽的領導者。
在我們的例子中,我們看到我們的第一個代理(broker.id 為 0)是領導者。然後 Replicas:0,2,1 表示所有代理最終複製主題,`Isr
` 是`in-sync
`副本的集合。嗯,這是當前處於活動狀態並與領導者保持同步的副本子集。
啟動生產者傳送訊息
此過程與單代理設定中的過程相同。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
輸出
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message
啟動消費者接收訊息
此過程與單代理設定中顯示的過程相同。
示例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning
輸出
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message
基本主題操作
在本章中,我們將討論各種基本主題操作。
修改主題
正如你已經瞭解瞭如何在 Kafka 叢集中建立主題一樣。現在讓我們使用以下命令修改已建立的主題
語法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
示例
We have already created a topic “Hello-Kafka” with single partition count and one replica factor. Now using “alter” command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2
輸出
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
刪除主題
要刪除主題,可以使用以下語法。
語法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
示例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
輸出
> Topic Hello-kafka marked for deletion
**注意 -** 如果未將 **delete.topic.enable** 設定為 true,則此操作無效