国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Java > 正文

Rxjava Subject分析

2019-11-07 23:41:15
字體:
來源:轉載
供稿:網友

Subject = Observable + Observer

看看官方的描述:

Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現中(如Rxjava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的數據,也可以發射新的數據。

由于一個Observable訂閱一個Observable,它可以觸發這個Observable開始發射數據(如果那個Observable是”冷”的–就是說,它等待有訂閱才開始發射數據)。因此有這樣的效果,Subject可以把原來那個”冷”的Observable變成”熱”的。

通俗的理解就是:

subject是一個神奇的對象,它可以是一個Observable同時也可以是一個Observer:它作為連接這兩個世界的一座橋梁。一個Subject可以訂閱一個Observable,就像一個觀察者,并且它可以發射新的數據,或者傳遞它接受到的數據,就像一個Observable。很明顯,作為一個Observable,觀察者們或者其它Subject都可以訂閱它。

一旦Subject訂閱了Observable,它將會觸發Observable開始發射。如果原始的Observable是“冷”的,這將會對訂閱一個“熱”的Observable變量產生影響。

Subject的種類

針對不同的場景一共有四種類型的Subject。他們并不是在所有的實現中全部都存在,而且一些實現使用其它的命名約定(例如,在RxScala中Subject被稱作PublishSubject)。

RxJava提供的四種不同的Subject:

1.PublishSubject

PublishSubject僅會向Observer釋放在訂閱之后Observable釋放的數據。

官方介紹:

PublishSubject只會把在訂閱發生的時間點之后來自原始Observable的數據發射給觀察者。需要注意的是,PublishSubject可能會一創建完成就立刻開始發射數據(除非你可以阻止它發生),因此這里有一個風險:在Subject被創建后到有觀察者訂閱它之前這個時間段內,一個或多個數據可能會丟失。如果要確保來自原始Observable的所有數據都被分發,你需要這樣做:或者使用Create創建那個Observable以便手動給它引入”冷”Observable的行為(當所有觀察者都已經訂閱時才開始發射數據),或者改用ReplaySubject。

這里寫圖片描述 如果原始的Observable因為發生了一個錯誤而終止,BehaviorSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

這里寫圖片描述

2.BehaviorSubject

當Observer訂閱了一個BehaviorSubject,它一開始就會釋放Observable最近釋放的一個數據對象,當還沒有任何數據釋放時,它則是一個默認值。接下來就會釋放Observable釋放的所有數據。如果Observable因異常終止,BehaviorSubject將不會向后續的Observer釋放數據,但是會向Observer傳遞一個異常通知。

官方介紹:

當觀察者訂閱BehaviorSubject時,它開始發射原始Observable最近發射的數據(如果此時還沒有收到任何數據,它會發射一個默認值),然后繼續發射其它任何來自原始Observable的數據。

這里寫圖片描述 然而,如果原始的Observable因為發生了一個錯誤而終止,BehaviorSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

這里寫圖片描述

3.ReplaySubject

不管Observer何時訂閱ReplaySubject,ReplaySubject會向所有Observer釋放Observable釋放過的數據。 有不同類型的ReplaySubject,它們是用來限定Replay的范圍,例如設定Buffer的具體大小,或者設定具體的時間范圍。 如果使用ReplaySubject作為Observer,注意不要在多個線程中調用onNext、onComplete和onError方法,因為這會導致順序錯亂,這個是違反了Observer規則的。

官方介紹:

ReplaySubject會發射所有來自原始Observable的數據給觀察者,無論它們是何時訂閱的。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間后會丟棄舊的數據(原始Observable發射的)。

如果你把ReplaySubject當作一個觀察者使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調用,這會違反Observable協議,給Subject的結果增加了不確定性。

這里寫圖片描述

4.AsyncSubject

AsyncSubject僅釋放Observable釋放的最后一個數據,并且僅在Observable完成之后。然而如果當Observable因為異常而終止,AsyncSubject將不會釋放任何數據,但是會向Observer傳遞一個異常通知。

官方介紹:

一個AsyncSubject只在原始Observable完成后,發射來自原始Observable的最后一個值。(如果原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最后一個值發射給任何后續的觀察者。

這里寫圖片描述

然而,如果原始的Observable因為發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

這里寫圖片描述

小結

回顧上面所講的,不難看出不同的Subject最大的區別在于發送數據的行為不同,簡單概括如下:

Subject 發射行為
AsyncSubject 不論訂閱發生在什么時候,只會發射最后一個數據
BehaviorSubject 發送訂閱之前一個數據和訂閱之后的全部數據
ReplaySubject 不論訂閱發生在什么時候,都發射全部數據
PublishSubject 發送訂閱之后全部數據

關于Subject更詳細的使用方法請查閱官方文檔。

RxJava的對應類

假設你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你可以調用它的asObservable方法,這個方法返回一個Observable。具體使用方法可以參考Javadoc文檔。

如果你把 Subject 當作一個 Subscriber 使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調用,這會違反Observable協議,給Subject的結果增加了不確定性。

要避免此類問題,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:

mySafeSubject = new SerializedSubject( myUnsafeSubject );11

通過BehaviorSubject 來制作緩存

代碼大致形式:

api.getData().subscribe(behaviorSubject); // 判斷cache為空則獲取數據,網絡數據會被緩存behaviorSubject.subscribe(observer);// 之前的緩存將直接送達observer1212

詳細代碼:

BehaviorSubject<List<Item>> cache;public Subscription subscribeData(@NonNull Observer<List<Item>> observer) { //判斷內存緩存是否為空 if (cache == null) { cache = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<List<Item>>() { @Override public void call(Subscriber< ? super List<Item>> subscriber) { List<Item> items = Database.getInstance().readItems(); //判斷硬盤緩存是否為空 if (items == null) { //從網絡讀取數據 loadFromNetwork(); } else { //發送硬盤數據 subscriber.onNext(items); } } }).subscribeOn(Schedulers.io()) .subscribe(cache); } return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer); } subscription = subscribeData(new Observer<List<Item>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { swipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } @Override public void onNext(List<Item> items) { swipeRefreshLayout.setRefreshing(false); adapter.setItems(items); } });12345678910111213141516171819202122232425262728293031323334353637383940411234567891011121314151617181920212223242526272829303132333435363738394041

通過PublishSubject實現傳統的Observable Hello World

PublishSubject<String> stringPublishSubject = PublishSubject.create();Subscription subscriptionPRint = stringPublishSubject.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh,no!Something wrong happened!"); } @Override public void onNext(String message) { System.out.println(message); }});stringPublishSubject.onNext("Hello World");123456789101112131415161718123456789101112131415161718

在剛才的例子中,我們創建了一個PublishSubject,用create()方法發射一個String值,然后我們訂閱了PublishSubject。此時,沒有數據要發送,因此我們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源。就在這隨時準備從subject接收值,如果subject沒有發射值那么我們的觀察者就會一直在等待。再次聲明的是,無需擔心:觀察者知道在每個場景中該做什么,我們不用擔心什么時候是因為它是響應式的:系統會響應。我們并不關心它什么時候響應。我們只關心它響應時該做什么。

最后一行代碼展示了手動發射字符串“Hello World”,它觸發了觀察者的onNext()方法,讓我們在控制臺打印出“Hello World”信息。

讓我們看一個更復雜的例子。話說我們有一個private聲明的Observable,外部不能訪問。Observable在它生命周期內發射值,我們不用關心這些值,我們只關心他們的結束。

首先,我們創建一個新的PublishSubject來響應它的onNext()方法,并且外部也可以訪問它。

final PublishSubject<Boolean> subject = PublishSubject.create();subject.subscribe(new Observer<Boolean>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Boolean aBoolean) { System.out.println("Observable Completed"); }});123456789101112131415161718123456789101112131415161718

然后,我們創建“私有”的Observable,只有subject才可以訪問的到。

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); }}).doOnCompleted(new Action0() { @Override public void call() { subject.onNext(true); }}).subscribe();12345678910111213141234567891011121314

