国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Java > 正文

RxJava 入門篇 (二) -- 關鍵的類

2019-11-06 09:38:04
字體:
來源:轉載
供稿:網友

Rx 有兩個最基礎的類型,和其他一些擴展這兩種類型的類。兩個核心的類為: ObservableObserver。Subject 是同時繼承了 Observable 和 Observer。

Rx 是在 Observer 模式之上建立起來的。這種模式很常見,在 java 中有很多地方都使用了該模式,比如 JavaFx 中的 EventHandler。 這些簡單的使用方式和 Rx 對比有如下區別:

使用 event handler 來處理事件很難組合使用無法延時處理查詢事件可能會導致內存泄露沒有標準的標示完成的方式需要手工的來處理并行和多線程

1. Observable

Observable 是第一個核心類。該類包含了 Rx 中的很多實現,以及所有核心的操作函數(Operator、或者說 操作符)。在本系列教程中會逐步介紹每個操作函數。現在我們只需要理解 subscribe 函數即可,下面是該函數的一種定義:

public final Subscription subscribe(Subscriber<? super T> subscriber)

該函數是用來接收 observable 發射的事件的。當事件被發射后,他們就丟給了 subscriber, subscriber 是用來處理事件的實現。這里的 Subscriber 參數實現了 Observer 接口

一個 Observable 發射 三種類型的事件:

Values (數據)完成狀態,告訴 Subscriber 事件(數據) 發射完畢,沒有其他數據了Error, 錯誤狀態,如果在發射數據的過程中出現錯誤了。會發送該事件

2. Observer

Subscriber 是 Observer 的一個實現。 Subscriber 實現了其他一些額外的功能,可以作為我們實現 Observer 的基類。現在先看看 Observer 的接口定義:

interface Observer<T> { void onCompleted(); void onError(java.lang.Throwable e); void onNext(T t);}

每次 Observable 發射事件的時候就會執行這三個對應的函數。Observer 的 onNext 函數會被調用0次或者多次,然后會調用 onCompleted 或者 onError。在 onCompleted 或者 onError 發生以后就不會再有其他事件發射出來了。

在使用 Rx 開發的過程中,你會看到很多 Observable,但是 Observer 出場的時候很少。但是理解 Observer 的概念是非常重要的,雖然有很多簡寫方式來幫助更加簡潔的使用 Observer

3. 實現 Observable 和 Observer

你可以手工的實現 Observer 或者擴展 Observable。 在真實場景中并不需要這樣做,Rx 已經提供了很多可以直接使用的工廠方法了。使用 Rx 提供的工具來創建 Observable 和 Observer 比手工實現要更加安全和簡潔。

要訂閱到一個 Observable,并不需要提供一個 Observer 示例。subscribe 函數有各種重載方法可以使用,你可以只訂閱 onNext 事件,有可以只訂閱 onError 事件,這樣就不用提供 Observer 對象就可以接受事件了。每次只需要提供你關心的函數即可,例如 如果你不關心 error 和完成事件,則只提供 onNext 來接收每次發送的數據即可。

配合 Java 8 的 Lambda 表達式則使用起來代碼看起來會更加簡潔,所以本系列示例代碼會使用 lambda 表達式,如果你不了解的話,可以先看看掌握 Java 8 Lambda 表達式。

4. Subjec

官方文檔:Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的數據,也可以發射新的數據

Subject 是 Observable 的一個擴展,同時還實現了 Observer 接口,總結就是:

Subject 可以像 Observer 一樣接收事件,同時還可以像 Observable 一樣把接收到的事件再發射出去

它可以充當Observable

它可以充當Observer

Observable和Observer之間的橋梁

這種特性非常適合 Rx 中的接入點,當你的事件來至于 Rx 框架之外的代碼的時候,你可以把這些數據先放到 Subject 中,然后再把 Subject轉換為一個 Observable,就可以在 Rx 中使用它們了。你可以把 Subject 當做 Rx 中的 事件管道

Subject 有兩個參數類型:輸入參數和輸出參數。這樣設計是為了抽象 而不是為了轉換數據類型。轉換數據應該使用轉換操作函數來完成,后面我們將介紹各種操作函數。

AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject

4.1 PublishSubject

PublishSubject比較容易理解,相對比其他Subject常用,它的Observer只會接收到PublishSubject被訂閱之后發送的數據。示例代碼如下:

PublishSubject<String> publishSubject = PublishSubject.create(); publishSubject.onNext("publishSubject1"); publishSubject.onNext("publishSubject2"); publishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { LogUtil.log("publishSubject observer1:"+s); } }); publishSubject.onNext("publishSubject3"); publishSubject.onNext("publishSubject4");

輸出結果:

behaviorSubject3behaviorSubject4

4.2 ReplaySubject

ReplaySubject 可以緩存所有發射給他的數據。當一個新的訂閱者訂閱的時候,緩存的所有數據都會發射給這個訂閱者。 由于使用了緩存,所以每個訂閱者都會收到所以的數據:

ReplaySubject<String> replaySubject = ReplaySubject.create(); // 創建默認初始緩存容量大小為16的ReplaySubject,當數據條目超過16會重新分配內存空間,使用這種方式,不論ReplaySubject何時被訂閱,Observer都能接收到數據 // replaySubject = // ReplaySubject.create(100);//創建指定初始緩存容量大小為100的ReplaySubject // replaySubject = ReplaySubject.createWithSize(2);//只緩存訂閱前最后發送的2條數據 // replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation()); // replaySubject被訂閱前的前1秒內發送的數據才能被接收 replaySubject.onNext("replaySubject:結果

