RxJava 快速指南



RxJava - 概述

RxJava 是一個基於 Java 的 ReactiveX 擴充套件。它提供了 ReactiveX 專案在 Java 中的實現。以下是 RxJava 的主要特徵。

  • 擴充套件觀察者模式。

  • 支援資料/事件序列。

  • 提供運算子以宣告方式將序列組合在一起。

  • 在內部處理執行緒、同步、執行緒安全和併發資料結構。

什麼是 ReactiveX?

ReactiveX 是一個旨在為各種程式語言提供響應式程式設計概念的專案。響應式程式設計指的是程式在資料出現時做出反應的場景。它是一種基於事件的程式設計概念,事件可以傳播到註冊觀察者。

根據 **響應式** 的說法,他們結合了觀察者模式、迭代器模式和函式式模式的優點。

正確實現的觀察者模式。ReactiveX 結合了觀察者模式、迭代器模式和函數語言程式設計的最佳理念。

函數語言程式設計

函數語言程式設計圍繞著使用純函式構建軟體。純函式不依賴於先前狀態,並且對於傳遞的相同引數始終返回相同的結果。純函式有助於避免與共享物件、可變資料和副作用相關的問題,這些問題在多執行緒環境中經常出現。

響應式程式設計

響應式程式設計指的是事件驅動的程式設計,其中資料流以非同步方式進入並在到達時進行處理。

函式式響應式程式設計

RxJava 將這兩個概念一起實現,其中流資料隨時間變化,消費者函式相應地做出反應。

響應式宣言

響應式宣言 是一份線上文件,闡述了應用程式軟體系統的較高標準。根據該宣言,響應式軟體的主要屬性如下:

  • **響應式** - 應該始終及時響應。

  • **訊息驅動** - 元件之間應該使用非同步訊息傳遞,以便它們保持松耦合。

  • **彈性** - 即使在高負載下也應該保持響應。

  • **彈性** - 即使任何元件發生故障也應該保持響應。

RxJava 的關鍵元件

RxJava 具有兩個關鍵元件:可觀察物件和觀察者。

  • **可觀察物件** - 它表示類似於流的物件,可以發出零個或多個數據,可以傳送錯誤訊息,其速度可以在發出資料集時控制,可以傳送有限資料和無限資料。

  • **觀察者** - 它訂閱可觀察物件的序列資料並在每個可觀察物件的專案上做出反應。每當可觀察物件發出資料時,觀察者都會收到通知。觀察者逐個處理資料。

如果專案不存在或先前專案的回撥未返回,則觀察者永遠不會收到通知。

RxJava - 環境設定

本地環境設定

RxJava 是一個 Java 庫,因此首要要求是在您的機器上安裝 JDK。

系統要求

JDK 1.5 或更高版本。
記憶體 沒有最低要求。
磁碟空間 沒有最低要求。
作業系統 沒有最低要求。

步驟 1 - 驗證您的機器上是否安裝了 Java

首先,開啟控制檯並根據您正在使用的作業系統執行 java 命令。

作業系統 任務 命令
Windows 開啟命令控制檯 c:\> java -version
Linux 開啟命令終端 $ java -version
Mac 開啟終端 machine:< joseph$ java -version

讓我們驗證所有作業系統的輸出:

作業系統 輸出
Windows

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Linux

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

Mac

java version "1.8.0_101"

Java(TM) SE Runtime Environment (build 1.8.0_101)

如果您的系統上沒有安裝 Java,請從以下連結下載 Java 軟體開發工具包 (SDK) https://www.oracle.com。在本教程中,我們假設 Java 1.8.0_101 為已安裝版本。

步驟 2 - 設定 JAVA 環境

設定 **JAVA_HOME** 環境變數以指向 Java 安裝在您機器上的基本目錄位置。例如。

作業系統 輸出
Windows 將環境變數 JAVA_HOME 設定為 C:\Program Files\Java\jdk1.8.0_101
Linux export JAVA_HOME = /usr/local/java-current
Mac export JAVA_HOME = /Library/Java/Home

將 Java 編譯器位置追加到系統路徑。

作業系統 輸出
Windows 在系統變數 **Path** 的末尾追加字串 **C:\Program Files\Java\jdk1.8.0_101\bin**。
Linux export PATH = $PATH:$JAVA_HOME/bin/
Mac 不需要

