国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

ParallelProgramming-多消費者,多生產者同時運行并行

2019-11-14 13:46:49
字體:
來源:轉載
供稿:網友

在上一篇文章演示了并行的流水線操作(生產者和消費者并行同時執行),C#是通過BlockingCollection這個線程安全的對象作為Buffer,并且結合Task來實現的。但是上一篇文章有個缺陷,在整個流水線上,生產者和消費者是唯一的。本文將演示多個消費者多個生產者同時并行執行。

一、多消費者、多生產者示意圖

 與前一篇文章演示的流水線思想類似,不同之處就是本文的topic:消費者和生產者有多個,以buffer1為例,起生產者有兩個,消費者有兩個,現在有三個緯度的并行:

  1. Action1和Action2并行(消費者和生產者并行)
  2. 消費者并行(Action2.1和Action2.2并行)
  3. 生產者并行(Action1.1和Action1.2并行)

二、實現

2.1 代碼

 class PiplelineDemo    {        PRivate int seed;        public PiplelineDemo()        {            seed = 10;        }        public void Action11(BlockingCollection<string> output)        {            for (var i = 0; i < seed; i++)            {                output.Add(i.ToString());//initialize data to buffer1            }        }        public void Action12(BlockingCollection<string> output)        {            for (var i = 0; i < seed; i++)            {                output.Add(i.ToString());//initialize data to buffer1            }        }        public void Action21(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                var itemToInt = int.Parse(item);                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2            }        }        public void Action22(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                var itemToInt = int.Parse(item);                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2            }        }        public void Action31(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                output.Add((item));// add new data to buffer3            }        }        public void Action32(BlockingCollection<string> input, BlockingCollection<string> output)        {            foreach (var item in input.GetConsumingEnumerable())            {                output.Add((item));// add new data to buffer3            }        }        public void Pipeline()        {            var buffer1 = new BlockingCollection<string>(seed * 2);            var buffer2 = new BlockingCollection<string>(seed * 2);            var buffer3 = new BlockingCollection<string>(seed * 2);            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);            var stage11 = taskFactory.StartNew(() => Action11(buffer1));            var stage12 = taskFactory.StartNew(() => Action12(buffer1));            Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) =>            {                buffer1.CompleteAdding();            });            var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2));            var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2));            Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) =>            {                buffer2.CompleteAdding();            });            var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3));            var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3));            Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) =>            {                buffer3.CompleteAdding();            });            Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32);            foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3            {                Console.WriteLine(item);            }        }    }

2.2 運行結果

2.3 代碼解釋

  1. Action11和Action12相對比較好理解。初始化數據到buffer1。
  2. Action2.1和Action2.2相對比較費解,他們同時接受buffer1作為輸入,為什么最終的結果Buffer2沒有產生重復? 最后由Action21,action22同時產生的buffer3為什么也沒有重復?這就是GetConsumingEnumerable這個方法的功勞。這個方法會將buffer的數據分成多份給多個消費者,如果一個value已經被一個消費者獲取,那么其他消費者將不會再拿到這個值。這就回答了為什么沒有重復這個問題。
  3. 上面方法同時使用了多任務延續(ContinueWhenAll)對buffer的調用CompleteAdding方法:該方法非常重要,如果沒有調用這個方法,程序會進入死鎖,因為消費者(consumer)會處于一直的等待狀態。

 


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 海宁市| 长治县| 望城县| 沂源县| 宁乡县| 翁牛特旗| 大同市| 华坪县| 阳江市| 崇阳县| 亚东县| 沙湾县| 伊春市| 西宁市| 滕州市| 灵石县| 江安县| 永胜县| 崇义县| 齐齐哈尔市| 托里县| 金湖县| 芮城县| 广元市| 普陀区| 广州市| 揭西县| 宁津县| 平潭县| 杨浦区| 贡觉县| 密山市| 昌黎县| 县级市| 沙坪坝区| 绥滨县| 淅川县| 伊金霍洛旗| 夹江县| 沾益县| 高雄市|