Spring Cloud - 使用 Apache Kafka 的流



介紹

在分散式環境中,服務需要相互通訊。通訊可以同步發生,也可以非同步發生。在本節中,我們將瞭解服務如何透過使用**訊息代理**非同步通訊。

執行非同步通訊的兩個主要好處:

  • **生產者和消費者的速度可以不同** - 如果資料的消費者速度慢或快,則不會影響生產者的處理,反之亦然。兩者都可以以各自的速度工作,而不會相互影響。

  • **生產者不需要處理來自各種消費者的請求** - 可能有多個消費者希望從生產者讀取相同的資料集。透過在兩者之間使用訊息代理,生產者不需要處理這些消費者產生的負載。此外,生產者級別的任何中斷都不會阻止消費者讀取舊的生產者資料,因為這些資料將存在於訊息代理中。

**Apache Kafka** 和**RabbitMQ** 是兩種用於進行非同步通訊的知名訊息代理。在本教程中,我們將使用 Apache Kafka。

Kafka – 依賴項設定

讓我們使用我們之前一直在使用的餐廳案例。因此,假設我們的客戶服務和餐廳服務透過非同步通訊進行通訊。為此,我們將使用 Apache Kafka。並且我們需要在兩個服務中使用它,即客戶服務和餐廳服務。

要使用 Apache Kafka,我們將更新這兩個服務的 POM 並新增以下依賴項。

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

我們還需要執行 Kafka 例項。有多種方法可以做到這一點,但我們更傾向於使用 Docker 容器啟動 Kafka。以下是一些我們可以考慮使用的映象:

無論我們使用哪個映象,這裡需要注意的重要一點是,一旦映象啟動並執行,請確保 Kafka 叢集可在**localhost:9092**訪問。

現在我們已經在映象上運行了 Kafka 叢集,讓我們轉到核心示例。

繫結和繫結器

在 Spring Cloud Streams 中,有三個重要的概念:

  • **外部訊息系統** - 這是外部管理的元件,負責儲存應用程式生成的事件/訊息,這些事件/訊息可以被其訂閱者/消費者讀取。請注意,這不是在應用程式/Spring 中管理的。一些示例包括 Apache Kafka、RabbitMQ

  • **繫結器** - 這是提供與訊息系統整合的元件,例如,包含訊息系統的 IP 地址、身份驗證等。

  • **繫結** - 此元件使用繫結器向訊息系統傳送訊息或從特定主題/佇列中使用訊息。

所有上述屬性都在**應用程式屬性檔案**中定義。

示例

讓我們使用我們之前一直在使用的餐廳案例。因此,假設每當向客戶服務新增新服務時,我們都希望通知附近的餐廳有關他/她的客戶資訊。

為此,讓我們首先更新我們的客戶服務以包含和使用 Kafka。請注意,我們將使用客戶服務作為資料的生產者。也就是說,每當我們透過 API 新增客戶時,它也將新增到 Kafka 中。

spring:
   application:
      name: customer-service
   cloud:
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
            brokers: localhost:9092
            replicationFactor: 1
      bindings:
         customerBinding-out-0:
            destination: customer
            producer:
               partitionCount: 3
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: https://:8900/eureka

**需要注意的事項**:

  • 我們已定義了一個繫結器,其中包含我們本地 Kafka 例項的地址。

  • 我們還定義了繫結“customerBinding-out-0”,它使用“customer”主題輸出訊息。

  • 我們還在**stream.source**中提到了我們的繫結,以便我們可以在程式碼中使用它。

完成此操作後,讓我們現在透過新增一個負責處理 POST 請求的新方法“addCustomer”來更新我們的控制器。然後,從**post**請求中,我們將資料傳送到 Kafka 代理。

package com.tutorialspoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
class RestaurantCustomerInstancesController {
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Customer> mockCustomerData = new HashMap();
   static{
      mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
      mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
      mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
   }
   @RequestMapping("/customer/{id}")
   public Customer getCustomerInfo(@PathVariable("id") Long id) {
      System.out.println("Querying customer for id with: " + id);
      return mockCustomerData.get(id);
   }
   @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
   public Customer addCustomer(@PathVariable("id") Long id) {
      // add default name
      Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
      streamBridge.send("customerBinding-out-0", defaultCustomer);
      return defaultCustomer;
   }
}

需要注意的事項

  • 我們正在自動裝配 StreamBridge,這就是我們將用於傳送訊息的工具。

  • 我們在“send”方法中使用的引數也指定了我們要用來發送資料的繫結。

現在讓我們更新我們的餐廳服務以包含並訂閱“customer”主題。請注意,我們將使用餐廳服務作為資料的消費者。也就是說,每當我們透過 API 新增客戶時,餐廳服務都會透過 Kafka 瞭解它。

首先,讓我們更新**application.properties**檔案。

spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: https://:8900/eureka

完成此操作後,讓我們現在透過新增一個負責獲取請求並提供一個函式的新方法“customerBinding”來更新我們的控制器,該函式將列印請求及其元資料詳細資訊。