Observable.create()方法包含了我們熟悉的for循環,發射數字。doOnCompleted()方法指定當Observable結束時要做什么事情:在subject上發射true。最后,我們訂閱了Observable。很明顯,空的subscribe()調用僅僅是為了開啟Observable,而不用管已發出的任何值,也不用管完成事件或者錯誤事件。為了這個例子我們需要它像這樣。

在這個例子中,我們創建了一個可以連接Observables并且同時可被觀測的實體。當我們想為公共資源創建獨立、抽象或更易觀測的點時,這是極其有用的。


RxJava的Subject源碼分析

package rx.subjects;import rx.Observable;import rx.Observer;import rx.Subscriber;/** * Represents an object that is both an Observable and an Observer. */public abstract class Subject<T, R> extends Observable<R> implements Observer<T> { protected Subject(OnSubscribe<R> onSubscribe) { super(onSubscribe); } public abstract boolean hasObservers(); public final SerializedSubject<T, R> toSerialized() { return new SerializedSubject<T, R>(this); }}12345678910111213141516171819201234567891011121314151617181920

BehaviorSubject源碼分析

BehaviorSubject訂閱subscribe過程

在需要使用subject時,調用Subject的subscribe(..)方法,該方法實際會調用下面這個subscribe(Subscriber< ? super T> subscriber)方法,所以其他的subscribe方法都要將輸入參數轉化為一個Subscriber對象。