如上所述,使用命令 **java -version** 驗證 Java 安裝。

步驟 3 - 下載 RxJava2 存檔

RxJava @ MVNRepository 下載最新版本的 RxJava jar 檔案及其依賴項 Reactive Streams @ MVNRepository。在撰寫本教程時,我們已下載 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 並將其複製到 C:\>RxJava 資料夾中。

作業系統 存檔名稱
Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

步驟 4 - 設定 RxJava 環境

設定 **RX_JAVA** 環境變數以指向 RxJava jar 儲存在您機器上的基本目錄位置。假設我們將 rxjava-2.2.4.jar 和 reactive-streams-1.0.2.jar 儲存在 RxJava 資料夾中。

序號 作業系統和說明
1

Windows

將環境變數 RX_JAVA 設定為 C:\RxJava

2

Linux

export RX_JAVA = /usr/local/RxJava

3

Mac

export RX_JAVA = /Library/RxJava

步驟 5 - 設定 CLASSPATH 變數

設定 **CLASSPATH** 環境變數以指向 RxJava jar 位置。

序號 作業系統和說明
1

Windows

將環境變數 CLASSPATH 設定為 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

3

Mac

export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:.

步驟 6 - 測試 RxJava 設定

建立一個名為 TestRx.java 的類,如下所示:

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

步驟 7 - 驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac Tester.java

驗證輸出。

Hello World!

RxJava - 可觀察物件的工作原理

**可觀察物件** 表示資料來源,而 **觀察者(訂閱者)** 則監聽它們。簡而言之,可觀察物件發出專案,然後訂閱者使用這些專案。

可觀察物件

  • 可觀察物件在訂閱者開始監聽時提供資料。

  • 可觀察物件可以發出任意數量的專案。

  • 可觀察物件也可以僅發出完成訊號,而沒有專案。

  • 可觀察物件可以成功終止。

  • 可觀察物件可能永遠不會終止。例如,按鈕可以被點選任意次數。

  • 可觀察物件可能在任何時候丟擲錯誤。

訂閱者

  • 可觀察物件可以有多個訂閱者。

  • 當可觀察物件發出專案時,每個訂閱者的 onNext() 方法都會被呼叫。

  • 當可觀察物件完成發出專案時,每個訂閱者的 onComplete() 方法都會被呼叫。

  • 如果可觀察物件發出錯誤,則每個訂閱者的 onError() 方法都會被呼叫。

RxJava - 建立可觀察物件

以下是建立可觀察物件的基本類。

  • **Flowable** - 0..N 流,發出 0 或 n 個專案。支援 Reactive-Streams 和背壓。

  • **Observable** - 0..N 流,但沒有背壓。

  • **Single** - 1 個專案或錯誤。可以視為方法呼叫的響應式版本。

  • **Completable** - 沒有發出專案。用作完成或錯誤的訊號。可以視為 Runnable 的響應式版本。

  • **MayBe** - 既不發出專案也不發出 1 個專案。可以視為 Optional 的響應式版本。

以下是 Observable 類中建立可觀察物件的便捷方法。

  • **just(T item)** - 返回一個可觀察物件,該物件發出給定的(常量引用)專案,然後完成。

  • **fromIterable(Iterable source)** - 將 Iterable 序列轉換為一個 ObservableSource,該序列發出序列中的專案。

  • **fromArray(T... items)** - 將陣列轉換為一個 ObservableSource,該陣列發出陣列中的專案。

  • **fromCallable(Callable supplier)** - 返回一個可觀察物件,當觀察者訂閱它時,它會呼叫您指定的函式,然後發出該函式返回的值。

  • **fromFuture(Future future)** - 將 Future 轉換為 ObservableSource。

  • **interval(long initialDelay, long period, TimeUnit unit)** - 返回一個可觀察物件,該物件在 initialDelay 後發出 0L,然後在每個時間段之後發出遞增的數字。

RxJava - 單一可觀察物件

Single 類表示單值響應。Single 可觀察物件只能發出單個成功值或錯誤。它不發出 onComplete 事件。

類宣告

以下是 **io.reactivex.Single<T>** 類的宣告:

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

協議

以下是 Single 可觀察物件操作的順序協議:

