gRPC - 雙向RPC



現在讓我們看看在使用 gRPC 通訊時客戶端-伺服器流式是如何工作的。在這種情況下,客戶端將搜尋書籍並新增到購物車。每次新增書籍時,伺服器都會以即時購物車值進行響應。

.proto 檔案

首先,讓我們在 **common_proto_files** 中定義 **bookstore.proto** 檔案 -

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

以下程式碼塊表示服務名稱 **"BookStore"** 和可呼叫的函式名稱 "liveCartValue"。**"liveCartValue"** 函式接收型別為 **"Book"** 的輸入,該輸入是一個流。該函式返回一個型別為 **"Cart"** 的物件流。因此,實際上,我們允許客戶端以流式方式新增書籍,並且每當新增新書籍時,伺服器都會將當前購物車值響應給客戶端。

service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}

現在讓我們看看這些型別。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客戶端將傳送它想要購買的 **"Book"**。它不需要是完整的書籍資訊;它可以僅僅是書籍的標題。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

伺服器在獲取書籍列表後,將返回 **"Cart"** 物件,它僅僅是客戶端已購買的書籍總數和總價。

請注意,我們已經完成了 Maven 設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案。

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在

Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            public void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

以上程式碼在指定埠啟動一個 gRPC 伺服器,並服務於我們在 **proto** 檔案中編寫的函式和服務。讓我們逐步瞭解以上程式碼 -

  • 從 **main** 方法開始,我們在指定埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中,是 **BookStore** 服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中是 **BookStoreImpl**

  • 服務例項需要提供 **proto** 檔案中存在的 method/function 的實現,即在我們的例子中是 **totalCartValue** 方法。

  • 現在,鑑於這是伺服器和客戶端流式的情況,伺服器將獲得客戶端新增的 **Books** 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個自定義流觀察者。此流觀察者實現了在找到新 Book 時會發生什麼以及在流關閉時會發生什麼。

  • 當客戶端新增一個 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車並使用響應觀察者返回購物車值。在流式傳輸的情況下,伺服器不會等待所有有效書籍都可用。

  • 當客戶端完成新增書籍後,將呼叫流觀察者的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增 **Books** 後想要執行的操作,即宣告它已完成接收客戶端訂單。

  • 最後,我們還有一個關閉鉤子,以確保在完成執行程式碼後乾淨地關閉伺服器。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "\nTotal number of Books:" + cart.getBooks()+ 
               "\nTotal Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamClientSender == null) {
         streamClientSender =stub.liveCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamClientSender != null); {
         streamClientSender.onCompleted();
      }
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientBothStreaming client = new
         BookStoreClientBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(client.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  client.completeOrder();
                  break;
               }
               client.serverIntermediateResponseCompleted = false;
               client.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

以上程式碼啟動一個 gRPC 客戶端,並連線到指定埠的伺服器,並呼叫我們在 **proto** 檔案中編寫的函式和服務。讓我們逐步瞭解以上程式碼 -

  • 從 **main** 方法開始,我們接受要新增到購物車的書籍名稱。一旦所有書籍都將被新增,使用者預計會列印“EXIT”。

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 接下來,我們使用 channel 建立一個非阻塞存根。在這裡,我們選擇要呼叫其函式的服務 **"BookStore"**。

  • 然後,我們簡單地建立 **proto** 檔案中定義的預期輸入,即在我們的例子中是 **Book**,並新增我們希望伺服器新增的標題。

  • 但是鑑於這是伺服器和客戶端流式的情況,我們首先為伺服器建立一個 **流觀察者**。此伺服器流觀察者列出了伺服器響應時需要執行的操作,即 **onNext()** 和 **onCompleted()**。

  • 並且使用存根,我們也獲得了客戶端流觀察者。我們使用此流觀察者傳送資料,即要新增到購物車的 **Book**。

  • 並且一旦我們的訂單完成,我們確保客戶端流觀察者已關閉。這告訴伺服器關閉流並執行清理。

  • 最後,我們關閉 channel 以避免任何資源洩漏。

所以,那是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是以下幾點 -

  • 啟動 gRPC 伺服器。

  • 客戶端透過通知伺服器新增書籍流。

  • 伺服器在其商店中搜索書籍並將其新增到購物車。

  • 每次新增書籍時,伺服器都會告訴客戶端購物車值。

  • 當客戶端完成訂購時,伺服器和客戶端都關閉流。

現在我們已經定義了 **proto** 檔案,編寫了伺服器和客戶端程式碼,讓我們現在執行此程式碼並檢視實際效果。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

此輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientBothStreaming

讓我們向客戶端新增一本書。

Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreClientBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

因此,正如我們所看到的,我們獲得了訂單的當前購物車值。現在讓我們向客戶端再新增一本書。

Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

一旦我們添加了書籍並輸入“EXIT”,客戶端將關閉。

Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreClientBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

因此,正如我們所看到的,客戶端能夠新增書籍。並且隨著書籍的新增,伺服器會以當前購物車值進行響應。

廣告
© . All rights reserved.