package com.tutorialspoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
class RestaurantController {
   @Autowired
   CustomerService customerService;
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
   static{
      mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
      mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
      mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
      mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
   }
   @RequestMapping("/restaurant/customer/{id}")
   public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
      System.out.println("Got request for customer with id: " + id);
      String customerCity = customerService.getCustomerById(id).getCity();
      return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
   }
   @RequestMapping("/restaurant/cust/{id}")
   public void getRestaurantForCust(@PathVariable("id") Long id) {
      streamBridge.send("ordersBinding-out-0", id);
   }
   @Bean
   public Consumer<Message<Customer>> customerBinding() {
      return msg -> {
         System.out.println(msg);
      };
   }
}

**需要注意的事項**:

  • 我們正在使用“customerBinding”,它應該傳遞當此繫結收到訊息時將呼叫的函式。

  • 我們為此函式/bean使用的名稱也需要在建立捆綁和指定主題時在 YAML 檔案中使用。

現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。

然後,讓我們編譯並開始使用以下命令更新客戶服務:

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml

然後,讓我們編譯並開始使用以下命令更新餐廳服務:

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml

我們已設定好了,現在讓我們透過訪問 API 來測試我們的程式碼段:

curl -X POST https://:8083/customer/1

這是我們將為此 API 獲得的輸出:

{
   "id": 1,
   "name": "Dwayne",
   "city": "NY"
}

現在,讓我們檢查餐廳服務的日誌:

GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...

因此,實際上,您可以看到,使用 Kafka 代理,餐廳服務已收到有關新新增的客戶的通知。

分割槽和消費者組

分割槽和消費者組是您在使用 Spring Cloud Streams 時應該瞭解的兩個重要概念。

**分割槽** - 用於對資料進行分割槽,以便我們可以在多個消費者之間分配工作。

讓我們看看如何在 Spring Cloud 中對資料進行分割槽。假設,我們希望根據客戶 ID 對資料進行分割槽。因此,讓我們為此更新我們的客戶服務。為此,我們需要告訴

讓我們更新我們的客戶服務應用程式屬性以指定我們資料的鍵。

spring:
   application:
      name: customer-service
   cloud:
      function:
         definition: ordersBinding
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
         bindings:
            customerBinding-out-0:
               destination: customer
               producer:
                  partitionKeyExpression: 'getPayload().getId()'
                  partitionCount: 3
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: https://:8900/eureka

為了指定鍵,即“partitionKeyExpression”,我們提供了 Spring 表示式語言。該表示式假設型別為 GenericMessage,因為我們正在訊息中傳送 Customer 資料。請注意,GenericMessage 是 Spring 框架用於將有效負載和標頭包裝到單個物件中的類。因此,我們從此訊息中獲取有效負載,該有效負載的型別為 Customer,然後我們呼叫客戶上的**getId()**方法。

現在,讓我們也更新我們的消費者,即餐廳服務,以便在使用請求時記錄更多資訊。

現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。

然後,讓我們編譯並開始使用以下命令更新客戶服務:

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml

然後,讓我們編譯並開始使用以下命令更新餐廳服務:

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml

我們已設定好了,現在讓我們測試我們的程式碼段。作為測試的一部分,我們將執行以下操作:

  • 插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1

  • 插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1

  • 插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/5

  • 插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/3

  • 插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1

我們不太關心 API 的輸出。相反,我們更關心資料傳送到的分割槽。由於我們使用客戶 ID 作為鍵,因此我們預計具有相同 ID 的客戶將最終位於同一分割槽中。

現在,讓我們檢查餐廳服務的日誌:

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 2
Customer: Customer [id=5, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 0
Customer: Customer [id=3, name=Dwayne, city=NY]
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]

因此,正如我們所看到的,ID 為 1 的客戶每次都最終位於同一分割槽中,即分割槽 1。

**消費者組** - 消費者組是讀取同一主題以實現相同目的的消費者的邏輯分組。主題中的資料在消費者組中的消費者之間進行分割槽,以便給定消費者組中的只有一個消費者可以讀取主題的一個分割槽。

要定義消費者組,我們只需在我們使用 Kafka 主題名稱的繫結中定義一個組即可。例如,讓我們在我們的應用程式檔案中為我們的控制器定義消費者組名稱。

spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer
               group: restController
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: https://:8900/eureka

讓我們重新編譯並啟動餐廳服務。現在,讓我們透過訪問客戶服務的 POST API 生成事件:

插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1

現在,如果我們檢查餐廳服務的日誌,我們將看到以下內容:

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: restContoller
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]

因此,正如我們從輸出中看到的,我們建立了一個名為“rest-contoller”的消費者組,其消費者負責讀取主題。在上述情況下,我們只有一個服務例項正在執行,因此“customer”主題的所有分割槽都被分配給同一個例項。但是,如果我們有多個分割槽,我們將有多個分割槽分佈在工作程式之間。

廣告