RabbitMQ 快速指南



RabbitMQ - 概述

什麼是 RabbitMQ?

RabbitMQ 是一個用 Java 編寫的開源訊息代理。它完全符合 JMS 1.1 標準。它由 Apache 軟體基金會開發和維護,並根據 Apache 許可證授權。它為企業級訊息應用程式提供高可用性、可擴充套件性、可靠性、效能和安全性。

JMS 是一種允許開發基於訊息的系統的規範。RabbitMQ 充當訊息代理,位於應用程式之間,允許它們以非同步和可靠的方式進行通訊。

AMQ

訊息型別

為了更好地理解,下面解釋了兩種訊息選項。

點對點

在這種型別的通訊中,代理僅向一個消費者傳送訊息,而其他消費者將等待直到他們從代理接收訊息。沒有哪個消費者會收到相同的訊息。

如果沒有消費者,代理將保留訊息,直到收到消費者。這種型別的通訊也稱為**基於佇列的通訊**,其中生產者將訊息傳送到佇列,並且只有一個消費者從佇列中獲取一條訊息。如果有多個消費者,他們可能會收到下一條訊息,但他們不會收到與其他消費者相同的 訊息。

Point to Point Messaging

釋出/訂閱

在這種型別的通訊中,代理將相同的訊息副本傳送給所有活動的消費者。這種型別的通訊也稱為**基於主題的通訊**,其中代理將相同的訊息傳送給訂閱了特定主題的所有活動消費者。此模型支援單向通訊,不需要驗證已傳輸的訊息。

Publish/Subscribe Messaging

RabbitMQ - 環境搭建

本章將指導您如何準備開發環境以開始使用 RabbitMQ。它還將教您如何在設定 RabbitMQ 之前在您的機器上設定 JDK、Maven 和 Eclipse:

設定 Java 開發工具包 (JDK)

您可以從 Oracle 的 Java 網站下載最新版本的 SDK:Java SE 下載。 您將在下載的檔案中找到安裝 JDK 的說明,請按照給定的說明安裝和配置設定。最後設定 PATH 和 JAVA_HOME 環境變數以指向包含 java 和 javac 的目錄,通常分別為 java_install_dir/bin 和 java_install_dir。

如果您執行的是 Windows 並已將 JDK 安裝在 C:\jdk-11.0.11 中,則必須將以下行新增到 C:\autoexec.bat 檔案中。

set PATH=C:\jdk-11.0.11;%PATH% 
set JAVA_HOME=C:\jdk-11.0.11 

或者,在 Windows NT/2000/XP 上,您必須右鍵單擊“我的電腦”,選擇“屬性”→“高階”→“環境變數”。然後,您必須更新 PATH 值並單擊“確定”按鈕。

在 Unix(Solaris、Linux 等)上,如果 SDK 安裝在 /usr/local/jdk-11.0.11 中並且您使用的是 C shell,則必須將以下內容新增到您的 .cshrc 檔案中。

setenv PATH /usr/local/jdk-11.0.11/bin:$PATH 
setenv JAVA_HOME /usr/local/jdk-11.0.11

或者,如果您使用的是整合開發環境 (IDE),例如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,則必須編譯並執行一個簡單的程式以確認 IDE 知道您已安裝 Java 的位置。否則,您必須按照 IDE 文件中給出的說明進行正確的設定。

設定 Eclipse IDE

本教程中的所有示例均使用 Eclipse IDE 編寫。因此,我們建議您應該在您的機器上安裝最新版本的 Eclipse。

要安裝 Eclipse IDE,請從www.eclipse.org/downloads下載最新的 Eclipse 二進位制檔案。下載安裝後,將二進位制分發版解壓縮到方便的位置。例如,在 Windows 上為 C:\eclipse,或在 Linux/Unix 上為 /usr/local/eclipse,最後適當地設定 PATH 變數。

可以透過在 Windows 機器上執行以下命令啟動 Eclipse,或者您可以簡單地雙擊 eclipse.exe。

