ZeroMQ - 通訊模式



ZeroMQ 是一個高效能的非同步訊息庫,它提供了一套用於生成分散式應用程式的通訊模式。它提供了一種簡單而結構化的方式來在各種應用程式中實現訊息傳遞模式。以下是 ZeroMQ 的一些基本核心概念:

  • Socket 作為通訊端點:ZeroMQ Socket 支援高階訊息傳遞模式,例如 PUB-SUB 和 PUSH-PULL。
  • 非同步訊息傳遞:ZeroMQ Socket 非同步執行,允許應用程式傳送和接收訊息而不會阻塞。
  • 可擴充套件性和效能:ZeroMQ 旨在水平擴充套件,允許多對多的配置,同時保持高吞吐量和低延遲。

通訊模式

以下是通訊模式列表:

  • 請求-回覆 (REQ-REP)
  • 釋出-訂閱 (PUB-SUB)
  • 推送-拉取 (PUSH-PULL)
  • 配對 (PAIR)
  • 處理程式-路由器 (DEALER-ROUTER)
  • 路由器-處理程式 (ROUTER-DEALER)
  • XPUB-XSUB

這些通訊模式可以以各種方式組合起來建立複雜的分散式系統。ZeroMQ 提供了一個靈活且可擴充套件的訊息傳遞框架,允許開發者實現從簡單的客戶端-伺服器配置到複雜的分散式架構的任何內容。

請求-回覆

這是一種遠端服務呼叫和任務分配通訊模式,它將一組客戶端連線到一組服務。這意味著它是一個簡單的客戶端-伺服器通訊,客戶端向伺服器傳送請求,伺服器回覆響應。

例如,考慮一個響應客戶端HTTP請求的 Web 伺服器。

Req-Rep-Pattern

示例

以下是 REQ/REP 的示例。首先,我們連線到伺服器並從客戶端獲取響應。

package com.zeromq.mavenProject;

import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Practice {
   public static void main(String[] args) {
      try (ZContext context = new ZContext()) {
         System.out.println("Connecting to TP server");

         //  Socket to talk to server
         ZMQ.Socket socket = context.createSocket(SocketType.REQ);
         socket.connect("tcp://:5555");

         for (int requestNbr = 0; requestNbr != 5; requestNbr++) {
             String request = "Tutorialspoint";
             System.out.println("Sending TP " + requestNbr);
             socket.send(request.getBytes(ZMQ.CHARSET), 0);

             byte[] reply = socket.recv(0);
             System.out.println(
                "Received " + new String(reply, ZMQ.CHARSET) + " " +
                requestNbr
             );
         }
      }
   }
}

輸出

Connecting to TP server
Sending TP 0
Received world 0
Sending TP 1
Received world 1
Sending TP 2
Received world 2
Sending TP 3
Received world 3
Sending TP 4
Received world 4

釋出-訂閱

這是一種資料分發通訊模式,它將一組釋出者連線到一組訂閱者。它用於將訊息廣播到多個接收者。

例如,考慮一個向多個訂閱者廣播新聞文章的新聞提要系統。

Pub-Sub-Pattern

示例

在這個示例中,釋出者建立一個 PUB socket,繫結到一個埠,併發送訊息。訂閱者建立一個 SUB socket,連線到釋出者,接收訊息,並列印收到的訊息。

// Publisher (PUB) class
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PUB comment it runs SUB.

public class PubSubPublisher {
   public static void main(String[] args) {
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PUB);
      socket.bind("tcp://*:5555");

      while (true) {
         socket.send("Hello, subscribers!".getBytes(), 0);
      }
   }
}

// Subscriber (SUB) class
public class PubSubSubscriber {
   public static void main(String[] args) {
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.SUB);
      socket.connect("tcp://:5555");
      socket.subscribe("".getBytes()); // Subscribe to all messages

      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received message: " + new String(message));
      }
   }
}

輸出

Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!

推送-拉取

此模式可用於負載均衡和任務分配。推送 socket 將訊息傳送到拉取 socket,拉取 socket 接收然後處理訊息。

例如,考慮一個將任務分配給多個工作節點的負載均衡器。

Push-Pull-Pattern

示例

在這個示例中,推送器建立一個 PUSH socket 並連線到拉取器。拉取器建立一個拉取 socket,將其繫結到埠,從推送器接收工作項,並列印它們。

package com.zeromq.mavenProject;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PUSH comment it runs PULL.

public class PushPullPusher {
   public static void main(String[] args) {
      // Create a PUSH socket and connect to the puller
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PUSH);
      socket.connect("tcp://:5500");

      // Send work items to the puller
      while (true) {
         socket.send("Work item".getBytes(), 0);
      }
   }
}

public class PushPullPuller {
   public static void main(String[] args) {
      // Create a PULL socket and bind it to a port
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PULL);
      socket.bind("tcp://*:5500");

      // Receive work items from the pusher
      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received work item: " + new String(message));
      }
   }
}

輸出


配對

此模式可用於簡單的點對點互動。一對 socket 以點對點方式相互連線,以便兩個節點可以雙向通訊。

例如,考慮一個允許兩個使用者相互通訊的聊天應用程式。

Pair-Pattern

示例

在這個示例中,我們透過建立兩個可以相互發送和接收訊息的節點來演示 PAIR 模式。我們正在檢查連線是雙向的還是非同步的。如果是,則每個節點都在傳送下一條訊息之前等待另一個節點響應。

package com.zeromq.mavenProject;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PairNode1 comment it runs PairNode2.

public class PairNode1 {
   public static void main(String[] args) {
      // Create a PAIR socket and bind it to a port
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PAIR);
      socket.bind("tcp://*:5000");

      // Receive messages from Node 2 and send responses
      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received message: " + new String(message));
         socket.send("Hello, Node 2!".getBytes(), 0);
      }
   }
}

public class PairNode2 {
   public static void main(String[] args) {
      // Create a PAIR socket and connect to Node 1
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PAIR);
      socket.connect("tcp://:5000");

      // Send messages to Node 1 and receive responses
      while (true) {
         socket.send("Hello, Node 1!".getBytes(), 0);
         byte[] message = socket.recv(0);
         System.out.println("Received response: " + new String(message));
      }
   }
}

輸出

Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!

處理程式-路由器

此模式用於實現複雜的分散式系統。處理程式 socket 將訊息傳送到路由器 socket,路由器 socket 將它們分派給一個或多個已連線的節點。

例如,考慮一個將查詢路由到多個節點的分散式資料庫系統。

路由器-處理程式

此模式有助於實現複雜的分散式系統。路由器 socket 將訊息從一個或多個已連線的處理程式 socket 路由,每個處理程式 socket 將其傳送到其各自的節點。

例如,考慮一個將路由到多個邊緣伺服器的內容分發網路。

XPUB-XSUB

此通訊模式可用於建立動態釋出-訂閱系統。XPUB 是一個特殊的釋出者,允許訂閱者動態連線和斷開連線。XSUB 是一個特殊的訂閱者,可以連線到多個釋出者。

例如,考慮一個即時分析系統,其中多個釋出者將資料傳送到多個訂閱者。

廣告