gRPC - 傳送/接收元資料



gRPC 支援傳送元資料。元資料基本上是我們想要傳送的一組資料,它不是業務邏輯的一部分,但在進行 gRPC 呼叫時會用到。

讓我們來看以下兩種情況:

  • 客戶端傳送元資料,伺服器讀取它。
  • 伺服器傳送元資料,客戶端讀取它。

我們將逐一介紹這兩種情況。

客戶端傳送元資料

如前所述,gRPC 支援客戶端傳送伺服器可以讀取的元資料。gRPC 支援擴充套件客戶端和伺服器攔截器,分別用於寫入和讀取元資料。讓我們透過一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:

讓我們透過一個例子來更好地理解它。這是我們的客戶端程式碼,它傳送主機名作為元資料:

示例

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookStoreClientUnaryBlockingMetadata {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingMetadata.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStub blockingStub;
   public BookStoreClientUnaryBlockingMetadata(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   static class BookClientInterceptor implements ClientInterceptor {
      @Override
      public <ReqT, RespT> ClientCall<ReqT, RespT>
      interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next
      ) {
         return new 
         ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
               logger.info("Added metadata");
               headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
               super.start(responseListener, headers);
            }
         };
      }
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName); 
      BookSearch request = BookSearch.newBuilder().setName(bookName).build(); 
      Book response; 
      CallOptions.Key<String> metaDataKey = CallOptions.Key.create("my_key");
      try {
         response = blockingStub.withOption(metaDataKey, "bar").first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   } 
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =  ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().intercept(new BookClientInterceptor()).build();
      try {
         BookStoreClientUnaryBlockingMetadata client = new BookStoreClientUnaryBlockingMetadata(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

這裡有趣的部分是攔截器

static class BookClientInterceptor implements ClientInterceptor{
   @Override
   public <ReqT, RespT> ClientCall<ReqT, RespT>
   interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
      return new 
      ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
         @Override
         public void start(Listener<RespT>responseListener, Metadata headers) {
            logger.info("Added metadata");
            headers.put(Metadata.Key.of("HOSTNAME", ASCII_STRING_MARSHALLER), "MY_HOST");
            super.start(responseListener, headers);
         }
      };
   }
}

我們攔截客戶端發出的任何呼叫,然後在進一步呼叫之前向其中新增主機名元資料。

伺服器讀取元資料

現在,讓我們看看讀取此元資料的伺服器程式碼:

package com.tp.bookstore;

import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerMetadata {
   private static final Logger logger = Logger.getLogger(BookeStoreServerMetadata.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott Fitzgerald").setPrice(300).build());
   }
   private Server server;
   class BookServerInterceptor implements ServerInterceptor{
      @Override
      public <ReqT, RespT> Listener<ReqT> 
      interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
 
         logger.info("Recieved following metadata: " + headers);
         return next.startCall(call, headers);
      } 
   }
   private void start() throws IOException {
      int port = 50051; server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).intercept(new BookServerInterceptor()).build().start();
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerMetadata greetServer = new BookeStoreServerMetadata();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " + searchQuery.getName());
         List<String> matchingBookTitles = bookMap.keySet().stream().filter(title ->title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

同樣,這裡有趣的部分是攔截器

class BookServerInterceptor implements ServerInterceptor{
   @Override
   public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
      logger.info("Recieved following metadata: " + headers);
      return next.startCall(call, headers);
   }
}

我們攔截進入伺服器的任何呼叫,然後在實際方法處理呼叫之前記錄元資料。

客戶端-伺服器呼叫

現在讓我們看看它的實際執行情況。要執行程式碼,請啟動兩個終端。在第一個終端上啟動伺服器,執行以下命令:

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerMetadata

我們將看到以下輸出:

輸出

Jul 31, 2021 5:29:14 PM 
com.tp.bookstore.BookeStoreServerMetadata start
INFO: Server started, listening on 50051

上述輸出表明伺服器已啟動。

現在,讓我們啟動客戶端。

java -cp .\target\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata Great

我們將看到以下輸出:

輸出

Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Querying for book with title: Great
Jul 31, 2021 5:29:39 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata$BookCli
entInterceptor$1 start
INFO: Added metadata
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookStoreClientUnaryBlockingMetadata getBook
INFO: Got following book from server: name: "Great Gatsby"
author: "Scott Fitzgerald"
price: 300

伺服器日誌中將包含以下資料:

Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookServerIntercept
or interceptCall
INFO: Recieved following metadata:
Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.38.0,hostname=MY_HOST,grpc-accept-encoding=gzip)
Jul 31, 2021 5:29:40 PM 
com.tp.bookstore.BookeStoreServerMetadata$BookStoreImpl first
INFO: Searching for book with title: Great

正如我們所看到的,伺服器能夠讀取客戶端新增的元資料:hostname=MY_HOST。

廣告
© . All rights reserved.