在一些常見的編程情形中,使用任務也許能提升性能。為了簡化變成,靜態類System.Threading.Tasks.Parallel封裝了這些常見的情形,它內部使用Task對象。
Parallel.For(0, 1000, (i) => { //i是從0開始一直到1000結束 }); var lst = new List<string>(); Parallel.ForEach(lst, (s) => { //do something }); //Invoke Parallel.Invoke( () => { }, () => { }, () => { } ); 三個不同的方式做并行操作,Invoke是自己定義的并行,像上面的代碼,只會開三個線程去做,而For和Foreach和普通的沒什么區別只是并行去做了.
這里需要注意的是:這個三個方法都會堵塞當前線程,要等待所有線程都做完了以后才能往下執行,這是有用的,當進行數據分析的時候,每條數據都是獨立的,這個時候等待所有線程做完是有意義的,而如果需要不堵塞線程可以用TASK來包一層,具體請參照【C#】線程之Task。在并發的時候如果有某些線程出現了異常,這個時候不會中斷線程,會在所有線程結束的時候拋出AggregateException(并不是所有異常都能用這個捕獲),我們通過這個異??梢灾浪械漠惓#?/p>
Parallel.ForEach(new List<string> { "aa", "bb", "cc", "dd", "ee", }, (b) => { Thread.Sleep(1000); throw new Exception(b); }); } catch (AggregateException ex) { foreach (var exception in ex.Flatten().InnerExceptions) { Console.WriteLine(exception.Message); } }看一下官方給出的解釋:
public class ParallelOptions { public ParallelOptions(); //允許取消操作 public CancellationToken CancellationToken { get; set; } //允許指定可以并發操作的最大工作項目數 public int MaxDegreeOfParallelism { get; set; } //允許指定要使用哪個TaskScheduler public TaskScheduler TaskScheduler { get; set; } }這個類提供我們最大并發數量的設置,取消操作的Token賦值,以及任務調度器的設置(關于這個,需要另拉一章來說,此處只關注前面兩個)
//定義線程取消的一個對象 var cancel = new CancellationTokenSource(); var po = new ParallelOptions() { CancellationToken = cancel.Token, MaxDegreeOfParallelism = 2, }; //為了不堵塞線程,這里開啟一個線程 Task.Run(() => { try { Parallel.For(0, 1000, po, (i) => { Thread.Sleep(1000); po.CancellationToken.ThrowIfCancellationRequested(); Console.WriteLine(i); }); } catch (AggregateException ex) { foreach (var e in ex.Flatten().InnerExceptions) { Console.WriteLine(e.Message); } } catch (Exception ex) { //OperationCanceledException Console.WriteLine(ex.GetType().Name); } //不設置取消的TOKEN }, CancellationToken.None); Thread.Sleep(10 * 1000); cancel.Cancel();線程最大并行數位2個,如果取消的話,整個Parallel會全部取消,并且拋出OperationCanceledException異常。
我只挑選了一個重載來說
public static ParallelLooPResult For<TLocal> (int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
來看一下官方的解釋:
localInit:任務局部初始化,為參與工作的每一個任務都調用一次該委托這個委托在任務被要求處理一個工作項之前調用的。
boy:為參與工作的各個線程鎖處理的每一項都調用一次該委托
localFinally:任務局部終結委托,為參與工作的每一個任務都調用一次該委托,這個委托是在任務處理好派給它的所有工作項之后調用的。即使主體委托代碼引發一個未處理的異常,也會調用它。
public static void Parallel_For_Local_Test(){ int[] nums = Enumerable.Range(0, 1000000).ToArray<int>(); long total = 0; ParallelLoopResult result = Parallel.For<long>(0, nums.Length, () => { return 0; }, (j, loop, subtotal) => { // 延長任務時間,更方便觀察下面得出的結論 Thread.SpinWait(200); Console.WriteLine("當前線程ID為:{0},j為{1},subtotal為:{2}。" , Thread.CurrentThread.ManagedThreadId, j.ToString(), subtotal.ToString()); if (j == 23) loop.Break(); if (j > loop.LowestBreakIteration) { Thread.Sleep(4000); Console.WriteLine("j為{0},等待4s種,用于判斷已開啟且大于阻斷迭代是否會運行完。", j.ToString()); } Console.WriteLine("j為{0},LowestBreakIteration為:{1}", j.ToString(), loop.LowestBreakIteration); subtotal += nums[j]; return subtotal; }, (finalResult) => Interlocked.Add(ref total, finalResult) ); Console.WriteLine("total值為:{0}", total.ToString()); if (result.IsCompleted) Console.WriteLine("循環執行完畢"); else Console.WriteLine("{0}" , result.LowestBreakIteration.HasValue ? "調用了Break()阻斷循環." : "調用了Stop()終止循環.");} 看一下輸出
分析一下:
a) 泛型類型參數TLocal為本地線程數據類型,本示例設置為long。
b) 三個委托的參數解析body(j, loop, subtotal):首先初始委托localInit中返回了0,所以body委托中參數subtotal的初始值即為0,body委托的參數j對應的是當前迭代索引,參數loop為當前迭代狀態ParallelLoopState對象;localFinally委托參數為body委托的返回值。
c) 三個委托三個階段中都可能并行運行,因此您必須同步對任何共享變量的訪問,如示例中在finally委托中使用了System.Threading.Interlocked對象。
d) 在索引為23的迭代中調用Break()后:
i. 索引小于23的所有迭代仍會運行(即使還未開始處理),并在退出循環之前處理完。
ii. 索引大于 23 的迭代若還未開啟則會被放棄;若已處于運行中則會在退出循環之前處理完。
e) 對于調用Break()之后,在任何循環迭代中訪問LowestBreakIteration屬性都會返回調用Break()的迭代對應的索引。
注:滴答的雨(何雨泉)的幫助
可用來使 Tasks.Parallel 循環的迭代與其他迭代交互,并為 Parallel 類的循環提供提前退出循環的功能。此類的實例不要自行創建,它由 Parallel 類創建并提供給每個循環項,并且只應該在提供此實例的“循環內部”使用。
public class ParallelLoopState{ // 獲取循環的任何迭代是否已引發相應迭代未處理的異常。 public bool IsExceptional { get; } // 獲取循環的任何迭代是否已調用 ParallelLoopState.Stop()。 public bool IsStopped { get; } // 獲取在Parallel循環中調用 ParallelLoopState.Break() 的最低循環迭代。 public long? LowestBreakIteration { get; } // 獲取循環的當前迭代是否應基于此迭代或其他迭代發出的請求退出。 public bool ShouldExitCurrentIteration { get; } //通知Parallel循環當前迭代”之后”的其他迭代不需要運行。 public void Break(); //通知Parallel循環當前迭代“之外”的所有其他迭代不需要運行。 public void Stop();}
Break()
Break()用于通知Parallel循環當前迭代“之后”的其他迭代不需要運行。例如,對于從 0 到 1000 并行迭代的 for 循環,如果在第 100 次迭代調用 Break(),則低于 100 的所有迭代仍會運行(即使還未開始處理),并在退出循環之前處理完。從 101 到 1000 中還未開啟的迭代則會被放棄。
對于已經在執行的長時間運行迭代,Break()將為已運行還未結束的迭代對應ParallelLoopResult結構的LowestBreakIteration屬性設置為調用Bread()迭代項的索引。
Stop()
Stop() 用于通知Parallel循環當前迭代“之外”的所有其他迭代不需要運行,無論它們是位于當前迭代的上方還是下方。
對于已經在執行的長時間運行迭代,可以檢查 IsStopped屬性,在觀測到是 true 時提前退出。
Stop 通常在基于搜索的算法中使用,在找到一個結果之后就不需要執行其他任何迭代。(比如在看視頻或漫畫時自動匹配響應最快的服務器)
ShouldExitCurrentIteration 屬性
當循環的迭代調用 Break 或 Stop時,或一個迭代引發異常,或取消循環時,Parallel 類將主動嘗試禁止開始執行循環的其他迭代。但是,可能有無法阻止其他迭代啟動的情況。也可能是長時間運行的迭代已經開始執行的情況。在此類情況下,迭代可以通過顯式檢查 ShouldExitCurrentIteration 屬性,在該屬性返回 true 時停止執行。
LowestBreakIteration 屬性
返回過程中調用過Break方法的最低的項,如果從來沒有調用過Break則返回null.
這里還有一個
新聞熱點
疑難解答