%C:\eclipse\eclipse.exe 

可以透過在 Unix(Solaris、Linux 等)機器上執行以下命令啟動 Eclipse:

$/usr/local/eclipse/eclipse

成功啟動後,如果一切正常,則應顯示以下結果:

Eclipse Home page

設定 Maven

在本教程中,我們使用 maven 來執行和構建基於 Spring 的示例以執行基於 RabbitMQ 的應用程式。請遵循Maven - 環境搭建來安裝 maven。

RabbitMQ - 特性

RabbitMQ 是最流行的開源訊息代理之一。它旨在為企業級訊息應用程式提供高可用性、可擴充套件性、可靠性、效能和安全性。以下是 RabbitMQ 的一些主要特性。

  • **輕量級** - RabbitMQ 輕量級,易於在本地和雲端安裝。

  • **連線選項** - RabbitMQ 支援多種訊息協議,並且可以部署在分散式/聯合配置中,以滿足高可用性和可擴充套件性要求。

  • **可插拔架構** - RabbitMQ 允許選擇永續性機制,並提供根據應用程式需求自定義安全性的選項(身份驗證和授權)。

  • **多平臺** - RabbitMQ 為許多流行的語言提供客戶端 API,例如 Java、Python、JavaScript、Ruby 等。

  • **代理叢集** - RabbitMQ 可以作為叢集部署以實現高可用性和吞吐量。它可以跨多個可用區和區域聯合。

  • **功能豐富** - RabbitMQ 為代理和客戶端提供了許多高階功能。

  • **簡單的管理介面** - RabbitMQ 管理控制檯易於使用,但仍然提供許多強大的管理功能。

  • **企業級和雲就緒** - RabbitMQ 支援可插拔的身份驗證和授權。它支援 LDAP 和 TLS。它可以輕鬆部署在公共雲和私有云中。

  • **功能豐富** - RabbitMQ 為代理和客戶端提供了許多高階功能。它提供外掛以支援持續整合、運營指標以及與其他企業系統的整合等。

  • **管理** - RabbitMQ 提供 HTTP API、命令列工具和 UI 來管理和監控 RabbitMQ。

RabbitMQ - 安裝

RabbitMQ 基於 Erlang 執行時構建,因此在安裝 RabbitMQ 之前,我們需要下載 Erlang 並安裝它。確保您使用管理員許可權安裝 Erlang 和 RabbitMQ。

Erlang

Erlang 是一種通用程式語言和執行時環境。您可以從其主頁下載最新版本的 Erlang:下載 Erlang/OTP。 我們正在 Windows 上安裝 Erlang 並下載了適用於 Windows 的**Erlang/OTP 24.2.2** 安裝程式 - otp_win64_24.2.2.exe。

Download Erlang

現在透過雙擊安裝程式安裝 Erlang,並遵循預設選擇完成設定。

Erlang Installation

RabbitMQ 安裝

從其官方下載頁面下載最新的 RabbitMQ 二進位制檔案。我們已下載 3.9.13 版本為 rabbitmq-server-3.9.13.exe (適用於Windows)。

RabbitMQ Downloads

現在透過雙擊安裝程式安裝 RabbitMQ,並遵循預設選擇完成設定。

RabbitMQ Installation

預設情況下,RabbitMQ 作為 Windows 服務執行。要啟用基於 Web 的管理 UI,需要執行以下步驟。

轉到 RabbitMQ 安裝目錄並鍵入如下命令:

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch

started 3 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_shovel
rabbitmq_shovel_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_shovel
   rabbitmq_shovel_management
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_shovel
   rabbitmq_shovel_management

started 2 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>

使用管理員許可權編輯 C:\Windows\System32\drivers\etc\hosts 檔案,並在其中新增以下行:

127.0.0.1 rabbitmq

驗證安裝

現在開啟**http://rabbitmq:15672/** 以開啟管理控制檯。使用 guest/guest 登入。

RabbitMQ Management Console

RabbitMQ - 生產者應用

