国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

創建和訂閱簡單可觀察序列

2019-11-06 06:42:04
字體:
來源:轉載
供稿:網友

您不需要手動實現IObservable 接口來創建可觀察的序列。 同樣,您不需要實現IObserver 來訂閱序列。 通過安裝Reactive Extension程序集,您可以利用Observable類型,該類型提供了許多靜態LINQ運算符,用于創建具有零個,一個或多個元素的簡單序列。 此外,Rx提供了訂閱擴展方法,根據代理采用OnNext,OnError和OnCompleted處理程序的各種組合。

創建和訂閱簡單序列

以下示例使用Observable類型的Range運算符創建一個簡單的observable數字集合。 觀察者使用Observable類的Subscribe方法訂閱此集合,并提供作為處理OnNext,OnError和OnCompleted的委托的操作。 Range運算符有幾個重載。 在我們的示例中,它創建一個以x開頭的整數序列,然后生成y個序列號。 一旦訂閱發生,值將被發送到觀察者。 OnNext委托打印輸出值。

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Reactive;using System.Reactive.Linq;using System.Reactive.Subjects;namespace SimpleSequence{ class PRogram { static void Main(string[] args) { IObservable<int> source = Observable.Range(1, 10); IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose(); } }}

  當觀察者訂閱可觀察序列時,調用Subscribe方法的線程可以不同于序列運行直到完成的線程。因此,Subscribe調用是異步的,因為調用者不會被阻塞,直到序列的觀察完成。這將在使用計劃程序主題中的更多詳細信息。   請注意,Subscribe方法返回一個IDisposable,以便您可以取消訂閱序列并輕松處理它。當在observable序列上調用Dispose方法時,觀察器將停止監聽observable的數據。通常,您不需要顯式調用Dispose,除非您需要提前取消訂閱,或者當源可觀察序列具有比觀察者更長的壽命時。 Rx中的訂閱設計用于不使用終結器的火災忘記場景。當IDisposable實例由垃圾收集器收集時,Rx不會自動處理預訂。但是,請注意,Observable運算符的默認行為是盡快處理訂閱(即,發布OnCompleted或OnError消息時)。例如,代碼var x = Observable.Zip(a,b).Subscribe();將向序列a和b訂閱x。如果a拋出錯誤,x將立即從b取消訂閱。   您還可以調整代碼示例以使用Observable類型的Create運算符,該運算符從指定的OnNext,OnError和OnCompleted操作代理創建并返回觀察器。然后,您可以將此觀察器傳遞給Observable類型的Subscribe方法。以下示例顯示如何執行此操作。

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Reactive;using System.Reactive.Linq;using System.Reactive.Subjects;namespace SimpleSequence{ class Program { static void Main(string[] args) { IObservable<int> source = Observable.Range(1, 10); IObserver<int> obsvr = Observer.Create<int>( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); IDisposable subscription = source.Subscribe(obsvr); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose(); } }}

  除了從頭創建可觀察序列之外,您還可以將現有枚舉器,.NET事件和異步模式轉換為可觀察序列。 本節中的其他主題將向您展示如何做到這一點。   請注意,本主題僅顯示幾個可以從頭創建可觀察序列的運算符。 要了解有關其他LINQ運算符的更多信息,請參閱使用LINQ運算符查詢可觀察序列。

使用定時器

以下示例使用Timer運算符創建序列。 該序列將在經過5秒后推出第一個值,然后它將每隔1秒推出后續值。 為了說明的目的,我們將Timestamp運算符鏈接到查詢,以便每個推出的值將在發布時附加。 通過這樣做,當我們訂閱這個源序列時,我們可以接收它的值和時間戳。

Console.WriteLine(“Current Time: “ + DateTime.Now);var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)) .Timestamp();using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp))) { Console.WriteLine("Press any key to unsubscribe"); Console.ReadKey(); }Console.WriteLine("Press any key to exit");Console.ReadKey();

輸出將類似于: 當前時間:5/31/2011 5:35:08 PM 按任意鍵取消訂閱 0:5/31/2011 5:35:13 PM -07:00 1:5/31/2011 5:35:14 PM -07:00 2:5/31/2011 5:35:15 PM -07:00 通過使用Timestamp運算符,我們驗證了第一個項目確實在序列開始后5秒被推出,每個項目在1秒后發布。