onSubscribe (onSuccess | onError)?

Single 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Hello World

RxJava - 可能的可觀察物件

MayBe 類表示延遲響應。MayBe 可觀察物件可以發出單個成功值或不發出任何值。

類宣告

以下是 **io.reactivex.Single<T>** 類的宣告:

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

協議

以下是 MayBe 可觀察物件操作的順序協議:

onSubscribe (onSuccess | onError | OnComplete)?

MayBe 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Hello World

RxJava - 可完成的可觀察物件

Completable 類表示延遲響應。Completable 可觀察物件可以指示成功完成或錯誤。

類宣告

以下是 **io.reactivex.Completable** 類的宣告:

public abstract class Completable
extends Object
implements CompletableSource

協議

以下是 Completable 可觀察物件操作的順序協議:

onSubscribe (onError | onComplete)?

Completable 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Started!
Done!

RxJava - 使用 CompositeDisposable

CompositeDisposable 類表示一個容器,該容器可以儲存多個可丟棄物件,並且提供新增和刪除可丟棄物件的 O(1) 複雜度。

類宣告

以下是 **io.reactivex.disposables.CompositeDisposable** 類的宣告:

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Hello World
Hi

RxJava - 建立運算子

以下是用於建立可觀察物件的運算子。

序號。 運算子和描述
1

建立

從頭開始建立可觀察物件,並允許以程式設計方式呼叫觀察者方法。

2

延遲

在觀察者訂閱之前不要建立 Observable。為每個觀察者建立一個新的 Observable。

3

空/永不/丟擲

建立一個具有有限行為的 Observable。

4

將物件/資料結構轉換為 Observable。

5

間隔

建立一個 Observable,以指定的時間間隔依次發出整數。

6

將物件/資料結構轉換為 Observable 以發出相同或相同型別的物件。

7

範圍

建立一個 Observable,以給定範圍依次發出整數。

8

重複

建立一個 Observable,重複依次發出整數。

9

開始

建立一個 Observable 以發出函式的返回值。

10

定時器

建立一個 Observable,在給定延遲後發出單個專案。

建立運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

ABCDEFG

RxJava - 轉換運算子

以下是用於轉換從 Observable 發出的專案的運算子。

序號。 運算子和描述
1

緩衝區

定期將 Observable 中的專案收集到捆綁包中,然後發出捆綁包而不是專案。

2

扁平對映

用於巢狀的 Observable。將專案轉換為 Observable。然後將專案展平為單個 Observable。

3

分組依據

將 Observable 分為一組由鍵組織的 Observable,以發出不同組的專案。

4

地圖

對每個發出的專案應用一個函式來轉換它。

5

掃描

依次對每個發出的專案應用一個函式,然後發出後續的值。

6

視窗

定期將 Observable 中的專案收集到 Observable 視窗中,然後發出視窗而不是專案。

轉換運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

ABCDEFG

RxJava - 過濾運算子

以下是用於選擇性地從 Observable 發出專案(s)的運算子。

序號。 運算子和描述
1

去抖動

僅在超時發生時發出專案,而不會發出另一個專案。

2

獨特

僅發出唯一專案。

3

ElementAt

僅發出 Observable 發出的第 n 個索引處的專案。

4

過濾器

僅發出透過給定謂詞函式的那些專案。

5

第一

發出第一個專案或透過給定條件的第一個專案。

6

忽略元素

不要從 Observable 發出任何專案,但標記完成。

7

最後

從 Observable 發出最後一個元素。

8

樣本

以給定的時間間隔發出最新的專案。

9

跳過

跳過 Observable 中的前 n 個專案。

10

跳過最後

跳過 Observable 中的最後 n 個專案。

11

獲取

從 Observable 中獲取前 n 個專案。

12

獲取最後

從 Observable 中獲取最後 n 個專案。

過濾運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

ab

RxJava - 組合運算子

以下是用於從多個 Observable 建立單個 Observable 的運算子。

序號。 運算子和描述
1 和/然後/何時

使用模式和計劃中介組合專案集。

2 組合最新

透過指定的函式組合每個 Observable 發出的最新專案,併發出結果專案。

3 加入

如果在第二個 Observable 發出的專案的時段內發出,則組合兩個 Observable 發出的專案。

