gRPC - 快速指南



gRPC - 簡介

在深入瞭解 gRPC 之前,讓我們先簡單瞭解一下遠端過程呼叫,也就是 gRPC 所做的事情。

什麼是遠端過程呼叫?

遠端過程呼叫是看起來像普通/本地函式呼叫一樣的函式呼叫,但不同之處在於遠端函式呼叫通常在不同的機器上執行。但是,對於編寫程式碼的開發人員來說,函式呼叫和遠端呼叫之間的區別很小。這些呼叫通常遵循客戶端-伺服器模型,其中執行呼叫的機器充當伺服器。

為什麼我們需要遠端過程呼叫?

遠端過程呼叫提供了一種在另一臺機器上執行程式碼的方法。在大型、龐大的產品中,這變得至關重要,因為單臺機器無法承載產品正常執行所需的所有程式碼。

在微服務架構中,應用程式被分解成小的服務,這些服務透過訊息佇列和 API 相互通訊。所有這些通訊都發生在網路上,不同的機器/節點根據它們所承載的服務提供不同的功能。因此,在分散式環境中工作時,建立遠端過程呼叫成為一個關鍵方面。

為什麼選擇 gRPC?

Google 遠端過程呼叫 (gRPC) 提供了一個執行遠端過程呼叫的框架。但是,還有一些其他庫和機制可以在遠端機器上執行程式碼。那麼,是什麼讓 gRPC 如此特別呢?讓我們來了解一下。

  • 語言無關性 - gRPC 在內部使用 Google Protocol Buffer。因此,可以使用多種語言,例如 Java、Python、Go、Dart 等。Java 客戶端可以進行過程呼叫,而使用 Python 的伺服器可以做出響應,從而有效地實現語言無關性。

  • 高效的資料壓縮 - 在微服務環境中,考慮到網路上會發生多次通訊,因此我們傳送的資料儘可能簡潔至關重要。我們需要避免任何多餘的資料,以確保資料快速傳輸。鑑於 gRPC 在內部使用 Google Protocol Buffer,因此它可以利用此功能。

  • 高效的序列化和反序列化 - 在微服務環境中,考慮到網路上會發生多次通訊,因此我們儘可能快速地序列化和反序列化資料至關重要。鑑於 gRPC 在內部使用 Google Protocol Buffer,因此它可以確保快速序列化和反序列化資料。

  • 易於使用 - gRPC 已經擁有一個庫和外掛,可以自動生成過程程式碼(我們將在後續章節中看到)。對於簡單的用例,它可以像本地函式呼叫一樣使用。

gRPC 與使用 JSON 的 REST 的比較

讓我們看看其他透過網路傳輸資料的方式與 Protobuf 相比如何。

功能 gRPC 使用 JSON/XML 的 HTTP
語言無關性
HTTP 版本 HTTP/2 HTTP 1.1
指定域模式 .proto 檔案(Google Protocol Buffer)
序列化資料大小 最小 高(XML 更高)
人類可讀性 否,因為它使用單獨的編碼模式 是,因為它使用基於文字的格式
序列化速度 最快 較慢(XML 最慢)
資料型別支援 更豐富。支援複雜資料型別,如 Any、oneof 等。 支援基本資料型別
支援演變模式

gRPC - 設定

Protoc 設定

請注意,此設定僅適用於 Python。對於 Java,所有這些都由 Maven 檔案處理。讓我們安裝“proto”二進位制檔案,我們將使用它來自動生成“.proto”檔案的程式碼。二進位制檔案可以在 https://github.com/protocolbuffers/protobuf/releases/ 中找到。根據作業系統選擇正確的二進位制檔案。我們將在 Windows 上安裝 proto 二進位制檔案,但 Linux 的步驟差別不大。

安裝完成後,確保您可以透過命令列訪問它 -

protoc --version
libprotoc 3.15.6

這意味著 Protobuf 已正確安裝。現在,讓我們轉到專案結構。

我們還需要設定 gRPC 程式碼生成所需的外掛。

對於 Python,我們需要執行以下命令 -

python -m pip install grpcio
python -m pip install grpcio-tools

它將安裝所有必需的二進位制檔案並將它們新增到路徑中。

專案結構

以下是我們將擁有的整體專案結構 -

Project Structure

與各個語言相關的程式碼進入各自的目錄。我們將有一個單獨的目錄來儲存我們的 proto 檔案。並且,以下是我們將用於 Java 的專案結構 -

Project Structure

專案依賴項

現在我們已經安裝了protoc,我們可以使用protoc從 proto 檔案自動生成程式碼。讓我們先建立一個 Java 專案。

以下是我們將用於 Java 專案的 Maven 配置。請注意,它還包含 Protobuf 所需的庫。

示例

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
   http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.grpc.point</groupId>
   <artifactId>grpc-point</artifactId>
   <version>1.0</version>
   <packaging>jar</packaging>

   <properties>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
   </properties>

   <dependencies>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-netty-shaded</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-protobuf</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-stub</artifactId>
         <version>1.38.0</version>
      </dependency>
      <dependency> <!-- necessary for Java 9+ -->
         <groupId>org.apache.tomcat</groupId>
         <artifactId>annotations-api</artifactId>
         <version>6.0.53</version>
         <scope>provided</scope>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple-->
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.30</version>
      </dependency>
   </dependencies>
      
   <build>
      <extensions>
         <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.6.2</version>
            </extension>
      </extensions>
      <plugins>
         <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>build-helper-maven-plugin</artifactId>
            <version>1.1</version>
            <executions>
               <execution>
                  <id>test</id>
                  <phase>generate-sources</phase>
                  <goals>
                     <goal>add-source</goal>
                  </goals>
                  <configuration>
                     <sources>
                        <source>${basedir}/target/generated-sources</source>
                     </sources>
                  </configuration>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
               <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
               <protoSourceRoot>../common_proto_files</protoSourceRoot>
            </configuration>
            <executions>
               <execution>
                  <goals>
                     <goal>compile</goal>
                     <goal>compile-custom</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <configuration>
            </configuration>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals>
                     <goal>shade</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>
