
Spring Cloud - 使用 Apache Kafka 的流
介紹
在分散式環境中,服務需要相互通訊。通訊可以同步發生,也可以非同步發生。在本節中,我們將瞭解服務如何透過使用**訊息代理**非同步通訊。
執行非同步通訊的兩個主要好處:
**生產者和消費者的速度可以不同** - 如果資料的消費者速度慢或快,則不會影響生產者的處理,反之亦然。兩者都可以以各自的速度工作,而不會相互影響。
**生產者不需要處理來自各種消費者的請求** - 可能有多個消費者希望從生產者讀取相同的資料集。透過在兩者之間使用訊息代理,生產者不需要處理這些消費者產生的負載。此外,生產者級別的任何中斷都不會阻止消費者讀取舊的生產者資料,因為這些資料將存在於訊息代理中。
**Apache Kafka** 和**RabbitMQ** 是兩種用於進行非同步通訊的知名訊息代理。在本教程中,我們將使用 Apache Kafka。
Kafka – 依賴項設定
讓我們使用我們之前一直在使用的餐廳案例。因此,假設我們的客戶服務和餐廳服務透過非同步通訊進行通訊。為此,我們將使用 Apache Kafka。並且我們需要在兩個服務中使用它,即客戶服務和餐廳服務。
要使用 Apache Kafka,我們將更新這兩個服務的 POM 並新增以下依賴項。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
我們還需要執行 Kafka 例項。有多種方法可以做到這一點,但我們更傾向於使用 Docker 容器啟動 Kafka。以下是一些我們可以考慮使用的映象:
無論我們使用哪個映象,這裡需要注意的重要一點是,一旦映象啟動並執行,請確保 Kafka 叢集可在**localhost:9092**訪問。
現在我們已經在映象上運行了 Kafka 叢集,讓我們轉到核心示例。
繫結和繫結器
在 Spring Cloud Streams 中,有三個重要的概念:
**外部訊息系統** - 這是外部管理的元件,負責儲存應用程式生成的事件/訊息,這些事件/訊息可以被其訂閱者/消費者讀取。請注意,這不是在應用程式/Spring 中管理的。一些示例包括 Apache Kafka、RabbitMQ
**繫結器** - 這是提供與訊息系統整合的元件,例如,包含訊息系統的 IP 地址、身份驗證等。
**繫結** - 此元件使用繫結器向訊息系統傳送訊息或從特定主題/佇列中使用訊息。
所有上述屬性都在**應用程式屬性檔案**中定義。
示例
讓我們使用我們之前一直在使用的餐廳案例。因此,假設每當向客戶服務新增新服務時,我們都希望通知附近的餐廳有關他/她的客戶資訊。
為此,讓我們首先更新我們的客戶服務以包含和使用 Kafka。請注意,我們將使用客戶服務作為資料的生產者。也就是說,每當我們透過 API 新增客戶時,它也將新增到 Kafka 中。
spring: application: name: customer-service cloud: stream: source: customerBinding-out-0 kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-out-0: destination: customer producer: partitionCount: 3 server: port: ${app_port} eureka: client: serviceURL: defaultZone: https://:8900/eureka
**需要注意的事項**:
我們已定義了一個繫結器,其中包含我們本地 Kafka 例項的地址。
我們還定義了繫結“customerBinding-out-0”,它使用“customer”主題輸出訊息。
我們還在**stream.source**中提到了我們的繫結,以便我們可以在程式碼中使用它。
完成此操作後,讓我們現在透過新增一個負責處理 POST 請求的新方法“addCustomer”來更新我們的控制器。然後,從**post**請求中,我們將資料傳送到 Kafka 代理。
package com.tutorialspoint; import java.util.HashMap; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController class RestaurantCustomerInstancesController { @Autowired private StreamBridge streamBridge; static HashMap<Long, Customer> mockCustomerData = new HashMap(); static{ mockCustomerData.put(1L, new Customer(1, "Jane", "DC")); mockCustomerData.put(2L, new Customer(2, "John", "SFO")); mockCustomerData.put(3L, new Customer(3, "Kate", "NY")); } @RequestMapping("/customer/{id}") public Customer getCustomerInfo(@PathVariable("id") Long id) { System.out.println("Querying customer for id with: " + id); return mockCustomerData.get(id); } @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST) public Customer addCustomer(@PathVariable("id") Long id) { // add default name Customer defaultCustomer = new Customer(id, "Dwayne", "NY"); streamBridge.send("customerBinding-out-0", defaultCustomer); return defaultCustomer; } }
需要注意的事項
我們正在自動裝配 StreamBridge,這就是我們將用於傳送訊息的工具。
我們在“send”方法中使用的引數也指定了我們要用來發送資料的繫結。
現在讓我們更新我們的餐廳服務以包含並訂閱“customer”主題。請注意,我們將使用餐廳服務作為資料的消費者。也就是說,每當我們透過 API 新增客戶時,餐廳服務都會透過 Kafka 瞭解它。
首先,讓我們更新**application.properties**檔案。
spring: application: name: restaurant-service cloud: function: definition: customerBinding stream: kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-in-0: destination: customer server: port: ${app_port} eureka: client: serviceURL: defaultZone: https://:8900/eureka
完成此操作後,讓我們現在透過新增一個負責獲取請求並提供一個函式的新方法“customerBinding”來更新我們的控制器,該函式將列印請求及其元資料詳細資訊。
package com.tutorialspoint; import java.util.HashMap; import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController class RestaurantController { @Autowired CustomerService customerService; @Autowired private StreamBridge streamBridge; static HashMap<Long, Restaurant> mockRestaurantData = new HashMap(); static{ mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC")); mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO")); mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC")); mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY")); } @RequestMapping("/restaurant/customer/{id}") public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) { System.out.println("Got request for customer with id: " + id); String customerCity = customerService.getCustomerById(id).getCity(); return mockRestaurantData.entrySet().stream().filter( entry -> entry.getValue().getCity().equals(customerCity)) .map(entry -> entry.getValue()) .collect(Collectors.toList()); } @RequestMapping("/restaurant/cust/{id}") public void getRestaurantForCust(@PathVariable("id") Long id) { streamBridge.send("ordersBinding-out-0", id); } @Bean public Consumer<Message<Customer>> customerBinding() { return msg -> { System.out.println(msg); }; } }
**需要注意的事項**:
我們正在使用“customerBinding”,它應該傳遞當此繫結收到訊息時將呼叫的函式。
我們為此函式/bean使用的名稱也需要在建立捆綁和指定主題時在 YAML 檔案中使用。
現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。
然後,讓我們編譯並開始使用以下命令更新客戶服務:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然後,讓我們編譯並開始使用以下命令更新餐廳服務:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我們已設定好了,現在讓我們透過訪問 API 來測試我們的程式碼段:
curl -X POST https://:8083/customer/1
這是我們將為此 API 獲得的輸出:
{ "id": 1, "name": "Dwayne", "city": "NY" }
現在,讓我們檢查餐廳服務的日誌:
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY], headers={kafka_offset=1,...
因此,實際上,您可以看到,使用 Kafka 代理,餐廳服務已收到有關新新增的客戶的通知。
分割槽和消費者組
分割槽和消費者組是您在使用 Spring Cloud Streams 時應該瞭解的兩個重要概念。
**分割槽** - 用於對資料進行分割槽,以便我們可以在多個消費者之間分配工作。
讓我們看看如何在 Spring Cloud 中對資料進行分割槽。假設,我們希望根據客戶 ID 對資料進行分割槽。因此,讓我們為此更新我們的客戶服務。為此,我們需要告訴
讓我們更新我們的客戶服務應用程式屬性以指定我們資料的鍵。
spring: application: name: customer-service cloud: function: definition: ordersBinding stream: source: customerBinding-out-0 kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-out-0: destination: customer producer: partitionKeyExpression: 'getPayload().getId()' partitionCount: 3 server: port: ${app_port} eureka: client: serviceURL: defaultZone: https://:8900/eureka
為了指定鍵,即“partitionKeyExpression”,我們提供了 Spring 表示式語言。該表示式假設型別為 GenericMessage
現在,讓我們也更新我們的消費者,即餐廳服務,以便在使用請求時記錄更多資訊。
現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。
然後,讓我們編譯並開始使用以下命令更新客戶服務:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然後,讓我們編譯並開始使用以下命令更新餐廳服務:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我們已設定好了,現在讓我們測試我們的程式碼段。作為測試的一部分,我們將執行以下操作:
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/5
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/3
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
我們不太關心 API 的輸出。相反,我們更關心資料傳送到的分割槽。由於我們使用客戶 ID 作為鍵,因此我們預計具有相同 ID 的客戶將最終位於同一分割槽中。
現在,讓我們檢查餐廳服務的日誌:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 2 Customer: Customer [id=5, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 0 Customer: Customer [id=3, name=Dwayne, city=NY] Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我們所看到的,ID 為 1 的客戶每次都最終位於同一分割槽中,即分割槽 1。
**消費者組** - 消費者組是讀取同一主題以實現相同目的的消費者的邏輯分組。主題中的資料在消費者組中的消費者之間進行分割槽,以便給定消費者組中的只有一個消費者可以讀取主題的一個分割槽。
要定義消費者組,我們只需在我們使用 Kafka 主題名稱的繫結中定義一個組即可。例如,讓我們在我們的應用程式檔案中為我們的控制器定義消費者組名稱。
spring: application: name: restaurant-service cloud: function: definition: customerBinding stream: kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-in-0: destination: customer group: restController server: port: ${app_port} eureka: client: serviceURL: defaultZone: https://:8900/eureka
讓我們重新編譯並啟動餐廳服務。現在,讓我們透過訪問客戶服務的 POST API 生成事件:
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
現在,如果我們檢查餐廳服務的日誌,我們將看到以下內容:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: restContoller Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我們從輸出中看到的,我們建立了一個名為“rest-contoller”的消費者組,其消費者負責讀取主題。在上述情況下,我們只有一個服務例項正在執行,因此“customer”主題的所有分割槽都被分配給同一個例項。但是,如果我們有多個分割槽,我們將有多個分割槽分佈在工作程式之間。