4 合併

組合 Observable 發出的專案。

5 以…開始

在開始發出源 Observable 的專案之前發出指定的專案序列

6 切換

發出 Observable 發出的最新專案。

7 拉鍊

基於函式組合 Observable 的專案,併發出結果專案。

組合運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

g1g2g3g4g5g6

RxJava - 實用運算子

以下是通常與 Observable 有用的運算子。

序號。 運算子和描述
1

延遲

註冊操作以處理 Observable 生命週期事件。

2

具體化/非具體化

表示發出的專案和傳送的通知。

3

ObserveOn

指定要觀察的排程程式。

4

序列化

強制 Observable 進行序列化呼叫。

5

訂閱

對專案的排放和通知(如來自 Observable 的完成)進行操作

6

SubscribeOn

指定 Observable 訂閱時使用的排程程式。

7

時間間隔

轉換 Observable 以發出排放之間經過的時間量的指示。

8

超時

如果指定的時間發生而沒有發出任何專案,則發出錯誤通知。

9

時間戳

將時間戳附加到每個發出的專案。

9

使用

建立一個可處置的資源或與 Observable 具有相同的生命週期。

實用程式運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

abcdefg

RxJava - 條件運算子

以下是評估一個或多個 Observable 或發出的專案的運算子。

序號。 運算子和描述
1

全部

評估所有發出的專案以滿足給定條件。

2

模稜兩可

僅在給定多個 Observable 時發出第一個 Observable 的所有專案。

3

包含

檢查 Observable 是否發出特定專案。

4

預設如果為空

如果 Observable 沒有發出任何內容,則發出預設專案。

5

序列相等

檢查兩個 Observable 是否發出相同的專案序列。

6

跳過直到

丟棄第一個 Observable 發出的專案,直到第二個 Observable 發出專案。

7

跳過一段時間

丟棄 Observable 發出的專案,直到給定條件變為假。

8

獲取直到

在第二個 Observable 發出專案或終止後丟棄 Observable 發出的專案。

9

獲取一段時間

在指定條件變為假後丟棄 Observable 發出的專案。

條件運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

No Data
a

RxJava - 數學運算子

以下是對 Observable 發出的整個專案進行操作的運算子。

序號。 運算子和描述
1

平均

評估所有專案的平均值併發出結果。

2

連線

從多個 Observable 發出所有專案,而無需交錯。

3

計數

計算所有專案併發出結果。

4

最大值

評估所有專案中最大值專案併發出結果。

5

最小值

評估所有專案中最小值專案併發出結果。

6

減少

對每個專案應用一個函式並返回結果。

7

總和

評估所有專案的總和併發出結果。

數學運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

abcdefg123456

RxJava - 可連線運算子

以下是更精確地控制訂閱的運算子。

序號。 運算子和描述
1

連線

指示可連線的 Observable 向其訂閱者發出專案。

2

釋出

將 Observable 轉換為可連線的 Observable。

3

RefCount

將可連線的 Observable 轉換為普通 Observable。

4

重播

確保每個訂閱者都能看到相同的發出專案序列,即使在 Observable 開始發出專案並且訂閱者稍後訂閱之後也是如此。

可連線運算子示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

0
7
abcdefg

RxJava - 主題

根據反應式,Subject 可以同時充當 Observable 和 Observer。

Subject 是一種橋樑或代理,在 ReactiveX 的某些實現中可用,它既充當觀察者又充當 Observable。因為它是一個觀察者,所以它可以訂閱一個或多個 Observable,並且因為它是一個 Observable,所以它可以透過重新發出它觀察到的專案來傳遞這些專案,它還可以發出新專案。

有四種類型的 Subject:

序號。 Subject 和描述
1

釋出 Subject

僅發出訂閱時間之後發出的那些專案。

2 重播 Subject

發出源 Observable 發出的所有專案,無論何時訂閱 Observable。

3

行為 Subject

訂閱後,發出最新的專案,然後繼續發出源 Observable 發出的專案。

4

非同步 Subject

在完成發出後,發出源 Observable 發出的最後一個專案。

RxJava - PublishSubject

PublishSubject 將專案發出給當前已訂閱的觀察者,並將終止事件發出給當前或之後的觀察者。

類宣告