</project>

gRPC - 使用 Java 的 Hello World 應用

現在讓我們建立一個基本的類似“Hello World”的應用程式,它將使用 gRPC 和 Java。

.proto 檔案

首先讓我們在common_proto_files中定義“greeting.proto”檔案 -

syntax = "proto3";
option java_package = "com.tp.greeting";
service Greeter {
   rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

現在讓我們仔細看看這個程式碼塊,並瞭解每一行的作用 -

syntax = "proto3";

此處的“syntax”表示我們正在使用的 Protobuf 版本。因此,我們使用最新版本 3,因此模式可以使用所有對版本 3 有效的語法。

package tutorial;

此處的包用於衝突解決,例如,如果我們有多個名稱相同的類/成員。

option java_package = "com.tp.greeting";

此引數特定於 Java,即從“.proto”檔案自動生成程式碼的包。

service Greeter {
   rpc greet(ClientInput) returns (ServerOutput) {}
}

此塊表示服務“Greeter”的名稱和可以呼叫的函式名稱“greet”。“greet”函式接收型別為“ClientInput”的輸入並返回型別為“ServerOutput”的輸出。現在讓我們看看這些型別。

message ClientInput {
   string greeting = 1;
   string name = 2;
}

在上面的程式碼塊中,我們定義了ClientInput,它包含兩個屬性“greeting”“name”,它們都是字串。客戶端應該將型別為“ClientInput”的物件傳送到伺服器。

message ServerOutput {
   string message = 1;
}

在上面的程式碼塊中,我們定義了,給定一個“ClientInput”,伺服器將返回“ServerOutput”,它將包含一個名為“message”的屬性。伺服器應該將型別為“ServerOutput”的物件傳送到客戶端。

請注意,我們已經完成了 Maven 設定以自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案 -

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以服務於呼叫這些函式的伺服器。

讓我們編寫伺服器程式碼來服務於上述函式,並將其儲存在com.tp.grpc.GreetServer.java中 -

示例

package com.tp.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ClientInput;
import com.tp.greeting.Greeting.ServerOutput;

public class GreetServer {
   private static final Logger logger = Logger.getLogger(GreetServer.class.getName());
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
       
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
      @Override
      public void greet(ClientInput req, StreamObserver<ServerOutput> responseObserver) {
         logger.info("Got request from client: " + req);
         ServerOutput reply = ServerOutput.newBuilder().setMessage(
            "Server says " + "\"" + req.getGreeting() + " " + req.getName() + "\""
         ).build();
         responseObserver.onNext(reply);
         responseObserver.onCompleted();
      }
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final GreetServer greetServer = new GreetServer();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
} 

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們在指定的埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將伺服器分配給我們要執行的服務,即在我們的例子中,是Greeter服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中,是GreeterImpl

  • 服務例項需要提供“.proto”檔案中存在的函式/方法的實現,即在我們的例子中,是greet方法。

  • 該方法期望一個型別與“.proto”檔案中定義的型別相同的物件,即在我們的例子中,是ClientInput

  • 該方法處理上述輸入,執行計算,然後應該在“.proto”檔案中返回提到的輸出,即在我們的例子中,是ServerOutput

  • 最後,我們還有一個shutdown鉤子,以確保在完成程式碼執行後伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.grpc.GreetClient.java中 -

示例

package com.tp.grpc;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class GreetClient {
   private static final Logger logger = Logger.getLogger(GreetClient.class.getName());
   private final GreeterGrpc.GreeterBlockingStub blockingStub;
   
   public GreetClient(Channel channel) {
      blockingStub = GreeterGrpc.newBlockingStub(channel);
   }
   public void makeGreeting(String greeting, String username) {
      logger.info("Sending greeting to server: " + greeting + " for name: " + username);
      ClientInput request = ClientInput.newBuilder().setName(username).setGreeting(greeting).build();
      logger.info("Sending to server: " + request);
      ServerOutput response;
      try {
         response = blockingStub.greet(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following from the server: " + response.getMessage());
   }
   
   public static void main(String[] args) throws Exception {
      String greeting = args[0];
      String username = args[1];
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         GreetClient client = new GreetClient(channel);
         client.makeGreeting(greeting, username);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們接受兩個引數,即namegreeting

  • 我們為 gRPC 與伺服器的通訊設定了一個 Channel。

  • 接下來,我們使用建立的 Channel 建立一個阻塞式存根。在這裡,我們擁有服務“Greeter”,我們計劃呼叫它的函式。存根只不過是一個包裝器,它隱藏了遠端呼叫的複雜性,使呼叫者無需關心。

  • 然後,我們簡單地建立“.proto”檔案中定義的預期輸入,即在我們的例子中,是ClientInput

  • 我們最終進行呼叫並等待伺服器的響應。

  • 最後,我們關閉 Channel 以避免任何資源洩漏。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際效果。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetServer

我們將看到以下輸出 -

輸出

Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, listening on 50051

上述輸出意味著伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetClient 
Hello Jane

我們將看到以下輸出 -

輸出

Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet
INFO: Sending greeting to server: Hello for name: Jane
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet
INFO: Sending to server: greeting: "Hello"
name: "Jane"

Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetClient greet
INFO: Got following from the server: Server says "Hello Jane"

現在,如果我們開啟伺服器日誌,我們將看到以下內容 -

Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start
INFO: Server started, listening on 50051
Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl 
greet
INFO: Got request from client: greeting: "Hello"
name: "Jane"

因此,客戶端能夠按預期呼叫伺服器,並且伺服器透過向客戶端傳送問候進行了響應。

gRPC - 使用 Python 的 Hello World 應用

現在讓我們建立一個基本的類似“Hello World”的應用程式,它將使用 gRPC 和 Python。

.proto 檔案

首先讓我們在common_proto_files中定義greeting.proto檔案 -

syntax = "proto3";

service Greeter {
   rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
   string greeting = 1;
   string name = 2;
}
message ServerOutput {
   string message = 1;
}

現在讓我們仔細看看上面程式碼塊中的每一行 -

syntax = "proto3";

此處的“syntax”表示我們正在使用的 Protobuf 版本。因此,我們使用最新版本 3,因此模式可以使用所有對版本 3 有效的語法。

package tutorial;

此處的package用於衝突解決,例如,如果我們有多個名稱相同的類/成員。

service Greeter {
   rpc greet(ClientInput) returns (ServerOutput) {}
}

此塊表示服務“Greeter”的名稱和可以呼叫的函式名稱“greet”。“greet”函式接收型別為“ClientInput”的輸入並返回型別為“ServerOutput”的輸出。現在讓我們看看這些型別。

message ClientInput {
   string greeting = 1;
   string name = 2;
}

在上面的程式碼塊中,我們定義了ClientInput,它包含兩個屬性“greeting”和“name”,它們都是字串。客戶端應該將型別為“ClientInput”的物件傳送到伺服器。

message ServerOutput {
   string message = 1;
}

這裡,我們還定義了,給定一個“ClientInput”,伺服器將返回一個包含單個屬性“message”的“ServerOutput”。伺服器應該將型別為“ServerOutput”的物件傳送到客戶端。

現在,讓我們為 Protobuf 類和 gRPC 類生成底層程式碼。為此,我們需要執行以下命令:

python -m grpc_tools.protoc -I ..\common_proto_files\ --
python_out=../python --grpc_python_out=. greeting.proto

但是,請注意,要執行該命令,我們需要安裝教程的設定部分中提到的正確依賴項。

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: python/greeting_pb2.py
Protobuf gRPC code: python/greeting_pb2_grpcpb2.py

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在server.py中:

示例

from concurrent import futures

import grpc
import greeting_pb2
import greeting_pb2_grpc

class Greeter(greeting_pb2_grpc.GreeterServicer):
   def greet(self, request, context):
      print("Got request " + str(request))
      return greeting_pb2.ServerOutput(message='{0} {1}!'.format(request.greeting, request.name))
	  
def server():
   server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
   greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
   server.add_insecure_port('[::]:50051')
   print("gRPC starting")
   server.start()
   server.wait_for_termination()
server()

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們在指定的埠上建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將伺服器分配給我們要執行的服務,即在我們的例子中,是Greeter服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為Greeter

  • 服務例項需要提供.proto檔案中存在的 method/function 的實現,即在我們的例子中為greet方法。

  • 該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是request

  • 該方法處理上述輸入,進行計算,然後應該返回.proto檔案中提到的輸出,即在我們的例子中為ServerOutput

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在client.py中:

示例

import grpc

import greeting_pb2
import greeting_pb2_grpc

def run():
   with grpc.insecure_channel('localhost:50051') as channel:
      stub = greeting_pb2_grpc.GreeterStub(channel)
      response = stub.greet(greeting_pb2.ClientInput(name='John', greeting = "Yo"))
   print("Greeter client received following from server: " + response.message)   
run()

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 然後,我們使用 channel 建立一個stub。在這裡,我們使用服務“Greeter”,我們計劃呼叫其函式。stub 只是一個包裝器,它隱藏了遠端呼叫對呼叫者的複雜性。

  • 然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為ClientInput。我們硬編碼了兩個引數,即namegreeting

  • 我們最終進行呼叫並等待伺服器的結果。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

python .\server.py

輸出

我們將得到以下輸出:

gRPC starting

上述輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

python .\client.py

我們將看到以下輸出 -

輸出

Greeter client received following from server: Yo John!

現在,如果我們開啟伺服器日誌,我們將看到以下資料:

gRPC starting
Got request greeting: "Yo"
name: "John"

因此,正如我們所看到的,客戶端能夠按預期呼叫伺服器,並且伺服器響應了向客戶端回送問候。

gRPC - 一元 gRPC

我們現在將瞭解 gRPC 框架支援的各種型別的通訊。我們將使用書店的例子,客戶可以在其中搜索和下單送書。

讓我們看看一元 gRPC 通訊,我們讓客戶端搜尋標題並隨機返回一個與查詢的標題匹配的書籍。

.proto 檔案

首先讓我們在 common_proto_files 中定義 bookstore.proto 檔案:

syntax = "proto3";
option java_package = "com.tp.bookstore";

service BookStore {
   rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

現在讓我們仔細看看上面程式碼塊中的每一行。

syntax = "proto3";

語法

這裡的“語法”表示我們使用的 Protobuf 版本。我們使用最新的版本 3,因此該模式可以使用所有對版本 3 有效的語法。

package tutorial;

此處的package用於衝突解決,例如,如果我們有多個名稱相同的類/成員。

option java_package = "com.tp.bookstore";

此引數特定於 Java,即從 .proto 檔案自動生成程式碼的包。

service BookStore {
   rpc first (BookSearch) returns (Book) {}
}

這表示服務的名稱"BookStore"和可以呼叫的函式名稱"first""first"函式接收型別為"BookSearch"的輸入並返回型別為"Book"的輸出。因此,實際上,我們讓客戶端搜尋標題並返回一個與查詢的標題匹配的書籍。

現在讓我們看看這些型別。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

在上面的程式碼塊中,我們定義了 BookSearch,它包含諸如name、authorgenre之類的屬性。客戶端應該將型別為"BookSearch"的物件傳送到伺服器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

這裡,我們還定義了,給定一個“BookSearch”,伺服器將返回一個包含書籍屬性以及書籍價格的“Book”。伺服器應該將型別為“Book”的物件傳送到客戶端。

請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在com.tp.bookstore.BookeStoreServerUnary.java中:

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
            title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());

         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們在指定的埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl

  • 服務例項需要提供.proto檔案中存在的 method/function 的實現,即在我們的例子中為first方法。

  • 該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是BookSearch

  • 該方法在可用的 bookMap 中搜索書籍,然後透過呼叫onNext()方法返回Book。完成後,伺服器透過呼叫onCompleted()宣佈它已完成輸出。

  • 最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	
   public BookStoreClientUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();
 
   Book response; 
   try {
      response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
	  
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
 
      try {
         BookStoreClientUnaryBlocking client = new 
         BookStoreClientUnaryBlocking(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, 
         TimeUnit.SECONDS);
      }
   }
}

上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:

  • main方法開始,我們接受一個引數,即我們要搜尋的書籍的標題。

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 然後,我們使用 channel 建立一個阻塞 stub。在這裡,我們選擇服務“BookStore”,我們計劃呼叫其函式。“stub”只是一個包裝器,它隱藏了遠端呼叫對呼叫者的複雜性。

  • 然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為BookSearch,並新增我們希望伺服器搜尋的標題名稱。

  • 我們最終進行呼叫並等待伺服器的結果。

  • 最後,我們關閉 Channel 以避免任何資源洩漏。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是:

  • 啟動 gRPC 伺服器。

  • 客戶端查詢伺服器以獲取具有給定名稱/標題的書籍。

  • 伺服器在其商店中搜索書籍。

  • 伺服器然後響應書籍及其其他屬性。

現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerUnary

我們將看到以下輸出 -

輸出

Jul 03, 2021 7:21:58 PM 
com.tp.bookstore.BookeStoreServerUnary start
INFO: Server started, listening on 50051

上述輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"

我們將看到以下輸出 -

輸出

Jul 03, 2021 7:22:03 PM 
com.tp.bookstore.BookStoreClientUnaryBlocking getBook
INFO: Querying for book with title: To Kill

Jul 03, 2021 7:22:04 PM 
com.tp.bookstore.BookStoreClientUnaryBlocking getBook
INFO: Got following book from server: name: "To Kill 

MockingBird"
author: "Harper Lee"
price: 400

因此,正如我們所看到的,客戶端能夠透過使用書籍名稱查詢伺服器來獲取書籍詳細資訊。

gRPC - 伺服器端流式 RPC

現在讓我們討論在使用 gRPC 通訊時伺服器流是如何工作的。在這種情況下,客戶端將搜尋具有給定作者的書籍。假設伺服器需要一些時間才能遍歷所有書籍。伺服器不是在遍歷完所有書籍後等待提供所有書籍,而是會以流式方式提供書籍,即一旦找到一本就提供一本。

.proto 檔案

首先讓我們在 common_proto_files 中定義 bookstore.proto 檔案:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

以下程式碼塊表示服務的名稱"BookStore"和可以呼叫的函式名稱"searchByAuthor"。"searchByAuthor"函式接收型別為"BookSearch"的輸入並返回型別為"Book"的流。因此,實際上,我們讓客戶端搜尋標題並返回一個與查詢的作者匹配的書籍。

service BookStore {
   rpc searchByAuthor (BookSearch) returns (stream Book) {}
}

現在讓我們看看這些型別。

message BookSearch {
   string name = 1;
   string author = 2;
   string genre = 3;
}

這裡,我們定義了BookSearch,它包含一些屬性,如nameauthorgenre。客戶端應該將型別為"BookSearch"的物件傳送到伺服器。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

我們還定義了,給定一個"BookSearch",伺服器將返回一個包含書籍屬性以及書籍價格的"Book"流。伺服器應該傳送一個"Book"流。

請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以服務呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在com.tp.bookstore.BookeStoreServerStreaming.java中:

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnary {
   private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
 
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with author: " + searchQuery.getAuthor());
         for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
            try {
               logger.info("Going through more books....");
               Thread.sleep(5000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
    
            if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
               logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");

               responseObserver.onNext(bookEntry.getValue());
            } 
         }
         responseObserver.onCompleted();
      }
   }
}

上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:

  • main方法開始,我們在指定的埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl

  • 服務例項需要提供.proto 檔案中存在的 method/function 的實現,即在我們的例子中為searchByAuthor方法。

  • 該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是BookSearch

  • 請注意,我們添加了一個 sleep 來模擬遍歷所有書籍的操作。在流式傳輸的情況下,伺服器不會等待所有搜尋到的書籍都可用。它在書籍可用時透過使用onNext()呼叫返回書籍。

  • 當伺服器完成請求時,它透過呼叫onCompleted()關閉通道。

  • 最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中:

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
	public BookStoreClientServerStreamingBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook((String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
      Iterator<Book> response; 
      try {
         response = blockingStub.searchByAuthor(request);
         while(response.hasNext()) {
            logger.info("Found book: " + response.next());
         }
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
	   
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
        .usePlaintext()
        .build();
 
      try {
         BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們接受一個引數,即我們要搜尋的書籍的title

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 然後,我們使用 channel 建立一個阻塞 stub。在這裡,我們選擇服務“BookStore”,我們計劃呼叫其函式。

  • 然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為 BookSearch,並新增我們希望伺服器搜尋的標題。

  • 我們最終進行呼叫並在有效的 Books 上獲取一個迭代器。當我們迭代時,我們得到伺服器提供的相應 Books。

  • 最後,我們關閉 Channel 以避免任何資源洩漏。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是:

  • 啟動 gRPC 伺服器。

  • 客戶端查詢伺服器以獲取具有給定作者的書籍。

  • 伺服器在其商店中搜索書籍,這是一個耗時的過程。

  • 伺服器在找到符合給定條件的書籍時做出響應。伺服器不會等待所有有效的書籍都可用。它在找到一本時立即傳送輸出。然後重複此過程。

現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerStreaming

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

上述輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:40:31 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Querying for book with author: Har

Jul 03, 2021 10:40:37 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "Go Set a Watchman"
author: "Harper Lee"
price: 700

Jul 03, 2021 10:40:42 PM 
com.tp.bookstore.BookStoreClientServerStreamingBlocking 
getBook
INFO: Found book: name: "To Kill MockingBird"
author: "Harper Lee"
price: 400

因此,正如我們所看到的,客戶端能夠透過使用書籍名稱查詢伺服器來獲取書籍詳細資訊。但更重要的是,客戶端在不同的時間戳獲取了第一本書和第二本書,即大約 5 秒的間隔。

gRPC - 客戶端流式 RPC

現在讓我們看看在使用 gRPC 通訊時客戶端流是如何工作的。在這種情況下,客戶端將搜尋並將書籍新增到購物車。一旦客戶端完成新增所有書籍,伺服器將向客戶端提供結賬購物車值。

.proto 檔案

首先讓我們在common_proto_files中定義bookstore.proto檔案:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

這裡,以下程式碼塊表示服務名稱 **"BookStore"** 和可以呼叫的函式名稱 **"totalCartValue"**。“totalCartValue” 函式接收型別為 **"Book"** 的輸入,該輸入是一個流。函式返回型別為 "Cart" 的物件。因此,實際上,我們允許客戶端以流的方式新增書籍,一旦客戶端完成,伺服器就會向客戶端提供購物車總價值。

service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}

現在讓我們看看這些型別。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客戶端將傳送它想要購買的 **"Book"**。它可能不是完整的書籍資訊;它可以僅僅是書籍的標題。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

伺服器在獲取書籍列表後,將返回 **"Cart"** 物件,該物件僅僅是客戶端購買的書籍總數和總價。

請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -

Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 **proto** 檔案,讓我們設定一個可以呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務於上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class  BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerClientStreaming greetServer = new  BookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) 
            logger.info("Searching for book with title starting with: " + book.getName());
            for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
               if(bookEntry.getValue().getName().startsWith(book.getName())){
                  logger.info("Found book, adding to cart:....");
                  bookCart.add(bookEntry.getValue());
               }
            }
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading book stream: " + t);
         }
         @Override
         public void onCompleted() {
            int cartValue = 0;
            for (Book book : bookCart) {
               cartValue += book.getPrice();
            }
            responseObserver.onNext(Cart.newBuilder()
               .setPrice(cartValue)
               .setBooks(bookCart.size()).build());
            responseObserver.onCompleted();
         }
      };
 
   }
}          

