緩存作為計算機歷史上最重要的發明之一,對計算機歷史起到了舉足輕重的作用,因為緩存可以協調兩個速度不一致的組件之間的并行運作。內存作為CPU和非易失性存儲介質之間的緩存,避免CPU每次讀取指令,讀取數據都去速度緩慢的硬盤讀取。快速緩存作為內存和CPU之間的緩存進一步提高了CPU的效率,現在大部分CPU都支持指令預取,CPU會預測后面要執行的指令預取到快速緩存中。而我們平時也直接或間接地會用到緩存技術,那如果要自己實現一個線程安全的緩存,要注意哪些問題呢?我們一步步來探討這個問題。
假設我們提供一個服務,客戶端提供一個字符串,我們返回一個對應的Long數值(當然這只是為了演示方便舉的簡單例子),為了提高效率,我們不希望每次都重復計算,因此我們把計算結果保存在一個緩存里,如果下次有相同的請求過來就直接返回緩存中的數據。
首先我們把計算任務抽象成Compute接口:
public interface Compute<A,V>{ V compute(A args);}一個不使用緩存計算的類:
public class NoCache implements Compute<String, Long>{ @Override public Long compute(String args) { // TODO Auto-generated method stub return Long.valueOf(args); }}這樣每次都要重復計算,效率比較低,因此我們引入了一個Map來保存計算結果:
public class BasicCache1<A,V> implements Compute<A, V>{PRivate final Map<A, V> cache=new HashMap<>();private final Compute<A, V> c; public BasicCache1(Compute<A, V> c){ this.c=c;} @Overridepublic synchronized V compute(A args) throws Exception{ V ans=cache.get(args); if(ans==null) { ans=c.compute(args); cache.put(args, ans); } return ans;}}這里因為HashMap不是線程安全的,因此計算方法被寫成了同步方法,這樣的話,每次請求最后實際都是串行執行的,大大降低了系統的吞吐量。就像下面這幅圖表示的:

既然這樣,我們就改用ConcurrentHashMap試試:
public class BasicCache2<A,V> implements Compute<A, V>{ private final Map<A,V> cache=new ConcurrentHashMap<>(); private final Compute<A, V> c; public BasicCache2(Compute<A, V> c) { this.c=c; } @Override public V compute(A args) throws Exception { V ans=cache.get(args); if(ans==null) { ans=c.compute(args); cache.put(args, ans); } return ans; }}這里沒有同步compute操作,因此系統可以并發地執行請求,但是假如多個有相同參數的請求短時間內先后到達,這個時候先到達的請求還沒來得及把結果寫入緩存(因為計算耗時),后來的請求就會重復計算,降低了緩存的效率。一圖勝前言:

因此我們就想,能不能先更新緩存再去計算呢,這樣不就可以消除了重復計算嗎?聽起來不錯,可是我們如何在更新了緩存后獲取計算結果呢(因為這時計算還沒有完成)?這里就要用到JDK提供的Future和Callable接口,Callable接口和Runnable接口一樣,是線程任務的抽象接口,不同的是Callable的call方法可以返回一個Future對象,而Future對象的get方法會阻塞等待任務執行結果。既然有這么好的基礎設施,那我們趕緊開擼:
public class BasicCache3<A,V> implements Compute<A, V>{ private final Map<A, Future<V>> cache=new ConcurrentHashMap(); private final Compute<A, V> c; private ExecutorService executors=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()+1); public BasicCache3(Compute<A, V> c) { this.c=c; } @Override public V compute(final A args) throws Exception { Future<V> ans=cache.get(args); if(ans==null) { Callable<V> computeTask=new Callable<V>() { @Override public V call() throws Exception { return c.compute(args); } }; ans= executors.submit(computeTask); cache.put(args, ans); } return ans.get(); }} 上面這段代碼里把計算任務提交到線程池去執行,返回了一個結果句柄供后面獲取計算結果。可是仔細觀察后,我們發現似乎還是有點不對,這樣似乎減小了重復計算的概率,但是其實只是減小了發生的窗口,因為判斷是否在緩存中和put到緩存中兩個操作雖然單獨都是線程安全的,但是還是會發生先到達的請求還沒來得及put到緩存的情況,而其本質原因就是先檢查再插入這樣的復雜操作不是原子操作,就好比++這個操作,CPU需要先取原值,再操作加數,最后寫回原值也會出現后一次寫入覆蓋前一次的情況,本質都是因為復雜操作的非原子性。下圖演示了這種情況:

因此JDK中的ConcurrentMap接口提供了putIfAbsent的原子操作方法,可是如果我們像前一個例子中一樣先獲取計算結果的Future句柄,即便是我們不會重復更新緩存,計算任務還是會執行,依然沒達到緩存的效果,因此我們需要一個能夠在任務還沒啟動就可以獲取結果句柄,同時能夠自由控制任務的啟動、停止的東西。當然JDK里也有這樣的東西(這里插一句,JDK的concurrent包的代碼基本都是Doug Lea寫的,老頭子代碼寫的太牛逼了),就是FutureTask,既然如此,趕緊開擼:
public class BasicCache4<A,V> implements Compute<A, V>{private final ConcurrentMap<A, Future<V>> cache=new ConcurrentHashMap<>();private final Compute<A, V> c; public BasicCache4(Compute<A, V> c){ this.c=c;}@Overridepublic V compute(final A args) throws Exception{ Future<V> ans=cache.get(args); if(ans==null) { Callable<V> computeTask=new Callable<V>() { @Override public V call() throws Exception { return c.compute(args); } }; FutureTask<V> ft=new FutureTask<V>(computeTask); ans=cache.putIfAbsent(args, ft); if(ans==null)// { ans=ft; ft.run(); } } return ans.get();}}上面這段代碼中,我們創建了一個FutureTask任務,但是并沒有立即執行這個異步任務,而是先調用ConcurrentHashMap的putIfAbsent方法來嘗試把結果句柄更新到緩存中去,這個方法的返回值是Map中的舊值,因此如果返回的是null,也就是說原來緩存中不存在,那我們就啟動異步計算任務,而如果緩存中已經存在的話,我們就直接調用緩存中的Future對象的get方法獲取計算結果,如果其他請求中的計算任務還沒有執行完畢的話,get方法會阻塞直到計算完成。實際運行效果見下圖:

至此,我們算是構建了一個有效線程安全的緩存了,當然這個版本其實還是會有很多問題,比如如果異步計算任務被取消的話,我們應該循環重試,但是一方面我們為了簡單只考慮了正常情況,另一方面FutureTask是局部變量,在線程棧層面已經保證了其他線程或代碼無法拿到該對象。最后用一張xmind圖作為總結:

參考資料:《java Concurrency in Practice》
新聞熱點
疑難解答