這篇文章主要介紹了深入解析C++編程中線程池的使用,包括線程池的封裝實(shí)現(xiàn)等內(nèi)容,需要的朋友可以參考下
為什么需要線程池
目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫(kù)服務(wù)器等都具有一個(gè)共同點(diǎn),就是單位時(shí)間內(nèi)必須處理數(shù)目巨大的連接請(qǐng)求,但處理時(shí)間卻相對(duì)較短。
傳 統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請(qǐng)求之后,即創(chuàng)建一個(gè)新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時(shí)創(chuàng)建,即 時(shí)銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于 不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過程分為三個(gè)過程:T1、T2、T3。
T1:線程創(chuàng)建時(shí)間
T2:線程執(zhí)行時(shí)間,包括線程的同步等時(shí)間
T3:線程銷毀時(shí)間
那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時(shí)間很短的話,這比開銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時(shí)間很頻繁的話,這筆開銷將是不可忽略的。
除此之外,線程池能夠減少創(chuàng)建的線程個(gè)數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時(shí)需要并發(fā)的線程數(shù)超過上界,那么一部分線程將會(huì)等待。而傳統(tǒng)方案中,如果同時(shí)請(qǐng)求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個(gè)線程。盡管這不是一個(gè)很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。
因此線程池的出現(xiàn)正是著眼于減少線程池本身帶來的開銷。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動(dòng)之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來后,緩沖池選擇一個(gè)空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個(gè)線程都在處理任務(wù)后,緩沖池自動(dòng)創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時(shí),大部分線程都一直處于暫停狀態(tài),線程池自動(dòng)銷毀一部分線程,回收系統(tǒng)資源。
基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來的開銷分?jǐn)偟搅烁鱾€(gè)具體的任務(wù)上,執(zhí)行次數(shù)越多,每個(gè)任務(wù)所分擔(dān)到的線程本身開銷則越小,不過我們另外可能需要考慮進(jìn)去線程之間同步所帶來的開銷
構(gòu)建線程池框架
一般線程池都必須具備下面幾個(gè)組成部分:
線程池管理器:用于創(chuàng)建并管理線程池
工作線程: 線程池中實(shí)際執(zhí)行的線程
任務(wù)接口: 盡管線程池大多數(shù)情況下是用來支持網(wǎng)絡(luò)服務(wù)器,但是我們將線程執(zhí)行的任務(wù)抽象出來,形成任務(wù)接口,從而是的線程池與具體的任務(wù)無關(guān)。
任務(wù)隊(duì)列: 線程池的概念具體到實(shí)現(xiàn)則可能是隊(duì)列,鏈表之類的數(shù)據(jù)結(jié)構(gòu),其中保存執(zhí)行線程。
我們實(shí)現(xiàn)的通用線程池框架由五個(gè)重要部分組成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中還包括線程同步使用的類CThreadMutex和CCondition。
CJob是所有的任務(wù)的基類,其提供一個(gè)接口Run,所有的任務(wù)類都必須從該類繼承,同時(shí)實(shí)現(xiàn)Run方法。該方法中實(shí)現(xiàn)具體的任務(wù)邏輯。
CThread是Linux中線程的包裝,其封裝了Linux線程最經(jīng)常使用的屬性和方法,它也是一個(gè)抽象類,是所有線程類的基類,具有一個(gè)接口Run。
CWorkerThread是實(shí)際被調(diào)度和執(zhí)行的線程類,其從CThread繼承而來,實(shí)現(xiàn)了CThread中的Run方法。
CThreadPool是線程池類,其負(fù)責(zé)保存線程,釋放線程以及調(diào)度線程。
CThreadManage是線程池與用戶的直接接口,其屏蔽了內(nèi)部的具體實(shí)現(xiàn)。
CThreadMutex用于線程之間的互斥。
CCondition則是條件變量的封裝,用于線程之間的同步。
CThreadManage直接跟客戶端打交道,其接受需要?jiǎng)?chuàng)建的線程初始個(gè)數(shù),并接受客戶端提交的任務(wù)。這兒的任務(wù)是具體的非抽象的任務(wù)。CThreadManage的內(nèi)部實(shí)際上調(diào)用的都是CThreadPool的相關(guān)操作。CThreadPool創(chuàng)建具體的線程,并把客戶端提交的任務(wù)分發(fā)給CWorkerThread,CWorkerThread實(shí)際執(zhí)行具體的任務(wù)。
理解系統(tǒng)組件
下面我們分開來了解系統(tǒng)中的各個(gè)組件。
CThreadManage
CThreadManage的功能非常簡(jiǎn)單,其提供最簡(jiǎn)單的方法,其類定義如下:
- class CThreadManage
- {
- private:
- CThreadPool* m_Pool;
- int m_NumOfThread;
- protected:
- public:
- CThreadManage();
- CThreadManage(int num);
- virtual ~CThreadManage();
- void SetParallelNum(int num);
- void Run(CJob* job,void* jobdata);
- void TerminateAll(void);
- };
其中m_Pool指向?qū)嶋H的線程池;m_NumOfThread是初始創(chuàng)建時(shí)候允許創(chuàng)建的并發(fā)的線程個(gè)數(shù)。另外Run和TerminateAll方法也非常簡(jiǎn)單,只是簡(jiǎn)單的調(diào)用CThreadPool的一些相關(guān)方法而已。其具體的實(shí)現(xiàn)如下:
- CThreadManage::CThreadManage()
- {
- m_NumOfThread = 10;
- m_Pool = new CThreadPool(m_NumOfThread);
- }
- CThreadManage::CThreadManage(int num)
- {
- m_NumOfThread = num;
- m_Pool = new CThreadPool(m_NumOfThread);
- }
- CThreadManage::~CThreadManage()
- {
- if(NULL != m_Pool)
- delete m_Pool;
- }
- void CThreadManage::SetParallelNum(int num)
- {
- m_NumOfThread = num;
- }
- void CThreadManage::Run(CJob* job,void* jobdata)
- {
- m_Pool->Run(job,jobdata);
- }
- void CThreadManage::TerminateAll(void)
- {
- m_Pool->TerminateAll();
- }
CThread
CThread 類實(shí)現(xiàn)了對(duì)Linux中線程操作的封裝,它是所有線程的基類,也是一個(gè)抽象類,提供了一個(gè)抽象接口Run,所有的CThread都必須實(shí)現(xiàn)該Run方法。CThread的定義如下所示:
- class CThread
- {
- private:
- int m_ErrCode;
- Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize
- unsigned long m_ThreadID;
- bool m_Detach; //The thread is detached
- bool m_CreateSuspended; //if suspend after creating
- char* m_ThreadName;
- ThreadState m_ThreadState; //the state of the thread
- protected:
- void SetErrcode(int errcode){m_ErrCode = errcode;}
- static void* ThreadFunction(void*);
- public:
- CThread();
- CThread(bool createsuspended,bool detach);
- virtual ~CThread();
- virtual void Run(void) = 0;
- void SetThreadState(ThreadState state){m_ThreadState = state;}
- bool Terminate(void); //Terminate the threa
- bool Start(void); //Start to execute the thread
- void Exit(void);
- bool Wakeup(void);
- ThreadState GetThreadState(void){return m_ThreadState;}
- int GetLastError(void){return m_ErrCode;}
- void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
- char* GetThreadName(void){return m_ThreadName;}
- int GetThreadID(void){return m_ThreadID;}
- bool SetPriority(int priority);
- int GetPriority(void);
- int GetConcurrency(void);
- void SetConcurrency(int num);
- bool Detach(void);
- bool Join(void);
- bool Yield(void);
- int Self(void);
- };
線程的狀態(tài)可以分為四種,空閑、忙碌、掛起、終止(包括正常退出和非正常退出)。由于目前Linux線程庫(kù)不支持掛起操作,因此,我們的此處的掛起操作類似于暫停。如果線程創(chuàng)建后不想立即執(zhí)行任務(wù),那么我們可以將其“暫停”,如果需要運(yùn)行,則喚醒。有一點(diǎn)必須注意的是,一旦線程開始執(zhí)行任務(wù),將不能被掛起,其將一直執(zhí)行任務(wù)至完畢。
線程類的相關(guān)操作均十分簡(jiǎn)單。線程的執(zhí)行入口是從Start()函數(shù)開始,其將調(diào)用函數(shù)ThreadFunction,ThreadFunction再調(diào)用實(shí)際的Run函數(shù),執(zhí)行實(shí)際的任務(wù)。
CThreadPool
CThreadPool是線程的承載容器,一般可以將其實(shí)現(xiàn)為堆棧、單向隊(duì)列或者雙向隊(duì)列。在我們的系統(tǒng)中我們使用STL Vector對(duì)線程進(jìn)行保存。CThreadPool的實(shí)現(xiàn)代碼如下:
- class CThreadPool
- {
- friend class CWorkerThread;
- private:
- unsigned int m_MaxNum; //the max thread num that can create at the same time
- unsigned int m_AvailLow; //The min num of idle thread that shoule kept
- unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time
- unsigned int m_AvailNum; //the normal thread num of idle num;
- unsigned int m_InitNum; //Normal thread num;
- protected:
- CWorkerThread* GetIdleThread(void);
- void AppendToIdleList(CWorkerThread* jobthread);
- void MoveToBusyList(CWorkerThread* idlethread);
- void MoveToIdleList(CWorkerThread* busythread);
- void DeleteIdleThread(int num);
- void CreateIdleThread(int num);
- public:
- CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock
- CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock
- CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
- CThreadMutex m_VarMutex;
- CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list
- CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list
- CCondition m_IdleJobCond; //m_JobCond is used to sync job list
- CCondition m_MaxNumCond;
- vector<CWorkerThread*> m_ThreadList;
- vector<CWorkerThread*> m_BusyList; //Thread List
- vector<CWorkerThread*> m_IdleList; //Idle List
- CThreadPool();
- CThreadPool(int initnum);
- virtual ~CThreadPool();
- void SetMaxNum(int maxnum){m_MaxNum = maxnum;}
- int GetMaxNum(void){return m_MaxNum;}
- void SetAvailLowNum(int minnum){m_AvailLow = minnum;}
- int GetAvailLowNum(void){return m_AvailLow;}
- void SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
- int GetAvailHighNum(void){return m_AvailHigh;}
- int GetActualAvailNum(void){return m_AvailNum;}
- int GetAllNum(void){return m_ThreadList.size();}
- int GetBusyNum(void){return m_BusyList.size();}
- void SetInitNum(int initnum){m_InitNum = initnum;}
- int GetInitNum(void){return m_InitNum;}
- void TerminateAll(void);
- void Run(CJob* job,void* jobdata);
- };
- CWorkerThread* CThreadPool::GetIdleThread(void)
- {
- while(m_IdleList.size() ==0 )
- m_IdleCond.Wait();
- m_IdleMutex.Lock();
- if(m_IdleList.size() > 0 )
- {
- CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
- printf("Get Idle thread %d/n",thr->GetThreadID());
- m_IdleMutex.Unlock();
- return thr;
- }
- m_IdleMutex.Unlock();
- return NULL;
- }
- //create num idle thread and put them to idlelist
- void CThreadPool::CreateIdleThread(int num)
- {
- for(int i=0;i<num;i++){
- CWorkerThread* thr = new CWorkerThread();
- thr->SetThreadPool(this);
- AppendToIdleList(thr);
- m_VarMutex.Lock();
- m_AvailNum++;
- m_VarMutex.Unlock();
- thr->Start(); //begin the thread,the thread wait for job
- }
- }
- void CThreadPool::Run(CJob* job,void* jobdata)
- {
- assert(job!=NULL);
- //if the busy thread num adds to m_MaxNum,so we should wait
- if(GetBusyNum() == m_MaxNum)
- m_MaxNumCond.Wait();
- if(m_IdleList.size()<m_AvailLow)
- {
- if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
- CreateIdleThread(m_InitNum-m_IdleList.size());
- else
- CreateIdleThread(m_MaxNum-GetAllNum());
- }
- CWorkerThread* idlethr = GetIdleThread();
- if(idlethr !=NULL)
- {
- idlethr->m_WorkMutex.Lock();
- MoveToBusyList(idlethr);
- idlethr->SetThreadPool(this);
- job->SetWorkThread(idlethr);
- printf("Job is set to thread %d /n",idlethr->GetThreadID());
- idlethr->SetJob(job,jobdata);
- }
- }
在CThreadPool中存在兩個(gè)鏈表,一個(gè)是空閑鏈表,一個(gè)是忙碌鏈表。Idle鏈表中存放所有的空閑進(jìn)程,當(dāng)線程執(zhí)行任務(wù)時(shí)候,其狀態(tài)變?yōu)槊β禒顟B(tài),同時(shí)從空閑鏈表中刪除,并移至忙碌鏈表中。在CThreadPool的構(gòu)造函數(shù)中,我們將執(zhí)行下面的代碼:
- for(int i=0;i<m_InitNum;i++)
- {
- CWorkerThread* thr = new CWorkerThread();
- AppendToIdleList(thr);
- thr->SetThreadPool(this);
- thr->Start(); //begin the thread,the thread wait for job
- }
在該代碼中,我們將創(chuàng)建m_InitNum個(gè)線程,創(chuàng)建之后即調(diào)用AppendToIdleList放入Idle鏈表中,由于目前沒有任務(wù)分發(fā)給這些線程,因此線程執(zhí)行Start后將自己掛起。
事實(shí)上,線程池中容納的線程數(shù)目并不是一成不變的,其會(huì)根據(jù)執(zhí)行負(fù)載進(jìn)行自動(dòng)伸縮。為此在CThreadPool中設(shè)定四個(gè)變量:
m_InitNum:處世創(chuàng)建時(shí)線程池中的線程的個(gè)數(shù)。
m_MaxNum:當(dāng)前線程池中所允許并發(fā)存在的線程的最大數(shù)目。
m_AvailLow:當(dāng)前線程池中所允許存在的空閑線程的最小數(shù)目,如果空閑數(shù)目低于該值,表明負(fù)載可能過重,此時(shí)有必要增加空閑線程池的數(shù)目。實(shí)現(xiàn)中我們總是將線程調(diào)整為m_InitNum個(gè)。
m_AvailHigh:當(dāng)前線程池中所允許的空閑的線程的最大數(shù)目,如果空閑數(shù)目高于該值,表明當(dāng)前負(fù)載可能較輕,此時(shí)將刪除多余的空閑線程,刪除后調(diào)整數(shù)也為m_InitNum個(gè)。
m_AvailNum:目前線程池中實(shí)際存在的線程的個(gè)數(shù),其值介于m_AvailHigh和m_AvailLow之間。如果線程的個(gè)數(shù)始終維持在m_AvailLow和m_AvailHigh之間,則線程既不需要?jiǎng)?chuàng)建,也不需要?jiǎng)h除,保持平衡狀態(tài)。因此如何設(shè)定m_AvailLow和m_AvailHigh的值,使得線程池最大可能的保持平衡態(tài),是線程池設(shè)計(jì)必須考慮的問題。
線程池在接受到新的任務(wù)之后,線程池首先要檢查是否有足夠的空閑池可用。檢查分為三個(gè)步驟:
(1)檢查當(dāng)前處于忙碌狀態(tài)的線程是否達(dá)到了設(shè)定的最大值m_MaxNum,如果達(dá)到了,表明目前沒有空閑線程可用,而且也不能創(chuàng)建新的線程,因此必須等待直到有線程執(zhí)行完畢返回到空閑隊(duì)列中。
(2)如果當(dāng)前的空閑線程數(shù)目小于我們?cè)O(shè)定的最小的空閑數(shù)目m_AvailLow,則我們必須創(chuàng)建新的線程,默認(rèn)情況下,創(chuàng)建后的線程數(shù)目應(yīng)該為m_InitNum,因此創(chuàng)建的線程數(shù)目應(yīng)該為( 當(dāng)前空閑線程數(shù)與m_InitNum);但是有一種特殊情況必須考慮,就是現(xiàn)有的線程總數(shù)加上創(chuàng)建后的線程數(shù)可能超過m_MaxNum,因此我們必須對(duì)線程的創(chuàng)建區(qū)別對(duì)待。
- if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
- CreateIdleThread(m_InitNum-m_IdleList.size());
- else
- CreateIdleThread(m_MaxNum-GetAllNum());
如果創(chuàng)建后總數(shù)不超過m_MaxNum,則創(chuàng)建后的線程為m_InitNum;如果超過了,則只創(chuàng)建( m_MaxNum-當(dāng)前線程總數(shù) )個(gè)。
(3)調(diào)用GetIdleThread方法查找空閑線程。如果當(dāng)前沒有空閑線程,則掛起;否則將任務(wù)指派給該線程,同時(shí)將其移入忙碌隊(duì)列。
當(dāng)線程執(zhí)行完畢后,其會(huì)調(diào)用MoveToIdleList方法移入空閑鏈表中,其中還調(diào)用m_IdleCond.Signal()方法,喚醒GetIdleThread()中可能阻塞的線程。
CJob
CJob類相對(duì)簡(jiǎn)單,其封裝了任務(wù)的基本的屬性和方法,其中最重要的是Run方法,代碼如下:
- class CJob
- {
- private:
- int m_JobNo; //The num was assigned to the job
- char* m_JobName; //The job name
- CThread *m_pWorkThread; //The thread associated with the job
- public:
- CJob( void );
- virtual ~CJob();
- int GetJobNo(void) const { return m_JobNo; }
- void SetJobNo(int jobno){ m_JobNo = jobno;}
- char* GetJobName(void) const { return m_JobName; }
- void SetJobName(char* jobname);
- CThread *GetWorkThread(void){ return m_pWorkThread; }
- void SetWorkThread ( CThread *pWorkThread ){
- m_pWorkThread = pWorkThread;
- }
- virtual void Run ( void *ptr ) = 0;
- };
線程池使用示例
至此我們給出了一個(gè)簡(jiǎn)單的與具體任務(wù)無關(guān)的線程池框架。使用該框架非常的簡(jiǎn)單,我們所需要的做的就是派生CJob類,將需要完成的任務(wù)實(shí)現(xiàn)在Run方法中。然后將該Job交由CThreadManage去執(zhí)行。下面我們給出一個(gè)簡(jiǎn)單的示例程序
- class CXJob:public CJob
- {
- public:
- CXJob(){i=0;}
- ~CXJob(){}
- void Run(void* jobdata) {
- printf("The Job comes from CXJOB/n");
- sleep(2);
- }
- };
- class CYJob:public CJob
- {
- public:
- CYJob(){i=0;}
- ~CYJob(){}
- void Run(void* jobdata) {
- printf("The Job comes from CYJob/n");
- }
- };
- main()
- {
- CThreadManage* manage = new CThreadManage(10);
- for(int i=0;i<40;i++)
- {
- CXJob* job = new CXJob();
- manage->Run(job,NULL);
- }
- sleep(2);
- CYJob* job = new CYJob();
- manage->Run(job,NULL);
- manage->TerminateAll();
- }
CXJob和CYJob都是從Job類繼承而來,其都實(shí)現(xiàn)了Run接口。CXJob只是簡(jiǎn)單的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒鐘。在主程序中我們初始創(chuàng)建10個(gè)工作線程。然后分別執(zhí)行40次CXJob和一次CYJob。
C++ 線程池的封裝實(shí)現(xiàn)
為了充分利用多核的優(yōu)勢(shì),我們利用多線程來進(jìn)行任務(wù)處理,但線程也同樣不能濫用,會(huì)帶來一下幾個(gè)問題:
1)線程本身存在開銷,系統(tǒng)必須為每個(gè)線程分配如棧,TLS(線程局部存儲(chǔ)),寄存器等。
2)線程管理會(huì)給系統(tǒng)帶來開銷,context切換同樣會(huì)給系統(tǒng)帶來成本。
3)線程本身是可以重用的資源,不需要每次都進(jìn)行初始化。
所以往往在使用中,我們無需把線程與task任務(wù)進(jìn)行一對(duì)一對(duì)應(yīng),只需要預(yù)先初始化有限的線程個(gè)數(shù)來處理無限的task任務(wù)即可,線程池應(yīng)運(yùn)而生,原理也就是如此。
主要含有三個(gè)隊(duì)列
工作隊(duì)列
工作線程隊(duì)列
忙碌線程隊(duì)列
工作隊(duì)列是一個(gè)阻塞隊(duì)列,任務(wù)(仿函數(shù))任務(wù)不算被push進(jìn)來(notify阻塞獲取的工作線程),工作線程隊(duì)列(一直不變)則從該隊(duì)列中獲取任務(wù)執(zhí)行(wait獲取,當(dāng)任務(wù)隊(duì)列為空時(shí)阻塞等待通知),如果獲取到任務(wù),則將線程會(huì)進(jìn)入忙碌線程隊(duì)列中,執(zhí)行任務(wù)的仿函數(shù),當(dāng)工作完成,重新移出工作線程隊(duì)列。
定義線程池專屬異常:
- struct TC_ThreadPool_Exception : public TC_Exception
- {
- TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){};
- TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){};
- ~TC_ThreadPool_Exception () throw (){};
- };
- /**
- * @brief 用通線程池類, 與tc_functor, tc_functorwrapper配合使用.
- *
- * 使用方式說明:
- * 1 采用tc_functorwrapper封裝一個(gè)調(diào)用
- * 2 用tc_threadpool對(duì)調(diào)用進(jìn)行執(zhí)行
- * 具體示例代碼請(qǐng)參見:test/test_tc_thread_pool.cpp
- */
- /**線程池本身繼承自鎖,可以幫助鎖定**/
- class TC_ThreadPool : public TC_ThreadLock
- {
- public:
- /**
- * @brief 構(gòu)造函數(shù)
- *
- */
- TC_ThreadPool ();
- /**
- * @brief 析構(gòu), 會(huì)停止所有線程
- */
- ~TC_ThreadPool ();
- /**
- * @brief 初始化.
- *
- * @param num 工作線程個(gè)數(shù)
- */
- void init(size_t num);
- /**
- * @brief 獲取線程個(gè)數(shù).
- *
- * @return size_t 線程個(gè)數(shù)
- */
- size_t getThreadNum() { Lock sync(* this); return _jobthread. size(); }
- /**
- * @brief 獲取線程池的任務(wù)數(shù)( exec添加進(jìn)去的).
- *
- * @return size_t 線程池的任務(wù)數(shù)
- */
- size_t getJobNum() { return _jobqueue. size(); }
- /**
- * @brief 停止所有線程
- */
- void stop();
- /**
- * @brief 啟動(dòng)所有線程
- */
- void start();
- /**
- * @brief 啟動(dòng)所有線程并, 執(zhí)行初始化對(duì)象.
- *
- * @param ParentFunctor
- * @param tf
- */
- template<class ParentFunctor>
- void start(const TC_FunctorWrapper< ParentFunctor> &tf)
- {
- for(size_t i = 0; i < _jobthread .size(); i++)
- {
- _startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
- }
- start();
- }
- /**
- * @brief 添加對(duì)象到線程池執(zhí)行,該函數(shù)馬上返回,
- * 線程池的線程執(zhí)行對(duì)象
- */
- template<class ParentFunctor>
- void exec(const TC_FunctorWrapper< ParentFunctor> &tf)
- {
- _jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
- }
- /**
- * @brief 等待所有工作全部結(jié)束(隊(duì)列無任務(wù), 無空閑線程).
- *
- * @param millsecond 等待的時(shí)間( ms), -1:永遠(yuǎn)等待
- * @return true, 所有工作都處理完畢
- * false,超時(shí)退出
- */
- bool waitForAllDone(int millsecond = -1);
- public:
- /**
- * @brief 線程數(shù)據(jù)基類,所有線程的私有數(shù)據(jù)繼承于該類
- */
- class ThreadData
- {
- public:
- /**
- * @brief 構(gòu)造
- */
- ThreadData(){};
- /**
- * @brief 析夠
- */
- virtual ~ThreadData(){};
- /**
- * @brief 生成數(shù)據(jù).
- *
- * @ param T
- * @return ThreadData*
- */
- template<typename T>
- static T* makeThreadData()
- {
- return new T;
- }
- };
- /**
- * @brief 設(shè)置線程數(shù)據(jù).
- *
- * @param p 線程數(shù)據(jù)
- */
- static void setThreadData(ThreadData *p);
- /**
- * @brief 獲取線程數(shù)據(jù).
- *
- * @return ThreadData* 線程數(shù)據(jù)
- */
- static ThreadData* getThreadData();
- /**
- * @brief 設(shè)置線程數(shù)據(jù), key需要自己維護(hù).
- *
- * @param pkey 線程私有數(shù)據(jù)key
- * @param p 線程指針
- */
- static void setThreadData(pthread_key_t pkey, ThreadData *p);
- /**
- * @brief 獲取線程數(shù)據(jù), key需要自己維護(hù).
- *
- * @param pkey 線程私有數(shù)據(jù)key
- * @return 指向線程的ThreadData*指針
- */
- static ThreadData* getThreadData(pthread_key_t pkey);
- protected:
- /**
- * @brief 釋放資源.
- *
- * @param p
- */
- static void destructor(void *p);
- /**
- * @brief 初始化key
- */
- class KeyInitialize
- {
- public:
- /**
- * @brief 初始化key
- */
- KeyInitialize()
- {
- int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);
- if(ret != 0)
- {
- throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);
- }
- }
- /**
- * @brief 釋放key
- */
- ~KeyInitialize()
- {
- pthread_key_delete(TC_ThreadPool::g_key);
- }
- };
- /**
- * @brief 初始化key的控制
- */
- static KeyInitialize g_key_initialize;
- /**
- * @brief 數(shù)據(jù)key
- */
- static pthread_key_t g_key;
- protected:
- /**
- * @brief 線程池中的工作線程
- */
- class ThreadWorker : public TC_Thread
- {
- public:
- /**
- * @brief 工作線程構(gòu)造函數(shù).
- *
- * @ param tpool
- */
- ThreadWorker(TC_ThreadPool *tpool);
- /**
- * @brief 通知工作線程結(jié)束
- */
- void terminate();
- protected:
- /**
- * @brief 運(yùn)行
- */
- virtual void run();
- protected:
- /**
- * 線程池指針
- */
- TC_ThreadPool * _tpool;
- /**
- * 是否結(jié)束線程
- */
- bool _bTerminate;
- };
- protected:
- /**
- * @brief 清除
- */
- void clear();
- /**
- * @brief 獲取任務(wù), 如果沒有任務(wù), 則為NULL.
- *
- * @return TC_FunctorWrapperInterface*
- */
- TC_FunctorWrapperInterface * get(ThreadWorker *ptw);
- /**
- * @brief 獲取啟動(dòng)任務(wù).
- *
- * @return TC_FunctorWrapperInterface*
- */
- TC_FunctorWrapperInterface * get();
- /**
- * @brief 空閑了一個(gè)線程.
- *
- * @param ptw
- */
- void idle(ThreadWorker *ptw);
- /**
- * @brief 通知等待在任務(wù)隊(duì)列上的工作線程醒來
- */
- void notifyT();
- /**
- * @brief 是否處理結(jié)束.
- *
- * @return bool
- */
- bool finish();
- /**
- * @brief 線程退出時(shí)調(diào)用
- */
- void exit();
- friend class ThreadWorker;
- protected:
- /**
- * 任務(wù)隊(duì)列
- */
- TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue;
- /**
- * 啟動(dòng)任務(wù)
- */
- TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue;
- /**
- * 工作線程
- */
- std::vector<ThreadWorker *> _jobthread;
- /**
- * 繁忙線程
- */
- std::set<ThreadWorker *> _busthread;
- /**
- * 任務(wù)隊(duì)列的鎖
- */
- TC_ThreadLock _tmutex;
- /**
- * 是否所有任務(wù)都執(zhí)行完畢
- */
- bool _bAllDone;
- };
工作線程設(shè)計(jì)如下:
- TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
- : _tpool (tpool)
- , _bTerminate ( false)
- {
- }
- void TC_ThreadPool ::ThreadWorker::terminate()
- {
- _bTerminate = true;
- _tpool->notifyT();
- }
- void TC_ThreadPool ::ThreadWorker::run()
- {
- //調(diào)用初始化部分
- TC_FunctorWrapperInterface *pst = _tpool->get();
- if(pst)
- {
- try
- {
- (*pst)();
- }
- catch ( ... )
- {
- }
- delete pst;
- pst = NULL;
- }
- //調(diào)用處理部分
- while (! _bTerminate)
- {
- TC_FunctorWrapperInterface *pfw = _tpool->get( this);
- if(pfw != NULL)
- {
- auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
- try
- {
- (*pfw)();
- }
- catch ( ... )
- {
- }
- _tpool->idle( this);
- }
- }
- //結(jié)束
- _tpool->exit();
- }
- 每個(gè)工作線程在剛開始時(shí)都會(huì)執(zhí)行一下初始化操作,并進(jìn)入一個(gè)無限循環(huán)的部分//調(diào)用處理部分
- while (! _bTerminate)
- {
- TC_FunctorWrapperInterface *pfw = _tpool->get( this);
- if(pfw != NULL)
- {
- auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
- try
- {
- (*pfw)();
- }
- catch ( ... )
- {
- }
- _tpool->idle( this);
- }
- }
該工作主要是無限的從線程池的工作隊(duì)列中獲取任務(wù)并執(zhí)行,如果成功獲取任務(wù),則會(huì)將線程移進(jìn)忙碌隊(duì)列:
- TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw)
- {
- TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
- if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
- {
- return NULL;
- }
- {
- Lock sync( _tmutex);
- _busthread. insert(ptw);
- }
- return pFunctorWrapper;
- }
執(zhí)行完,移回工作線程隊(duì)列:_tpool->idle( this);
- void TC_ThreadPool:: idle(ThreadWorker *ptw)
- {
- Lock sync( _tmutex);
- _busthread. erase(ptw);
- //無繁忙線程, 通知等待在線程池結(jié)束的線程醒過來
- if( _busthread. empty())
- {
- _bAllDone = true;
- _tmutex.notifyAll();
- }
- }
此處jobThread隊(duì)列初始化后不會(huì)改變(因?yàn)闆]有實(shí)現(xiàn)自增長(zhǎng)功能),所以非線程安全的vector隊(duì)列即可,busthread的忙碌線程隊(duì)列會(huì)被移進(jìn)移出,但是操作會(huì)自帶Lock sync( _tmutex),該互斥量是線程池本身繼承的,所以是共有的,也無需另外使用線程安全的TC_ThreadQueue,使用vector即可。
TC_ThreadPool:: idle中的
- if( _busthread. empty())
- {
- _bAllDone = true;
- _tmutex.notifyAll();
- }
主要用于當(dāng)線程池工作起來后的waitForAllDone方法:
- bool TC_ThreadPool:: waitForAllDone( int millsecond)
- {
- Lock sync( _tmutex);
- start1:
- //任務(wù)隊(duì)列和繁忙線程都是空的
- if (finish())
- {
- return true;
- }
- //永遠(yuǎn)等待
- if(millsecond < 0)
- {
- _tmutex.timedWait(1000);
- goto start1;
- }
- int64_t iNow = TC_Common:: now2ms();
- int m = millsecond;
- start2:
- bool b = _tmutex.timedWait(millsecond);
- //完成處理了
- if(finish())
- {
- return true;
- }
- if(!b)
- {
- return false;
- }
- millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
- goto start2;
- return false;
- }
- _tmutex.timedWait(millsecond)方法喚醒。反復(fù)判斷是否所有的工作是否完成:
- bool TC_ThreadPool:: finish()
- {
- return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
- }
整體cpp實(shí)現(xiàn)如下:
- TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;
- pthread_key_t TC_ThreadPool::g_key ;
- void TC_ThreadPool::destructor( void *p)
- {
- ThreadData *ttd = ( ThreadData*)p;
- if(ttd)
- {
- delete ttd;
- }
- }
- void TC_ThreadPool::exit()
- {
- TC_ThreadPool:: ThreadData *p = getThreadData();
- if(p)
- {
- delete p;
- int ret = pthread_setspecific( g_key, NULL );
- if(ret != 0)
- {
- throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
- }
- }
- _jobqueue. clear();
- }
- void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p)
- {
- TC_ThreadPool:: ThreadData *pOld = getThreadData();
- if(pOld != NULL && pOld != p)
- {
- delete pOld;
- }
- int ret = pthread_setspecific( g_key, ( void *)p);
- if(ret != 0)
- {
- throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
- }
- }
- TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData ()
- {
- return ( ThreadData *) pthread_getspecific( g_key);
- }
- void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p)
- {
- TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);
- if(pOld != NULL && pOld != p)
- {
- delete pOld;
- }
- int ret = pthread_setspecific(pkey, ( void *)p);
- if(ret != 0)
- {
- throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
- }
- }
- TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey)
- {
- return ( ThreadData *) pthread_getspecific(pkey);
- }
- TC_ThreadPool::TC_ThreadPool()
- : _bAllDone ( true)
- {
- }
- TC_ThreadPool::~TC_ThreadPool()
- {
- stop();
- clear();
- }
- void TC_ThreadPool::clear()
- {
- std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
- while(it != _jobthread. end())
- {
- delete (*it);
- ++it;
- }
- _jobthread. clear();
- _busthread. clear();
- }
- void TC_ThreadPool::init( size_t num)
- {
- stop();
- Lock sync(* this);
- clear();
- for( size_t i = 0; i < num; i++)
- {
- _jobthread. push_back( new ThreadWorker( this));
- }
- }
- void TC_ThreadPool::stop()
- {
- Lock sync(* this);
- std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
- while(it != _jobthread. end())
- {
- if ((*it)-> isAlive())
- {
- (*it)-> terminate();
- (*it)-> getThreadControl().join ();
- }
- ++it;
- }
- _bAllDone = true;
- }
- void TC_ThreadPool::start()
- {
- Lock sync(* this);
- std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
- while(it != _jobthread. end())
- {
- (*it)-> start();
- ++it;
- }
- _bAllDone = false;
- }
- bool TC_ThreadPool:: finish()
- {
- return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
- }
- bool TC_ThreadPool::waitForAllDone( int millsecond)
- {
- Lock sync( _tmutex);
- start1:
- //任務(wù)隊(duì)列和繁忙線程都是空的
- if (finish ())
- {
- return true;
- }
- //永遠(yuǎn)等待
- if(millsecond < 0)
- {
- _tmutex.timedWait(1000);
- goto start1;
- }
- int64_t iNow = TC_Common:: now2ms();
- int m = millsecond;
- start2:
- bool b = _tmutex.timedWait(millsecond);
- //完成處理了
- if(finish ())
- {
- return true;
- }
- if(!b)
- {
- return false;
- }
- millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
- goto start2;
- return false;
- }
- TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw)
- {
- TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
- if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
- {
- return NULL;
- }
- {
- Lock sync( _tmutex);
- _busthread. insert(ptw);
- }
- return pFunctorWrapper;
- }
- TC_FunctorWrapperInterface *TC_ThreadPool::get()
- {
- TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
- if(! _startqueue. pop_front(pFunctorWrapper))
- {
- return NULL;
- }
- return pFunctorWrapper;
- }
- void TC_ThreadPool::idle( ThreadWorker *ptw)
- {
- Lock sync( _tmutex);
- _busthread. erase(ptw);
- //無繁忙線程, 通知等待在線程池結(jié)束的線程醒過來
- if( _busthread. empty())
- {
- _bAllDone = true;
- _tmutex.notifyAll();
- }
- }
- void TC_ThreadPool::notifyT()
- {
- _jobqueue. notifyT();
- }
線程池使用后記
線程池適合場(chǎng)合
事 實(shí)上,線程池并不是萬能的。它有其特定的使用場(chǎng)合。線程池致力于減少線程本身的開銷對(duì)應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開銷與線程執(zhí)行任 務(wù)相比不可忽略。如果線程本身的開銷相對(duì)于線程任務(wù)執(zhí)行開銷而言是可以忽略不計(jì)的,那么此時(shí)線程池所帶來的好處是不明顯的,比如對(duì)于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時(shí)間較長(zhǎng),開銷較大,那么此時(shí),我們采用線程池未必是理想的方法,我們可以選擇“即時(shí)創(chuàng)建,即時(shí)銷毀”的策略。
總之線程池通常適合下面的幾個(gè)場(chǎng)合:
(1) 單位時(shí)間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時(shí)間短
(2) 對(duì)實(shí)時(shí)性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時(shí)要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
(3) 必須經(jīng)常面對(duì)高突發(fā)性事件,比如Web服務(wù)器,如果有足球轉(zhuǎn)播,則服務(wù)器將產(chǎn)生巨大的沖擊。此時(shí)如果采取傳統(tǒng)方法,則必須不停的大量產(chǎn)生線程,銷毀線程。此時(shí)采用動(dòng)態(tài)線程池可以避免這種情況的發(fā)生。
新聞熱點(diǎn)
疑難解答