Spring Cloud - 使用 Apache Kafka 的流
介紹
在分散式環境中,服務需要相互通訊。通訊可以同步發生,也可以非同步發生。在本節中,我們將瞭解服務如何透過使用**訊息代理**非同步通訊。
執行非同步通訊的兩個主要好處:
**生產者和消費者的速度可以不同** - 如果資料的消費者速度慢或快,則不會影響生產者的處理,反之亦然。兩者都可以以各自的速度工作,而不會相互影響。
**生產者不需要處理來自各種消費者的請求** - 可能有多個消費者希望從生產者讀取相同的資料集。透過在兩者之間使用訊息代理,生產者不需要處理這些消費者產生的負載。此外,生產者級別的任何中斷都不會阻止消費者讀取舊的生產者資料,因為這些資料將存在於訊息代理中。
**Apache Kafka** 和**RabbitMQ** 是兩種用於進行非同步通訊的知名訊息代理。在本教程中,我們將使用 Apache Kafka。
Kafka – 依賴項設定
讓我們使用我們之前一直在使用的餐廳案例。因此,假設我們的客戶服務和餐廳服務透過非同步通訊進行通訊。為此,我們將使用 Apache Kafka。並且我們需要在兩個服務中使用它,即客戶服務和餐廳服務。
要使用 Apache Kafka,我們將更新這兩個服務的 POM 並新增以下依賴項。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
我們還需要執行 Kafka 例項。有多種方法可以做到這一點,但我們更傾向於使用 Docker 容器啟動 Kafka。以下是一些我們可以考慮使用的映象:
無論我們使用哪個映象,這裡需要注意的重要一點是,一旦映象啟動並執行,請確保 Kafka 叢集可在**localhost:9092**訪問。
現在我們已經在映象上運行了 Kafka 叢集,讓我們轉到核心示例。
繫結和繫結器
在 Spring Cloud Streams 中,有三個重要的概念:
**外部訊息系統** - 這是外部管理的元件,負責儲存應用程式生成的事件/訊息,這些事件/訊息可以被其訂閱者/消費者讀取。請注意,這不是在應用程式/Spring 中管理的。一些示例包括 Apache Kafka、RabbitMQ
**繫結器** - 這是提供與訊息系統整合的元件,例如,包含訊息系統的 IP 地址、身份驗證等。
**繫結** - 此元件使用繫結器向訊息系統傳送訊息或從特定主題/佇列中使用訊息。
所有上述屬性都在**應用程式屬性檔案**中定義。
示例
讓我們使用我們之前一直在使用的餐廳案例。因此,假設每當向客戶服務新增新服務時,我們都希望通知附近的餐廳有關他/她的客戶資訊。
為此,讓我們首先更新我們的客戶服務以包含和使用 Kafka。請注意,我們將使用客戶服務作為資料的生產者。也就是說,每當我們透過 API 新增客戶時,它也將新增到 Kafka 中。
spring:
application:
name: customer-service
cloud:
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
**需要注意的事項**:
我們已定義了一個繫結器,其中包含我們本地 Kafka 例項的地址。
我們還定義了繫結“customerBinding-out-0”,它使用“customer”主題輸出訊息。
我們還在**stream.source**中提到了我們的繫結,以便我們可以在程式碼中使用它。
完成此操作後,讓我們現在透過新增一個負責處理 POST 請求的新方法“addCustomer”來更新我們的控制器。然後,從**post**請求中,我們將資料傳送到 Kafka 代理。
package com.tutorialspoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantCustomerInstancesController {
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Customer> mockCustomerData = new HashMap();
static{
mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
}
@RequestMapping("/customer/{id}")
public Customer getCustomerInfo(@PathVariable("id") Long id) {
System.out.println("Querying customer for id with: " + id);
return mockCustomerData.get(id);
}
@RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
public Customer addCustomer(@PathVariable("id") Long id) {
// add default name
Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
streamBridge.send("customerBinding-out-0", defaultCustomer);
return defaultCustomer;
}
}
需要注意的事項
我們正在自動裝配 StreamBridge,這就是我們將用於傳送訊息的工具。
我們在“send”方法中使用的引數也指定了我們要用來發送資料的繫結。
現在讓我們更新我們的餐廳服務以包含並訂閱“customer”主題。請注意,我們將使用餐廳服務作為資料的消費者。也就是說,每當我們透過 API 新增客戶時,餐廳服務都會透過 Kafka 瞭解它。
首先,讓我們更新**application.properties**檔案。
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
完成此操作後,讓我們現在透過新增一個負責獲取請求並提供一個函式的新方法“customerBinding”來更新我們的控制器,該函式將列印請求及其元資料詳細資訊。
package com.tutorialspoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantController {
@Autowired
CustomerService customerService;
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
static{
mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
}
@RequestMapping("/restaurant/customer/{id}")
public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
System.out.println("Got request for customer with id: " + id);
String customerCity = customerService.getCustomerById(id).getCity();
return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
}
@RequestMapping("/restaurant/cust/{id}")
public void getRestaurantForCust(@PathVariable("id") Long id) {
streamBridge.send("ordersBinding-out-0", id);
}
@Bean
public Consumer<Message<Customer>> customerBinding() {
return msg -> {
System.out.println(msg);
};
}
}
**需要注意的事項**:
我們正在使用“customerBinding”,它應該傳遞當此繫結收到訊息時將呼叫的函式。
我們為此函式/bean使用的名稱也需要在建立捆綁和指定主題時在 YAML 檔案中使用。
現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。
然後,讓我們編譯並開始使用以下命令更新客戶服務:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然後,讓我們編譯並開始使用以下命令更新餐廳服務:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我們已設定好了,現在讓我們透過訪問 API 來測試我們的程式碼段:
curl -X POST https://:8083/customer/1
這是我們將為此 API 獲得的輸出:
{
"id": 1,
"name": "Dwayne",
"city": "NY"
}
現在,讓我們檢查餐廳服務的日誌:
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...
因此,實際上,您可以看到,使用 Kafka 代理,餐廳服務已收到有關新新增的客戶的通知。
分割槽和消費者組
分割槽和消費者組是您在使用 Spring Cloud Streams 時應該瞭解的兩個重要概念。
**分割槽** - 用於對資料進行分割槽,以便我們可以在多個消費者之間分配工作。
讓我們看看如何在 Spring Cloud 中對資料進行分割槽。假設,我們希望根據客戶 ID 對資料進行分割槽。因此,讓我們為此更新我們的客戶服務。為此,我們需要告訴
讓我們更新我們的客戶服務應用程式屬性以指定我們資料的鍵。
spring:
application:
name: customer-service
cloud:
function:
definition: ordersBinding
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionKeyExpression: 'getPayload().getId()'
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
為了指定鍵,即“partitionKeyExpression”,我們提供了 Spring 表示式語言。該表示式假設型別為 GenericMessage
現在,讓我們也更新我們的消費者,即餐廳服務,以便在使用請求時記錄更多資訊。
現在,讓我們像往常一樣執行上述程式碼,啟動 Eureka Server。請注意,這不是硬性要求,這裡是為了完整性而存在的。
然後,讓我們編譯並開始使用以下命令更新客戶服務:
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然後,讓我們編譯並開始使用以下命令更新餐廳服務:
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我們已設定好了,現在讓我們測試我們的程式碼段。作為測試的一部分,我們將執行以下操作:
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/5
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/3
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
我們不太關心 API 的輸出。相反,我們更關心資料傳送到的分割槽。由於我們使用客戶 ID 作為鍵,因此我們預計具有相同 ID 的客戶將最終位於同一分割槽中。
現在,讓我們檢查餐廳服務的日誌:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 2 Customer: Customer [id=5, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 0 Customer: Customer [id=3, name=Dwayne, city=NY] Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我們所看到的,ID 為 1 的客戶每次都最終位於同一分割槽中,即分割槽 1。
**消費者組** - 消費者組是讀取同一主題以實現相同目的的消費者的邏輯分組。主題中的資料在消費者組中的消費者之間進行分割槽,以便給定消費者組中的只有一個消費者可以讀取主題的一個分割槽。
要定義消費者組,我們只需在我們使用 Kafka 主題名稱的繫結中定義一個組即可。例如,讓我們在我們的應用程式檔案中為我們的控制器定義消費者組名稱。
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
group: restController
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: https://:8900/eureka
讓我們重新編譯並啟動餐廳服務。現在,讓我們透過訪問客戶服務的 POST API 生成事件:
插入 ID 為 1 的客戶:curl -X POST https://:8083/customer/1
現在,如果我們檢查餐廳服務的日誌,我們將看到以下內容:
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: restContoller Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我們從輸出中看到的,我們建立了一個名為“rest-contoller”的消費者組,其消費者負責讀取主題。在上述情況下,我們只有一個服務例項正在執行,因此“customer”主題的所有分割槽都被分配給同一個例項。但是,如果我們有多個分割槽,我們將有多個分割槽分佈在工作程式之間。