gRPC - 伺服器流式RPC



現在讓我們討論一下在使用 gRPC 通訊時伺服器流是如何工作的。在這種情況下,客戶端將搜尋具有給定作者的書籍。假設伺服器需要一些時間來遍歷所有書籍。伺服器不會等到遍歷完所有書籍後才提供所有書籍,而是會以流式的方式提供書籍,即一旦找到一本書就提供一本書。

.proto 檔案

首先,讓我們在 common_proto_files 中定義 bookstore.proto 檔案 -

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

以下程式碼塊表示服務名稱 "BookStore" 和可以呼叫的函式名稱 "searchByAuthor"。 "searchByAuthor" 函式接收型別為 "BookSearch" 的輸入,並返回型別為 "Book" 的流。因此,實際上,我們允許客戶端搜尋標題並返回與查詢的作者匹配的書籍之一。

service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

現在讓我們看看這些型別。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

在這裡,我們定義了 BookSearch,它包含一些屬性,如 nameauthorgenre。客戶端應該將型別為 "BookSearch" 的物件傳送到伺服器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

我們還定義了,給定一個 "BookSearch",伺服器將返回一個 "Book" 流,其中包含書籍屬性以及書籍的價格。伺服器應該傳送一個 "Book" 流。

請注意,我們已經完成了 Maven 設定,用於自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案 -

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以為這些函式提供服務的伺服器。

讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在 com.tp.bookstore.BookeStoreServerStreaming.java 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

以上程式碼在指定埠啟動 gRPC 伺服器,併為我們在 proto 檔案中編寫的函式和服務提供服務。讓我們一起瀏覽以上程式碼 -

  • main 方法開始,我們在指定埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中,是 BookStore 服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中,是 BookStoreImpl

  • 服務例項需要提供 .proto 檔案中存在的方法/函式的實現,即在我們的例子中,是 searchByAuthor 方法。

  • 該方法期望一個型別為 .proto 檔案中定義的物件,即對於我們來說是 BookSearch

  • 請注意,我們添加了一個 sleep 來模擬遍歷所有書籍的操作。在流式傳輸的情況下,伺服器不會等待所有搜尋到的書籍都可用。它在可用時透過使用 onNext() 呼叫返回書籍。

  • 當伺服器完成請求時,它透過呼叫 onCompleted() 關閉通道。

  • 最後,我們還有一個關閉鉤子,以確保在完成執行程式碼時伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	public BookStoreClientServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

以上程式碼在指定埠啟動 gRPC 伺服器,併為我們在 proto 檔案中編寫的函式和服務提供服務。讓我們一起瀏覽以上程式碼 -

  • main 方法開始,我們接受一個引數,即我們要搜尋的書籍的 title

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 然後,我們使用 channel 建立一個 阻塞存根。在這裡,我們選擇服務 "BookStore",我們計劃呼叫其函式。

  • 然後,我們簡單地建立 .proto 檔案中定義的預期輸入,即在我們的例子中是 BookSearch,並新增我們希望伺服器搜尋的標題。

  • 我們最終進行呼叫並獲取有效 Books 上的迭代器。當我們迭代時,我們得到伺服器提供的相應 Books。

  • 最後,我們關閉 channel 以避免任何資源洩漏。

所以,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是以下內容 -

  • 啟動 gRPC 伺服器。

  • 客戶端查詢伺服器以獲取具有給定作者的書籍。

  • 伺服器在其儲存中搜索書籍,這是一個耗時的過程。

  • 伺服器在找到符合給定條件的書籍時做出響應。伺服器不會等待所有有效的書籍都可用。它在找到一本書時立即傳送輸出。然後重複此過程。

現在,我們已經定義了我們的 proto 檔案,編寫了我們的伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 上啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

以上輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

因此,正如我們所看到的,客戶端能夠透過使用書籍名稱查詢伺服器來獲取書籍詳細資訊。但更重要的是,客戶端在不同的時間戳獲取了第一本書和第二本書,即大約 5 秒的間隔。

廣告

© . All rights reserved.