public final Subscription subscribe(Subscriber<? super T> subscriber) { ... // new Subscriber so onStart it subscriber.onStart(); ... // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }123456789101112131415161718192021222324252627282930313233123456789101112131415161718192021222324252627282930313233

方法中hook.onSubsribeStart(this, onSubscribe).call(subscriber)默認情況下等價于onSubscribe.call(subscriber)。onSubscriber是什么呢?這個就需要了解BehaviorSubject的構造方法

protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) { super(onSubscribe); this.state = state; }12341234

其中調用了父類Subject的構造方法

protected Subject(OnSubscribe<R> onSubscribe) { super(onSubscribe); }123123

其中調用了父類Observer的構造方法

protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }123123

onSubscribe即是BehaviorSubject構造方法中傳入的第一個參數。

BehaviorSubject有3個靜態工廠方法用來生產BehaviorSubject對象。

public final class BehaviorSubject<T> extends Subject<T, T> { public static <T> BehaviorSubject<T> create() { return create(null, false); } public static <T> BehaviorSubject<T> create(T defaultValue) { return create(defaultValue, true); } private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) { final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>(); if (hasDefault) { state.set(NotificationLite.instance().next(defaultValue)); } state.onAdded = new Action1<SubjectObserver<T>>() { @Override public void call(SubjectObserver<T> o) { o.emitFirst(state.get(), state.nl); } }; state.onTerminated = state.onAdded; return new BehaviorSubject<T>(state, state); } ....}1234567891011121314151617181920212223242526272812345678910111213141516171819202122232425262728

前兩個Public的靜態構造方法實際上調用的是第三個private方法。

最后return new BehaviorSubject(state, state),所以onSubscribe實際為一個SubjectSubscriptionManager的對象,onSubscribe.call(subscriber)實際調用的是SubjectSubscriptionManager的call方法。

/* package */final class SubjectSubscriptionManager<T> implements OnSubscribe<T> { ... @Override public void call(final Subscriber<? super T> child) { SubjectObserver<T> bo = new SubjectObserver<T>(child); addUnsubscriber(child, bo); onStart.call(bo); if (!child.isUnsubscribed()) { if (add(bo) && child.isUnsubscribed()) { remove(bo); } } }}12345678910111213141234567891011121314調用addUnsubscriber方法,注冊一個在取消訂閱時執行的一個動作,即將該觀擦者Observer移除掉。/** Registers the unsubscribe action for the given subscriber. */ void addUnsubscriber(Subscriber<? super T> child, final SubjectObserver<T> bo) { child.add(Subscriptions.create(new Action0() { @Override public void call() { remove(bo); } })); } 123456789123456789調用add(SubjectObserver< T> o)方法,將該Observer加入已經注冊的Observer[]數組當中。boolean add(SubjectObserver<T> o) { do { State oldState = state; if (oldState.terminated) { onTerminated.call(o); return false; } State newState = oldState.add(o); if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { onAdded.call(o); return true; } } while (true); }12345678910111213141234567891011121314

