Tomat組件研究之ThreadPool
2024-07-21 02:14:09
供稿:網友
tomat組件研究之threadpool
前幾天曾向大家承諾,要完成thredpool,tomcat ssl的文章,今天終于有時間可以寫一點。tomcat的thradpool不同于apache的common thradpool,tomcat的threadpool是專門為tomcat服務的,確切說是為tomcat處理http連接服務地。經過研究發現,apache用了及其難懂而又隱晦的方法寫了這幾個threadpool類,雖然僅簡單的幾個類,但其難理解的程度卻是驚人的。在理解之后看,里面確實又值得我們學習的東西,但也有好多無用的東西。看來我們也不要盲目崇拜apache。廢話少說,下面直入正題.
threadpool的class圖及整體結構:
一.每個類的說明:
1. org.apache.tomcat.util.threads.threadpool
線程池類,本類僅維護一定數量的線程處理對象,而把具體執行操作的任務委派給其他對象(controlrunnable),apache并沒有把過多的功能交給這個類,而僅只是讓這個類維護線程的創建,銷毀,取出,歸還等功能。下面我們看該類的代碼:
public class threadpool {
(1) 線程池常量,對應的變量設置就不再列出了
//最大線程數
public static final int max_threads = 200;
//最大線程數的最小數(最大線程的數量不能小于這個數)
public static final int max_threads_min = 10;
//最大空閑線程數
public static final int max_spare_threads = 50;
//最小空閑線程數(當線程池初始化時就啟動這么多線程)
public static final int min_spare_threads = 4;
//最大等待時間(1分鐘)
public static final int work_wait_timeout = 60*1000;
(2) start方法
//對每個線程實例本方法僅被調用一次
public synchronized void start() {
//是否停止線程
stopthepool=false;
//當前生成線程的數量
currentthreadcount = 0;
//當前使用線程的數量
currentthreadsbusy = 0;
//如果當前設置的各個參數不正確,調整一下
adjustlimits();
//生成空的線程池
pool = new controlrunnable[maxthreads];
//啟動最小線數線程
openthreads(minsparethreads);
//啟動監視線程,監視線程池內部狀態
monitor = new monitorrunnable(this);
}
(3) openthreads方法
/**
* 啟動指定數量(toopen)的線程
* 這個方法很是奇怪,這個toopen并不是本次打開的的線程數
* 而是本次要打開的和以前已經打開的線程數總和
*/
protected void openthreads(int toopen) {
if(toopen > maxthreads) {
toopen = maxthreads;
}
//新打開的線程數放在已經存在的空閑線程后面(用數組存放)
for(int i = currentthreadcount ; i < toopen ; i++) {
pool[i - currentthreadsbusy] = new
controlrunnable(this);
}
currentthreadcount = toopen;
}
到這里我們感覺apache的做法好生奇怪,首先這個toopen,還有一點,以前我們寫連接池時,都時用list作為容器,一般有個當前的空閑線程數,但apache偏偏用數組作為容器來存放線程,用數組就要維護每種線程(新的,使用的,空閑的)在數組中的下標,若用list這些問題就沒了,我們只要get,add,remove一下就一切ok,非常方便。因為有了currentthreadsbusy,apache的當前的空閑線程數就必須用currentthreadcount- currentthreadsbusy計算得來。這就時我們為什么會看到上面那個奇怪得小循環。但用數組到底有什么好處呢,還是apache的人是豬頭(靠,他們不可能是豬頭)?,我們可能發現上面有個常量:
//最大線程數
public static final int max_threads = 200;
也就是說,默認最多池可以有200個線程,就是說有很多線程可能頻繁地從池中
取,放線程,如果用list效率將大打折扣,因此才用了數組。
(4) findcontrolrunnable方法,取得一個可用線程
private controlrunnable findcontrolrunnable() {
controlrunnable c=null;
if ( stopthepool ) {
throw new illegalstateexception();
}
//從池中取一個可用的線程.
synchronized(this) {
//當前所有的線程都被使用
while (currentthreadsbusy == currentthreadcount) {
//當前的線程數量小于最大線程數
if (currentthreadcount < maxthreads) {
//生成一定數量(minsparethreads)線程,這一點與
//我們做連接池時非常不一樣,我們往往只生成一個新連
//接,看來apache的東西真是值得我們學習
int toopen = currentthreadcount +
minsparethreads;
openthreads(toopen);
} else {
logfull(log, currentthreadcount,
maxthreads);
//如果所有線程(已到最大數)都被使用,則等待其他線
//程釋放.
try {
this.wait();
}catch(interruptedexception e) {
log.error("unexpected exception", e);
}
// pool was stopped. get away of the pool.
if( stopthepool) {
break;
}
}
}
// pool was stopped. get away of the pool.
if(0 == currentthreadcount || stopthepool) {
throw new illegalstateexception();
}
int pos = currentthreadcount - currentthreadsbusy - 1;
//經過上面一番折騰,在線程池里終于又有了空閑線程,下面取數
//組里最后一個線程
c = pool[pos];
//釋放當前線程池對該線程的引用
pool[pos] = null;
//當然,使用線程的數量也要加1
currentthreadsbusy++;
}
return c;
}
這個方法我們可以看出:
ø 線程池里存放的都是空閑線程
ø 新生成的線程放在已存在的線程后面(隊列)
ø 當取一個線程時,取隊列上最后一個可用線程
ø 當線程被取出去時,隊列釋放對該線程的引用,同時使用線程變量加1
ø 線程池對使用線程的維護僅通過currentthreadsbusy變量得已實現
(5) returncontroller方法,歸還一個線程對象到池中
這個方法還算好理解
protected synchronized void returncontroller(controlrunnable c) {
if(0 == currentthreadcount || stopthepool) {
c.terminate();
return;
}
//使用線程減1
currentthreadsbusy--;
//把釋放得線程放在空閑線程隊列得最后
pool[currentthreadcount - currentthreadsbusy - 1] = c;
//喚醒等待線程,告訴他們有可用的線程了,不要在那傻等
notify();
}
到這里我們看到,用數組作為容器存放線程真復雜,總讓我們小心翼翼地操作數組的下標,但沒辦法,還是這玩意效率高。
(6) runit方法
這個看起來丑陋的小方法,卻是tomat threadpool精華部分的入口,它是線程池提供給其他組件有能力參與線程池內部操作的接口。當有http請求進來的時候,tomcat會生成一個工作線程,然后傳入到這個方法這行具體操作。
public void runit(threadpoolrunnable r) {
if(null == r) {
throw new nullpointerexception();
}
//查找一個可用空閑線程處理具體任務
controlrunnable c = findcontrolrunnable();
c.runit(r);
}
(7) shutdown方法,關閉線程池,釋放資源
public synchronized void shutdown() {
if(!stopthepool) {
stopthepool = true;
//停止監聽器
monitor.terminate();
monitor = null;
//釋放空閑線程
for(int i = 0 ; i < (currentthreadcount - currentthreadsbusy - 1) ; i++) {
try {
pool[i].terminate();
} catch(throwable t) {
log.error("ignored exception while shutting down thread pool", t);
}
}
//重置使用線程使用標志
currentthreadsbusy = currentthreadcount = 0;
pool = null;
notifyall();
}
}
前面我們說過,使用線程的維護僅通過currentthreadsbusy變量,因此對已經被使用的線程對象根本無法回收,只能簡單地置currentthreadsbusy=0
2. org.apache.tomcat.util.threads.monitorrunnable
這個類僅有的目的就是維護線程池的線程數量,看到這里我們不僅又對apache的做法怪了起來,為什么要大動干戈做一個線程類去維護線程數量?其實線程池中線程數量的維護完全可以放在findcontrolrunnable及returncontroller方法中,但因為這兩個方法的頻繁調用,就對效率產生了影響,因此,歸根結底還是從效率方面作考慮。
(1) run方法
public void run() {
while(true) {
try {
//等待一段指定的時間,或有線程歸還時喚醒本線程
synchronized(this) {
this.wait(work_wait_timeout);
}
//停止.
if(shouldterminate) {
break;
}
//調用線程池的方法進行線程維護.
p.checksparecontrollers();
} catch(throwable t) {
threadpool.log.error("unexpected exception", t);
}
}
}
(2) checksparecontrollers方法
該方法屬于threadpool類,由monitorrunnable類的run方法調用。
protected synchronized void checksparecontrollers() {
if(stopthepool) {
return;
}
//當前空閑線程數量大于最大空閑線程,釋放多余線程
if((currentthreadcount - currentthreadsbusy) > maxsparethreads) {
//應該釋放的線程數
int tofree = currentthreadcount -
currentthreadsbusy -
maxsparethreads;
for(int i = 0 ; i < tofree ; i++) {
controlrunnable c = pool[currentthreadcount - currentthreadsbusy - 1];
c.terminate();
//從后向前釋放
pool[currentthreadcount - currentthreadsbusy - 1] = null;
//線程數量減1
currentthreadcount --;
}
}
}
通過這個方法,我們要把握住兩點:
ø 釋放空閑線程時按從后向前的順序
ø 釋放線程時總線程的數量要隨之減少
3. org.apache.tomcat.util.threads.controlrunnable
本類是一個靜態線程類,線程池里存放該類的實例。這個類主要被用來在線程池內部執行各種各樣的操作。
(1) 構造函數controlrunnable
controlrunnable(threadpool p) {
torun = null;
//停止標志
shouldterminate = false;
//運行標志,構造函數時該線程不運行
shouldrun = false;
this.p = p;
//類似線程本地數據操作,類threadwithattributes將稍后介紹
t = new threadwithattributes(p, this);
t.setdaemon(true);
t.setname(p.getname() + "-processor" + p.getsequence());
//啟動線程threadwithattributes
t.start();
//向池中增加線程
p.addthread( t, this );
nothdata=true;
}
可以看出該構造函數完成了以下幾個功能:
ø 用線程池對象p和本身(this)構造了threadwithattributes對象
threadwithattributes是用來代替threadlocal對象的,它的作用是把線程數據的本地化,避免了線程之間數據的訪問沖突,令一方面,它對線程屬性的訪問加以控制,阻止非信任的代碼訪問線程數據,我們將在下面作具體講解。
ø 啟動了threadwithattributes線程
ø 向池中增加線程
到這里我們可以看出,線程池里的線程存放在兩個不同的地方:用數組維護的線程池和用hashtable維護的threads對象:
protected hashtable threads=new hashtable();
key:threadwithattributes對象
value: controlrunnable對象
向池里增加線程將引起下面方法的調用:
(2) addthread,removethread方法
public void addthread( thread t, controlrunnable cr ) {
threads.put( t, cr );
for( int i=0; i<listeners.size(); i++ ) {
threadpoollistener tpl=(threadpoollistener)listeners.elementat(i);
//通知監聽器,有線程加入
tpl.threadstart(this, t);
}
}
public void removethread( thread t ) {
threads.remove(t);
for( int i=0; i<listeners.size(); i++ ) {
threadpoollistener tpl=(threadpoollistener)listeners.elementat(i);
//通知監聽器,有線程被刪除
tpl.threadend(this, t);
}
}
看到這個方法,我們可能會回想起好多地方都在用listener進行一些處理,listener到底為何物?其實我們仔細觀察一下就會發現,用listener處理其實是使用了gof23種模式種的observer模式。
(3) 關于observer模式
上面是一般的observer模式class圖,如果subject不使用接口而用一個類,并且把subject的含義擴展一下,不是對其所有的屬性而是部分屬性作觀察,則subject其實就是我們的threadpool類,observer其實就是listener接口。被觀察的對象就是threadpool類的threads,當對threads作put,remove時就會調用所有被注冊到threadpool類的listener方法,即上面的addthread,removethread。
observer模式的用意是:在對象間建立一個一對多的依賴關系,當一個對象的狀態發生變化是,所有依賴于它的對象能獲得通知并且能被隨之自動更新。
(4) java的observer模式
其實java已經將observer模式集成到語言里面,類java.util.observable相當于subject,java.util.observer就時observer接口,若要使用只要作簡單的繼承即可。但為了更好的擴展性及更明確的邏輯意義,threadpool類并無繼承observable類,而是用了自己的實現方式。
(5) runit方法
public synchronized void runit(threadpoolrunnable
torun) {
this.torun =
torun;
shouldrun = true;
this.notify();
}
該方法主要是運行一個指定的任務,具體的任務都被封裝在threadpoolrunnable接口里,該方法要注意以下幾點:
ø 該方法對每個線程僅被調用一次
ø 調用該方法不是馬上運行threadpoolrunnable指定的任務,而是通知controlrunnable”可以執行任務”。
具體的任務執行在下面的run方法里。
(6) run方法
public void run() {
try {
while(true) {
try {
synchronized(this) {
//當既不運行也不停止時,等待
if(!shouldrun && !shouldterminate) {
this.wait();
}
}
//停止
if( shouldterminate ) {
if( threadpool.log.isdebugenabled())
threadpool.log.debug( "terminate");
break;
}
try {
//初始化線程數據,僅一次
if(nothdata) {
if(
torun != null ) {
object thdata[]=torun.getinitdata();
t.setthreaddata(p, thdata);
}
nothdata = false;
}
//執行操作
if(shouldrun) {
//運行threadrunnalbe接口
if(
torun != null ) {
torun.runit(t.getthreaddata(p));
//controlrunnable也提供一般runnable接口參與處理的機會
} else if( torunrunnable != null ) {
torunrunnable.run();
} else {
if( threadpool.log.isdebugenabled())
threadpool.log.debug( "no
torun ???");
}
}
} catch(throwable t) {
//發生致命錯誤,從池中刪除線程
shouldterminate = true;
shouldrun = false;
p.notifythreadend(this);
} finally {
//運行結束回收線程
if(shouldrun) {
shouldrun = false;
p.returncontroller(this);
}
}
if(shouldterminate) {
break;
}
} catch(interruptedexception ie) {
//當執行wait時可能發生的異常(盡管這種異常不太可能發生)
p.log.error("unexpected exception", ie);
}
}
} finally {
//線程池停止或線程運行中發生錯誤時,從池中刪除線程
p.removethread(thread.currentthread());
}
}
結合runit方法,run方法能很容易看懂。
4. org.apache.tomcat.util.threads.threadpoollistener
前面我們曾提到過,該接口時observer模式的observer對象,該接口定義了兩個方法:
//當線程被創建時執行的方法
public void threadstart( threadpool tp, thread t);
//當線程被停止時執行的方法
public void threadend( threadpool tp, thread t);
關于該接口的詳細使用可以參考上面提到的observer模式。
5. org.apache.tomcat.util.threads.threadwithattributes
threadwithattributes是一個特殊的線程,該線程用來存放其他線程的屬性和數據,并且該類提供了類似threadlocal的功能,但比threadlocal效率更高。
(1) 構造函數threadwithattributes
public threadwithattributes(object control, runnable r) {
super(r);
this.control=control;
}
用control(threadpool)和r(controlrunnable)構造實例(具體可參見controlrunnable的構造方法)
(2) setnote方法
public final void setnote( object control, int id, object value ) {
if( this.control != control ) return;
notes[id]=value;
}
ø 用controlrunnable構造一個新的threadwithattributes對象避免了線程公用數據的爭奪
ø 根據control設置線程屬性,通過control可以阻止非信任的代碼操作線程屬性。
對其他操作線程屬性的方法都比較簡單就不再一一列出。
(3) java的threadlocal
java.lang.threadlocal是在java1.2中出現的“線程局部變量”,它為每個使用它的線程提供單獨的線程局部變量值的副本。每個線程只能看到與自己相聯系的值,而不知道別的線程可能正在使用或修改它們自己的副本。“線程局部變量”是一種能簡化多線程編程的好方法,可惜的是多數開發者可能不了解它。具體的信息可以參考:
http://www-900.ibm.com/developerworks/cn/java/j-threads/index3.shtml
6. org.apache.tomcat.util.threads.threadpoolrunnable
前面我們提到過,如果想把自己的代碼嵌入到線程池內部被執行,就必須實現該接口。具體可以參照controlrunnable的run方法。這個接口定義了下面兩個方法:
(1) getinitdata方法
public object[] getinitdata();
取得運行該對象所需要的初始化數據,對池中所有的線程來說應該返回相同類型的數據,否則處理機制將變的很復雜。
(2) runit方法
public void runit(object thdata[]);
嵌入執行的代碼將在這個方法里得以體現,以后我們將會看到,對tcp connection得處理也是在這里進行的。
至此,tomcat threadpool的介紹就算基本結束,對tomcat threadpool始終要把握住下面幾點:
ø tomcat threadpool僅提供了對線程的管理維護功能
ø 池所執行的操作有外部組件去實現
ø 從池的設計可以看出一點面向組件(cop)編程的痕跡
二.threadpool在處理tcp connection中的應用
在接下來的內容中我們將演示tomat是如何在指定的端口監聽http連接,并利用threadpool生成一個線程處理接受的請求。
1. org.apache.tomcat.util.net.pooltcpendpoint
類pooltcpendpoint主要是被用來處理接受到的http連接,處理方式是處理原始的socket,下面我們看幾個重要的方法:
(1) initendpoint方法
對該方法,現在我們可以暫時不要考慮太多,只要知道在初始化serversocket的工作就足夠了。
public void initendpoint() throws ioexception, instantiationexception {
try {
//創建serversocket工廠
if(factory==null)
factory=serversocketfactory.getdefault();
//創建serversocket,將被用于在指定的端口(8080)監聽連接
if(serversocket==null) {
try {
if (inet == null) {
serversocket = factory.createsocket(port, backlog);
} else {
serversocket = factory.createsocket(port, backlog, inet);
}
} catch ( bindexception be ) {
throw new bindexception(be.getmessage() + ":" + port);
}
}
//設定連接的超時限制時間
if( servertimeout >= 0 )
serversocket.setsotimeout( servertimeout );
} catch( ioexception ex ) {
throw ex;
} catch( instantiationexception ex1 ) {
throw ex1;
}
//保證初始化一次
initialized = true;
}
(2) startendpoint方法
該方法將在tocmat啟動時被調用,主要作用時啟動線程池并生成監聽線程。
public void startendpoint() throws ioexception, instantiationexception {
if (!initialized) {
initendpoint();
}
//tp是外部組件傳進來的threadpool對象,這里tomcat啟動了該線程池
if(ispool) {
tp.start();
}
running = true;
//生成工作線程監聽http連接
if(ispool) {
listener = new tcpworkerthread(this);
tp.runit(listener);
} else {
log.error("xxx error - need pool !");
}
}
下面將向大家描述,工作線程是如何監聽http連接的:
2. org.apache.tomcat.util.net.tcpworkerthread
該類是pooltcpendpoint的內部類,它實現了threadpoolrunnable接口執行http連接監聽和請求處理。(class tcpworkerthread implements threadpoolrunnable)
(1) 構造函數tcpworkerthread
該方法的主要目的是通過pooltcpendpoint對象生成一個實例,并且在緩存中生成一定數量的tcpconnection對象。
public tcpworkerthread(pooltcpendpoint endpoint) {
this.endpoint = endpoint;
if( usepool ) {
//緩存初始化simplepool為緩存對象,可先不理會其實現細節
connectioncache = new simplepool(endpoint.getmaxthreads());
for(int i = 0;i< endpoint.getmaxthreads()/2 ; i++) {
connectioncache.put(new tcpconnection());
}
}
}
我們目的是先弄清楚http的監聽及處理,對其他細節可先不于深究。
(2) getinitdata方法
對該方法的描述前面已經說過,大家還記得否?本方法主要是取得線程的初始化數據。
public object[] getinitdata() {
if( usepool ) {
return endpoint.getconnectionhandler().init();
} else {
object obj[]=new object[2];
//第二個參數存放http請求處理器(可先不考慮細節)
obj[1]= endpoint.getconnectionhandler().init();
//第一個參數存放tcpconnection對象
obj[0]=new tcpconnection();
return obj;
}
}
關于第二個參數,其實是初始化了http請求處理器及其他的信息,大家可先不究其細節。只要能認識到這個方法是返回線程初始化數據即可。
(3) runit方法
前面我們說過,嵌入到線程池執行的代碼要寫在這個方法里,這個方法是http監聽的核心,我們看具體實現:
public void runit(object perthrdata[]) {
if (endpoint.isrunning()) {
socket s = null;
//在指定的端口(8080)監聽客戶端連接
try {
s = endpoint.acceptsocket();
} finally {
//當接受到一個連接后繼續啟動下一個線程進行監聽
if (endpoint.isrunning()) {
endpoint.tp.runit(this);
}
}
if (null != s) {
try {
if(endpoint.getserversocketfactory()!=null) {
//客戶端與服務器第一次握手,主要用于ssi連接(即https) endpoint.getserversocketfactory().handshake(s);
}
} catch (throwable t) {
pooltcpendpoint.log.debug("handshake failed", t);
try {
s.close();
} catch (ioexception e) {
}
return;
}
tcpconnection con = null;
try {
if( usepool ) {
//從緩存中取一個tcpconnection對象
con=(tcpconnection)connectioncache.get();
if( con == null ) {
con = new tcpconnection();
}
} else {
//若不使用緩存從初始化數據中取一個tcpconnection對象
con = (tcpconnection) perthrdata[0];
perthrdata = (object []) perthrdata[1];
}
//設定剛生成tcpconnection對象
con.setendpoint(endpoint);
con.setsocket(s);
endpoint.setsocketoptions( s );
//把tcpconnection及所需要的初始化數據傳給http處理器處理
//在process處理中將把原始的socket流解析成request對象傳
//給容器調用
endpoint.getconnectionhandler().processconnection(con, perthrdata);
} catch (socketexception se) {
try {
s.close();
} catch (ioexception e) {}
} catch (throwable t) {
try {
s.close();
} catch (ioexception e) {}
} finally {
if (con != null) {
con.recycle();
if (usepool) {
connectioncache.put(con);
}
}
}
}
}
}
請大家仔細而反復的多看一下上面帶陰影的注釋。通過上面我們看到工作線程作了如下的工作:
ø 啟動了線程池(線程池啟動時將生成指定數量的線程及監視線程)
ø 如果使用緩沖處理則預先生成指定數量的tcpconnection對象
ø 在指定的端口(默認是8080)監聽http連接
ø 當接收的一個連接時再啟動一個線程繼續監聽連接
ø 用接收的連接生成tcpconnection對象,即tomcat對http的處理是以tcpconnection對象為基礎的
ø 把生成的tcpconnection對象交由http process進行socket解析,最終生成request對象
要注意的是:tomcat并不是事先用指定數量的線程在端口監聽,而是當一個監聽完成后再啟動下一個監聽線程。
,歡迎訪問網頁設計愛好者web開發。