將Ienumerable集合轉換為可觀察序列

使用ToObservable運算符,可以將通用ienumerable集合轉換為observable序列并進行訂閱。

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };IObservable<int> source = e.ToObservable();IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted"));Console.ReadKey();

冷與熱觀察

Cold observables在訂閱時開始運行,即,當調用Subscribe時,observable序列僅開始向觀察者推送值。值也不會在訂閱者之間共享。這不同于熱的可觀察量,例如鼠標移動事件或股票行情,即使在訂閱活動之前已經產生值。當觀察者訂閱熱可觀察序列時,它將獲得流中的當前值。熱可觀察序列在所有訂戶之間共享,并且每個訂戶被推送到序列中的下一個值。例如,即使沒有人訂閱了特定的股票行情,股票代碼將繼續根據市場變動更新其價值。當用戶注冊該股票的權益時,它將自動獲得最新的股票。 下面的例子演示了一個冷的可觀察序列。在這個例子中,我們使用Interval操作符來創建一個簡單的可觀察的序列,在特定的時間間隔(在這種情況下,每1秒鐘)。 然后兩個觀察者訂閱這個序列并打印出它的值。您會注意到,每個訂戶的序列都會重置,其中第二個訂閱將從第一個值重新啟動序列。

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1)); IDisposable subscription1 = source.Subscribe( x => Console.WriteLine("Observer 1: OnNext: {0}", x), ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 1: OnCompleted"));IDisposable subscription2 = source.Subscribe( x => Console.WriteLine("Observer 2: OnNext: {0}", x), ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 2: OnCompleted"));Console.WriteLine("Press any key to unsubscribe");Console.ReadLine();subscription1.Dispose();subscription2.Dispose();

  在下面的示例中,我們使用Publish運算符將先前的cold observable序列源轉換為熱序列,該運算符返回一個名為hot的IConnectableObservable實例。發布運算符提供了通過向多個訂戶廣播單個訂閱來共享訂閱的機制。熱作為代理并訂閱源,然后當它從源接收值時,將它們推送到其自己的訂閱者。要建立對備份源的預訂并開始接收值,我們使用IConnectableObservable.Connect()方法。由于IConnectableObservable繼承IObservable,因此即使在開始運行之前,我們也可以使用Subscribe訂閱此熱序列。請注意,在示例中,當訂閱1訂閱時,熱序列尚未啟動。因此,沒有值被推送到訂戶。調用Connect后,值隨后被推送到subscription1。延遲3秒后,subscription2訂閱hot并開始立即從當前位置(在這種情況下為3)直到結束接收值。輸出如下所示:    Current Time: 6/1/2011 3:38:49 PM Current Time after 1st subscription: 6/1/2011 3:38:49 PM Current Time after Connect: 6/1/2011 3:38:52 PM Observer 1: OnNext: 0 Observer 1: OnNext: 1 Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM Observer 1: OnNext: 2 Observer 1: OnNext: 3 Observer 2: OnNext: 3 Observer 1: OnNext: 4 Observer 2: OnNext: 4

Console.WriteLine("Current Time: " + DateTime.Now);var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequenceIConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequenceIDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point x => Console.WriteLine("Observer 1: OnNext: {0}", x), ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 1: OnCompleted"));Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);Thread.Sleep(3000); //idle for 3 secondshot.Connect(); // hot is connected to source and starts pushing value to subscribers Console.WriteLine("Current Time after Connect: " + DateTime.Now);Thread.Sleep(3000); //idle for 3 secondsConsole.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription x => Console.WriteLine("Observer 2: OnNext: {0}", x), ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 2: OnCompleted"));Console.ReadKey();

返回


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 嘉兴市| 横山县| 定南县| 丰都县| 于都县| 东丽区| 都江堰市| 肥城市| 鹿泉市| 临漳县| 南投市| 平南县| 彰化县| 巍山| 上饶市| 安龙县| 定西市| 邳州市| 惠安县| 客服| 古蔺县| 青神县| 临澧县| 天等县| 抚州市| 赣州市| 阿坝| 铅山县| 安塞县| 武陟县| 赣榆县| 绍兴县| 婺源县| 百色市| 江孜县| 江城| 永丰县| 和田县| 汶上县| 文山县| 福海县|