上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:

  • main方法開始,我們在指定的埠建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl

  • 服務例項需要提供 **.proto 檔案** 中存在的 method/function 的實現,即在我們的例子中,是 **totalCartValue** 方法。

  • 現在,鑑於這是客戶端流的情況,伺服器將在客戶端新增書籍時獲得一個 Book 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個 **自定義流觀察器**。此流觀察器實現了當找到新 Book 時以及當流關閉時會發生什麼。

  • 當客戶端新增一個 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車中。在流式傳輸的情況下,伺服器不會等待所有可用的書籍。

  • 當客戶端完成新增書籍時,將呼叫流觀察器的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增書籍時想要傳送的內容,即它將 Cart 物件返回給客戶端。

  • 最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientStreamingClient {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
   private final BookStoreStub stub;
	private boolean serverResponseCompleted = false; 
   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientStreamingClient(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() + 
               "\nTotal Order Value:" + cart.getPrice());
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
 
      if(streamClientSender == null) {
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
         String bookName = ""; 
 
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break; 
            }
            client.addBook(bookName);
         }
 
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
 
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -

  • main方法開始,我們接受一個引數,即我們要搜尋的書籍的標題。

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 接下來,我們使用建立的通道建立一個 **非阻塞存根**。在這裡,我們選擇計劃呼叫其函式的服務 **"BookStore"**。

  • 然後,我們簡單地建立 **.proto** 檔案中定義的預期輸入,即在我們的例子中,是 **Book**,並新增我們希望伺服器新增的標題。

  • 但是鑑於這是客戶端流的情況,我們首先為伺服器建立一個流觀察器。此伺服器流觀察器列出了伺服器響應時需要執行的操作的行為,即 **onNext()** 和 **onCompleted()**

  • 並且使用存根,我們也獲得了客戶端流觀察器。我們使用此流觀察器傳送要新增到購物車的書籍資料。最終,我們進行呼叫並在有效書籍上獲取迭代器。當我們迭代時,我們得到伺服器提供的相應書籍。

  • 並且一旦我們的訂單完成,我們確保客戶端流觀察器已關閉。它告訴伺服器計算購物車價值並將其作為輸出提供。

  • 最後,我們關閉 Channel 以避免任何資源洩漏。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是:

  • 啟動 gRPC 伺服器。

  • 客戶端透過向伺服器通知它們來新增書籍流。

  • 伺服器在其商店中搜索書籍並將其新增到購物車。

  • 當客戶端完成訂購時,伺服器會響應客戶端的購物車總價值。

