Parallel的靜態For,ForEach和Invoke方法
在一些常見的編程情形中,使用任務也許會提升性能。為了簡化編程,靜態類System.Threading.Tasks.Paraller封裝了這些常見的情形,它內部使用Task對象。例如,不要像下面一樣處理一個集合中的所有項:// 一個線程順序執行這個工作(每次迭代調用一次DoWork)for (Int32 i = 0; i< 1000; i++ ) DoWork(i);
相反,可以使用Parallel類型的For方法,讓多個線程池治線程幫助執行這個工作:
// 線程池的線程并行處理工作Parallel.For(0,1000,i=>DoWork(i));
類似的,如果有一個集合,那么不要像下面這樣寫:
// 一個線程順序執行這個工作(每次迭代調用一次DoWork)foreach ( var item in conllection) DoWork(item);
而是這樣做:
// 線程池的線程并行處理工作Parallel.ForEach(conllection,item=>DoWork(item));
如果代碼中既可以用For,也可以用ForEach,那么建議使用For,因為它執行的快一點。最后,如果要執行幾個方法,那么可以順序執行它們,如下所示:
// 一個線程順序執行所有方法 Method1();Method2();Method3();
也可以并行執行它們:

// 線程池的線程并行執行parallel.Invoke( () => Method1(), () => Method2(), () => Method3());