replaySubject:replaySubject:pre1replaySubject:replaySubject:pre2replaySubject:replaySubject:pre3replaySubject:replaySubject:after1replaySubject:replaySubject:after2

不管是何時訂閱的,每個訂閱者都收到了所有的數據。

4.3 BehaviorSubject

BehaviorSubject 會接收到BehaviorSubject被訂閱之前的最后一個數據,再接收其他發射過來的數據,如果BehaviorSubject被訂閱之前沒有發送任何數據,則會發送一個默認數據。

注意跟AsyncSubject的區別,AsyncSubject手動調用onCompleted(),且它的Observer會接收到onCompleted()前發送的最后一個數據,之后不會再接收數據,而BehaviorSubject不需手動調用onCompleted(),它的Observer接收的是BehaviorSubject被訂閱前發送的最后一個數據,兩個的分界點不一樣,且之后還會繼續接收數據。示例代碼如下:

BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default"); behaviorSubject.onNext("behaviorSubject1"); behaviorSubject.onNext("behaviorSubject2"); behaviorSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { LogUtil.log("behaviorSubject:complete"); } @Override public void onError(Throwable e) { LogUtil.log("behaviorSubject:error"); } @Override public void onNext(String s) { LogUtil.log("behaviorSubject:"+s); } }); behaviorSubject.onNext("behaviorSubject3"); behaviorSubject.onNext("behaviorSubject4");

結果

behaviorSubject2behaviorSubject3behaviorSubject4

如果在behaviorSubject.subscribe()之前不發送behaviorSubject1behaviorSubject2,則Observer會先接收到default,再接收behaviorSubject3behaviorSubject4

4.4 AsyncSubject

Observer會接收AsyncSubjectonComplete()之前的最后一個數據,如果因異常而終止,AsyncSubject將不會釋放任何數據,但是會向Observer傳遞一個異常通知。示例代碼如下:

AsyncSubject<String> asyncSubject = AsyncSubject.create(); asyncSubject.onNext("asyncSubject1"); asyncSubject.onNext("asyncSubject2"); asyncSubject.onNext("asyncSubject3"); asyncSubject.onCompleted(); asyncSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { LogUtil.log("asyncSubject onCompleted"); //輸出 asyncSubject onCompleted } @Override public void onError(Throwable e) { LogUtil.log("asyncSubject onError"); //不輸出(異常才會輸出) } @Override public void onNext(String s) { LogUtil.log("asyncSubject:"+s); //輸出asyncSubject:asyncSubject3 } });

輸出結果:

asyncSubject3

以上代碼,Observer只會接收asyncSubjectonCompleted()被調用前的最后一個數據,即“asyncSubject3”,如果不調用onCompleted()Subscriber將不接收任何數據。

5. 隱含的規則

如果你把 Subject 當作一個 Subscriber 使用,不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調用,這會違反Observable協議,給Subject的結果增加了不確定性。要避免此類問題,官方提出了“串行化”,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

Rx 中有一些隱含的規則在代碼中并不太容易看到。一個重要的規則就是當一個事件流結束(onError 或者 onCompleted 都會導致事件流結束)后就不會發射任何數據了。這些 Subject 的實現都遵守這個規則,subscribe 函數也拒絕違反該規則的情況。

Subject<Integer, Integer> s = ReplaySubject.create();s.subscribe(v -> System.out.println(v));s.onNext(0);s.onCompleted();s.onNext(1);s.onNext(2);

結果

0

但是在 Rx 實現中并沒有完全確保這個規則,所以你在使用 Rx 的過程中要注意遵守該規則,否則會出現意料不到的情況。

6. 代碼示例

6.1 創建Observable并發射數據

Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("I'm Observable"); subscriber.onCompleted(); } });

用Subject實現為

PublishSubject<String> publishSubject = PublishSubject.create();publishSubject.onNext("as Observable");publishSubject.onCompleted();

6.2 創建Observer訂閱Observable并接收數據:

mObservable.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { //接收數據 }});

用Subject實現為:

publishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } });

6.3 把Subject當作Observer傳入subscribe()中

PublishSubject<String> publishSubject = PublishSubject.create();Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("as Observer"); subscriber.onCompleted(); }}).subscribe(publishSubject);

有沒有發現問題?publishSubject沒有重寫onNext()方法啊,在哪接收的數據?這就是前面說的“橋梁”的問題了,盡管把Subject作為Observer傳入subscribe(),但接收數據還是要通過Observer來接收,借用Subject來連接Observable和Observer,整體代碼如下:

PublishSubject<String> publishSubject = PublishSubject.create(); Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("as Bridge"); subscriber.onCompleted(); } }).subscribe(publishSubject); publishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { LogUtil.log("subject:"+s); //接收到 as Bridge } });

這就是橋梁的意思!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 鄂尔多斯市| 宜君县| 黄山市| 托克逊县| 昌图县| 资溪县| 双流县| 广元市| 盐池县| 乐都县| 武宣县| 扎囊县| 浦城县| 寿光市| 柳河县| 合作市| 通榆县| 民权县| 林芝县| 寻甸| 册亨县| 万盛区| 靖边县| 五台县| 鸡泽县| 封丘县| 三门县| 榆中县| 拜城县| 东乡族自治县| 武安市| 祁门县| 通州区| 江口县| 鄂尔多斯市| 安义县| 阿勒泰市| 富锦市| 蓬莱市| 京山县| 苍山县|