今天領導給我們發了一篇文章文章,讓我們學習一下。
文章鏈接:TAM - Threaded Array Manipulator
這是codePRoject上的一篇文章,花了一番時間閱讀了一下。文章主要是介紹當單線程處理大量數組遇到性能瓶頸時,使用多線程的方式進行處理,可以縮短數組的處理時間。
看了這篇文章后,感覺似曾相識,很多次,當我想要處理大數組時,我就會進行構思,然后想出的解決方案,與此文章中介紹的方案非常的相似。但是說來慚愧,此文章的作者有了構思后便動手寫出了實現代碼,然后還進行了性能測試,而我每次只是構思,覺得我能想出來就可以了,等到真正用的時候再把它寫出來就行了。事實上,我大概已經構思過很多次了,但是還從來沒有寫過,直到看到這篇文章,我才下定決心,一定要將這個思路整理一遍。
雖然科技一直在進步,CPU的處理能力也一直在提高,但是當我們進入大數據時代后,CPU每秒鐘都會面臨著大量的數據需要處理,這個時候CPU的處理能力可能就會成為性能瓶頸。這是我們就要選擇多核多CPU了,編程中也就是使用多線程進行處理。
首先看下單線程處理的例子
static void Main(string[] args){ int count = 100000000; double[] arrayForTest = new double[count]; Stopwatch watch = new Stopwatch(); watch.Start(); for (int i = 0; i < arrayForTest.Length; i++) { arrayForTest[i] = MathOperationFunc(arrayForTest[i]); } watch.Stop(); Console.WriteLine("經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");}static double MathOperationFunc(double value){ return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);}單線程處理的耗時

這個單線程的例子中對一個有10000000個元素的數組中的每個元素進行了數學計算,執行完畢共計耗時5.95秒。
然后看兩個線程處理的例子
static void Main(string[] args){ //四線程測試 int threadCount = 2; Thread[] threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(ForTestInThread); threads[i].Name = threadCount + "線程測試" + (i + 1); threads[i].Start(); }}//工作線程static void ForTestInThread(){ int count = 50000000; double[] arrayForTest = new double[count]; Stopwatch watch = new Stopwatch(); watch.Start(); for (int i = 0; i < arrayForTest.Length; i++) { arrayForTest[i] = MathOperationFunc(arrayForTest[i]); } watch.Stop(); Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");}//數據計算static double MathOperationFunc(double value){ return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);}兩個線程測試耗時

我們再來看一下四個線程的例子
static void Main(string[] args){ //四線程測試 int threadCount = 4; Thread[] threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(ForTestInThread); threads[i].Name = threadCount + "線程測試" + (i + 1); threads[i].Start(); }}//工作線程static void ForTestInThread(){ int count = 25000000; double[] arrayForTest = new double[count]; Stopwatch watch = new Stopwatch(); watch.Start(); for (int i = 0; i < arrayForTest.Length; i++) { arrayForTest[i] = MathOperationFunc(arrayForTest[i]); } watch.Stop(); Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");}//數據計算static double MathOperationFunc(double value){ return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);}四個線程測試耗時

由上面的測試中可以看到,隨著線程數的增多,任務被分解后每個線程執行的任務耗時由原來的 6秒 逐漸降到 2秒 左右,由此我們可以猜想當所有線程同時執行的時候,那么總任務的耗時就會下降,接下來讓我們來進行更精確的測試。
進行多線程測試時,經常會遇到這樣的問題:主線程中如何等待所有線程執行結束后,再執行后續任務。
錯誤的做法
Thread[] threads = new Thread[threadCount];for (int i = 0; i < threadCount; i++){ int beginIndex = i*arrayForTest.Length/threadCount; int length = arrayForTest.Length/threadCount; threads[i] = new Thread(WorkerThread); var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length); threads[i].Name = threadCount + "線程測試" + (i + 1).ToString(); threads[i].Start(arg); //等待所有線程結束 threads[i].Join();}
這么做實際上所有的子線程均是串行執行的,并沒有達到并行的效果。
正確的做法
Thread[] threads = new Thread[threadCount];for (int i = 0; i < threadCount; i++){ int beginIndex = i*arrayForTest.Length/threadCount; int length = arrayForTest.Length/threadCount; threads[i] = new Thread(WorkerThread); var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length); threads[i].Name = threadCount + "線程測試" + (i + 1).ToString(); threads[i].Start(arg);}//等待所有線程結束foreach (var thread in threads){ thread.Join();}了解了Thread.Join后,就可以進行多線程處理大數組的代碼編寫了:
class Program{ static void Main(string[] args) { int count = 100000000; double[] arrayForTest = new double[count]; Stopwatch totalWatch = new Stopwatch(); totalWatch.Start(); ThreadTest(arrayForTest, 2); totalWatch.Stop(); Console.WriteLine("總任務,經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (totalWatch.ElapsedMilliseconds / 1000.0) + " s"); } //大循環測試 static void ForTest(double[] arrayForTest, int beingIndex, int offset, Func<double, double> func) { for (int i = beingIndex; i < beingIndex + offset; i++) { arrayForTest[i] = func(arrayForTest[i]); } } //數學計算 static double MathOperationFunc(double value) { return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01); } static void ThreadTest(double[] arrayForTest, int threadCount) { //啟動線程 Thread[] threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { //為每個線程分配任務 int beginIndex = i*arrayForTest.Length/threadCount; int length = arrayForTest.Length/threadCount; threads[i] = new Thread(WorkerThread); var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length); threads[i].Name = threadCount + "線程測試" + (i + 1).ToString(); threads[i].Start(arg); threads[i].Join(); } //等待所有線程結束 foreach (var thread in threads) { thread.Join(); } } //工作線程 static void WorkerThread(object arg) { Stopwatch watch = new Stopwatch(); watch.Start(); var argArray = arg as Tuple<double[], int, int>; if (argArray == null) return; ForTest(argArray.Item1, argArray.Item2, argArray.Item3, MathOperationFunc); watch.Stop(); Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + argArray.Item3 + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds/1000.0) + " s"); }}這樣多線程處理大數組的功能代碼就編寫完成了,那么性能是如何呢,用事實說話,效果如下:

由圖可以看出,將一個大任務分解到兩個線程中去執行后,大任務總體的執行時間會縮短,但是與兩個線程中耗時最長的線程的執行時間有關。
同時執行耗時由原來的6秒逐漸降到2秒左右。可見
新聞熱點
疑難解答