ZeroMQ - 同步訊息處理



ZeroMQ 是一個高效能的非同步通用訊息傳遞庫,用於構建分散式或併發應用程式。它也被稱為ØMQ。眾所周知,ZeroMQ 主要設計用於非同步訊息傳遞,您也可以使用各種 Socket 型別和訊息傳遞模式來實現同步訊息處理模式。

關鍵概念

在 ZeroMQ 中執行同步訊息處理時,您應該瞭解以下重要主題:

  • Socket:在 ZeroMQ 中,Socket 是客戶端和伺服器之間傳送或接收資料的端點(API)。它表示兩個元件(例如程序或裝置)之間的通訊通道,允許它們交換資料。ZeroMQ 提供不同型別的 Socket 用於各種訊息傳遞模式,例如 PUB/SUB、REQ/REP、PUSH/PULL 等。

  • 訊息佇列:ZeroMQ Socket 用於傳送和接收訊息。它們不需要專用的訊息代理,因為訊息模式是在庫本身中實現的,這使得它“無代理”。

  • 非同步特性:ZeroMQ 主要設計為非同步的,這意味著它旨在同時處理多個操作,這就是為什麼它通常用於高效能、非阻塞通訊的原因。

以下是同步訊息處理的示意圖:

Synchronous Message Processing

實現同步處理

同步過程是一種操作型別,其中任務或操作按順序執行,每個步驟或操作都等待上一個步驟完成才能繼續。

為了使用 ZeroMQ(預設情況下遵循非同步機制)實現同步處理,您可以使用特定的模式和 Socket 型別。同步訊息處理最常見的方法是使用 REQ(請求)和 REP(回覆)Socket,它們遵循請求-回覆模式。在REQ/REP請求模式中:

  • REQ Socket:客戶端用來發送請求或資料到伺服器。
  • REP Socket:伺服器用來響應接收到的請求。

Java 中的同步訊息處理

以下是用 ZeroMQ 在Java中進行同步訊息處理的基本示例:

伺服器程式碼(REP Socket)

import org.zeromq.ZContext;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Response {
   public static void main(String[] args) {
      try (ZContext context = new ZContext()) {
         ZMQ.Socket socket = context.createSocket(ZMQ.REP);
         socket.bind("tcp://127.0.0.1:5555");

         while (true) {
            // Wait for the next request from the client
            String message = socket.recvStr();
            System.out.println("Received request: " + message);

            // Send a reply back to the client
            socket.send("Welcome, " + message);
         }
      }
   }
}

客戶端程式碼(REQ Socket)

import org.zeromq.ZContext;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Request {
   public static void main(String[] args) {
      ZMQ.Context context = ZMQ.context(1);
      ZMQ.Socket socket = context.socket(ZMQ.REQ);
      socket.connect("tcp://127.0.0.1:5555");

      // Send a request
      socket.send("to World".getBytes(ZMQ.CHARSET), 0);

      // Receive the reply
      byte[] reply = socket.recv(0);
      System.out.println("Received reply: "+new String(reply,ZMQ.CHARSET));

      socket.close();
      context.term();
   }
}

輸出

執行上述程式後,將顯示以下輸出:

Received reply: Welcome, to World

Python 中的同步訊息處理

以下是用 ZeroMQ 在Python中進行同步訊息處理的基本示例:

伺服器程式碼(REP Socket)

import zmq
def main():
   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind("tcp://127.0.0.1:5555")

   while True:
      # Wait for the next request from the client
      message = socket.recv_string()
      print(f"Received request: {message}")

      # Send a reply back to the client
      socket.send_string(f"Welcome, {message}")

if __name__ == "__main__":
   main()

客戶端程式碼(REQ Socket)

import zmq
def main():
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect("tcp://127.0.0.1:5555")

   # Send a request
   socket.send_string("to World")

   # Receive the reply
   message = socket.recv_string()
   print(f"Received reply: {message}")

if __name__ == "__main__":
   main()

輸出

上述程式產生以下輸出:

Received reply: Welcome, to World
廣告