gRPC - 客戶端流式RPC
現在讓我們看看使用 gRPC 通訊時客戶端流是如何工作的。在這種情況下,客戶端將搜尋並將圖書新增到購物車。一旦客戶端完成新增所有圖書,伺服器將向客戶端提供結賬購物車價值。
.proto 檔案
首先,讓我們在 **common_proto_files** 中定義 **bookstore.proto** 檔案:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
這裡,以下程式碼塊表示服務的名稱 **"BookStore"** 和可以呼叫的函式名 **"totalCartValue"**。“totalCartValue”函式接收型別為 **"Book"** 的輸入,這是一個流。該函式返回型別為 "Cart" 的物件。因此,實際上,我們允許客戶端以流式方式新增圖書,一旦客戶端完成,伺服器就會向客戶端提供總購物車價值。
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
現在讓我們看看這些型別。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客戶端將傳送它想要購買的 **"Book"**。它可能不是完整的圖書資訊;它可以只是圖書的標題。
message Cart {
int32 books = 1;
int32 price = 2;
}
伺服器在獲取圖書列表後,將返回 **"Cart"** 物件,它只是客戶端購買的圖書總數和總價格。
請注意,我們已經完成了 Maven 設定,用於自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在:
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 **proto** 檔案,讓我們設定一個可以服務於這些函式呼叫的伺服器。
讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerClientStreaming greetServer = new BookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book)
logger.info("Searching for book with title starting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 **proto** 檔案中編寫的函式和服務。讓我們一起瀏覽上面的程式碼:
從 **main** 方法開始,我們在指定的埠建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中,是 **BookStore** 服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中,是 **BookStoreImpl**。
服務例項需要提供 **.proto 檔案** 中存在的 method/function 的實現,即在我們的例子中,是 **totalCartValue** 方法。
現在,鑑於這是客戶端流的情況,當客戶端新增圖書時,伺服器將獲得一個 Book 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個 **自定義流觀察者**。此流觀察者實現了當找到新書時會發生什麼以及流關閉時會發生什麼。
當客戶端新增一本 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車。在流式傳輸的情況下,伺服器不會等待所有可用的圖書。
當客戶端完成新增圖書時,將呼叫流觀察者的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增圖書時想要傳送的內容,即它將 Cart 物件返回給客戶端。
最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientStreamingClient {
private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientStreamingClient(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() +
"\nTotal Order Value:" + cart.getPrice());
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.addBook(bookName);
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們一起瀏覽上面的程式碼:
從 **main** 方法開始,我們接受一個引數,即我們要搜尋的圖書標題。
我們為與伺服器的 gRPC 通訊設定一個 Channel。
接下來,我們使用建立的 channel 建立一個 **非阻塞存根**。在這裡,我們選擇我們要呼叫其函式的服務 **"BookStore"**。
然後,我們簡單地建立 **.proto** 檔案中定義的預期輸入,即在我們的例子中是 **Book**,並且我們添加了我們希望伺服器新增的標題。
但是鑑於這是客戶端流的情況,我們首先為伺服器建立一個流觀察者。此伺服器流觀察者列出了伺服器響應時需要執行的操作的行為,即 **onNext()** 和 **onCompleted()**。
並且使用存根,我們還獲得了客戶端流觀察者。我們使用此流觀察者將資料(即 Book)傳送到新增到購物車。最終,我們進行呼叫並在有效的圖書上獲得迭代器。當我們迭代時,我們得到伺服器提供的相應圖書。
一旦我們的訂單完成,我們確保客戶端流觀察者關閉。它告訴伺服器計算購物車價值並將其作為輸出提供。
最後,我們關閉 channel 以避免任何資源洩漏。
所以,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
總而言之,我們要做的是:
啟動 gRPC 伺服器。
客戶端透過向伺服器通知它們來新增一系列圖書。
伺服器在其商店中搜索圖書並將它們新增到購物車。
當客戶端完成訂購時,伺服器會響應客戶端的總購物車價值。
現在,我們已經定義了我們的 **proto** 檔案,編寫了我們的伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們將看到以下輸出:
輸出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
以上輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingClient
讓我們向我們的客戶端新增一些圖書。
Type book name to be added to the cart.... Gr Jul 24, 2021 5:53:07 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... Pa Jul 24, 2021 5:53:20 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Passage Type book name to be added to the cart....
一旦我們添加了圖書並且輸入“EXIT”,伺服器然後計算購物車價值,以下是我們得到的輸出:
輸出
EXIT Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient completeOrder INFO: Done, waiting for server to create order summary... Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
因此,正如我們所看到的,客戶端能夠新增圖書。一旦所有圖書都被新增,伺服器將響應圖書總數和總價格。