現在讓我們建立一個生產者應用程式,它將訊息傳送到 RabbitMQ 佇列。

建立專案

使用 Eclipse,選擇**檔案** → **新建** → **Maven 專案**。勾選**建立簡單的專案(跳過原型選擇)**並單擊“下一步”。

輸入如下所示的詳細資訊:

  • **groupId** - com.tutorialspoint

  • **artifactId** - producer

  • **version** - 0.0.1-SNAPSHOT

  • **name** - RabbitMQ 生產者

單擊“完成”按鈕,將建立一個新專案。

pom.xml

現在更新 pom.xml 的內容以包含 RabbitMQ 的依賴項。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>producer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Producer</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

現在建立一個生產者類,它將訊息傳送到 RabbitMQ 佇列。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         channel.queueDeclare(QUEUE, false, false, false, null);

         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish("", QUEUE, null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生產者類建立一個連線,建立一個通道,連線到一個佇列。如果使用者輸入 quit,則應用程式終止,否則它將使用 basicPublish 方法將訊息傳送到佇列。

我們將在RabbitMQ - 測試應用章節中執行此應用程式。

RabbitMQ - 消費者應用

現在讓我們建立一個消費者應用程式,它將從 RabbitMQ 佇列接收訊息。

建立專案

使用 Eclipse,選擇**檔案** → **新建** → **Maven 專案**。勾選**建立簡單的專案(跳過原型選擇)**並單擊“下一步”。

輸入如下所示的詳細資訊

  • **groupId** - com.tutorialspoint

  • **artifactId** - consumer

  • **version** - 0.0.1-SNAPSHOT

  • **name** - RabbitMQ 消費者

單擊“完成”按鈕,將建立一個新專案。

pom.xml

現在更新 pom.xml 的內容以包含 ActiveMQ 的依賴項。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>consumer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Consumer</name>
      <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

現在建立一個消費者類,它將從 RabbitMQ 佇列接收訊息。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE, false, false, false, null);
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> { });
   }
}

消費者類建立一個連線,建立一個通道,如果不存在則建立一個佇列,然後如果有任何訊息則從佇列接收訊息,並且它將繼續輪詢佇列以獲取訊息。一旦訊息被傳遞,它就會由 basicConsume() 方法使用 deliverCallback 處理。

我們將在RabbitMQ - 測試應用章節中執行此應用程式。

RabbitMQ - 測試應用

啟動生產者應用程式

在 Eclipse 中,右鍵單擊 Producer.java 原始碼,然後選擇“執行方式”→“Java 應用程式”。生產者應用程式將開始執行,您將看到如下所示的輸出:

Enter message:

啟動消費者應用程式

在 Eclipse 中,右鍵單擊 Consumer.java 原始碼,然後選擇“執行方式”→“Java 應用程式”。消費者應用程式將開始執行,您將看到如下所示的輸出:

Waiting for messages. To exit press CTRL+C

傳送訊息

在生產者控制檯視窗中,鍵入 Hi 並按 Enter 鍵傳送訊息。

Enter message:
Hi

接收訊息

在消費者控制檯視窗中驗證是否已收到訊息。

Waiting for messages. To exit press CTRL+C
Received = Hi

傳送 Quit 作為訊息以終止生產者視窗會話並終止客戶端視窗會話。

驗證

現在在您的瀏覽器中開啟**http://rabbitmq:15672/**。它將要求輸入憑據。使用 guest/guest 作為使用者名稱/密碼,它將載入 RabbitMQ 管理控制檯,您可以在其中檢查佇列以檢查狀態。它將顯示排隊和已交付的訊息。

queue

RabbitMQ - 釋出者應用

現在讓我們建立一個釋出者應用程式,它將訊息傳送到 RabbitMQ 交換機。此交換機將訊息傳遞到與交換機繫結的佇列。

建立專案

使用 Eclipse,選擇**檔案** → **新建** → **Maven 專案**。勾選**建立簡單的專案(跳過原型選擇)**並單擊“下一步”。

