Rx 有兩個最基礎的類型,和其他一些擴展這兩種類型的類。兩個核心的類為: Observable 和 Observer。Subject 是同時繼承了 Observable 和 Observer。
Rx 是在 Observer 模式之上建立起來的。這種模式很常見,在 java 中有很多地方都使用了該模式,比如 JavaFx 中的 EventHandler。 這些簡單的使用方式和 Rx 對比有如下區別:
使用 event handler 來處理事件很難組合使用無法延時處理查詢事件可能會導致內存泄露沒有標準的標示完成的方式需要手工的來處理并行和多線程Observable 是第一個核心類。該類包含了 Rx 中的很多實現,以及所有核心的操作函數(Operator、或者說 操作符)。在本系列教程中會逐步介紹每個操作函數。現在我們只需要理解 subscribe 函數即可,下面是該函數的一種定義:
public final Subscription subscribe(Subscriber<? super T> subscriber)該函數是用來接收 observable 發射的事件的。當事件被發射后,他們就丟給了 subscriber, subscriber 是用來處理事件的實現。這里的 Subscriber 參數實現了 Observer 接口
一個 Observable 發射 三種類型的事件:
Values (數據)完成狀態,告訴 Subscriber 事件(數據) 發射完畢,沒有其他數據了Error, 錯誤狀態,如果在發射數據的過程中出現錯誤了。會發送該事件Subscriber 是 Observer 的一個實現。 Subscriber 實現了其他一些額外的功能,可以作為我們實現 Observer 的基類。現在先看看 Observer 的接口定義:
interface Observer<T> { void onCompleted(); void onError(java.lang.Throwable e); void onNext(T t);}每次 Observable 發射事件的時候就會執行這三個對應的函數。Observer 的 onNext 函數會被調用0次或者多次,然后會調用 onCompleted 或者 onError。在 onCompleted 或者 onError 發生以后就不會再有其他事件發射出來了。
在使用 Rx 開發的過程中,你會看到很多 Observable,但是 Observer 出場的時候很少。但是理解 Observer 的概念是非常重要的,雖然有很多簡寫方式來幫助更加簡潔的使用 Observer
你可以手工的實現 Observer 或者擴展 Observable。 在真實場景中并不需要這樣做,Rx 已經提供了很多可以直接使用的工廠方法了。使用 Rx 提供的工具來創建 Observable 和 Observer 比手工實現要更加安全和簡潔。
要訂閱到一個 Observable,并不需要提供一個 Observer 示例。subscribe 函數有各種重載方法可以使用,你可以只訂閱 onNext 事件,有可以只訂閱 onError 事件,這樣就不用提供 Observer 對象就可以接受事件了。每次只需要提供你關心的函數即可,例如 如果你不關心 error 和完成事件,則只提供 onNext 來接收每次發送的數據即可。
配合 Java 8 的 Lambda 表達式則使用起來代碼看起來會更加簡潔,所以本系列示例代碼會使用 lambda 表達式,如果你不了解的話,可以先看看掌握 Java 8 Lambda 表達式。
官方文檔:Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的數據,也可以發射新的數據
Subject 是 Observable 的一個擴展,同時還實現了 Observer 接口,總結就是:
Subject 可以像 Observer 一樣接收事件,同時還可以像 Observable 一樣把接收到的事件再發射出去
它可以充當Observable
它可以充當Observer
是Observable和Observer之間的橋梁這種特性非常適合 Rx 中的接入點,當你的事件來至于 Rx 框架之外的代碼的時候,你可以把這些數據先放到 Subject 中,然后再把 Subject轉換為一個 Observable,就可以在 Rx 中使用它們了。你可以把 Subject 當做 Rx 中的 事件管道。
Subject 有兩個參數類型:輸入參數和輸出參數。這樣設計是為了抽象 而不是為了轉換數據類型。轉換數據應該使用轉換操作函數來完成,后面我們將介紹各種操作函數。
AsyncSubjectBehaviorSubjectPublishSubjectReplaySubjectPublishSubject比較容易理解,相對比其他Subject常用,它的Observer只會接收到PublishSubject被訂閱之后發送的數據。示例代碼如下:
輸出結果:
behaviorSubject3behaviorSubject4ReplaySubject 可以緩存所有發射給他的數據。當一個新的訂閱者訂閱的時候,緩存的所有數據都會發射給這個訂閱者。 由于使用了緩存,所以每個訂閱者都會收到所以的數據:
ReplaySubject<String> replaySubject = ReplaySubject.create(); // 創建默認初始緩存容量大小為16的ReplaySubject,當數據條目超過16會重新分配內存空間,使用這種方式,不論ReplaySubject何時被訂閱,Observer都能接收到數據 // replaySubject = // ReplaySubject.create(100);//創建指定初始緩存容量大小為100的ReplaySubject // replaySubject = ReplaySubject.createWithSize(2);//只緩存訂閱前最后發送的2條數據 // replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation()); // replaySubject被訂閱前的前1秒內發送的數據才能被接收 replaySubject.onNext("replaySubject:結果replaySubject:replaySubject:pre1replaySubject:replaySubject:pre2replaySubject:replaySubject:pre3replaySubject:replaySubject:after1replaySubject:replaySubject:after2不管是何時訂閱的,每個訂閱者都收到了所有的數據。
BehaviorSubject 會接收到BehaviorSubject被訂閱之前的最后一個數據,再接收其他發射過來的數據,如果BehaviorSubject被訂閱之前沒有發送任何數據,則會發送一個默認數據。
注意跟AsyncSubject的區別,AsyncSubject要手動調用onCompleted(),且它的Observer會接收到onCompleted()前發送的最后一個數據,之后不會再接收數據,而BehaviorSubject不需手動調用onCompleted(),它的Observer接收的是BehaviorSubject被訂閱前發送的最后一個數據,兩個的分界點不一樣,且之后還會繼續接收數據。示例代碼如下:
結果
behaviorSubject2behaviorSubject3behaviorSubject4如果在behaviorSubject.subscribe()之前不發送behaviorSubject1、behaviorSubject2,則Observer會先接收到default,再接收behaviorSubject3、behaviorSubject4
Observer會接收AsyncSubject的onComplete()之前的最后一個數據,如果因異常而終止,AsyncSubject將不會釋放任何數據,但是會向Observer傳遞一個異常通知。示例代碼如下:
輸出結果:
asyncSubject3以上代碼,Observer只會接收asyncSubject的onCompleted()被調用前的最后一個數據,即“asyncSubject3”,如果不調用onCompleted(),Subscriber將不接收任何數據。
如果你把 Subject 當作一個 Subscriber 使用,不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調用,這會違反Observable協議,給Subject的結果增加了不確定性。要避免此類問題,官方提出了“串行化”,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:
Rx 中有一些隱含的規則在代碼中并不太容易看到。一個重要的規則就是當一個事件流結束(onError 或者 onCompleted 都會導致事件流結束)后就不會發射任何數據了。這些 Subject 的實現都遵守這個規則,subscribe 函數也拒絕違反該規則的情況。
Subject<Integer, Integer> s = ReplaySubject.create();s.subscribe(v -> System.out.println(v));s.onNext(0);s.onCompleted();s.onNext(1);s.onNext(2);結果
0但是在 Rx 實現中并沒有完全確保這個規則,所以你在使用 Rx 的過程中要注意遵守該規則,否則會出現意料不到的情況。
用Subject實現為:
PublishSubject<String> publishSubject = PublishSubject.create();publishSubject.onNext("as Observable");publishSubject.onCompleted();用Subject實現為:
publishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } });有沒有發現問題?publishSubject沒有重寫onNext()方法啊,在哪接收的數據?這就是前面說的“橋梁”的問題了,盡管把Subject作為Observer傳入subscribe(),但接收數據還是要通過Observer來接收,借用Subject來連接Observable和Observer,整體代碼如下:
這就是橋梁的意思!
新聞熱點
疑難解答