現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerClientStreaming

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

上述輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingClient

讓我們向我們的客戶端新增一些書籍。

Type book name to be added to the cart....
Gr
Jul 24, 2021 5:53:07 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
Pa
Jul 24, 2021 5:53:20 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Passage

Type book name to be added to the cart....

一旦我們添加了書籍並輸入“EXIT”,伺服器將計算購物車價值,以下是我們獲得的輸出 -

輸出

EXIT
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

因此,正如我們所看到的,客戶端能夠新增書籍。並且一旦所有書籍都新增完畢,伺服器將響應書籍總數和總價。

gRPC - 雙向 RPC

現在讓我們看看在使用 gRPC 通訊時客戶端-伺服器流是如何工作的。在這種情況下,客戶端將搜尋並將書籍新增到購物車。伺服器將在每次新增書籍時響應即時購物車價值。

.proto 檔案

首先讓我們在common_proto_files中定義bookstore.proto檔案:

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

以下程式碼塊表示服務名稱 **"BookStore"** 和可以呼叫的函式名稱 "liveCartValue"。**"liveCartValue"** 函式接收型別為 **"Book"** 的輸入,該輸入是一個流。函式返回型別為 **"Cart"** 的物件的流。因此,實際上,我們允許客戶端以流的方式新增書籍,並且每當新增新書籍時,伺服器都會將當前購物車價值響應給客戶端。

