国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

淺談.NET下的多線程和并行計算(六)線程池基礎下

2019-11-17 03:54:12
字體:
來源:轉載
供稿:網友
這節我們按照線程池的核心思想來自定義一個簡單的線程池:

1) 池中使用的線程不少于一定數量,不多于一定數量

2) 池中線程不夠的時候創建,富裕的時候收回

3) 任務排隊,沒有可用線程時,任務等待

我們的目的只是實現這些“需求”,不去考慮性能(比如等待一段時間再去創建新的線程等策略)以及特殊的處理(異常),在實現這個需求的過程中我們也回顧了線程以及線程同步的基本概念。

首先,把任務委托和任務需要的狀態數據封裝一個對象:

public class WorkItem
{
    public WaitCallback Action { get; set; }
    public object State { get; set; }

    public WorkItem(WaitCallback action, object state)
    {
        this.Action = action;
        this.State = state;
    }
}然后來創建一個對象作為線程池中的一個線程:

public class SimpleThreadPoolThread
{
    PRivate object locker = new object();
    private AutoResetEvent are = new AutoResetEvent(false);
    private WorkItem wi;
    private Thread t;
    private bool b = true;
    private bool isWorking;

    public bool IsWorking
    {
        get
        {
            lock (locker)
            {
                return isWorking;
            }
        }
    }
    public event Action<SimpleThreadPoolThread> WorkComplete;

    public SimpleThreadPoolThread()
    {
        lock (locker)
        {
            // 當前沒有實際任務
            isWorking = false;
        }
        t = new Thread(Work) { IsBackground = true };
        t.Start();
    }

    public void SetWork(WorkItem wi)
    {
        this.wi = wi;
    }

    public void StartWork()
    {
        // 發出信號
        are.Set();
    }

    public void StopWork()
    {
        // 空任務
        wi = null;
        // 停止線程循環
        b = false;
        // 發出信號結束線程
        are.Set();
    }

    private void Work()
    {
        while (b)
        {
            // 沒任務,等待信號
            are.WaitOne();
            if (wi != null)
            {
                lock (locker)
                {
                    // 開始
                    isWorking = true;
                }
                // 執行任務
                wi.Action(wi.State);
                lock (locker)
                {
                    // 結束
                    isWorking = false;
                }
                // 結束事件
                WorkComplete(this);
            }
        }
    }代碼的細節可以看注釋,對這段代碼的整體結構作一個說明:

1) 由于這個線程是被線程池中任務復用的,所以線程的任務處于循環中,除非線程池打算回收這個線程,否則不會退出循環結束任務

2) 使用自動信號量讓線程沒任務的時候等待,由線程池在外部設置任務后發出信號來執行實際的任務,執行完畢后繼續等待

3) 線程公開一個完成的事件,線程池可以掛接處理方法,在任務完成后更新線程池狀態

4) 線程池中的所有線程都是后臺線程

下面再來實現線程池:

public class SimpleThreadPool : IDisposable
{
    private object locker = new object();
    private bool b = true;
    private int minThreads;
    private int maxThreads;
    private int currentActiveThreadCount;
    private List<SimpleThreadPoolThread> simpleThreadPoolThreadList = new List<SimpleThreadPoolThread>();
    private Queue<WorkItem> workItemQueue = new Queue<WorkItem>();

    public int CurrentActiveThreadCount
    {
        get
        {
            lock (locker)
            {
                return currentActiveThreadCount;
            }
        }

    }

    public int CurrentThreadCount
    {
        get
        {
            lock (locker)
            {
                return simpleThreadPoolThreadList.Count;
            }
        }
    }

    public int CurrentQueuedWorkCount
    {
        get
        {
            lock (locker)
            {
                return workItemQueue.Count;
            }
        }
    }

    public SimpleThreadPool()
    {
        minThreads = 4;
        maxThreads = 25;
        Init();
    }

    public SimpleThreadPool(int minThreads, int maxThreads)
    {
        if (minThreads > maxThreads)
            throw new ArgumentException("minThreads > maxThreads", "minThreads,maxThreads");
        this.minThreads = minThreads;
        this.maxThreads = maxThreads;
        Init();
    }

    public void QueueUserWorkItem(WorkItem wi)
    {
        lock (locker)
        {
            // 任務入列
            workItemQueue.Enqueue(wi);
        }
    }

    private void Init()
    {
        lock (locker)
        {
            // 一開始創建最小線程
            for (int i = 0; i < minThreads; i++)
            {
                CreateThread();
            }
            currentActiveThreadCount = 0;
        }
        new Thread(Work) { IsBackground = true }.Start();
    }