以下是io.reactivex.subjects.PublishSubject<T>類的宣告:

public final class PublishSubject<T>
extends Subject<T>

PublishSubject 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

abcd
d

RxJava - BehaviorSubject

BehaviorSubject 發出它觀察到的最新專案,然後將所有後續觀察到的專案發出給每個已訂閱的觀察者。

類宣告

以下是io.reactivex.subjects.BehaviorSubject<T>類的宣告:

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

abcd
cd

RxJava - ReplaySubject

ReplaySubject 將事件/專案重播給當前和之後的觀察者。

類宣告

以下是io.reactivex.subjects.ReplaySubject<T>類的宣告:

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

abcd
abcd

RxJava - AsyncSubject

AsyncSubject 僅發出最後一個值,然後是完成事件或接收到的錯誤給觀察者。

類宣告

以下是io.reactivex.subjects.AsyncSubject<T>類的宣告:

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

d
d

RxJava - 排程器

排程程式用於多執行緒環境中處理 Observable 運算子。

根據反應式,Scheduler 用於安排運算子鏈如何應用於不同的執行緒。

預設情況下,Observable 和您應用於它的運算子鏈將在呼叫其 Subscribe 方法的同一執行緒上執行其工作並通知其觀察者。SubscribeOn 運算子透過指定 Observable 應在其上操作的不同 Scheduler 來更改此行為。ObserveOn 運算子指定 Observable 用於向其觀察者傳送通知的不同 Scheduler。

RxJava 中有以下型別的排程程式:

序號。 排程程式和描述
1

Schedulers.computation()

建立並返回一個用於計算工作的排程程式。要排程的執行緒數取決於系統中存在的 CPU。每個 CPU 允許一個執行緒。最適合事件迴圈或回撥操作。

2

Schedulers.io()

建立並返回一個用於 IO 繫結工作的排程程式。執行緒池可以根據需要擴充套件。

3

Schedulers.newThread()

建立並返回一個為每個工作單元建立一個新執行緒的排程程式。

4

Schedulers.trampoline()

建立並返回一個在當前執行緒上將工作排隊以在當前工作完成後執行的排程程式。

4

Schedulers.from(java.util.concurrent.Executor executor)

將 Executor 轉換為新的 Scheduler 例項。

RxJava - Trampoline 排程器

Schedulers.trampoline() 方法建立並返回一個在當前執行緒上將工作排隊以在當前工作完成後執行的排程程式。

Schedulers.trampoline() 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava - NewThread 排程器

Schedulers.newThread() 方法建立並返回一個為每個工作單元建立一個新執行緒的排程程式。

Schedulers.newThread() 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava - Computation 排程器

Schedulers.computation() 方法建立並返回一個用於計算工作的排程程式。要排程的執行緒數取決於系統中存在的 CPU。每個 CPU 允許一個執行緒。最適合事件迴圈或回撥操作。

Schedulers.computation() 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava - IO 排程器

Schedulers.io() 方法建立並返回一個用於 IO 繫結工作的排程程式。執行緒池可以根據需要擴充套件。最適合 I/O 密集型操作。

Schedulers.io() 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava - 來自排程器

Schedulers.from(Executor) 方法將 Executor 轉換為新的 Scheduler 例項。

Schedulers.from(Executor) 示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava - 緩衝

緩衝運算子允許將 Observable 發出的專案收集到列表或捆綁包中,併發出這些捆綁包而不是專案本身。在下面的示例中,我們建立了一個 Observable 來發出 9 個專案,並使用緩衝,3 個專案將一起發出。

緩衝示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 

RxJava - 視窗

視窗運算子的工作原理類似於緩衝運算子,但它允許將 Observable 發出的專案收集到另一個 Observable 中,而不是集合中,併發出這些 Observable 而不是集合。在下面的示例中,我們建立了一個 Observable 來發出 9 個專案,並使用視窗運算子,3 個 Observable 將一起發出。

視窗示例

使用您選擇的任何編輯器在 C:\> RxJava 中建立以下 Java 程式。

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

驗證結果

使用 **javac** 編譯器編譯類,如下所示:

C:\RxJava>javac ObservableTester.java

現在執行 ObservableTester,如下所示:

C:\RxJava>java ObservableTester

它應該產生以下輸出:

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 
廣告