service BookStore {
   rpc liveCartValue (stream Book) returns (stream Cart) {}
}

現在讓我們看看這些型別。

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客戶端將傳送它想要購買的 **"Book"**。它不必是完整的書籍資訊;它可以僅僅是書籍的標題。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

伺服器在獲取書籍列表後,將返回 **"Cart"** 物件,該物件僅僅是客戶端購買的書籍總數和總價。

請注意,我們已經完成了 Maven 設定,用於自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案。

mvn clean install

這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在

Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore
Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore

設定 gRPC 伺服器

現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。

讓我們編寫我們的伺服器程式碼來服務於上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerBothStreaming {
   private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            int cartValue = 0;
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding tocart:....");
                     bookCart.add(bookEntry.getValue());
                     cartValue +=bookEntry.getValue().getPrice();
                  }
               }
               logger.info("Updating cart value...");

               responseObserver.onNext(Cart.newBuilder()
                  .setPrice(cartValue)
                  .setBooks(bookCart.size()).build());
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream: " + t);
            }
            @Override
            public void onCompleted() {
               logger.info("Order completed");
               responseObserver.onCompleted();
            }
         };
      }
   }
}

上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:

  • 從 **main** 方法開始,我們在指定的埠上建立一個 gRPC 伺服器。

  • 但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。

  • 為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中是 **BookStoreImpl**

  • 服務例項需要提供 **proto** 檔案中存在的 method/function 的實現,即在我們的例子中,是 **totalCartValue** 方法。

  • 現在,鑑於這是伺服器和客戶端流的情況,伺服器將在客戶端新增書籍時獲得一個 **Books** 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個自定義流觀察器。此流觀察器實現了當找到新 Book 時以及當流關閉時會發生什麼。

  • 當客戶端新增一個 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車中並使用響應觀察器返回購物車價值。在流式傳輸的情況下,伺服器不會等待所有有效的書籍可用。

  • 當客戶端完成新增書籍時,將呼叫流觀察器的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增 **Books** 時想要執行的操作,即宣告它已完成接收客戶端訂單。

  • 最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。