    private SimpleThreadPoolThread CreateThread()
    {
        SimpleThreadPoolThread t = new SimpleThreadPoolThread();
        // 掛接任務結束事件
        t.WorkComplete += new Action<SimpleThreadPoolThread>(t_WorkComplete);
        // 線程入列
        simpleThreadPoolThreadList.Add(t);
        return t;
    }

    private void Work()
    {
        // 線程池主循環
        while (b)
        {
            Thread.Sleep(100);
            lock (locker)
            {
                // 如果隊列中有任務并且當前線程小于最大線程
                if (workItemQueue.Count > 0 && CurrentActiveThreadCount < maxThreads)
                {
                    WorkItem wi = workItemQueue.Dequeue();
                    // 尋找閑置線程
                    SimpleThreadPoolThread availableThread = simpleThreadPoolThreadList.FirstOrDefault(t => t.IsWorking == false);
                    // 無則創建
                    if (availableThread == null)
                        availableThread = CreateThread();
                    // 設置任務
                    availableThread.SetWork(wi);
                    // 開始任務
                    availableThread.StartWork();
                    // 增加個活動線程
                    currentActiveThreadCount++;
                }
            }
        }
    }

    private void t_WorkComplete(SimpleThreadPoolThread t)
    {
        lock (locker)
        {
            // 減少個活動線程
            currentActiveThreadCount--;
            // 如果當前線程數有所富裕并且比最小線程多
            if ((workItemQueue.Count + currentActiveThreadCount) < minThreads && CurrentThreadCount > minThreads)
            {
                // 停止已完成的線程
                t.StopWork();
                // 從線程池刪除線程
                simpleThreadPoolThreadList.Remove(t);
            }
        }
    }

    public void Dispose()
    {
        // 所有線程停止
        foreach (var t in simpleThreadPoolThreadList)
        {
            t.StopWork();
        }
        // 線程池主循環停止
        b = false;
    }
}線程池的結構如下:

1) 在構造方法中可以設置線程池最小和最大線程

2) 維護一個任務隊列和一個線程池中線程的列表

3) 初始化線程池的時候就創建最小線程數量定義的線程

4) 線程池主循環每20毫秒就去處理一次,如果有任務并且線程池還可以處理任務的話,先是找閑置線程,找不到則創建一個

5) 通過設置任務委托以及發出信號量來開始任務

6) 線程池提供了三個屬性來查看當前活動線程數,當前總線程數和當前隊列中的任務數

7) 任務完成的回調事件中我們判斷如果當前線程有富裕并且比最小線程多則回收線程

8) 線程池是IDispose對象,在Dispose()方法中停止所有線程后停止線程池主循環

寫一段代碼來測試線程池:

using (SimpleThreadPool t = new SimpleThreadPool(2, 4))
{
    Stopwatch sw2 = Stopwatch.StartNew();
    for (int i = 0; i < 10; i++)
    {
        t.QueueUserWorkItem(new WorkItem((index =>
        {
            Console.WriteLine(string.Format("#{0} : {1} / {2}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss"), index));
            Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));
            Thread.Sleep(1000);
        }), i));
    }
    while (t.CurrentQueuedWorkCount > 0 || t.CurrentActiveThreadCount > 0)
    {
        Thread.Sleep(10);
    }
    Console.WriteLine("All work completed");
    Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));
    Console.WriteLine(sw2.ElapsedMilliseconds);
} 代碼中我們向線程池推入10個任務,每個任務需要1秒執行,任務執行前輸出當前任務的所屬線程的Id,當前時間以及狀態值。然后再輸出線程池的幾個狀態屬性。主線程循環等待所有任務完成后再次輸出線程池狀態屬性以及所有任務完成耗費的時間:


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 阜康市| 镇巴县| 郑州市| 根河市| 重庆市| 长垣县| 奈曼旗| 绥宁县| 莱芜市| 香河县| 广昌县| 宁南县| 祁东县| 灌云县| 会泽县| 宁海县| 蒲江县| 大埔县| 舒兰市| 深州市| 平顶山市| 阳山县| 阿勒泰市| 南昌县| 鸡东县| 杭锦后旗| 乳源| 扶沟县| 嘉义市| 阜南县| 彩票| 马关县| 肃南| 威宁| 陇西县| 蓬安县| 宁河县| 铜山县| 宜兴市| 昭通市| 盐边县|