gRPC - 傳送/接收元資料
gRPC 支援傳送元資料。元資料基本上是我們想要傳送的一組資料,它不是業務邏輯的一部分,但在進行 gRPC 呼叫時會用到。
讓我們來看以下兩種情況:
- 客戶端傳送元資料,伺服器讀取它。
- 伺服器傳送元資料,客戶端讀取它。
我們將逐一介紹這兩種情況。
客戶端傳送元資料
如前所述,gRPC 支援客戶端傳送伺服器可以讀取的元資料。gRPC 支援擴充套件客戶端和伺服器攔截器,分別用於寫入和讀取元資料。讓我們透過一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:
讓我們透過一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:
示例
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookStoreClientUnaryBlockingMetadata {
private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
public BookStoreClientUnaryBlockingMetadata(Channel channel) {
blockingStub = BookStoreGrpc.newBlockingStub(channel);
}
static class BookClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
public void getBook(String bookName) {
logger.info("Querying for book with title: " + bookName);
BookSearch request = BookSearch.newBuilder().setName(bookName).build();
Book response;
CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
try {
response = blockingStub.withOption(metaDataKey, "bar").first(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Got following book from server: " + response);
}
public static void main(String[] args) throws Exception {
String bookName = args[0];
String serverAddress = "localhost:50051";
ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
try {
BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
client.getBook(bookName);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
這裡有趣的部分是攔截器。
static class BookClientInterceptor implements ClientInterceptor{
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT>responseListener, Metadata headers) {
logger.info("Added metadata");
headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
super.start(responseListener, headers);
}
};
}
}
我們攔截客戶端發出的任何呼叫,然後在進一步呼叫之前向其中新增主機名元資料。
伺服器讀取元資料
現在,讓我們看看讀取此元資料的伺服器程式碼:
package com.tp.bookstore;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
public class BookeStoreServerMetadata {
private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
static Map<String, Book> bookMap = new HashMap<>();
static {
bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
}
private Server server;
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT>
interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
private void start() throws IOException {
int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("Shutting down gRPC server");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
}
public static void main(String[] args) throws IOException, InterruptedException {
final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
greetServer.start();
greetServer.server.awaitTermination();
}
static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
@Override
public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
logger.info("Searching for book with title: " + searchQuery.getName());
List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
Book foundBook = null;
if(matchingBookTitles.size() > 0) {
foundBook = bookMap.get(matchingBookTitles.get(0));
}
responseObserver.onNext(foundBook);
responseObserver.onCompleted();
}
}
}
同樣,這裡有趣的部分是攔截器。
class BookServerInterceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
logger.info("Recieved following metadata: " + headers);
return next.startCall(call, headers);
}
}
我們攔截進入伺服器的任何呼叫,然後在實際方法處理呼叫之前記錄元資料。
客戶端-伺服器呼叫
現在讓我們看看它的實際執行情況。要執行程式碼,請啟動兩個終端。在第一個終端上啟動伺服器,執行以下命令:
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerMetadata
我們將看到以下輸出:
輸出
Jul 31, 2021 5:29:14 PM com.tp.bookstore.BookeStoreServerMetadata start INFO: Server started, listening on 50051
上述輸出表明伺服器已啟動。
現在,讓我們啟動客戶端。
java -cp .\target\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great
我們將看到以下輸出:
輸出
Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Querying for book with title: Great Jul 31, 2021 5:29:39 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli entInterceptor$1 start INFO: Added metadata Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook INFO: Got following book from server: name: "Great Gatsby" author: "Scott Fitzgerald" price: 300
伺服器日誌中將包含以下資料:
Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept or interceptCall INFO: Recieved following metadata: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip) Jul 31, 2021 5:29:40 PM com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first INFO: Searching for book with title: Great
正如我們所看到的,伺服器能夠讀取客戶端新增的元資料:hostname=MY_HOST。
廣告