設定 gRPC 客戶端

現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。

讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientBothStreaming {
   private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
   private final BookStoreStub stub;
   private boolean serverIntermediateResponseCompleted = true;
   private boolean serverResponseCompleted = false;

   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientBothStreaming(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver>Cart< getServerResponseObserver(){
      StreamObserver>Cart< observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + 
               "\nTotal number of Books:" + cart.getBooks()+ 
               "\nTotal Order Value:" cart.getPrice());

            serverIntermediateResponseCompleted = true;
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
      if(streamClientSender == null) {
         streamClientSender =stub.liveCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create ordersummary...");
      if(streamClientSender != null); {
         streamClientSender.onCompleted();
      }
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientBothStreaming client = new
         BookStoreClientBothStreaming(channel);
         String bookName = "";

         while(true) {
            if(client.serverIntermediateResponseCompleted ==true) {
               System.out.println("Type book name to beadded to the cart....");
               bookName = System.console().readLine();
               if(bookName.equals("EXIT")) {
                  client.completeOrder();
                  break;
               }
               client.serverIntermediateResponseCompleted = false;
               client.addBook(bookName);
               Thread.sleep(500);
            }
         }
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
            
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }   
   }
}

以上程式碼啟動一個 gRPC 客戶端並連線到指定埠上的伺服器,並呼叫我們在 **proto** 檔案中編寫的函式和服務。讓我們一起瀏覽以上程式碼 -

  • 從 **main** 方法開始,我們接受要新增到購物車的書籍名稱。一旦所有書籍都將要新增,使用者預計會列印“EXIT”。

  • 我們為與伺服器的 gRPC 通訊設定了一個 Channel。

  • 接下來,我們使用通道建立一個非阻塞存根。在這裡,我們選擇計劃呼叫其函式的服務 **"BookStore"**。

  • 然後,我們簡單地建立 **.proto** 檔案中定義的預期輸入,即在我們的例子中,是 **Book**,並新增我們希望伺服器新增的標題。

  • 但是鑑於這是伺服器和客戶端流的情況,我們首先為伺服器建立一個 **流觀察器**。此伺服器流觀察器列出了伺服器響應時需要執行的操作的行為,即 **onNext()** 和 **onCompleted()**。

  • 並且使用存根,我們也獲得了客戶端流觀察器。我們使用此流觀察器傳送要新增到購物車的書籍資料。

  • 並且一旦我們的訂單完成,我們確保客戶端流觀察器已關閉。這告訴伺服器關閉流並執行清理。

  • 最後,我們關閉 Channel 以避免任何資源洩漏。

