gRPC - 超時 & 取消
gRPC 支援為請求分配超時時間。這是一種執行請求取消的方式。它有助於避免為客戶端和伺服器使用資源,因為這些資源對於客戶端來說其結果將毫無用處。
請求超時
gRPC 支援為客戶端和伺服器指定超時時間。
客戶端可以在執行時指定它希望在取消請求之前等待的時間。
伺服器也可以在其端檢查是否需要處理請求,或者客戶端是否已放棄該請求。
讓我們舉一個例子,客戶端期望在 2 秒內收到響應,但伺服器需要更長時間。所以,這是我們的伺服器程式碼。
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnaryTimeout {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerUnaryTimeout greetServer = new
BookeStoreServerUnaryTimeout();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends
BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +searchQuery.getName());
logger.info("This may take more time...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> matchingBookTitles = bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
在上面的程式碼中,伺服器搜尋客戶端提供的標題對應的書籍。我們添加了一個虛擬休眠,以便我們可以看到請求超時。
這是我們的客戶端程式碼
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlockingTimeout {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlockingTimeout(Channel channel){
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request =BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
logger.info("Got following book from server: " +response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上面的程式碼使用標題呼叫伺服器進行搜尋。但更重要的是,它為 gRPC 呼叫提供了2 秒的超時時間。
現在讓我們看看它的實際操作。要執行程式碼,請啟動兩個 shell。在第一個 shell 上啟動伺服器,執行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我們會看到以下輸出:
輸出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上輸出表明伺服器已啟動。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我們會得到以下輸出:
輸出
Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我們所看到的,客戶端在 2 秒內沒有收到響應,因此它取消了請求並將其稱為超時,即DEADLINE_EXCEEDED
請求取消
gRPC 支援從客戶端和伺服器端取消請求。客戶端可以在執行時指定它希望在取消請求之前等待的時間。伺服器也可以在其端檢查是否需要處理請求,或者客戶端是否已放棄該請求。
讓我們看一個客戶端流式傳輸的例子,其中客戶端呼叫取消。所以,這是我們的伺服器程式碼
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
}
此伺服器程式碼是一個客戶端端流式傳輸的簡單示例。伺服器只是跟蹤客戶端想要的書籍,最後它提供訂單的總購物車價值。
但是這裡在請求取消方面沒有什麼特別的,因為這是客戶端將呼叫的操作。所以,讓我們看看客戶端程式碼。
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookStoreClientStreamingClientCancelation {
private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
private final BookStoreStub stub;
StreamObserver<Book> streamClientSender;
private CancellableContext withCancellation;
public BookStoreClientStreamingClientCancelation(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver>Cart<(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:"
+ "\nTotal number of Books: "
+ cart.getBooks()
+ "\nTotal Order Value: "
+ cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from Server: " + t);
}
@Override
public void onCompleted() {
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
withCancellation = Context.current().withCancellation();
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public void cancelOrder() {
withCancellation.cancel(null);
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
if(bookName.equals("CANCEL")) {
client.cancelOrder();
break;
}
client.addBook(bookName);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
因此,如果我們檢視上面的程式碼,以下行定義了一個啟用了取消功能的上下文。
withCancellation = Context.current().withCancellation();
這是使用者鍵入 CANCEL 時將呼叫的方法。這將取消訂單並讓伺服器知道。
public void cancelOrder() {
withCancellation.cancel(null);
}
現在讓我們看看它的實際操作。要執行程式碼,請啟動兩個 shell。在第一個 shell 上啟動伺服器,執行以下命令。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們會看到以下輸出:
輸出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
以上輸出表示伺服器已啟動。
現在,讓我們啟動客戶端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我們會得到以下輸出:
輸出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
並且我們將在伺服器日誌中獲得以下資料:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我們所看到的,客戶端啟動了對其向伺服器發出的請求的取消操作。伺服器也收到了關於取消的通知。