Java併發 - BlockingQueue介面



java.util.concurrent.BlockingQueue 介面是 Queue 介面的子介面,此外還支援諸如等待佇列變為非空後再檢索元素,以及等待佇列中有空間後再儲存元素之類的操作。

BlockingQueue方法

序號 方法及描述
1

boolean add(E e)

如果可以立即插入指定的元素而不會違反容量限制,則將該元素插入此佇列中,成功時返回 true,如果沒有可用空間則丟擲 IllegalStateException。

2

boolean contains(Object o)

如果此佇列包含指定的元素,則返回 true。

3

int drainTo(Collection<? super E> c)

從此佇列中移除所有可用元素並將它們新增到給定的集合中。

4

int drainTo(Collection<? super E> c, int maxElements)

從此佇列中移除最多給定數量的可用元素並將它們新增到給定的集合中。

5

boolean offer(E e)

如果可以立即插入指定的元素而不會違反容量限制,則將該元素插入此佇列中,成功時返回 true,如果沒有可用空間則返回 false。

6

boolean offer(E e, long timeout, TimeUnit unit)

將指定的元素插入此佇列,如果必要,則等待直到指定等待時間,直到有空間可用。

7

E poll(long timeout, TimeUnit unit)

檢索並移除此佇列的頭,如果必要,則等待直到指定等待時間,直到有元素可用。

8

void put(E e)

將指定的元素插入此佇列,如有必要,則等待直到有空間可用。

9

int remainingCapacity()

返回此佇列理想情況下(在沒有記憶體或資源限制的情況下)可以在不阻塞的情況下接受的額外元素數量,如果沒有固有限制,則返回 Integer.MAX_VALUE。

10

boolean remove(Object o)

從此佇列中移除指定的元素的一個例項(如果存在)。

11

E take()

檢索並移除此佇列的頭,如有必要,則等待直到有元素可用。

示例

下面的 TestThread 程式演示了在基於執行緒的環境中使用 BlockingQueue 介面。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }	   
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }
      
      @Override
      public void run() {
         
         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

這將產生以下結果。

輸出

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27
廣告