因此,這就是我們的客戶端程式碼。

客戶端伺服器呼叫

總而言之,我們要做的是:

  • 啟動 gRPC 伺服器。

  • 客戶端透過向伺服器通知它們來新增書籍流。

  • 伺服器在其商店中搜索書籍並將其新增到購物車。

  • 每次新增書籍時,伺服器都會告訴客戶端購物車價值。

  • 當客戶端完成訂購時,伺服器和客戶端都會關閉流。

現在我們已經定義了我們的 **proto** 檔案,編寫了我們的伺服器和客戶端程式碼,讓我們現在執行此程式碼並檢視實際情況。

要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我們將看到以下輸出 -

輸出

Jul 03, 2021 10:37:21 PM
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

此輸出表示伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientBothStreaming

讓我們向我們的客戶端新增一本書。

Jul 24, 2021 7:21:45 PM
com.tp.bookstore.BookStoreClientBothStreaming main
Type book name to be added to the cart....
Great

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Gr

Jul 24, 2021 7:21:48 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:

Total number of Books: 1
Total Order Value: 300

因此,正如我們所看到的,我們得到了訂單的當前購物車價值。現在讓我們向我們的客戶端再新增一本書。

Type book name to be added to the cart....
Passage

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming addBook
INFO: Adding book with title starting with: Pa

Jul 24, 2021 7:21:51 PM
com.tp.bookstore.BookStoreClientBothStreaming$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

一旦我們添加了書籍並輸入“EXIT”,客戶端將關閉。

Type book name to be added to the cart....
EXIT
Jul 24, 2021 7:21:59 PM
com.tp.bookstore.BookStoreClientBothStreaming completeOrder
INFO: Done, waiting for server to create order summary...

因此,正如我們所看到的,客戶端能夠新增書籍。並且隨著書籍的新增,伺服器將響應當前購物車價值。

gRPC - 客戶端呼叫

gRPC 客戶端支援兩種型別的客戶端呼叫,即客戶端如何呼叫伺服器。以下是兩種方式 -

  • 阻塞式客戶端呼叫

  • 非同步客戶端呼叫

在本章中,我們將逐一檢視它們。

阻塞式客戶端呼叫

gRPC 支援阻塞式客戶端呼叫。這意味著,一旦客戶端對服務進行呼叫,客戶端將不會繼續執行其餘程式碼,直到它從伺服器收到響應。請注意,對於單向呼叫和伺服器流式呼叫,阻塞式客戶端呼叫是可能的。

請注意,對於單向呼叫和伺服器流式呼叫,阻塞式客戶端呼叫是可能的。

