在與現有.NET事件橋接時,我們已將現有.NET事件轉換為可觀察序列以訂閱它們。 在本主題中,我們將觀察可觀察序列的一類本質作為IObservable 對象,其中通用的LINQ操作符由Rx程序集提供以操作這些對象。 大多數運算符采用可觀察序列并對其執行一些邏輯并輸出另一個可觀察序列。 此外,從代碼示例中可以看出,您甚至可以在源序列上鏈接多個運算符,以根據您的確切要求調整結果序列。
我們已經使用了前面主題中的Create和Generate運算符來創建和返回簡單序列。 我們還使用FromEventPattern運算符將現有的.NET事件轉換為可觀察的序列。 在本主題中,我們將使用Observable類型的其他靜態LINQ運算符,以便可以對數據進行過濾,分組和轉換。 這樣的算子將可觀察序列作為輸入,并產生可觀測序列作為輸出。
在本節中,我們將研究一些將各種可觀察序列組合成單個可觀察序列的運算符。 請注意,當我們組合序列時,數據不會被轉換。 在下面的示例中,我們使用Concat運算符將兩個序列組合成一個序列并訂閱。 為了說明的目的,我們將使用非常簡單的Range(x,y)運算符來創建一個以x開頭的整數序列,然后生成y個序列號。
var source1 = Observable.Range(1, 3);var source2 = Observable.Range(1, 3);source1.Concat(source2) .Subscribe(Console.WriteLine);Console.ReadLine();注意,結果序列是1,2,3,1,2,3。這是因為當您使用Concat運算符時,第二個序列(source2)直到第一個序列(source1)完成推送其所有值后才會被激活。只有在source1完成后,source2才會開始將值推送到結果序列。然后,訂戶將從結果序列中獲得所有值。 將此與合并運算符進行比較。如果運行以下示例代碼,您將獲得1,1,2,2,3,3。這是因為兩個序列同時是活動的,并且值在源中發生時被推出。結果序列僅在最后一個源序列完成推送值時完成。 注意,為了合并工作,所有源可觀察序列需要具有相同類型的IObservable 。所得到的序列將具有類型IObservable 。如果source1在序列中間產生一個OnError,那么結果序列將立即完成。
var source1 = Observable.Range(1, 3);var source2 = Observable.Range(1, 3);source1.Merge(source2) .Subscribe(Console.WriteLine);Console.ReadLine();可以使用Catch運算符進行另一個比較。 在這種情況下,如果source1完成沒有任何錯誤,則source2將不會啟動。 因此,如果運行以下示例代碼,您將只得到1,2,3,因為source2(它產生4,5,6)被忽略。
var source1 = Observable.Range(1, 3);var source2 = Observable.Range(4, 3);source1.Catch(source2) .Subscribe(Console.WriteLine);Console.ReadLine();最后,讓我們來看看OnErrorResumeNext。 此操作符將移動到source2,即使source1由于錯誤而無法完成。 在以下示例中,即使source1表示以異常(通過使用Throw運算符)終止的序列,訂閱方將接收由source2發布的值(1,2,3)。 因此,如果您期望源序列產生任何錯誤,則更安全的是使用OnErrorResumeNext來保證訂戶仍然會收到一些值。
var source1 = Observable.Throw<int>(new Exception("An error has occurred."));var source2 = Observable.Range(4, 3);source1.OnErrorResumeNext(source2) .Subscribe(Console.WriteLine);Console.ReadLine();注意,對于所有這些組合運算符工作,所有可觀察的序列需要是相同類型的T.
Select操作符可以將可觀察序列的每個元素轉換為另一種形式。 在下面的例子中,我們分別將整數序列投影成長度為n的字符串。
var seqNum = Observable.Range(1, 5);var seqString = from n in seqNum select new string('*', (int)n);seqString.Subscribe(str => { Console.WriteLine(str); });Console.ReadKey();在下面的示例中,它是.NET事件轉換示例的擴展,我們在“使用現有.NET事件橋接”主題中看到,我們使用選擇運算符將IEventPattern 數據類型投影到Point類型中。 這樣,我們將鼠標移動事件序列轉換為可以進一步解析和處理的數據類型,如下一個“過濾”部分所示。
var frm = new Form();IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");IObservable<System.Drawing.Point> points = from evt in move select evt.EventArgs.Location;points.Subscribe(pos => Console.WriteLine("mouse at " + pos));application.Run(frm);最后,讓我們看看SelectMany運算符。 SelectMany運算符有很多重載,其中一個接受選擇器函數參數。這個選擇器函數在由source observable推出的每個值上調用。對于這些值中的每一個,選擇器將它投射到迷你可觀察序列中。最后,SelectMany操作符將所有這些迷你序列平坦化為單個結果序列,然后將其推送到訂閱者。 SelectMany返回的observable在源序列之后發布OnCompleted,并且選擇器生成的所有mini observable序列都已完成。當源流中發生錯誤時,選擇器函數拋出異常時,或者在任何mini observable序列中發生錯誤時,將觸發OnError。 在下面的示例中,我們首先創建一個源序列,每5秒產生一個整數,然后決定采用生成的前2個值(通過使用Take運算符)。然后,我們使用SelectMany來使用另一個{100,101,102}序列來投影這些整數。通過這樣做,產生兩個小的可觀察序列,{100,101,102}和{100,101,102}。這些最終被平坦化成{100,101,102,100,101,102}的單個整數流,并被推送到觀察者。
var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);var PRoj = Observable.Range(100, 3);var resultSeq = source1.SelectMany(proj);var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()), ex => Console.WriteLine("Error : {0}", ex.ToString()), () => Console.WriteLine("Completed"));Console.ReadKey();在下面的示例中,我們使用主要的運算符創建一個簡單的可觀察數字序列。 主要的運算符有幾個重載。 在我們的示例中,它采用初始狀態(在我們的示例中為0),終止的條件函數(少于10次),迭代器(+1),結果選擇器(當前值的平方函數)。 ,并使用Where和Select運算符僅打印小于15的值。
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);IObservable<int> source = from n in seq where n < 5 select n;source.Subscribe(x => {Console.WriteLine(x);}); // output is 0, 1, 4, 9Console.ReadKey();以下示例是您在本主題前面看到的投影示例的擴展。 在該示例中,我們使用Select運算符將IEventPattern 數據類型投影到Point類型中。 在下面的示例中,我們使用Where和Select運算符僅選擇我們感興趣的那些鼠標移動。 在這種情況下,我們將鼠標移動到第一平分線上(其中x和y坐標相等)。
var frm = new Form(); IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");IObservable<System.Drawing.Point> points = from evt in move select evt.EventArgs.Location;var overfirstbisector = from pos in points where pos.X == pos.Y select pos;var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));Application.Run(frm);您可以使用Buffer運算符來執行基于時間的操作。 緩沖可觀察序列意味著可觀察序列的值基于指定的時間跨度或計數閾值被放入緩沖器中。這在您期望大量數據被序列推出并且訂戶沒有用于處理這些值的資源的情況下尤其有用。通過基于時間或計數緩沖結果,并且當超過標準時(或當源序列已經完成時),僅返回值序列,訂戶可以以其自己的速度處理OnNext調用。 在下面的示例中,我們首先為每秒創建一個簡單的整數序列。然后,我們使用Buffer運算符并指定每個緩沖區將保存來自序列的5個項目。當緩沖區已滿時,調用OnNext。然后,我們使用Sum運算符計算緩沖區的總和。緩沖區將自動刷新,并開始另一個循環。打印輸出將為10,35,60 …其中10 = 0 + 1 + 2 + 3 + 4,35 = 5 + 6 + 7 + 8 + 9,依此類推。
var seq = Observable.Interval(TimeSpan.FromSeconds(1));var bufSeq = seq.Buffer(5);bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));Console.ReadKey();我們還可以創建一個具有指定時間跨度的緩沖區。 在以下示例中,緩沖區將保存已累積3秒鐘的項目。 打印輸出將為3,12,21 …其中3 = 0 + 1 + 2,12 = 3 + 4 + 5,依此類推。
var seq = Observable.Interval(TimeSpan.FromSeconds(1));var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));bufSeq.Subscribe(value => Console.WriteLine(value.Sum())); Console.ReadKey();注意,如果你使用Buffer或Window,你必須確保序列在過濾之前不為空。
LINQ Operators by Categories主題列出了Observable類型按其類別實現的所有主要LINQ操作符; 具體地:創建,轉換,組合,功能,數學,時間,異常,雜項,選擇和原語。
新聞熱點
疑難解答