輸入如下所示的詳細資訊:

  • **groupId** - com.tutorialspoint

  • **artifactId** - publisher

  • **version** - 0.0.1-SNAPSHOT

  • **name** - RabbitMQ 釋出者

單擊“完成”按鈕,將建立一個新專案。

pom.xml

現在更新 pom.xml 的內容以包含 RabbitMQ 的依賴項。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>publisher</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Publisher</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

現在建立一個釋出者類,它將訊息傳送到 RabbitMQ 主題以將其廣播給所有訂閱者。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {
   private static final String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
      Channel channel = connection.createChannel()) {
         channel.exchangeDeclare(EXCHANGE, "fanout");
         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish(EXCHANGE, "", null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生產者類建立一個連線,建立一個通道,宣告一個交換機,然後要求使用者輸入訊息。訊息被髮送到交換機,並且因為我們沒有傳遞佇列名稱,所以所有繫結到此交換機的佇列都將收到訊息。如果使用者輸入 quit,則應用程式終止,否則它將訊息傳送到主題。

我們將在RabbitMQ - 測試應用章節中執行此應用程式。

RabbitMQ - 訂閱者應用

現在讓我們建立一個訂閱者應用程式,它將接收來自RabbitMQ主題的訊息。

建立專案

使用 Eclipse,選擇**檔案** → **新建** → **Maven 專案**。勾選**建立簡單的專案(跳過原型選擇)**並單擊“下一步”。

輸入如下所示的詳細資訊:

  • **groupId** - com.tutorialspoint

  • artifactId − subscriber

  • **version** - 0.0.1-SNAPSHOT

  • name − RabbitMQ 訂閱者

單擊“完成”按鈕,將建立一個新專案。

pom.xml

現在更新 pom.xml 的內容以包含 RabbitMQ 的依賴項。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>subscriber</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Subscriber</name>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>  
</project>

現在建立一個Subscriber類,它將接收來自RabbitMQ佇列的訊息。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Subscriber {
   private static String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(EXCHANGE, "fanout");

      String queueName = channel.queueDeclare().getQueue();
      channel.queueBind(queueName, EXCHANGE, "");
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
   }
}

Subscriber類建立連線,建立通道,宣告交換機,建立一個隨機佇列並將其與交換機繫結,然後接收主題中的訊息(如果有)。按Ctrl + C終止,否則它將繼續輪詢佇列以查詢訊息。

我們將在RabbitMQ - 測試應用程式章節中多次執行此應用程式以建立多個訂閱者。

RabbitMQ - 測試應用程式主題

啟動釋出者應用程式

在eclipse中,右鍵單擊Publisher.java原始碼,然後選擇“Run As”→“Java Application”。釋出者應用程式將開始執行,您將看到如下輸出:

Enter message:

啟動訂閱者應用程式

在eclipse中,右鍵單擊Subscriber.java原始碼,然後選擇“Run As”→“Java Application”。訂閱者應用程式將開始執行,您將看到如下輸出:

Waiting for messages. To exit press CTRL+C

啟動另一個訂閱者應用程式

在eclipse中,再次右鍵單擊Subscriber.java原始碼,然後選擇“Run As”→“Java Application”。另一個訂閱者應用程式將開始執行,您將看到如下輸出:

Waiting for messages. To exit press CTRL+C

傳送訊息

在釋出者控制檯視窗中,鍵入Hi並按Enter鍵傳送訊息。

Enter message:
Hi

接收訊息

在訂閱者控制檯視窗中驗證每個視窗是否都收到了訊息。

Received = Hi

傳送Quit作為訊息以終止所有釋出者和訂閱者控制檯視窗會話。

驗證

現在在瀏覽器中開啟http://rabbitmq:15672/。它將要求輸入憑據。使用guest/guest作為使用者名稱/密碼,它將載入RabbitMQ管理控制檯,您可以在其中檢查佇列和交換機以檢查已傳遞訊息和繫結的狀態。

廣告
© . All rights reserved.