這是一個單向阻塞式客戶端呼叫的示例。

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();

      Book response;
      try {
         response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientUnaryBlocking client = new
         BookStoreClientUnaryBlocking(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

在以上示例中,我們有,

public BookStoreClientUnaryBlocking(Channel channel) {
   blockingStub = BookStoreGrpc.newBlockingStub(channel);
}

這意味著我們將使用阻塞式 RPC 呼叫。

然後,我們有,

BookSearch request = BookSearch.newBuilder().setName(bookName).build();

Book response;
response = blockingStub.first(request);

在這裡,我們使用 **blockingStub** 呼叫 RPC **method first()** 來獲取書籍詳細資訊。

類似地,對於伺服器流,我們可以使用阻塞式存根 -

logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();

Iterator<Book> response;
try {
   response = blockingStub.searchByAuthor(request);
   while(response.hasNext()) {
   logger.info("Found book: " + response.next());
}

在這裡,我們呼叫 RPC 方法 **searchByAuthor** 方法,並迭代響應,直到伺服器流結束。

非阻塞式客戶端呼叫

gRPC 支援非阻塞式客戶端呼叫。這意味著,當客戶端對服務進行呼叫時,它不需要等待伺服器響應。為了處理伺服器響應,客戶端可以簡單地傳遞觀察器,該觀察器指示在收到響應時該做什麼。請注意,對於單向呼叫和流式呼叫,非阻塞式客戶端呼叫是可能的。但是,我們將專門關注伺服器流式呼叫的情況,以將其與阻塞式呼叫進行比較。

請注意,對於單向呼叫和流式呼叫,非阻塞式客戶端呼叫是可能的。但是,我們將專門關注伺服器流式呼叫的情況,以將其與阻塞式呼叫進行比較。

這是一個伺服器流式非阻塞式客戶端呼叫的示例。

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingNonBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName());
   private final BookStoreGrpc.BookStoreStub nonBlockingStub;
   public BookStoreClientServerStreamingNonBlocking(Channelchannel) {
      nonBlockingStub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Book> getServerResponseObserver(){
      StreamObserver<Book> observer = new
      StreamObserver<Book>(){
         @Override
         public void onNext(Book book) {
            logger.info("Server returned following book: " +book);
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            logger.info("Server returned following book: " + book);
         }
      };
      return observer;
   }
   public void getBook(String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
      try {
         nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientServerStreamingNonBlocking client = new
         BookStoreClientServerStreamingNonBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

正如我們在以上示例中看到的,

public BookStoreClientUnaryNonBlocking(Channel channel) {
   nonBlockingStub = BookStoreGrpc.newStub(channel);
}

它定義了存根是非阻塞式的。類似地,以下程式碼用於處理我們從伺服器接收到的響應。一旦伺服器傳送響應,我們就記錄輸出。

public StreamObserver<Book> getServerResponseObserver(){
   StreamObserver<Book> observer = new
   StreamObserver<Book>(){
   ....
   ....
   return observer;
}

以下 gRPC 呼叫是非阻塞式呼叫。

logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
   nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}

這就是我們確保客戶端不需要等到伺服器完成 **searchByAuthor** 執行的方式。這將由流觀察器物件在伺服器返回 Book 物件時直接處理。

gRPC - 超時和取消

gRPC 支援為請求分配超時。這是一種執行請求取消的方法。它有助於避免為客戶端和伺服器使用資源,以執行其結果對客戶端無用的請求。

請求超時

gRPC 支援為客戶端和伺服器指定超時。

  • 客戶端可以在執行時指定它希望等待多長時間,然後取消請求。

  • 伺服器也可以在其端檢查請求是否需要處理,或者客戶端是否已放棄請求。

讓我們來看一個例子,客戶端期望在2秒內收到響應,但伺服器花費了更長的時間。所以,這是我們的伺服器程式碼

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

在上面的程式碼中,伺服器搜尋客戶端提供的書籍標題。我們添加了一個模擬休眠,以便我們可以看到請求超時。

這是我們的客戶端程式碼

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

上面的程式碼使用標題呼叫伺服器進行搜尋。但更重要的是,它為gRPC呼叫提供了2秒的超時時間

現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器:

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

我們將看到以下輸出 -

輸出

Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, listening on 50051

以上輸出表明伺服器已啟動。

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great

我們將得到以下輸出:

輸出

Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

因此,正如我們所看到的,客戶端在2秒內沒有收到響應,因此它取消了請求並將其稱為超時,即DEADLINE_EXCEEDED

請求取消

gRPC支援客戶端和伺服器端都取消請求。客戶端可以在執行時指定它希望在取消請求之前等待的時間。伺服器也可以在其端檢查請求是否需要處理,或者客戶端是否已經放棄了請求。

讓我們看一個客戶端流式傳輸的例子,其中客戶端呼叫取消。所以,這是我們的伺服器程式碼

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            public void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

此伺服器程式碼是客戶端流式傳輸的一個簡單示例。伺服器簡單地跟蹤客戶端想要的書籍,最後,它提供訂單的總購物車價值。

但是這裡關於請求取消沒有什麼特別之處,因為這是客戶端將呼叫的。所以,讓我們看看客戶端程式碼。

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookStoreClientStreamingClientCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamClientSender;
   private CancellableContext withCancellation;
   public BookStoreClientStreamingClientCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "\nTotal number of Books: " 
               + cart.getBooks() 
               + "\nTotal Order Value: " 
               + cart.getPrice());
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            public void onCompleted() {
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamClientSender == null) {
         withCancellation = Context.current().withCancellation();
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public void cancelOrder() {
      withCancellation.cancel(null);
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               client.cancelOrder();
               break;
            }
            client.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

所以,如果我們看到上面的程式碼,以下行定義了一個啟用了取消的上下文。

withCancellation = Context.current().withCancellation();

這是當用戶輸入取消時將呼叫的方法。這將取消訂單,並讓伺服器知道。

public void cancelOrder() {
   withCancellation.cancel(null);
}

現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器。

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我們將看到以下輸出:

輸出

Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming start
INFO: Server started, listening on 50051

上述輸出意味著伺服器已啟動。

現在,讓我們啟動客戶端

java -cp .\target\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientStreamingClientCancelation

我們將得到以下輸出:

輸出

Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

我們將在伺服器日誌中獲得以下資料:

INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: client cancelled

因此,正如我們所看到的,客戶端啟動了對其傳送到伺服器的請求的取消。伺服器也收到了關於取消的通知。

gRPC - 傳送/接收元資料

gRPC支援傳送元資料。元資料基本上是我們想要傳送的一組資料,這些資料不是業務邏輯的一部分,同時進行gRPC呼叫。

讓我們看看以下兩種情況:

  • 客戶端傳送元資料,伺服器讀取它。
  • 伺服器傳送元資料,客戶端讀取它。

我們將逐一介紹這兩種情況。

客戶端傳送元資料

如前所述,gRPC支援客戶端傳送伺服器可以讀取的元資料。gRPC支援擴充套件客戶端和伺服器攔截器,這些攔截器可分別用於寫入和讀取元資料。讓我們舉一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:

讓我們舉一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:

示例

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookStoreClientUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   public BookStoreClientUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookClientInterceptor implements ClientInterceptor {
      @Override
      public <ReqT, RespT> ClientCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
      try {
         BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

這裡有趣的部分是攔截器

static class BookClientInterceptor implements ClientInterceptor{
   @Override
   public <ReqT, RespT> ClientCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         public void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

我們攔截客戶端發出的任何呼叫,然後在進一步呼叫之前向其新增主機名元資料。

伺服器讀取元資料

現在,讓我們看看讀取此元資料的伺服器程式碼:

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      public <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

同樣,這裡有趣的部分是攔截器

class BookServerInterceptor implements ServerInterceptor{
   @Override
   public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

我們攔截任何傳入伺服器的呼叫,然後在實際方法處理呼叫之前記錄元資料。

客戶端-伺服器呼叫

現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器:

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

我們將看到以下輸出 -

輸出

Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, listening on 50051

上述輸出意味著伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great

我們將看到以下輸出 -

輸出

Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

我們將在伺服器日誌中獲得以下資料:

Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

正如我們所看到的,伺服器能夠讀取元資料:hostname=MY_HOST,該元資料由客戶端新增。

廣告

© . All rights reserved.