最近,對(duì)多線程編程,并行編程,異步編程,這三個(gè)概念有點(diǎn)暈了,之前我研究了異步編程《VS 2013 C# 異步編程 async await》,現(xiàn)在猛然發(fā)覺(jué),自己怎么有點(diǎn)不明白這三者之間有什么聯(lián)系和區(qū)別了呢?有點(diǎn)說(shuō)不清、道不明的感覺(jué)~
因此,回顧了一下個(gè)人經(jīng)歷,屢屢思路~我剛接觸計(jì)算機(jī)時(shí),還是學(xué)校的 DOS 和 win 3.x,之后,學(xué)校換了 Windows 95,再之后,我有自己的臺(tái)式機(jī)……但無(wú)論如何,那時(shí)電腦的 CPU 都是單核的,即便采用多線程,程序無(wú)論看上多么像“同時(shí)”執(zhí)行,其本質(zhì)上還是順序的,因?yàn)榇a段是獨(dú)占 CPU 的;之后,我賣了臺(tái)式機(jī),買了筆記本電腦,CPU 是雙核的,如果用多線程,那情況就不同了,能達(dá)到正真的“同時(shí)”執(zhí)行,也就是并行。
“并行”是目的,為了實(shí)現(xiàn)這個(gè)目的,我們采用“多線程編程”這個(gè)手段,而我們知道,多線程編程涉及的問(wèn)題很多,申請(qǐng)超額、競(jìng)爭(zhēng)條件、死鎖、活鎖、二步舞、優(yōu)先級(jí)翻轉(zhuǎn)等,為了簡(jiǎn)化多線程編程,加之多核 CPU 越來(lái)越普遍,于是很多編程框架本身就提供了對(duì)多線程的封裝,比如一些類和方法,這些就是并行編程。因此,多線程編程變成了較底層?xùn)|西,而并行編程則是較高層次,較高抽象,至少能將一段很簡(jiǎn)單的代碼從順序的直接編程并行的;而異步編程呢,異步方法旨在成為非阻止操作,異步并不會(huì)創(chuàng)建其他線程。異步方法不會(huì)在其自身線程上運(yùn)行,而是在 CLR 提供的線程上,因此它不需要多線程。
總之,在多核和眾核(manycore)時(shí)代,想想一下,在未來(lái),具有一百萬(wàn)個(gè)核的 CPU 不是不可能的事。人類中樞神經(jīng)系統(tǒng)中約含1000億個(gè)神經(jīng)元,僅大腦皮層中就約有140億。如果再讓程序員自己用多線程編程,顯然太低效了,低效也就算了,還容易犯錯(cuò),所以才需要并行編程。
2009年Google推出了它的第二個(gè)開(kāi)源語(yǔ)言 Go。對(duì) Go 的評(píng)價(jià)褒貶不一,中國(guó)比國(guó)外的熱情高中國(guó)比國(guó)外的熱情高。Go 天生就是為并發(fā)和網(wǎng)絡(luò)而生的,除了這點(diǎn)外,在靜態(tài)編譯、GC、跨平臺(tái)、易學(xué)、豐富的標(biāo)準(zhǔn)庫(kù)等,其實(shí)并不如 C/C++、java、C#、Python。由此可想而知,為什么會(huì)出現(xiàn) Go?以及為什么 Go 存在如此多的問(wèn)題和爭(zhēng)論?——也許Go 更像是一個(gè)“天才的自閉癥患者”,如果看清了這點(diǎn),對(duì) Go 的褒貶也就能泰然啦~
使用 TPL,除了線程方面的知識(shí),你最好對(duì)委托、匿名方法或 Lambda 表達(dá)式有所了解。
多核 CPU 已經(jīng)相當(dāng)普遍,使得多個(gè)線程能夠同時(shí)執(zhí)行。將代碼并行化,工作也就分?jǐn)偟蕉鄠€(gè) CPU 上。
過(guò)去,并行化需要線程和鎖的低級(jí)操作。而 Visual Studio 2010 和 .NET Framework 4 開(kāi)始提供了新的運(yùn)行時(shí)、新的類庫(kù)類型以及新的診斷工具,從而增強(qiáng)了對(duì)并行編程的支持。這些功能簡(jiǎn)化了并行開(kāi)發(fā),通過(guò)固有方法編寫高效、細(xì)化且可伸縮的并行代碼,而不必直接處理線程或線程池。
下圖從較高層面上概述了 .NET Framework 4 中的并行編程體系結(jié)構(gòu)。
 
 
任務(wù)并行庫(kù)(The Task Parallel Library,TPL)是 System.Threading 和 System.Threading.Tasks 空間中的一組公共類型和 API。TPL 的目的是通過(guò)簡(jiǎn)化將并行和并發(fā)添加到應(yīng)用程序的過(guò)程來(lái)提高開(kāi)發(fā)人員的工作效率。TPL 能動(dòng)態(tài)地最有效地使用所有可用的處理器。此外,TPL 還處理工作分區(qū)、ThreadPool 上的線程調(diào)度、取消支持、狀態(tài)管理以及其他低級(jí)別的細(xì)節(jié)操作。通過(guò)使用 TPL,你可以將精力集中于程序要完成的工作,同時(shí)最大程度地提高代碼的性能。
從 .NET Framework 4 開(kāi)始,TPL 是編寫多線程代碼和并行代碼的首選方法。但并不是所有代碼都適合并行化,例如,如果某個(gè)循環(huán)在每次迭代時(shí)只執(zhí)行少量工作,或它在很多次迭代時(shí)都不運(yùn)行,那么并行化的開(kāi)銷可能導(dǎo)致代碼運(yùn)行更慢。 此外,像任何多線程代碼一樣,并行化會(huì)增加程序執(zhí)行的復(fù)雜性。 盡管 TPL 簡(jiǎn)化了多線程方案,但建議對(duì)線程處理概念(例如,鎖、死鎖和爭(zhēng)用條件)進(jìn)行基本了解,以便能夠有效地使用 TPL。
我們可以對(duì)數(shù)據(jù)進(jìn)行并行,簡(jiǎn)單地說(shuō),對(duì)集合中的每個(gè)數(shù)據(jù)同時(shí)執(zhí)行相同的操作,當(dāng)然也可以對(duì)任務(wù)和數(shù)據(jù)流進(jìn)行并行。本文主要描述數(shù)據(jù)并行。
TPL 通過(guò) System.Threading.Tasks.Parallel 類實(shí)現(xiàn)數(shù)據(jù)并行,此類提供了 for 和 foreach 基于并行的實(shí)現(xiàn)。為 Parallel.For 或 Parallel.ForEach 編寫循環(huán)邏輯與編寫順序循環(huán)非常類似。你不必創(chuàng)建線程或隊(duì)列工作項(xiàng)。基本循環(huán)中不必采用鎖。TPL 將處理所有低級(jí)別工作。
System.Threading.Tasks.Parallel類有三個(gè)方法:For、ForEach、Invoke,它們有很多重載,沒(méi)必要說(shuō)明這些方法本身,因此,下面用實(shí)例說(shuō)明如何用這些方法進(jìn)行并行編程,并對(duì)比與順序執(zhí)行的性能。
對(duì)比順序計(jì)算 PI、并行計(jì)算 PI 和并行分區(qū)計(jì)算 PI 的性能。
using System;    using System.Collections.Concurrent;    using System.Diagnostics;    using System.Threading.Tasks;    namespace ComputePi    {        class Program        {    const int num_steps = 100000000;
static void Main(string[] args)
        {    Time(() => SerialPi());
Time(() => ParallelPi());
Time(() => ParallelPartitionerPi());
            Console.WriteLine("Press any keys to Exit.");    Console.ReadLine();
}
        /// <summary>            /// Times the execution of a function and outputs both the elapsed time and the function's result.            /// </summary>    static void Time<T>(Func<T> work)
        {    var sw = Stopwatch.StartNew();
var result = work();
            Console.WriteLine(sw.Elapsed + ": " + result);    }
        /// <summary>            /// Estimates the value of PI using a for loop.            /// </summary>    static double SerialPi()
        {                double sum = 0.0;    double step = 1.0 / (double)num_steps;
for (int i = 0; i < num_steps; i++)
            {                    double x = (i + 0.5) * step;    sum = sum + 4.0 / (1.0 + x * x);
}
            return step * sum;    }
        /// <summary>            /// Estimates the value of PI using a Parallel.For.            /// </summary>    static double ParallelPi()
        {                double sum = 0.0;    double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
            {                    double x = (i + 0.5) * step;                    return local + 4.0 / (1.0 + x * x);                }, local => { lock (monitor) sum += local; });                return step * sum;    }
        /// <summary>            /// Estimates the value of PI using a Parallel.ForEach and a range partitioner.            /// </summary>    static double ParallelPartitionerPi()
        {                double sum = 0.0;    double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0, (range, state, local) =>
            {    for (int i = range.Item1; i < range.Item2; i++)
                {                        double x = (i + 0.5) * step;    local += 4.0 / (1.0 + x * x);
}
                return local;                }, local => { lock (monitor) sum += local; });                return step * sum;    }
}
}
//RESULT:    //00:00:00.4358850: 3.14159265359043    //00:00:00.4523856: 3.14159265358987    //00:00:00.1435475: 3.14159265358979    //Press any keys to Exit.當(dāng) For 循環(huán)的循環(huán)體很小時(shí),它的執(zhí)行速度可能比等效的順序循環(huán)更慢。這也就是為什么順序計(jì)算 PI 與并行計(jì)算 PI 的時(shí)間差不多,因?yàn)閷?duì)數(shù)據(jù)進(jìn)行分區(qū)所涉及的開(kāi)銷以及調(diào)用每個(gè)循環(huán)迭代上的委托的開(kāi)銷導(dǎo)致了性能降低。為了解決類似情況,Partitioner 類提供 Partitioner.Create 方法,該方法使您可以為委托體提供順序循環(huán),以便每個(gè)分區(qū)只調(diào)用一次委托,而不是每個(gè)迭代調(diào)用一次委托。因此,并行分區(qū)計(jì)算 PI 時(shí),性能有大幅度提升。
對(duì)比順序與并行計(jì)算矩陣乘法的性能。
using System;    using System.Diagnostics;    using System.Threading.Tasks;    namespace DataParallelismDemo    {        class Program        {            /// <summary>            /// Sequential_Loop            /// </summary>            /// <param name="matA"></param>            /// <param name="matB"></param>            /// <param name="result"></param>    static void MultiplyMatricesSequential(double[,] matA, double[,] matB, double[,] result)
        {                int matACols = matA.GetLength(1);                int matBCols = matB.GetLength(1);                int matARows = matA.GetLength(0);    for (int i = 0; i < matARows; i++)
            {    for (int j = 0; j < matBCols; j++)
                {    for (int k = 0; k < matACols; k++)
                    {    result[i, j] += matA[i, k] * matB[k, j];
}
}
}
}
        /// <summary>            /// Parallel_Loop            /// </summary>            /// <param name="matA"></param>            /// <param name="matB"></param>            /// <param name="result"></param>    static void MultiplyMatricesParallel(double[,] matA, double[,] matB, double[,] result)
        {                int matACols = matA.GetLength(1);                int matBCols = matB.GetLength(1);                int matARows = matA.GetLength(0);                // A basic matrix multiplication.                // Parallelize the outer loop to partition the source array by rows.    Parallel.For(0, matARows, i =>
            {    for (int j = 0; j < matBCols; j++)
                {                        // Use a temporary to improve parallel performance.                        double temp = 0;    for (int k = 0; k < matACols; k++)
                    {    temp += matA[i, k] * matB[k, j];
}
result[i, j] = temp;
}
            }); // Parallel.For    }
static void Main(string[] args)
        {                // Set up matrices. Use small values to better view                 // result matrix. Increase the counts to see greater                 // speedup in the parallel loop vs. the sequential loop.                int colCount = 180;                int rowCount = 2000;                int colCount2 = 270;                double[,] m1 = InitializeMatrix(rowCount, colCount);                double[,] m2 = InitializeMatrix(colCount, colCount2);    double[,] result = new double[rowCount, colCount2];
            // First do the sequential version.                Console.WriteLine("Executing sequential loop...");                Stopwatch stopwatch = new Stopwatch();    stopwatch.Start();
MultiplyMatricesSequential(m1, m2, result);
stopwatch.Stop();
            Console.WriteLine("Sequential loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);                // For the skeptics.    OfferToPrint(rowCount, colCount2, result);
            // Reset timer and results matrix.     stopwatch.Reset();
result = new double[rowCount, colCount2];
            // Do the parallel loop.                Console.WriteLine("Executing parallel loop...");    stopwatch.Start();
MultiplyMatricesParallel(m1, m2, result);
stopwatch.Stop();
            Console.WriteLine("Parallel loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);    OfferToPrint(rowCount, colCount2, result);
            // Keep the console window open in debug mode.                Console.WriteLine("Press any key to exit.");    Console.ReadKey();
}
        /// <summary>            /// 生成矩陣            /// </summary>            /// <param name="rows"></param>            /// <param name="cols"></param>            /// <returns></returns>    static double[,] InitializeMatrix(int rows, int cols)
        {    double[,] matrix = new double[rows, cols];
            Random r = new Random();    for (int i = 0; i < rows; i++)
            {    for (int j = 0; j < cols; j++)
                {    matrix[i, j] = r.Next(100);
}
}
            return matrix;    }
private static void OfferToPrint(int rowCount, int colCount, double[,] matrix)
        {                Console.WriteLine("Computation complete. Print results? y/n");                char c = Console.ReadKey().KeyChar;    if (c == 'y' || c == 'Y')
            {    Console.WindowWidth = 180;
Console.WriteLine();
for (int x = 0; x < rowCount; x++)
                {                        Console.WriteLine("ROW {0}: ", x);    for (int y = 0; y < colCount; y++)
                    {                            Console.Write("{0:#.##} ", matrix[x, y]);    }
Console.WriteLine();
}
}
}
}
}
//RESULST:    //Executing sequential loop...    //Sequential loop time in milliseconds: 1168    //Computation complete. Print results? y/n    //nExecuting parallel loop...    //Parallel loop time in milliseconds: 360    //Computation complete. Print results? y/n    //nPress any key to exit.    using System;    //using System.Collections.Generic;    //using System.Linq;    //using System.Text;    using System.Threading;    using System.Threading.Tasks;    using System.Configuration;    namespace MovePics    {        class Program        {    protected static string PIC_PATH = ConfigurationManager.AppSettings["PicPath"].ToString();
protected static string NEW_PIC_PATH = ConfigurationManager.AppSettings["NewPicPath"].ToString();
static void Main(string[] args)
        {                // A simple source for demonstration purposes. Modify this path as necessary.    string[] files = System.IO.Directory.GetFiles(PIC_PATH, "*.png");
System.IO.Directory.CreateDirectory(NEW_PIC_PATH);
            //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)    Parallel.ForEach(files, currentFile =>
            {                    // The more computational work you do here, the greater                     // the speedup compared to a sequential foreach loop.                    string filename = System.IO.Path.GetFileName(currentFile);                    System.Drawing.Bitmap bitmap = new System.Drawing.Bitmap(currentFile);    bitmap.RotateFlip(System.Drawing.RotateFlipType.Rotate180FlipNone);
bitmap.Save(System.IO.Path.Combine(NEW_PIC_PATH, filename));
                // Peek behind the scenes to see how work is parallelized.                    // But be aware: Thread contention for the Console slows down parallel loops!!!                    Console.WriteLine("Processing {0} on thread {1}", filename,    Thread.CurrentThread.ManagedThreadId);
            } //close lambda expression                     ); //close method invocation                // Keep the console window open in debug mode.                Console.WriteLine("Processing complete. Press any key to exit.");    Console.ReadKey();
}
}
}
using System;    using System.Collections.Generic;    using System.Diagnostics;    using System.IO;    using System.Linq;    using System.Security;    using System.Text;    using System.Threading;    using System.Threading.Tasks;    namespace TraverseTreeParallelForEach    {        class Program        {    static void Main(string[] args)
        {                try                {                    TraverseTreeParallelForEach(@"C:/Program Files", (f) =>                    {                        // Exceptions are no-ops.                        try                        {                            // Do nothing with the data except read it.                            byte[] data = File.ReadAllBytes(f);    }
                    catch (FileNotFoundException) { }                        catch (IOException) { }    catch (UnauthorizedaccessException) { }
                    catch (SecurityException) { }                        // Display the filename.    Console.WriteLine(f);
});
}
            catch (ArgumentException)                {                    Console.WriteLine(@"The directory 'C:/Program Files' does not exist.");    }
            // Keep the console window open.                Console.WriteLine("Press any key to exit.");    Console.ReadKey();
}
public static void TraverseTreeParallelForEach(string root, Action<string> action)
        {                //Count of files traversed and timer for diagnostic output                int fileCount = 0;    var sw = Stopwatch.StartNew();
            // Determine whether to parallelize file processing on each folder based on processor count.                int procCount = System.Environment.ProcessorCount;                // Data structure to hold names of subfolders to be examined for files.    Stack<string> dirs = new Stack<string>();
            if (!Directory.Exists(root))                {    throw new ArgumentException();
}
dirs.Push(root);
            while (dirs.Count > 0)                {                    string currentDir = dirs.Pop();                    string[] subDirs = { };                    string[] files = { };                    try                    {    subDirs = Directory.GetDirectories(currentDir);
}
                // Thrown if we do not have discovery permission on the directory.                    catch (UnauthorizedAccessException e)                    {    Console.WriteLine(e.Message);
                    continue;    }
                // Thrown if another process has deleted the directory after we retrieved its name.                    catch (DirectoryNotFoundException e)                    {    Console.WriteLine(e.Message);
                    continue;    }
                try                    {    files = Directory.GetFiles(currentDir);
}
                catch (UnauthorizedAccessException e)                    {    Console.WriteLine(e.Message);
                    continue;    }
                catch (DirectoryNotFoundException e)                    {    Console.WriteLine(e.Message);
                    continue;    }
                catch (IOException e)                    {    Console.WriteLine(e.Message);
                    continue;    }
                // Execute in parallel if there are enough files in the directory.                    // Otherwise, execute sequentially.Files are opened and processed                    // synchronously but this could be modified to perform async I/O.                    try                    {                        if (files.Length < procCount)                        {    foreach (var file in files)
                        {    action(file);
fileCount++;
}
}
                    else                        {    Parallel.ForEach(files, () => 0, (file, loopState, localCount) =>
                        {    action(file);
return (int)++localCount;
},
(c) =>
                                         {                                                 Interlocked.Add(ref fileCount, c);    });
}
}
                catch (AggregateException ae)                    {    ae.Handle((ex) =>
                    {    if (ex is UnauthorizedAccessException)
                        {                                // Here we just output a message and go on.    Console.WriteLine(ex.Message);
return true;
}
                        // Handle other exceptions here if necessary...    return false;
});
}
                // Push the subdirectories onto the stack for traversal.                    // This could also be done before handing the files.    foreach (string str in subDirs)
dirs.Push(str);
}
            // For diagnostic purposes.                Console.WriteLine("Processed {0} files in {1} milleseconds", fileCount, sw.ElapsedMilliseconds);    }
}
}
另外,Parallel.For 和 Parallel.ForEach 方法都有若干重載,利用這些重載可以停止或中斷循環(huán)執(zhí)行、監(jiān)視其他線程上循環(huán)的狀態(tài)、維護(hù)線程本地狀態(tài)、完成線程本地對(duì)象、控制并發(fā)程度,等等。 啟用此功能的幫助器類型包括 ParallelLoopState、ParallelOptions、ParallelLoopResult、 CancellationToken 和 CancellationTokenSource。
下載 MyDemo
下載 Samples for Parallel Programming with .net framework 完整示例
下載 Professional Parallel Programming with C#: Master Parallel Extensions with .NET 4 完整示例(該書(shū)的例子,深入淺出,循序漸進(jìn),對(duì)理解并行編程幫助很大,針對(duì)本文的數(shù)據(jù)并行,你可以參考 CH2,看作者如何對(duì) ASE 和 MD5 的計(jì)算進(jìn)行改進(jìn)的,評(píng)價(jià)的標(biāo)準(zhǔn)是 Amdahl 定律)
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注