gRPC - 快速指南
gRPC - 簡介
在深入瞭解 gRPC 之前,讓我們先簡單瞭解一下遠端過程呼叫,也就是 gRPC 所做的事情。
什麼是遠端過程呼叫?
遠端過程呼叫是看起來像普通/本地函式呼叫一樣的函式呼叫,但不同之處在於遠端函式呼叫通常在不同的機器上執行。但是,對於編寫程式碼的開發人員來說,函式呼叫和遠端呼叫之間的區別很小。這些呼叫通常遵循客戶端-伺服器模型,其中執行呼叫的機器充當伺服器。
為什麼我們需要遠端過程呼叫?
遠端過程呼叫提供了一種在另一臺機器上執行程式碼的方法。在大型、龐大的產品中,這變得至關重要,因為單臺機器無法承載產品正常執行所需的所有程式碼。
在微服務架構中,應用程式被分解成小的服務,這些服務透過訊息佇列和 API 相互通訊。所有這些通訊都發生在網路上,不同的機器/節點根據它們所承載的服務提供不同的功能。因此,在分散式環境中工作時,建立遠端過程呼叫成為一個關鍵方面。
為什麼選擇 gRPC?
Google 遠端過程呼叫 (gRPC) 提供了一個執行遠端過程呼叫的框架。但是,還有一些其他庫和機制可以在遠端機器上執行程式碼。那麼,是什麼讓 gRPC 如此特別呢?讓我們來了解一下。
語言無關性 - gRPC 在內部使用 Google Protocol Buffer。因此,可以使用多種語言,例如 Java、Python、Go、Dart 等。Java 客戶端可以進行過程呼叫,而使用 Python 的伺服器可以做出響應,從而有效地實現語言無關性。
高效的資料壓縮 - 在微服務環境中,考慮到網路上會發生多次通訊,因此我們傳送的資料儘可能簡潔至關重要。我們需要避免任何多餘的資料,以確保資料快速傳輸。鑑於 gRPC 在內部使用 Google Protocol Buffer,因此它可以利用此功能。
高效的序列化和反序列化 - 在微服務環境中,考慮到網路上會發生多次通訊,因此我們儘可能快速地序列化和反序列化資料至關重要。鑑於 gRPC 在內部使用 Google Protocol Buffer,因此它可以確保快速序列化和反序列化資料。
易於使用 - gRPC 已經擁有一個庫和外掛,可以自動生成過程程式碼(我們將在後續章節中看到)。對於簡單的用例,它可以像本地函式呼叫一樣使用。
gRPC 與使用 JSON 的 REST 的比較
讓我們看看其他透過網路傳輸資料的方式與 Protobuf 相比如何。
| 功能 | gRPC | 使用 JSON/XML 的 HTTP |
|---|---|---|
| 語言無關性 | 是 | 是 |
| HTTP 版本 | HTTP/2 | HTTP 1.1 |
| 指定域模式 | .proto 檔案(Google Protocol Buffer) | 無 |
| 序列化資料大小 | 最小 | 高(XML 更高) |
| 人類可讀性 | 否,因為它使用單獨的編碼模式 | 是,因為它使用基於文字的格式 |
| 序列化速度 | 最快 | 較慢(XML 最慢) |
| 資料型別支援 | 更豐富。支援複雜資料型別,如 Any、oneof 等。 | 支援基本資料型別 |
| 支援演變模式 | 是 | 否 |
gRPC - 設定
Protoc 設定
請注意,此設定僅適用於 Python。對於 Java,所有這些都由 Maven 檔案處理。讓我們安裝“proto”二進位制檔案,我們將使用它來自動生成“.proto”檔案的程式碼。二進位制檔案可以在 https://github.com/protocolbuffers/protobuf/releases/ 中找到。根據作業系統選擇正確的二進位制檔案。我們將在 Windows 上安裝 proto 二進位制檔案,但 Linux 的步驟差別不大。
安裝完成後,確保您可以透過命令列訪問它 -
protoc --version libprotoc 3.15.6
這意味著 Protobuf 已正確安裝。現在,讓我們轉到專案結構。
我們還需要設定 gRPC 程式碼生成所需的外掛。
對於 Python,我們需要執行以下命令 -
python -m pip install grpcio python -m pip install grpcio-tools
它將安裝所有必需的二進位制檔案並將它們新增到路徑中。
專案結構
以下是我們將擁有的整體專案結構 -
與各個語言相關的程式碼進入各自的目錄。我們將有一個單獨的目錄來儲存我們的 proto 檔案。並且,以下是我們將用於 Java 的專案結構 -
專案依賴項
現在我們已經安裝了protoc,我們可以使用protoc從 proto 檔案自動生成程式碼。讓我們先建立一個 Java 專案。
以下是我們將用於 Java 專案的 Maven 配置。請注意,它還包含 Protobuf 所需的庫。
示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.grpc.point</groupId>
<artifactId>grpc-point</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.38.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.38.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4jsimple-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpcjava:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>../common_proto_files</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
gRPC - 使用 Java 的 Hello World 應用
現在讓我們建立一個基本的類似“Hello World”的應用程式,它將使用 gRPC 和 Java。
.proto 檔案
首先讓我們在common_proto_files中定義“greeting.proto”檔案 -
syntax = "proto3";
option java_package = "com.tp.greeting";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
現在讓我們仔細看看這個程式碼塊,並瞭解每一行的作用 -
syntax = "proto3";
此處的“syntax”表示我們正在使用的 Protobuf 版本。因此,我們使用最新版本 3,因此模式可以使用所有對版本 3 有效的語法。
package tutorial;
此處的包用於衝突解決,例如,如果我們有多個名稱相同的類/成員。
option java_package = "com.tp.greeting";
此引數特定於 Java,即從“.proto”檔案自動生成程式碼的包。
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
此塊表示服務“Greeter”的名稱和可以呼叫的函式名稱“greet”。“greet”函式接收型別為“ClientInput”的輸入並返回型別為“ServerOutput”的輸出。現在讓我們看看這些型別。
message ClientInput {
string greeting = 1;
string name = 2;
}
在上面的程式碼塊中,我們定義了ClientInput,它包含兩個屬性“greeting”和“name”,它們都是字串。客戶端應該將型別為“ClientInput”的物件傳送到伺服器。
message ServerOutput {
string message = 1;
}
在上面的程式碼塊中,我們定義了,給定一個“ClientInput”,伺服器將返回“ServerOutput”,它將包含一個名為“message”的屬性。伺服器應該將型別為“ServerOutput”的物件傳送到客戶端。
請注意,我們已經完成了 Maven 設定以自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案 -
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.greeting Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.greeting
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以服務於呼叫這些函式的伺服器。
讓我們編寫伺服器程式碼來服務於上述函式,並將其儲存在com.tp.grpc.GreetServer.java中 -
示例
package com.tp.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ClientInput;
import com.tp.greeting.Greeting.ServerOutput;
public class GreetServer {
private static final Logger logger = Logger.getLogger(GreetServer.class.getName());
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void greet(ClientInput req, StreamObserver<ServerOutput> responseObserver) {
logger.info("Got request from client: " + req);
ServerOutput reply = ServerOutput.newBuilder().setMessage(
"Server says " + "\"" + req.getGreeting() + " " + req.getName() + "\""
).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final GreetServer greetServer = new GreetServer();
greetServer.start();
greetServer.server.awaitTermination();
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們在指定的埠建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將伺服器分配給我們要執行的服務,即在我們的例子中,是Greeter服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中,是GreeterImpl。
服務例項需要提供“.proto”檔案中存在的函式/方法的實現,即在我們的例子中,是greet方法。
該方法期望一個型別與“.proto”檔案中定義的型別相同的物件,即在我們的例子中,是ClientInput。
該方法處理上述輸入,執行計算,然後應該在“.proto”檔案中返回提到的輸出,即在我們的例子中,是ServerOutput。
最後,我們還有一個shutdown鉤子,以確保在完成程式碼執行後伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.grpc.GreetClient.java中 -
示例
package com.tp.grpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class GreetClient {
private static final Logger logger = Logger.getLogger(GreetClient.class.getName());
private final GreeterGrpc.GreeterBlockingStub blockingStub;
public GreetClient(Channel channel) {
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
public void makeGreeting(String greeting, String username) {
logger.info("Sending greeting to server: " + greeting + " for name: " + username);
ClientInput request = ClientInput.newBuilder().setName(username).setGreeting(greeting).build();
logger.info("Sending to server: " + request);
ServerOutput response;
try {
response = blockingStub.greet(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following from the server: " + response.getMessage());
}
public static void main(String[] args) throws Exception {
String greeting = args[0];
String username = args[1];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
GreetClient client = new GreetClient(channel);
client.makeGreeting(greeting, username);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們接受兩個引數,即name和greeting。
我們為 gRPC 與伺服器的通訊設定了一個 Channel。
接下來,我們使用建立的 Channel 建立一個阻塞式存根。在這裡,我們擁有服務“Greeter”,我們計劃呼叫它的函式。存根只不過是一個包裝器,它隱藏了遠端呼叫的複雜性,使呼叫者無需關心。
然後,我們簡單地建立“.proto”檔案中定義的預期輸入,即在我們的例子中,是ClientInput。
我們最終進行呼叫並等待伺服器的響應。
最後,我們關閉 Channel 以避免任何資源洩漏。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際效果。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetServer
我們將看到以下輸出 -
輸出
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051
上述輸出意味著伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.grpc.GreetClient Hello Jane
我們將看到以下輸出 -
輸出
Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending greeting to server: Hello for name: Jane Jul 03, 2021 1:05:59 PM com.tp.grpc.GreetClient greet INFO: Sending to server: greeting: "Hello" name: "Jane" Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetClient greet INFO: Got following from the server: Server says "Hello Jane"
現在,如果我們開啟伺服器日誌,我們將看到以下內容 -
Jul 03, 2021 1:04:23 PM com.tp.grpc.GreetServer start INFO: Server started, listening on 50051 Jul 03, 2021 1:06:00 PM com.tp.grpc.GreetServer$GreeterImpl greet INFO: Got request from client: greeting: "Hello" name: "Jane"
因此,客戶端能夠按預期呼叫伺服器,並且伺服器透過向客戶端傳送問候進行了響應。
gRPC - 使用 Python 的 Hello World 應用
現在讓我們建立一個基本的類似“Hello World”的應用程式,它將使用 gRPC 和 Python。
.proto 檔案
首先讓我們在common_proto_files中定義greeting.proto檔案 -
syntax = "proto3";
service Greeter {
rpc greet (ClientInput) returns (ServerOutput) {}
}
message ClientInput {
string greeting = 1;
string name = 2;
}
message ServerOutput {
string message = 1;
}
現在讓我們仔細看看上面程式碼塊中的每一行 -
syntax = "proto3";
此處的“syntax”表示我們正在使用的 Protobuf 版本。因此,我們使用最新版本 3,因此模式可以使用所有對版本 3 有效的語法。
package tutorial;
此處的package用於衝突解決,例如,如果我們有多個名稱相同的類/成員。
service Greeter {
rpc greet(ClientInput) returns (ServerOutput) {}
}
此塊表示服務“Greeter”的名稱和可以呼叫的函式名稱“greet”。“greet”函式接收型別為“ClientInput”的輸入並返回型別為“ServerOutput”的輸出。現在讓我們看看這些型別。
message ClientInput {
string greeting = 1;
string name = 2;
}
在上面的程式碼塊中,我們定義了ClientInput,它包含兩個屬性“greeting”和“name”,它們都是字串。客戶端應該將型別為“ClientInput”的物件傳送到伺服器。
message ServerOutput {
string message = 1;
}
這裡,我們還定義了,給定一個“ClientInput”,伺服器將返回一個包含單個屬性“message”的“ServerOutput”。伺服器應該將型別為“ServerOutput”的物件傳送到客戶端。
現在,讓我們為 Protobuf 類和 gRPC 類生成底層程式碼。為此,我們需要執行以下命令:
python -m grpc_tools.protoc -I ..\common_proto_files\ -- python_out=../python --grpc_python_out=. greeting.proto
但是,請注意,要執行該命令,我們需要安裝教程的設定部分中提到的正確依賴項。
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -
Protobuf class code: python/greeting_pb2.py Protobuf gRPC code: python/greeting_pb2_grpcpb2.py
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。
讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在server.py中:
示例
from concurrent import futures
import grpc
import greeting_pb2
import greeting_pb2_grpc
class Greeter(greeting_pb2_grpc.GreeterServicer):
def greet(self, request, context):
print("Got request " + str(request))
return greeting_pb2.ServerOutput(message='{0} {1}!'.format(request.greeting, request.name))
def server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
greeting_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
print("gRPC starting")
server.start()
server.wait_for_termination()
server()
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們在指定的埠上建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將伺服器分配給我們要執行的服務,即在我們的例子中,是Greeter服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為Greeter。
服務例項需要提供.proto檔案中存在的 method/function 的實現,即在我們的例子中為greet方法。
該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是request。
該方法處理上述輸入,進行計算,然後應該返回.proto檔案中提到的輸出,即在我們的例子中為ServerOutput。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在client.py中:
示例
import grpc
import greeting_pb2
import greeting_pb2_grpc
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = greeting_pb2_grpc.GreeterStub(channel)
response = stub.greet(greeting_pb2.ClientInput(name='John', greeting = "Yo"))
print("Greeter client received following from server: " + response.message)
run()
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們為與伺服器的 gRPC 通訊設定了一個 Channel。
然後,我們使用 channel 建立一個stub。在這裡,我們使用服務“Greeter”,我們計劃呼叫其函式。stub 只是一個包裝器,它隱藏了遠端呼叫對呼叫者的複雜性。
然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為ClientInput。我們硬編碼了兩個引數,即name和greeting。
我們最終進行呼叫並等待伺服器的結果。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
python .\server.py
輸出
我們將得到以下輸出:
gRPC starting
上述輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
python .\client.py
我們將看到以下輸出 -
輸出
Greeter client received following from server: Yo John!
現在,如果我們開啟伺服器日誌,我們將看到以下資料:
gRPC starting Got request greeting: "Yo" name: "John"
因此,正如我們所看到的,客戶端能夠按預期呼叫伺服器,並且伺服器響應了向客戶端回送問候。
gRPC - 一元 gRPC
我們現在將瞭解 gRPC 框架支援的各種型別的通訊。我們將使用書店的例子,客戶可以在其中搜索和下單送書。
讓我們看看一元 gRPC 通訊,我們讓客戶端搜尋標題並隨機返回一個與查詢的標題匹配的書籍。
.proto 檔案
首先讓我們在 common_proto_files 中定義 bookstore.proto 檔案:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
現在讓我們仔細看看上面程式碼塊中的每一行。
syntax = "proto3";
語法
這裡的“語法”表示我們使用的 Protobuf 版本。我們使用最新的版本 3,因此該模式可以使用所有對版本 3 有效的語法。
package tutorial;
此處的package用於衝突解決,例如,如果我們有多個名稱相同的類/成員。
option java_package = "com.tp.bookstore";
此引數特定於 Java,即從 .proto 檔案自動生成程式碼的包。
service BookStore {
rpc first (BookSearch) returns (Book) {}
}
這表示服務的名稱"BookStore"和可以呼叫的函式名稱"first"。"first"函式接收型別為"BookSearch"的輸入並返回型別為"Book"的輸出。因此,實際上,我們讓客戶端搜尋標題並返回一個與查詢的標題匹配的書籍。
現在讓我們看看這些型別。
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
在上面的程式碼塊中,我們定義了 BookSearch,它包含諸如name、author和genre之類的屬性。客戶端應該將型別為"BookSearch"的物件傳送到伺服器。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
這裡,我們還定義了,給定一個“BookSearch”,伺服器將返回一個包含書籍屬性以及書籍價格的“Book”。伺服器應該將型別為“Book”的物件傳送到客戶端。
請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。
讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在com.tp.bookstore.BookeStoreServerUnary.java中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->
title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們在指定的埠建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl
服務例項需要提供.proto檔案中存在的 method/function 的實現,即在我們的例子中為first方法。
該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是BookSearch
該方法在可用的 bookMap 中搜索書籍,然後透過呼叫onNext()方法返回Book。完成後,伺服器透過呼叫onCompleted()宣佈它已完成輸出。
最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.bookstore.BookStoreClientUnaryBlocking.java中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:
從main方法開始,我們接受一個引數,即我們要搜尋的書籍的標題。
我們為與伺服器的 gRPC 通訊設定了一個 Channel。
然後,我們使用 channel 建立一個阻塞 stub。在這裡,我們選擇服務“BookStore”,我們計劃呼叫其函式。“stub”只是一個包裝器,它隱藏了遠端呼叫對呼叫者的複雜性。
然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為BookSearch,並新增我們希望伺服器搜尋的標題名稱。
我們最終進行呼叫並等待伺服器的結果。
最後,我們關閉 Channel 以避免任何資源洩漏。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
總而言之,我們要做的是:
啟動 gRPC 伺服器。
客戶端查詢伺服器以獲取具有給定名稱/標題的書籍。
伺服器在其商店中搜索書籍。
伺服器然後響應書籍及其其他屬性。
現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnary
我們將看到以下輸出 -
輸出
Jul 03, 2021 7:21:58 PM com.tp.bookstore.BookeStoreServerUnary start INFO: Server started, listening on 50051
上述輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"
我們將看到以下輸出 -
輸出
Jul 03, 2021 7:22:03 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Querying for book with title: To Kill Jul 03, 2021 7:22:04 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Got following book from server: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我們所看到的,客戶端能夠透過使用書籍名稱查詢伺服器來獲取書籍詳細資訊。
gRPC - 伺服器端流式 RPC
現在讓我們討論在使用 gRPC 通訊時伺服器流是如何工作的。在這種情況下,客戶端將搜尋具有給定作者的書籍。假設伺服器需要一些時間才能遍歷所有書籍。伺服器不是在遍歷完所有書籍後等待提供所有書籍,而是會以流式方式提供書籍,即一旦找到一本就提供一本。
.proto 檔案
首先讓我們在 common_proto_files 中定義 bookstore.proto 檔案:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc first (BookSearch) returns (stream Book) {}
}
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
以下程式碼塊表示服務的名稱"BookStore"和可以呼叫的函式名稱"searchByAuthor"。"searchByAuthor"函式接收型別為"BookSearch"的輸入並返回型別為"Book"的流。因此,實際上,我們讓客戶端搜尋標題並返回一個與查詢的作者匹配的書籍。
service BookStore {
rpc searchByAuthor (BookSearch) returns (stream Book) {}
}
現在讓我們看看這些型別。
message BookSearch {
string name = 1;
string author = 2;
string genre = 3;
}
這裡,我們定義了BookSearch,它包含一些屬性,如name、author和genre。客戶端應該將型別為"BookSearch"的物件傳送到伺服器。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
我們還定義了,給定一個"BookSearch",伺服器將返回一個包含書籍屬性以及書籍價格的"Book"流。伺服器應該傳送一個"Book"流。
請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以服務呼叫這些函式的伺服器。
讓我們編寫我們的伺服器程式碼來服務上述函式,並將其儲存在com.tp.bookstore.BookeStoreServerStreaming.java中:
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnary {
private static final Logger logger = Logger.getLogger(BookeStoreServerrStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerUnary greetServer = new BookeStoreServerUnary();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void searchByAuthor(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with author: " + searchQuery.getAuthor());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
try {
logger.info("Going through more books....");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(bookEntry.getValue().getAuthor().startsWith(searchQuery.getAuthor())){
logger.info("Found book with required author: " + bookEntry.getValue().getName()+ ". Sending....");
responseObserver.onNext(bookEntry.getValue());
}
}
responseObserver.onCompleted();
}
}
}
上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:
從main方法開始,我們在指定的埠建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl
服務例項需要提供.proto 檔案中存在的 method/function 的實現,即在我們的例子中為searchByAuthor方法。
該方法期望一個型別為 .proto 檔案中定義的物件,即對我們來說是BookSearch
請注意,我們添加了一個 sleep 來模擬遍歷所有書籍的操作。在流式傳輸的情況下,伺服器不會等待所有搜尋到的書籍都可用。它在書籍可用時透過使用onNext()呼叫返回書籍。
當伺服器完成請求時,它透過呼叫onCompleted()關閉通道。
最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在com.tp.bookstore.BookStoreClientServerStreamingBlocking.java 中:
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientServerStreamingBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientServerStreamingBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook((String author) {
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder()..setAuthor(author).build();
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
}
public static void main(String[] args) throws Exception {
String authorName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingBlocking client = new BookStoreClientUnaryBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們接受一個引數,即我們要搜尋的書籍的title。
我們為與伺服器的 gRPC 通訊設定了一個 Channel。
然後,我們使用 channel 建立一個阻塞 stub。在這裡,我們選擇服務“BookStore”,我們計劃呼叫其函式。
然後,我們簡單地建立 proto 檔案中定義的預期輸入,即在我們的例子中為 BookSearch,並新增我們希望伺服器搜尋的標題。
我們最終進行呼叫並在有效的 Books 上獲取一個迭代器。當我們迭代時,我們得到伺服器提供的相應 Books。
最後,我們關閉 Channel 以避免任何資源洩漏。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
總而言之,我們要做的是:
啟動 gRPC 伺服器。
客戶端查詢伺服器以獲取具有給定作者的書籍。
伺服器在其商店中搜索書籍,這是一個耗時的過程。
伺服器在找到符合給定條件的書籍時做出響應。伺服器不會等待所有有效的書籍都可用。它在找到一本時立即傳送輸出。然後重複此過程。
現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerStreaming
我們將看到以下輸出 -
輸出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingBlocking "Har"
我們將看到以下輸出 -
輸出
Jul 03, 2021 10:40:31 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Querying for book with author: Har Jul 03, 2021 10:40:37 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "Go Set a Watchman" author: "Harper Lee" price: 700 Jul 03, 2021 10:40:42 PM com.tp.bookstore.BookStoreClientServerStreamingBlocking getBook INFO: Found book: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我們所看到的,客戶端能夠透過使用書籍名稱查詢伺服器來獲取書籍詳細資訊。但更重要的是,客戶端在不同的時間戳獲取了第一本書和第二本書,即大約 5 秒的間隔。
gRPC - 客戶端流式 RPC
現在讓我們看看在使用 gRPC 通訊時客戶端流是如何工作的。在這種情況下,客戶端將搜尋並將書籍新增到購物車。一旦客戶端完成新增所有書籍,伺服器將向客戶端提供結賬購物車值。
.proto 檔案
首先讓我們在common_proto_files中定義bookstore.proto檔案:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
這裡,以下程式碼塊表示服務名稱 **"BookStore"** 和可以呼叫的函式名稱 **"totalCartValue"**。“totalCartValue” 函式接收型別為 **"Book"** 的輸入,該輸入是一個流。函式返回型別為 "Cart" 的物件。因此,實際上,我們允許客戶端以流的方式新增書籍,一旦客戶端完成,伺服器就會向客戶端提供購物車總價值。
service BookStore {
rpc totalCartValue (stream Book) returns (Cart) {}
}
現在讓我們看看這些型別。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客戶端將傳送它想要購買的 **"Book"**。它可能不是完整的書籍資訊;它可以僅僅是書籍的標題。
message Cart {
int32 books = 1;
int32 price = 2;
}
伺服器在獲取書籍列表後,將返回 **"Cart"** 物件,該物件僅僅是客戶端購買的書籍總數和總價。
請注意,我們已經完成了 Maven 的設定,以便自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案:
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在 -
Protobuf class code: target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpc-java/com.tp.bookstore
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 **proto** 檔案,讓我們設定一個可以呼叫這些函式的伺服器。
讓我們編寫我們的伺服器程式碼來服務於上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerClientStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerClientStreaming greetServer = new BookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book)
logger.info("Searching for book with title starting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:
從main方法開始,我們在指定的埠建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立一個服務例項,即在我們的例子中為BookStoreImpl
服務例項需要提供 **.proto 檔案** 中存在的 method/function 的實現,即在我們的例子中,是 **totalCartValue** 方法。
現在,鑑於這是客戶端流的情況,伺服器將在客戶端新增書籍時獲得一個 Book 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個 **自定義流觀察器**。此流觀察器實現了當找到新 Book 時以及當流關閉時會發生什麼。
當客戶端新增一個 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車中。在流式傳輸的情況下,伺服器不會等待所有可用的書籍。
當客戶端完成新增書籍時,將呼叫流觀察器的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增書籍時想要傳送的內容,即它將 Cart 物件返回給客戶端。
最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientServerStreamingBlocking.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientStreamingClient {
private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientStreamingClient(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" + "\nTotal number of Books:" + cart.getBooks() +
"\nTotal Order Value:" + cart.getPrice());
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.addBook(bookName);
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
上面的程式碼在指定的埠啟動一個 gRPC 伺服器,並服務於我們在 proto 檔案中編寫的函式和服務。讓我們遍歷上面的程式碼 -
從main方法開始,我們接受一個引數,即我們要搜尋的書籍的標題。
我們為與伺服器的 gRPC 通訊設定了一個 Channel。
接下來,我們使用建立的通道建立一個 **非阻塞存根**。在這裡,我們選擇計劃呼叫其函式的服務 **"BookStore"**。
然後,我們簡單地建立 **.proto** 檔案中定義的預期輸入,即在我們的例子中,是 **Book**,並新增我們希望伺服器新增的標題。
但是鑑於這是客戶端流的情況,我們首先為伺服器建立一個流觀察器。此伺服器流觀察器列出了伺服器響應時需要執行的操作的行為,即 **onNext()** 和 **onCompleted()**
並且使用存根,我們也獲得了客戶端流觀察器。我們使用此流觀察器傳送要新增到購物車的書籍資料。最終,我們進行呼叫並在有效書籍上獲取迭代器。當我們迭代時,我們得到伺服器提供的相應書籍。
並且一旦我們的訂單完成,我們確保客戶端流觀察器已關閉。它告訴伺服器計算購物車價值並將其作為輸出提供。
最後,我們關閉 Channel 以避免任何資源洩漏。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
總而言之,我們要做的是:
啟動 gRPC 伺服器。
客戶端透過向伺服器通知它們來新增書籍流。
伺服器在其商店中搜索書籍並將其新增到購物車。
當客戶端完成訂購時,伺服器會響應客戶端的購物車總價值。
現在,我們已經定義了proto檔案,編寫了伺服器和客戶端程式碼,讓我們繼續執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們將看到以下輸出 -
輸出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
上述輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientServerStreamingClient
讓我們向我們的客戶端新增一些書籍。
Type book name to be added to the cart.... Gr Jul 24, 2021 5:53:07 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... Pa Jul 24, 2021 5:53:20 PM com.tp.bookstore.BookStoreClientStreamingClient addBook INFO: Adding book with title starting with: Passage Type book name to be added to the cart....
一旦我們添加了書籍並輸入“EXIT”,伺服器將計算購物車價值,以下是我們獲得的輸出 -
輸出
EXIT Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient completeOrder INFO: Done, waiting for server to create order summary... Jul 24, 2021 5:53:33 PM com.tp.bookstore.BookStoreClientStreamingClient$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
因此,正如我們所看到的,客戶端能夠新增書籍。並且一旦所有書籍都新增完畢,伺服器將響應書籍總數和總價。
gRPC - 雙向 RPC
現在讓我們看看在使用 gRPC 通訊時客戶端-伺服器流是如何工作的。在這種情況下,客戶端將搜尋並將書籍新增到購物車。伺服器將在每次新增書籍時響應即時購物車價值。
.proto 檔案
首先讓我們在common_proto_files中定義bookstore.proto檔案:
syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
message Cart {
int32 books = 1;
int32 price = 2;
}
以下程式碼塊表示服務名稱 **"BookStore"** 和可以呼叫的函式名稱 "liveCartValue"。**"liveCartValue"** 函式接收型別為 **"Book"** 的輸入,該輸入是一個流。函式返回型別為 **"Cart"** 的物件的流。因此,實際上,我們允許客戶端以流的方式新增書籍,並且每當新增新書籍時,伺服器都會將當前購物車價值響應給客戶端。
service BookStore {
rpc liveCartValue (stream Book) returns (stream Cart) {}
}
現在讓我們看看這些型別。
message Book {
string name = 1;
string author = 2;
int32 price = 3;
}
客戶端將傳送它想要購買的 **"Book"**。它不必是完整的書籍資訊;它可以僅僅是書籍的標題。
message Cart {
int32 books = 1;
int32 price = 2;
}
伺服器在獲取書籍列表後,將返回 **"Cart"** 物件,該物件僅僅是客戶端購買的書籍總數和總價。
請注意,我們已經完成了 Maven 設定,用於自動生成我們的類檔案以及我們的 RPC 程式碼。因此,現在我們可以簡單地編譯我們的專案。
mvn clean install
這應該會自動生成我們使用 gRPC 所需的原始碼。原始碼將放置在
Protobuf class code: target/generatedsources/protobuf/java/com.tp.bookstore Protobuf gRPC code: target/generated-sources/protobuf/grpcjava/com.tp.bookstore
設定 gRPC 伺服器
現在我們已經定義了包含函式定義的 proto 檔案,讓我們設定一個可以呼叫這些函式的伺服器。
讓我們編寫我們的伺服器程式碼來服務於上述函式,並將其儲存在 **com.tp.bookstore.BookeStoreServerBothStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerBothStreaming {
private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
int cartValue = 0;
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry :bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding tocart:....");
bookCart.add(bookEntry.getValue());
cartValue +=bookEntry.getValue().getPrice();
}
}
logger.info("Updating cart value...");
responseObserver.onNext(Cart.newBuilder()
.setPrice(cartValue)
.setBooks(bookCart.size()).build());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream: " + t);
}
@Override
public void onCompleted() {
logger.info("Order completed");
responseObserver.onCompleted();
}
};
}
}
}
上述程式碼在指定的埠上啟動一個 gRPC 伺服器,併為我們在proto檔案中編寫的函式和服務提供服務。讓我們逐步瞭解上面的程式碼:
從 **main** 方法開始,我們在指定的埠上建立一個 gRPC 伺服器。
但在啟動伺服器之前,我們將要執行的服務分配給伺服器,即在我們的例子中為BookStore服務。
為此,我們需要將服務例項傳遞給伺服器,因此我們繼續建立服務例項,即在我們的例子中是 **BookStoreImpl**
服務例項需要提供 **proto** 檔案中存在的 method/function 的實現,即在我們的例子中,是 **totalCartValue** 方法。
現在,鑑於這是伺服器和客戶端流的情況,伺服器將在客戶端新增書籍時獲得一個 **Books** 列表(在 **proto** 檔案中定義)。因此,伺服器返回一個自定義流觀察器。此流觀察器實現了當找到新 Book 時以及當流關閉時會發生什麼。
當客戶端新增一個 Book 時,gRPC 框架將呼叫 **onNext()** 方法。此時,伺服器將其新增到購物車中並使用響應觀察器返回購物車價值。在流式傳輸的情況下,伺服器不會等待所有有效的書籍可用。
當客戶端完成新增書籍時,將呼叫流觀察器的 **onCompleted()** 方法。此方法實現了伺服器在客戶端完成新增 **Books** 時想要執行的操作,即宣告它已完成接收客戶端訂單。
最後,我們還有一個關閉鉤子,以確保在我們完成程式碼執行時伺服器能夠乾淨地關閉。
設定 gRPC 客戶端
現在我們已經編寫了伺服器的程式碼,讓我們設定一個可以呼叫這些函式的客戶端。
讓我們編寫我們的客戶端程式碼來呼叫上述函式,並將其儲存在 **com.tp.bookstore.BookStoreClientBothStreaming.java** 中 -
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientBothStreaming {
private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName());
private final BookStoreStub stub;
private boolean serverIntermediateResponseCompleted = true;
private boolean serverResponseCompleted = false;
StreamObserver<Book> streamClientSender;
public BookStoreClientBothStreaming(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver>Cart< getServerResponseObserver(){
StreamObserver>Cart< observer = new StreamObserver<Cart>(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:" +
"\nTotal number of Books:" + cart.getBooks()+
"\nTotal Order Value:" cart.getPrice());
serverIntermediateResponseCompleted = true;
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response fromServer: " + t);
}
@Override
public void onCompleted() {
//logger.info("Server: Done reading orderreading cart");
serverResponseCompleted = true;
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
streamClientSender =stub.liveCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create ordersummary...");
if(streamClientSender != null); {
streamClientSender.onCompleted();
}
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientBothStreaming client = new
BookStoreClientBothStreaming(channel);
String bookName = "";
while(true) {
if(client.serverIntermediateResponseCompleted ==true) {
System.out.println("Type book name to beadded to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
client.serverIntermediateResponseCompleted = false;
client.addBook(bookName);
Thread.sleep(500);
}
}
while(client.serverResponseCompleted == false) {
Thread.sleep(2000);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
以上程式碼啟動一個 gRPC 客戶端並連線到指定埠上的伺服器,並呼叫我們在 **proto** 檔案中編寫的函式和服務。讓我們一起瀏覽以上程式碼 -
從 **main** 方法開始,我們接受要新增到購物車的書籍名稱。一旦所有書籍都將要新增,使用者預計會列印“EXIT”。
我們為與伺服器的 gRPC 通訊設定了一個 Channel。
接下來,我們使用通道建立一個非阻塞存根。在這裡,我們選擇計劃呼叫其函式的服務 **"BookStore"**。
然後,我們簡單地建立 **.proto** 檔案中定義的預期輸入,即在我們的例子中,是 **Book**,並新增我們希望伺服器新增的標題。
但是鑑於這是伺服器和客戶端流的情況,我們首先為伺服器建立一個 **流觀察器**。此伺服器流觀察器列出了伺服器響應時需要執行的操作的行為,即 **onNext()** 和 **onCompleted()**。
並且使用存根,我們也獲得了客戶端流觀察器。我們使用此流觀察器傳送要新增到購物車的書籍資料。
並且一旦我們的訂單完成,我們確保客戶端流觀察器已關閉。這告訴伺服器關閉流並執行清理。
最後,我們關閉 Channel 以避免任何資源洩漏。
因此,這就是我們的客戶端程式碼。
客戶端伺服器呼叫
總而言之,我們要做的是:
啟動 gRPC 伺服器。
客戶端透過向伺服器通知它們來新增書籍流。
伺服器在其商店中搜索書籍並將其新增到購物車。
每次新增書籍時,伺服器都會告訴客戶端購物車價值。
當客戶端完成訂購時,伺服器和客戶端都會關閉流。
現在我們已經定義了我們的 **proto** 檔案,編寫了我們的伺服器和客戶端程式碼,讓我們現在執行此程式碼並檢視實際情況。
要執行程式碼,請啟動兩個 shell。透過執行以下命令在第一個 shell 中啟動伺服器 -
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們將看到以下輸出 -
輸出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
此輸出表示伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientBothStreaming
讓我們向我們的客戶端新增一本書。
Jul 24, 2021 7:21:45 PM com.tp.bookstore.BookStoreClientBothStreaming main Type book name to be added to the cart.... Great Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Gr Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 1 Total Order Value: 300
因此,正如我們所看到的,我們得到了訂單的當前購物車價值。現在讓我們向我們的客戶端再新增一本書。
Type book name to be added to the cart.... Passage Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Pa Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
一旦我們添加了書籍並輸入“EXIT”,客戶端將關閉。
Type book name to be added to the cart.... EXIT Jul 24, 2021 7:21:59 PM com.tp.bookstore.BookStoreClientBothStreaming completeOrder INFO: Done, waiting for server to create order summary...
因此,正如我們所看到的,客戶端能夠新增書籍。並且隨著書籍的新增,伺服器將響應當前購物車價值。
gRPC - 客戶端呼叫
gRPC 客戶端支援兩種型別的客戶端呼叫,即客戶端如何呼叫伺服器。以下是兩種方式 -
阻塞式客戶端呼叫
非同步客戶端呼叫
在本章中,我們將逐一檢視它們。
阻塞式客戶端呼叫
gRPC 支援阻塞式客戶端呼叫。這意味著,一旦客戶端對服務進行呼叫,客戶端將不會繼續執行其餘程式碼,直到它從伺服器收到響應。請注意,對於單向呼叫和伺服器流式呼叫,阻塞式客戶端呼叫是可能的。
請注意,對於單向呼叫和伺服器流式呼叫,阻塞式客戶端呼叫是可能的。
這是一個單向阻塞式客戶端呼叫的示例。
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientUnaryBlocking client = new
BookStoreClientUnaryBlocking(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
在以上示例中,我們有,
public BookStoreClientUnaryBlocking(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
這意味著我們將使用阻塞式 RPC 呼叫。
然後,我們有,
BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; response = blockingStub.first(request);
在這裡,我們使用 **blockingStub** 呼叫 RPC **method first()** 來獲取書籍詳細資訊。
類似地,對於伺服器流,我們可以使用阻塞式存根 -
logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();
Iterator<Book> response;
try {
response = blockingStub.searchByAuthor(request);
while(response.hasNext()) {
logger.info("Found book: " + response.next());
}
在這裡,我們呼叫 RPC 方法 **searchByAuthor** 方法,並迭代響應,直到伺服器流結束。
非阻塞式客戶端呼叫
gRPC 支援非阻塞式客戶端呼叫。這意味著,當客戶端對服務進行呼叫時,它不需要等待伺服器響應。為了處理伺服器響應,客戶端可以簡單地傳遞觀察器,該觀察器指示在收到響應時該做什麼。請注意,對於單向呼叫和流式呼叫,非阻塞式客戶端呼叫是可能的。但是,我們將專門關注伺服器流式呼叫的情況,以將其與阻塞式呼叫進行比較。
請注意,對於單向呼叫和流式呼叫,非阻塞式客戶端呼叫是可能的。但是,我們將專門關注伺服器流式呼叫的情況,以將其與阻塞式呼叫進行比較。
這是一個伺服器流式非阻塞式客戶端呼叫的示例。
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientServerStreamingNonBlocking {
private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName());
private final BookStoreGrpc.BookStoreStub nonBlockingStub;
public BookStoreClientServerStreamingNonBlocking(Channelchannel) {
nonBlockingStub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Book> getServerResponseObserver(){
StreamObserver<Book> observer = new
StreamObserver<Book>(){
@Override
public void onNext(Book book) {
logger.info("Server returned following book: " +book);
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response fromServer: " + t);
}
@Override
public void onCompleted() {
logger.info("Server returned following book: " + book);
}
};
return observer;
}
public void getBook(String author) {
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
}
public static void main(String[] args) throws Exception {
String authorName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
.usePlaintext()
.build();
try {
BookStoreClientServerStreamingNonBlocking client = new
BookStoreClientServerStreamingNonBlocking(channel);
client.getBook(authorName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
正如我們在以上示例中看到的,
public BookStoreClientUnaryNonBlocking(Channel channel) {
nonBlockingStub = BookStoreGrpc.newStub(channel);
}
它定義了存根是非阻塞式的。類似地,以下程式碼用於處理我們從伺服器接收到的響應。一旦伺服器傳送響應,我們就記錄輸出。
public StreamObserver<Book> getServerResponseObserver(){
StreamObserver<Book> observer = new
StreamObserver<Book>(){
....
....
return observer;
}
以下 gRPC 呼叫是非阻塞式呼叫。
logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}
這就是我們確保客戶端不需要等到伺服器完成 **searchByAuthor** 執行的方式。這將由流觀察器物件在伺服器返回 Book 物件時直接處理。
gRPC - 超時和取消
gRPC 支援為請求分配超時。這是一種執行請求取消的方法。它有助於避免為客戶端和伺服器使用資源,以執行其結果對客戶端無用的請求。
請求超時
gRPC 支援為客戶端和伺服器指定超時。
客戶端可以在執行時指定它希望等待多長時間,然後取消請求。
伺服器也可以在其端檢查請求是否需要處理,或者客戶端是否已放棄請求。
讓我們來看一個例子,客戶端期望在2秒內收到響應,但伺服器花費了更長的時間。所以,這是我們的伺服器程式碼。
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerUnaryTimeout {
private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
.setAuthor("Scott Fitzgerald")
.setPrice(300).build());
bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
.setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
.setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
.setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
.setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerUnaryTimeout greetServer = new
BookeStoreServerUnaryTimeout();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends
BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " +searchQuery.getName());
logger.info("This may take more time...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<String> matchingBookTitles = bookMap.keySet().stream()
.filter(title ->
title.startsWith(searchQuery.getName().trim()))
.collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
在上面的程式碼中,伺服器搜尋客戶端提供的書籍標題。我們添加了一個模擬休眠,以便我們可以看到請求超時。
這是我們的客戶端程式碼
示例
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;
public class BookStoreClientUnaryBlockingTimeout {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
public BookStoreClientUnaryBlockingTimeout(Channel channel){
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request =BookSearch.newBuilder().setName(bookName).build();
Book response;
try {
response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
return;
}
logger.info("Got following book from server: " +response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5,
TimeUnit.SECONDS);
}
}
}
上面的程式碼使用標題呼叫伺服器進行搜尋。但更重要的是,它為gRPC呼叫提供了2秒的超時時間。
現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnaryTimeout
我們將看到以下輸出 -
輸出
Jul 31, 2021 12:29:31 PM com.tp.bookstore.BookeStoreServerUnaryTimeout start INFO: Server started, listening on 50051
以上輸出表明伺服器已啟動。
Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: Searching for book with title: Great Jul 31, 2021 12:29:35 PM com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl first INFO: This may take more time...
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great
我們將得到以下輸出:
輸出
Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}
因此,正如我們所看到的,客戶端在2秒內沒有收到響應,因此它取消了請求並將其稱為超時,即DEADLINE_EXCEEDED
請求取消
gRPC支援客戶端和伺服器端都取消請求。客戶端可以在執行時指定它希望在取消請求之前等待的時間。伺服器也可以在其端檢查請求是否需要處理,或者客戶端是否已經放棄了請求。
讓我們看一個客戶端流式傳輸的例子,其中客戶端呼叫取消。所以,這是我們的伺服器程式碼
示例
package com.tp.bookstore;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookeStoreServerClientStreaming {
private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
.setPrice(400).build());
bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
.setPrice(500).build());
bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
.setPrice(600).build());
bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
.setPrice(700).build());
}
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException,InterruptedException {
final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
@Override
public StreamObserver<Book>
totalCartValue(StreamObserver<Cart> responseObserver) {
return new StreamObserver<Book>() {
ArrayList<Book> bookCart = new ArrayList<Book>();
@Override
public void onNext(Book book) {
logger.info("Searching for book with titlestarting with: " + book.getName());
for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
if(bookEntry.getValue().getName().startsWith(book.getName())){
logger.info("Found book, adding to cart:....");
bookCart.add(bookEntry.getValue());
}
}
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading book stream:" + t);
}
@Override
public void onCompleted() {
int cartValue = 0;
for (Book book : bookCart) {
cartValue += book.getPrice();
}
responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
responseObserver.onCompleted();
}
};
}
}
}
此伺服器程式碼是客戶端流式傳輸的一個簡單示例。伺服器簡單地跟蹤客戶端想要的書籍,最後,它提供訂單的總購物車價值。
但是這裡關於請求取消沒有什麼特別之處,因為這是客戶端將呼叫的。所以,讓我們看看客戶端程式碼。
package com.tp.bookstore;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;
public class BookStoreClientStreamingClientCancelation {
private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
private final BookStoreStub stub;
StreamObserver<Book> streamClientSender;
private CancellableContext withCancellation;
public BookStoreClientStreamingClientCancelation(Channel channel) {
stub = BookStoreGrpc.newStub(channel);
}
public StreamObserver<Cart> getServerResponseObserver(){
StreamObserver<Cart> observer = new StreamObserver>Cart<(){
@Override
public void onNext(Cart cart) {
logger.info("Order summary:"
+ "\nTotal number of Books: "
+ cart.getBooks()
+ "\nTotal Order Value: "
+ cart.getPrice());
}
@Override
public void onError(Throwable t) {
logger.info("Error while reading response from Server: " + t);
}
@Override
public void onCompleted() {
}
};
return observer;
}
public void addBook(String book) {
logger.info("Adding book with title starting with: " + book);
Book request = Book.newBuilder().setName(book).build();
if(streamClientSender == null) {
withCancellation = Context.current().withCancellation();
streamClientSender = stub.totalCartValue(getServerResponseObserver());
}
try {
streamClientSender.onNext(request);
}
catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
}
public void completeOrder() {
logger.info("Done, waiting for server to create order summary...");
if(streamClientSender != null);
streamClientSender.onCompleted();
}
public void cancelOrder() {
withCancellation.cancel(null);
}
public static void main(String[] args) throws Exception {
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
try {
BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
while(true) {
System.out.println("Type book name to be added to the cart....");
bookName = System.console().readLine();
if(bookName.equals("EXIT")) {
client.completeOrder();
break;
}
if(bookName.equals("CANCEL")) {
client.cancelOrder();
break;
}
client.addBook(bookName);
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
所以,如果我們看到上面的程式碼,以下行定義了一個啟用了取消的上下文。
withCancellation = Context.current().withCancellation();
這是當用戶輸入取消時將呼叫的方法。這將取消訂單,並讓伺服器知道。
public void cancelOrder() {
withCancellation.cancel(null);
}
現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我們將看到以下輸出:
輸出
Jul 31, 2021 3:29:58 PM com.tp.bookstore.BookeStoreServerClientStreaming start INFO: Server started, listening on 50051
上述輸出意味著伺服器已啟動。
現在,讓我們啟動客戶端
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientStreamingClientCancelation
我們將得到以下輸出:
輸出
Type book name to be added to the cart.... Great Jul 31, 2021 3:30:55 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation addBook INFO: Adding book with title starting with: Great Type book name to be added to the cart.... CANCEL Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookStoreClientStreamingClientCancelation$1 onError INFO: Error while reading response from Server: io.grpc.StatusRuntimeException: UNAVAILABLE: Channel shutdownNow invoked
我們將在伺服器日誌中獲得以下資料:
INFO: Searching for book with title starting with: Great Jul 31, 2021 3:30:56 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onNext INFO: Found book, adding to cart:.... Jul 31, 2021 3:30:58 PM com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp l$1 onError INFO: Error while reading book stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
因此,正如我們所看到的,客戶端啟動了對其傳送到伺服器的請求的取消。伺服器也收到了關於取消的通知。
gRPC - 傳送/接收元資料
gRPC支援傳送元資料。元資料基本上是我們想要傳送的一組資料,這些資料不是業務邏輯的一部分,同時進行gRPC呼叫。
讓我們看看以下兩種情況:
- 客戶端傳送元資料,伺服器讀取它。
- 伺服器傳送元資料,客戶端讀取它。
我們將逐一介紹這兩種情況。
客戶端傳送元資料
如前所述,gRPC支援客戶端傳送伺服器可以讀取的元資料。gRPC支援擴充套件客戶端和伺服器攔截器,這些攔截器可分別用於寫入和讀取元資料。讓我們舉一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:
讓我們舉一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:
示例
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookStoreClientUnaryBlockingMetadata {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlockingMetadata(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
static class BookClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
try {
response = blockingStub.withOption(metaDataKey, "bar").first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
try {
BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
這裡有趣的部分是攔截器。
static class BookClientInterceptor implements ClientInterceptor{
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT>responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
我們攔截客戶端發出的任何呼叫,然後在進一步呼叫之前向其新增主機名元資料。
伺服器讀取元資料
現在,讓我們看看讀取此元資料的伺服器程式碼:
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerMetadata {
private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
}
private Server server;
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT>
interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
private void start() throws IOException {
int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
同樣,這裡有趣的部分是攔截器。
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
我們攔截任何傳入伺服器的呼叫,然後在實際方法處理呼叫之前記錄元資料。
客戶端-伺服器呼叫
現在讓我們看看它的執行情況。要執行程式碼,請啟動兩個shell。在第一個shell上執行以下命令啟動伺服器:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerMetadata
我們將看到以下輸出 -
輸出
Jul 31, 2021 5:29:14 PM com.tp.bookstore.BookeStoreServerMetadata start INFO: Server started, listening on 50051
上述輸出意味著伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great
我們將看到以下輸出 -
輸出
Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Querying for book with title: Great Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli entInterceptor$1 start INFO: Added metadata Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Got following book from server: name: "Great Gatsby" author: "Scott Fitzgerald" price: 300
我們將在伺服器日誌中獲得以下資料:
Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept or interceptCall INFO: Recieved following metadata: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip) Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first INFO: Searching for book with title: Great
正如我們所看到的,伺服器能夠讀取元資料:hostname=MY_HOST,該元資料由客戶端新增。