Parallel的所有方法都讓調用線程參與處理。從資源利用的角度說,這是一件好事,因為我們不希望調用線程停下來,等待線程池做完所有工作后才繼續。然而,如果調用線程在線程池完成自己的那一部分工作之前完成工作,調用程序就會將自己掛起,知道所有工作完成。這也是一件好事,因為這個提供了和普通for和foreach循環時相同的語義:線程要在所有工作后才繼續運行。還要注意,如果任何操作拋出一個未處理的異常,你調用的paraller方法最后會拋出一個AggregateException。
當然,這并不是說需要檢查自己的所有源代碼,將for循環替換成Parallel.For的調用。調用Parallel的方法時,有一個前提條件務必記住:工作項要能并行執行。因此,如果工作項必須順序執行,就不要調用Parallel的方法。另外,要避免會修改任何共享數據的工作項,因為多個線程同時處理的數據可能損壞。為了解決這個問題,一般的方法就是圍繞數據訪問添加線程同步鎖。但是這樣一來,一次就只能有一個線程訪問數據,無法享受并行處理多個想帶來的好處。 除此之外,Parallel的方法本身也有開銷:委托對象必須分配,而針對每一個工作項,都要調用一次這些委托。如果有大量可由多個線程處理的工作項,那么也許會獲得性能的提升。但是,如果只為區區幾個工作項使用Parallel的方法,或者為處理得非常快的工作項使用Parallel就會得不償失了。 Parallel的For,ForEach和Invoke方法都能接受一個ParallelOptions對象的重載版本。這個對象的定義如下:
// 存儲用于配置 Parallel 類的方法的操作的選項。 public class ParallelOptions { // 初始化 ParallelOptions 類的新實例 public ParallelOptions(); // 獲取或設置與此 ParallelOptions 實例關聯的 CancellationToken,運行取消操作 public CancellationToken CancellationToken { get; set; } // 獲取或設置此 ParallelOptions 實例所允許的最大并行度,默認為-1(可用CPU數) public int MaxDegreeOfParallelism { get; set; } // 獲取或設置與此 ParallelOptions 實例關聯的 TaskScheduler。默認為TaskScheduler.Default public TaskScheduler TaskScheduler { get; set; } }
除此之外,For和ForEach方法有一些重載版本允許傳遞3個委托:
任務局部初始化委托(localInit),為參與工作的每一個任務都調用一次委托。這個委托是在任務被要求處理一個工作項之前調用。 主體委托(body),為參與工作的各個線程所處理的每一項都調用一次委托。 任務局部終結委托(localFinally),為參與工作的每一個任務都調用一次委托。這個委托是在任務處理好派遣給它的所有工作之后調用。即使主體委托引發一個未處理的異常,也會調用它。 以下示例代碼演示了如何利用3個委托,計算一個目錄中的所有文件的字節長度總計值:
PRivate static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption) { var files = Directory.EnumerateFiles(path, searchPattern, searchOption); Int64 masterTotal = 0; ParallelLoopResult result = Parallel.ForEach<String, Int64>(files, () => { // localInit: 每個任務開始之前調用一次 // 每個任務開始之前,總計值都初始化為0 return 0; }, (file, parallelLoopState, index, taskLocalTotal) => { // body: 每個任務調用一次 // 獲得這個文件的大小,把它添加到這個任務的累加值上 Int64 fileLength = 0; FileStream fs = null; try { fs = File.OpenRead(file); fileLength = fs.Length; } catch (IOException) { /* 忽略拒絕訪問的文件 */ } finally { if (fs != null) fs.Dispose(); } return taskLocalTotal + fileLength; }, taskLocalTotal => { // localFinally: 每個任務完成后調用一次 // 將這個任務的總計值(taskLocalTotal)加到中的總計值(masterTotal)上去 Interlocked.Add(ref masterTotal, taskLocalTotal); }); return masterTotal; }

每個任務都通過taskLocalTotal變量為分配給它的文件維護自己的總計值。每個任務完成工作之后,都調用Interlocked.Add方法[對兩個 32 位整數進行求和并用和替換第一個整數],以一種線程安全的方式更新總的總計值。由于每個任務都有自己的總計值,可以在一個工作項處理期間,無需進行線程同步。由于線程同步會造成性能的損失,所以不需要線程同步是一件好事。只有在每個任務返回之后,masterTotal才需要以一種線程安全的方式更新materTotal變量。所以,因為調用Interlocked.Add方法而造成的性能損失每個任務只發生一次,而不會每個工作項都發生。
注意,我們向主題委托傳遞一個ParallelLoopState對象,它的定義如下:
// 可用來使 Parallel 循環的迭代與其他迭代交互 public class ParallelLoopState { // 獲取循環的任何迭代是否已引發相應迭代未處理的異常 public bool IsExceptional { get; } // 獲取循環的任何迭代是否已調用 Stop public bool IsStopped { get; } // 獲取從中調用 Break 的最低循環迭代。 public long? LowestBreakIteration { get; } // 獲取循環的當前迭代是否應基于此迭代或其他迭代發出的請求退出。 public bool ShouldExitCurrentIteration { get; } // 告知 Parallel 循環應在系統方便的時候盡早停止執行當前迭代之外的迭代。 public void Break(); // 告知 Parallel 循環應在系統方便的時候盡早停止執行。 public void Stop();}
參與工作的每一個任務都會獲得它自己的ParallelState對象,并可通過這個對象和參與工作的其他任務進行交互。Stop方法告訴循環停止處理任何更多的工作,未來對IsStopped屬性的查詢會返回true。Break方法告訴循環不再繼續處理當前項之后的項。例如,假如ForEach被告知要處理100項,并在第5項時調用了Break,那么循環會確保前5項處理好之后,ForEach才返回。但注意,這并不是說在這100項中,只有前5項被處理,也許第5項之后可能在以前已經處理過了。LowestBreakIteration屬性返回在處理過程中調用過Break方法的最低的項。從來沒有調用過Break,LowestBreakIteration會返回null。
處理任何一項時,如果造成一個未處理的異常,IsExceptional屬性會返回true。如果處理一項時會花費大量的時間,代碼可查詢ShouldExitCurrentIteration屬性看它是否應該提前退出。如果調用過Stop,調用過Break,取消過CancellationTokenSource,或者處理一項時造成了未處理的異常,這個屬性就會返回true。 Parallel的For和ForEach方法都返回一個ParallelLoopResult實例,他看起來像下面這樣:
// 提供執行 System.Threading.Tasks.Parallel 循環的完成狀態。 public struct ParallelLoopResult { // 獲取該循環是否已運行完成(即該循環的所有迭代均已執行,并且該循環沒有收到提前結束的請求)。 public bool IsCompleted { get; } // 獲取從中調用 System.Threading.Tasks.ParallelLoopState.Break() 的最低迭代的索引。 public long? LowestBreakIteration { get; } }
可通過檢查屬性來了解循環的結果,如果IsCompleted返回true。表明循環運行完成,所有項都得到了處理。如果IsCompleted為false,而且LowestBreakIteration為null,表明參與工作的某個線程調用了Stop方法。如果LowestBreakIteration返回false,而且LowestBreakIteration不為null,表名參與工作的某個線程調用的Break方法,LowestBreakIteration返回的Int64值指明了保證已得到處理的最低一項的索引。
場景
public void run(Action<List<string>> onload) { List<string> directoryLists = new List<string>(); directoryLists = Directory.GetDirectories(m_importPath).ToList(); Parallel.ForEach(directoryLists,new ParallelOptions { MaxDegreeOfParallelism=10}, i => { List<string> files = importer(i);
onload(files);
}
}
Action<List<string>> onload = dir => { BeginInvoke(new EventHandler((obj, even) => { var root = new TreeNode(dir[0]); for (int i = 1; i < dir.Count; i++) { string fileName = dir[i].Substring(dir[i].LastIndexOf('//') + 1); root.Nodes.Add(fileName.Remove(fileName.IndexOf('.'))); } this.treeView1.Nodes.Add(root); }), null);
};
import.run(onload);
新聞熱點
疑難解答