如何在 Java 9 中使用 Flow API 來實現響應式流?


自 Java 9 起,Flow API 便是響應式流規範的官方支援。它是 **Iterator** 和 **Observer** 這兩個模式的結合。Flow API 是互動操作規範,並非像 RxJava 那樣的終端使用者 API。

Flow API 由四個基本介面組成

  • 訂閱者:訂閱者訂閱釋出者以進行回撥。
  • 釋出者:釋出者將資料項流釋出給已註冊的訂閱者。
  • 訂閱:釋出者和訂閱者之間的連結。
  • 處理器:處理器位於釋出者和訂閱者之間,將一個流轉換到另一個流。

在以下示例中,我們建立了一個基本訂閱者,請求一個數據物件、列印它並請求另一個。我們可以使用 Java 提供的釋出者實現(SubmissionPublisher)來完成會話。

示例

import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;

class MySubscriber<T>implements Flow.Subscriber<T> {
   private Flow.Subscription subscription;
   @Override
   public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      this.subscription.request(1);
   }
   @Override
   public void onNext(T item) {
      System.out.println(item);
      subscription.request(1);
   }
   @Override
   public void onError(Throwable throwable) {
      throwable.printStackTrace();
   }
   @Override
   public void onComplete() {
      System.out.println("Done");
   }
}

// main class
public class FlowTest {
   public static void main(String args[]) {
      List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
      publisher.subscribe(new MySubscriber<>());
      items.forEach(s -> {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         publisher.submit(s);
      });
      publisher.close();
   }
}

輸出

1
2
3
4
5
6
7
8
9
10
Done

更新於:2020-03-27

2K+ 瀏覽

啟動您的 職業生涯

完成課程獲得認證

開始
廣告
© . All rights reserved.