該方法會調用onAdd.call(o)。BehaviorSubject的onAdd對象如下,state.get()得到的是最近的數據對象,o.emitFirst即會釋放最近的數據對象,這正體現了BehaviorSubject的特點。

state.onAdded = new Action1<SubjectObserver<T>>() { @Override public void call(SubjectObserver<T> o) { o.emitFirst(state.get(), state.nl); } };1234567812345678

在這個過程中使用了SubjectSubscriptionManager的兩個內部類。

State< T> 該類用來管理已經注冊的Observer數組,以及他們的狀態。/** State-machine representing the termination state and active SubjectObservers. */ protected static final class State<T> { final boolean terminated; final SubjectObserver[] observers; static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0]; static final State TERMINATED = new State(true, NO_OBSERVERS); static final State EMPTY = new State(false, NO_OBSERVERS); public State(boolean terminated, SubjectObserver[] observers) { this.terminated = terminated; this.observers = observers; } public State add(SubjectObserver o) { ... } public State remove(SubjectObserver o) { ... } }1234567891011121314151617181912345678910111213141516171819

2.SubjectObserver 該類時Observer的一個裝飾類,運用了裝飾模式給Observer類添加新的功能。

以上就是Subject對象訂閱Observer時的流程。


BehaviorSubject的onNext

Behavior的onNext(T v)方法如下

@Override public void onNext(T v) { Object last = state.get(); if (last == null || state.active) { Object n = nl.next(v); for (SubjectObserver<T> bo : state.next(n)) { bo.emitNext(n, state.nl); } } }1234567891012345678910

state是SubjectSubscriptionManager類的對象,是這個對象來維護最近釋放的數據對象,state.get()獲取最近釋放的數據對象,state.next(Object n)方法重新設置最近釋放的數據對象,并返回已經注冊的Observer數組。

SubjectObserver<T>[] next(Object n) { set(n); return state.observers; }12341234

bo.emitNext(Object n, final NotificationLite< T> nl)釋放給定的數據對象。

BehaviorSubject的onCompleted和onError

onCompleted和onError會調用SubjectSubscriptionManager的terminate(Object n)方法,該方法會重新設置最近釋放的數據對象,設置Subject狀態為TERMINATED,表示終結了,最后返回已注冊的Observer數組。

SubjectObserver<T>[] terminate(Object n) { set(n); active = false; State<T> oldState = state; if (oldState.terminated) { return State.NO_OBSERVERS; } return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers; }1234567891012345678910

參考文章: http://www.liuhaihua.cn/archives/133614.html http://blog.csdn.net/sun927/article/details/44818845

(function () {('pre.prettyprint code').each(function () { var lines = (this).text().split(′/n′).length;varnumbering = $('').addClass('pre-numbering').hide(); (this).addClass(′has?numbering′).parent().append(numbering); for (i = 1; i
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 东乌珠穆沁旗| 罗城| 黄骅市| 梅州市| 古浪县| 清镇市| 桃江县| 大关县| 武威市| 玛多县| 泌阳县| 岫岩| 宁都县| 察哈| 德惠市| 广州市| 台南市| 苗栗市| 景泰县| 裕民县| 周口市| 黑山县| 于都县| 临澧县| 桂林市| 伊金霍洛旗| 宜兰县| 龙山县| 饶阳县| 楚雄市| 南安市| 文昌市| 米脂县| 分宜县| 康保县| 大理市| 嘉鱼县| 连城县| 海淀区| 江西省| 吉首市|