Hazelcast - IQueue



java.util.concurrent.BlockingQueue 提供了一個介面,支援 JVM 中的執行緒以不同的速率生成和使用訊息。生產者會根據可用容量阻塞,消費者會阻塞等待佇列中出現可用元素。

類似地,IQueue 擴充套件了 BlockingQueue,使其成為分散式版本。它提供類似的功能:put、take 等。

關於 IQueue 需要注意的一點是,與其他集合不同,資料不會被分割槽。所有資料都儲存/存在於單個 JVM 上。所有 JVM 仍然可以訪問資料,但佇列無法擴充套件到單個機器/JVM 之外。如果元素數量超過可用記憶體,則會丟擲 OutOfMemoryException。

佇列支援同步備份和非同步備份。同步備份確保即使持有佇列的 JVM 宕機,所有元素都將被保留並可從備份中訪問。

讓我們來看一個有用功能的示例。

新增元素和讀取元素

讓我們在 3 個 JVM 上執行以下程式碼。在一個 JVM 上執行生產者程式碼,在另外兩個 JVM 上執行消費者程式碼。

示例

第一部分是生產者程式碼,它建立一個佇列並將專案新增到其中。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a queue
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   String[] fruits = {"Mango", "Apple", "Banana", "Watermelon"};
   for (String fruit : fruits) {
      System.out.println("Producing: " + fruit);
      Thread.sleep(1000);
   }
   System.exit(0);
}

第二部分是消費者程式碼,它讀取元素。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   while(!hzFruits.isEmpty()) {
   System.out.println("Consuming: " + hzFruits.take());
      Thread.sleep(2000);
   }
   System.exit(0);
}

輸出

生產者程式碼的輸出顯示它無法新增現有元素。

Producing Mango
Producing Apple
Producing Banana
Producing Watermelon

第一個消費者的程式碼輸出顯示它消耗了部分資料。

Consuming Mango
Consuming Banana

第二個消費者的程式碼輸出顯示它消耗了剩餘的資料:

Consuming Apple
Consuming Watermelon

有用方法

序號 函式名稱和描述
1

add(Type element)

向列表中新增元素

2

remove(Type element)

從列表中刪除元素

3

poll()

返回佇列的頭部,如果佇列為空則返回 NULL

4

take()

返回佇列的頭部,或等到元素可用

5

size()

返回列表中元素的數量

6

contains(Type element)

返回元素是否存在

7

getPartitionKey()

返回儲存列表的分割槽鍵

6

addItemListener(ItemListener<Type>listener, value)

通知訂閱者列表中新增/刪除/修改了元素。

hazelcast_data_structures.htm
廣告