使用 Java 執行緒中的 BlockingQueue 實現生產者消費者解決方案


生產者消費者是 Java 併發和多執行緒中最常見的問題之一。它出現在幫助管理多個執行緒嘗試訪問共享資源的同步過程中。本文將幫助我們找到使用 Java 執行緒中的 BlockingQueue 實現生產者消費者解決方案。

生產者消費者問題和 BlockingQueue

理解生產者消費者問題

生產者和消費者是兩個不同的實體或程序,它們使用一個共享佇列。此佇列是一個固定大小的緩衝區。生產者生成資訊片段並將它們儲存在佇列中。消費者使用給定的資訊並將其從佇列中移除。

實際問題發生在

  • 當生產者即使緩衝區已滿也繼續生成資料時。

  • 當消費者嘗試移除資料時,緩衝區為空時。

  • 生產者或消費者的速度較慢。

  • 兩者都試圖同時更新緩衝區。

解決方案

  • 當緩衝區已滿時,生產者必須停止資料生成。

  • 當緩衝區為空時,消費者必須停止從緩衝區移除資訊。

  • 只有當緩衝區既不為空也不滿時,生產者和消費者才能工作。

BlockingQueue

Java 在 'java.util.concurrent' 包中提供了 BlockingQueue 介面。使用此佇列的主要優點是,在檢索和刪除專案時,它會等待佇列變為非空。此外,在新增專案時,它會等待可用空間。此功能使其成為生產者消費者解決方案的完美選擇。

語法

BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();

這裡,LinkedBlockingQueue 是一個實現了 BlockingQueue 介面的類。

使用 Java 中的 BlockingQueue 實現生產者消費者解決方案

方法

  • 建立兩個類及其相應的建構函式。這兩個類都將擴充套件 'Thread' 類。第一個類用於生產者,第二個類用於消費者。

  • 在兩個類中,定義型別為 'Integer' 的 BlockingQueue 並將其作為引數傳遞給建構函式。這裡,'Integer' 是一個包裝類。

  • 在 Producer 類中,我們覆蓋內建方法 'run()' 以從 Producer 端生成資料。現在,使用 for 迴圈迭代 5 次,並使用 'put()' 方法將資料儲存到 BlockingQueue 中,間隔為 1 秒。

  • 在 Consumer 類中,再次覆蓋內建方法 'run()' 以使用名為 'take()' 的內建方法從 Consumer 端使用資料。

  • 在 main() 方法中,定義 BlockingQueue 的物件並將它們作為引數傳遞給生產者和消費者類的建構函式。

示例

import java.util.concurrent.*;
class Producr extends Thread {
   protected BlockingQueue<Integer> blcque;
   Producr(BlockingQueue<Integer> blcque) { // constructor
      this.blcque = blcque;
   }
   public void run() { // overriding run method
      while (true) {
         for(int i = 1; i <= 5; i++) {
            try {
               System.out.println("Producer is running " + i);
               blcque.put(i); // to produce data
               // produce data with an interval of 1 sec
               Thread.sleep(1000);
            }
           // to handle exception
           catch (InterruptedException exp) {
              System.out.println("An interruption occurred at Producer");
            }
         }
      }
   }
}
class Consumr extends Thread {
   protected BlockingQueue<Integer> blcque;
   Consumr(BlockingQueue<Integer> blcque) { // constructor
      this.blcque = blcque;
   }
   public void run() { // overriding run method
      try {
         while (true) {
            Integer elem = blcque.take(); // to consume data
            System.out.println("Consumer is running " + elem);
         }
      }
      // to handle exception
      catch (InterruptedException exp) {
         System.out.println("An interruption occurred at Producer");
      }
   }
}
public class Solution {
   public static void main(String[] args) throws InterruptedException {
      // create an object of BlockingQueue
      BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>();
      
      // passing object of BlockingQueue as arguments
      Producr threadProd = new Producr(bufrShr);
      Consumr threadCon = new Consumr(bufrShr);
      
      // to start the process
      threadProd.start();
      threadCon.start();
      
      // to exit the process after 5 sec
      Thread.sleep(5000);
      System.exit(0);
   }
}

輸出

Producer is running 1
Consumer is running 1
Producer is running 2
Consumer is running 2
Producer is running 3
Consumer is running 3
Producer is running 4
Consumer is running 4
Producer is running 5
Consumer is running 5

結論

我們從定義生產者消費者問題開始本文,在下一節中,我們透過引入 BlockingQueue 介面提出瞭解決此問題的可能解決方案。最後,我們討論了一個 Java 程式,該程式向我們展示瞭如何實際使用 BlockingQueue 來解決給定的問題。

更新於: 2023年7月20日

1K+ 次瀏覽

開啟你的 職業生涯

透過完成課程獲得認證

開始學習
廣告

© . All rights reserved.