
gRPC - 超時 & 取消
gRPC 支援為請求分配超時時間。這是一種執行請求取消的方式。它有助於避免為客戶端和伺服器使用資源,因為這些資源對於客戶端來說其結果將毫無用處。
請求超時
gRPC 支援為客戶端和伺服器指定超時時間。
客戶端可以在執行時指定它希望在取消請求之前等待的時間。
伺服器也可以在其端檢查是否需要處理請求,或者客戶端是否已放棄該請求。
讓我們舉一個例子,客戶端期望在 2 秒內收到響應,但伺服器需要更長時間。所以,這是我們的伺服器程式碼。
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnaryTimeout { private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerUnaryTimeout greetServer = new BookeStoreServerUnaryTimeout(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " +searchQuery.getName()); logger.info("This may take more time..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } List<String> matchingBookTitles = bookMap.keySet().stream() .filter(title -> title.startsWith(searchQuery.getName().trim())) .collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
在上面的程式碼中,伺服器搜尋客戶端提供的標題對應的書籍。我們添加了一個虛擬休眠,以便我們可以看到請求超時。
這是我們的客戶端程式碼
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlockingTimeout { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; public BookStoreClientUnaryBlockingTimeout(Channel channel){ blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request =BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } logger.info("Got following book from server: " +response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上面的程式碼使用標題呼叫伺服器進行搜尋。但更重要的是,它為 gRPC 呼叫提供了2 秒的超時時間。
現在讓我們看看它的實際操作。要執行程式碼,請啟動兩個 shell。在第一個 shell 上啟動伺服器,執行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我們會看到以下輸出:
輸出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上輸出表明伺服器已啟動。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我們會得到以下輸出:
輸出
Jul 31, 2021 12:29:34 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook INFO: Querying for book with title: Great Jul 31, 2021 12:29:36 PM com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED, description=deadline exceeded after 1.970455800s. [buffered_nanos=816522700, remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我們所看到的,客戶端在 2 秒內沒有收到響應,因此它取消了請求並將其稱為超時,即DEADLINE_EXCEEDED
請求取消
gRPC 支援從客戶端和伺服器端取消請求。客戶端可以在執行時指定它希望在取消請求之前等待的時間。伺服器也可以在其端檢查是否需要處理請求,或者客戶端是否已放棄該請求。
讓我們看一個客戶端流式傳輸的例子,其中客戶端呼叫取消。所以,這是我們的伺服器程式碼
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerClientStreaming { private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); @Override public void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry : bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding to cart:...."); bookCart.add(bookEntry.getValue()); } } } @Override public void onError(Throwable t) { logger.info("Error while reading book stream:" + t); } @Override public void onCompleted() { int cartValue = 0; for (Book book : bookCart) { cartValue += book.getPrice(); } responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build()); responseObserver.onCompleted(); } }; } } }
此伺服器程式碼是一個客戶端端流式傳輸的簡單示例。伺服器只是跟蹤客戶端想要的書籍,最後它提供訂單的總購物車價值。
但是這裡在請求取消方面沒有什麼特別的,因為這是客戶端將呼叫的操作。所以,讓我們看看客戶端程式碼。
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookStoreClientStreamingClientCancelation { private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName()); private final BookStoreStub stub; StreamObserver<Book> streamClientSender; private CancellableContext withCancellation; public BookStoreClientStreamingClientCancelation(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver<Cart> getServerResponseObserver(){ StreamObserver<Cart> observer = new StreamObserver>Cart<(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + "\nTotal number of Books: " + cart.getBooks() + "\nTotal Order Value: " + cart.getPrice()); } @Override public void onError(Throwable t) { logger.info("Error while reading response from Server: " + t); } @Override public void onCompleted() { } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { withCancellation = Context.current().withCancellation(); streamClientSender = stub.totalCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create order summary..."); if(streamClientSender != null); streamClientSender.onCompleted(); } public void cancelOrder() { withCancellation.cancel(null); } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build(); try { BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = ""; while(true) { System.out.println("Type book name to be added to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } if(bookName.equals("CANCEL")) { client.cancelOrder(); break; } client.addBook(bookName); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
因此,如果我們檢視上面的程式碼,以下行定義了一個啟用了取消功能的上下文。
withCancellation = Context.current().withCancellation();
這是使用者鍵入 CANCEL 時將呼叫的方法。這將取消訂單並讓伺服器知道。
public void cancelOrder() { withCancellation.cancel(null); }
現在讓我們看看它的實際操作。要執行程式碼,請啟動兩個 shell。在第一個 shell 上啟動伺服器,執行以下命令。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們會看到以下輸出:
輸出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
以上輸出表示伺服器已啟動。
現在,讓我們啟動客戶端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我們會得到以下輸出:
輸出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
並且我們將在伺服器日誌中獲得以下資料:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我們所看到的,客戶端啟動了對其向伺服器發出的請求的取消操作。伺服器也收到了關於取消的通知。