最近項目需要用應(yīng)用緩存解決方案,選擇了目前比較流行的memcached作為分布式緩存,客戶端我們選擇了阿里改造后的memchaced-client-forjava,因為該客戶端經(jīng)過阿里內(nèi)部大量實際項目的線上運行,表現(xiàn)給力.
源碼分析
memcached本身是一個集中式的內(nèi)存緩存系統(tǒng),對于分布式的支持服務(wù)端并沒有實現(xiàn),只有通過客戶端實現(xiàn);再者,memcached是基于TCP/UDP進行通信,只要客戶端語言支持TCP/UDP即可實現(xiàn)客戶端,并且可以根據(jù)需要進行功能擴展,memchaced-client-forjava 既是使用java語言實現(xiàn)的客戶端,并且實現(xiàn)了自己的功能擴展.
幾個重要類的說明:
MemcachedCacheManager:管理類,負責緩存服務(wù)端,客戶端,以及相關(guān)資源池的初始化工作,獲取客戶端等等
MemcachedCache:memcached緩存實體類,實現(xiàn)了所有的緩存API,實際上也會調(diào)用MemcachedClient進行操作
MemcachedClient:memcached緩存客戶端,一個邏輯概念,負責與服務(wù)端實例的實際交互,通過調(diào)用sockiopool中的socket
SockIOPool:socket連接資源池,負責與memcached服務(wù)端進行交互
ClusterProcessor:集群內(nèi)數(shù)據(jù)異步操作工具類
客戶端可配置化
MemcachedCacheManager是入口,其start方法讀取配置文件memcached.xml,初始化各個組建,包括memcached客戶端,socket連接池以及集群節(jié)點.
memcached客戶端是個邏輯概念,并不是和memcached服務(wù)端實例一一對應(yīng)的,可以認為其是一個邏輯環(huán)上的某個節(jié)點,后面會講到hash一致性算法時涉及,該配置文件中,可配置一個或多個客戶端,每個客戶端可配置一個socketPool連接池,如下:
- <client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0”> //開源軟件:Vevb.com
- <errorHandler>com.alisoft.xplatform.asf.cache.memcached.MemcachedErrorHandler</errorHandler>
- </client>
擴容
socketpool連接池配置的才是真正連接的memcached服務(wù)實例,當然,你可以連接多個memcached服務(wù)實例,多個實例可以分布在一臺或者多臺物理機器上。這樣,隨著實際業(yè)務(wù)數(shù)據(jù)量的增加,可以對現(xiàn)有緩存容量進行擴容,只需在servers中增加memcached實例即可,或者增加多個socketpool配置項,配置如下:
- <socketpool name="pool0" failover="true" initConn="5" minConn="5" maxConn="250" maintSleep="5000" nagle="false" socketTO="3000" aliveCheck="true">
- <servers>192.168.1.66:11211,192.168.1.68:11211</servers>
- </socketpool>
初始化過程
上文提及的MemcachedCacheManager,該類功能包括有初始化各種資源池,獲取所有客戶端,重新加載配置文件以及集群復制等。我們重點分析方法start,該方法首先加載配置文件,然后初始化資源池,即方法initMemCacheClientPool,該方法中定義了三個資源池,即socket連接資源池socketpool,memcachedcache資源池cachepool,以及由客戶端組成的集群資源池clusterpool,這些資源池的數(shù)據(jù)結(jié)構(gòu)都是線程安全的ConcurrentHashMap,保證了并發(fā)效率,將配置信息分別實例化后,再分別放入對應(yīng)的資源池容器中,socket連接放入socketpool中,memcached客戶端放入cachepool中,定義的集群節(jié)點放入clusterpool中。
注意,在實例化socket連接池資源socketpool時,會調(diào)用每個pool的初始化方法pool.initialize(),來映射memcached實例到HASH環(huán)上,以及初始化socket連接.
單點問題
memcached的分布式,解決了容量水平擴容的問題,但是當某個節(jié)點失效時,還是會丟失一部分數(shù)據(jù),單點故障依然存在,分布式只是解決了數(shù)據(jù)整體失效問題,而在實際項目中,特別是GAP平臺適應(yīng)的企業(yè)級項目中,是不允許數(shù)據(jù)不一致的,所以對每一份保存的數(shù)據(jù)都需要進行容災(zāi)處理,那么對于定義的每個memcached客戶端,都至少增加一個新客戶端與其組成一個cluster集群,當更新或者查找數(shù)據(jù)時,會先定位到該集群中某個節(jié)點,如果該節(jié)點失效,就去另外一個節(jié)點進行操作。在實際項目中,通過合理規(guī)劃配置cluster和client(memcached客戶端),可以最大限度的避免單點故障,當所有client都失效時還會丟失數(shù)據(jù),在配置文件中,集群配置如下:
- <cluster name="cluster1" mode="active">
- <memCachedClients>mclient1,mclient2</memCachedClients>
- </cluster>
HASH一致性算法
在memcached支持分布式部署場景下,如何獲取一個memcached實例?如何平均分配memcached實例的存儲?這些需要一個算法來實現(xiàn),我們選擇的是HASH一致性算法,具體就體現(xiàn)在客戶端如何獲取一個連接memcached服務(wù)端的socket上,也就是如何定位memcached實例的問題?算法要求能夠根據(jù)每次提供的同一個key獲得同一個實例.
HASH閉環(huán)的初始化
本質(zhì)上,hash一致性算法是需要實現(xiàn)一個邏輯環(huán),如圖所示,環(huán)上所有的節(jié)點即為一個memcached實例,如何實現(xiàn)?其實是根據(jù)每個memcached實例所在的ip地址,將所有的實例映射到hash數(shù)值空間中,構(gòu)成一個閉合的圓環(huán)。
HASH環(huán)映射的初始化的代碼位于SocketIOPool.populateConsistentBuckets方法中,主要代碼如下:
- private void populateConsistentBuckets()
- {
- ……...
- for (int i = 0; i < servers.length; i++)
- {
- int thisWeight = 1;
- if (this.weights != null && this.weights[i] != null)
- thisWeight = this.weights[i];
- double factor = Math .floor(((double) (40 * this.servers.length * thisWeight)) / (double ) this.totalWeight);
- for (long j = 0; j < factor; j++)
- {
- byte[] d = md5.digest((servers[i] + "-" + j).getBytes());
- for (int h = 0; h < 4; h++)
- {
- // k 的值使用MD5hash算法計算獲得
- Long k = ((long) (d[3 + h * 4] & 0xFF) << 24)
- | ((long) (d[2 + h * 4] & 0xFF) << 16)
- | ((long) (d[1 + h * 4] & 0xFF) << 8)
- | ((long) (d[0 + h * 4] & 0xFF));
- // 用treemap來存儲memcached實例所在的ip地址,
- // 也就是將每個緩存實例所在的ip地址映射到由k組成的hash環(huán)上
- consistentBuckets.put(k, servers[i]);
- if (log.isDebugEnabled())
- log.debug("++++ added " + servers[i]
- + " to server bucket");
- }
- }
- ……...
- }
- }
獲取socket連接
在實際獲取memcahced實例所在服務(wù)器的soket時,只要使用基于同一個存儲對象的key的MD5Hash算法,就可以獲得相同的memcached實例所在的ip地址,也就是可以準確定位到hash環(huán)上相同的節(jié)點,代碼位于SocketIOPool.getSock方法中,主要代碼如下:
- public SockIO getSock(String key, Integer hashCode){
- ………….
- // from here on, we are working w/ multiple servers
- // keep trying different servers until we find one
- // making sure we only try each server one time
- Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));
- // get initial bucket
- // 通過key值計算hash值,使用的是基于MD5的算法
- long bucket = getBucket(key, hashCode);
- String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets .get(bucket) : buckets.g et((int) bucket);
- …………...
- }
- private long getBucket(String key, Integer hashCode)
- {
- / / 通過key值計算hash值,使用的是基于MD5的算法
- long hc = getHash(key, hashCode);
- if (this.hashingAlg == CONSISTENT_HASH)
- {
- return findPointFor(hc);
- } else
- {
- long bucket = hc % buckets.size();
- if (bucket < 0)
- bucket *= -1;
- return bucket;
- }
- }
- /**
- * Gets the first available key equal or above the given one, if none found,
- * returns the first k in the bucket
- *
- * @param k
- * key
- * @return
- */
- private Long findPointFor(Long hv)
- {
- // this works in java 6, but still want to release support for java5
- // Long k = this.consistentBuckets.ceilingKey( hv );
- // return ( k == null ) ? this.consistentBuckets.firstKey() : k;
- // 該consistentBuckets中存儲的是HASH結(jié)構(gòu)初始化時,存入的所有memcahced實例節(jié)點,也就是整個hash環(huán)
- // tailMap方法是取出大于等于hv的所有節(jié)點,并且是遞增有序的
- SortedMap<Long, String> tmap = this.consistentBuckets.tailMap(hv);
- // 如果tmap為空,就默認返回hash環(huán)上的第一個值,否則就返回最接近hv值的那個節(jié)點
- return (tmap.isEmpty()) ? this.consistentBuckets.firstKey() : tmap .firstKey();
- }
- /**
- * Returns a bucket to check for a given key.
- *
- * @param key
- * String key cache is stored under
- * @return int bucket
- */
- private long getHash(String key, Integer hashCode)
- {
- if (hashCode != null)
- {
- if (hashingAlg == CONSISTENT_HASH)
- return hashCode.longValue() & 0xffffffffL;
- else
- return hashCode.longValue();
- } else
- {
- switch (hashingAlg)
- {
- case NATIVE_HASH:
- return (long) key.hashCode();
- case OLD_COMPAT_HASH:
- return origCompatHashingAlg(key);
- case NEW_COMPAT_HASH:
- return newCompatHashingAlg(key);
- case CONSISTENT_HASH:
- return md5HashingAlg(key);
- default:
- // use the native hash as a default
- hashingAlg = NATIVE_HASH;
- return (long) key.hashCode();
- }
- }
- }
- /**
- * Internal private hashing method.
- *
- * MD5 based hash algorithm for use in the consistent hashing approach.
- *
- * @param key
- * @return
- */
- private static long md5HashingAlg(String key)
- {
- / /通過key值計算hash值,使用的是基于MD5的算法
- MessageDigest md5 = MD5.get();
- md5.reset();
- md5.update(key.getBytes());
- byte[] bKey = md5.digest();
- long res = ((long) (bKey[3] & 0xFF) << 24)
- | ((long) (bKey[2] & 0xFF) << 16)
- | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
- return res;
- }
通過以上代碼的分析,整個memcahced服務(wù)端實例HASH環(huán)的初始化,以及數(shù)據(jù)更新和查找使用的算法都是基于同一種算法,這就保證了通過同一個key獲得的memcahced實例為同一個.
socket連接池
這部分單獨介紹,請猛烈地戳這里。
容災(zāi)、故障轉(zhuǎn)移以及性能
衡量系統(tǒng)的穩(wěn)定性,很大程度上是對各種異常情況的處理,充分考慮異常情況,以及合理處理異常是對系統(tǒng)設(shè)計人員的要求,下面看看在故障處理和容災(zāi)方面系統(tǒng)都做了那些工作。
定位memcached實例時,當?shù)谝淮味ㄎ皇?會對所有其他的屬于同一個socketpool中的memcahced實例進行定位,找到一個可用的,代碼如下:
- // log that we tried
- // 先刪除定位失敗的實例
- tryServers.remove(server);
- if (tryServers.isEmpty())
- break;
- // if we failed to get a socket from this server
- // then we try again by adding an incrementer to the
- // current key and then rehashing
- int rehashTries = 0;
- while (!tryServers.contains(server))
- {
- // 重新計算key值
- String newKey = new StringBuilder().append(rehashTries).append(key).toString();
- // String.format( "%s%s", rehashTries, key );
- if (log.isDebugEnabled())
- log.debug("rehashing with: " + newKey);
- // 去HASH環(huán)上定位實例節(jié)點
- bucket = getBucket(newKey, null);
- server=(this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
- rehashTries++;
- }
查找數(shù)據(jù)時,當前節(jié)點獲取不到,會嘗試到所在集群中其他的節(jié)點查找,成功后,會將數(shù)據(jù)復制到原先失效的節(jié)點中,代碼如下:
- public Object get(String key)
- {
- Object result = null;
- boolean isError = false;
- …….......
- if (result == null && helper.hasCluster())
- if (isError || helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE))
- {
- List<MemCachedClient> caches = helper.getClusterCache();
- for(MemCachedClient cache : caches)
- {
- if (getCacheClient(key).equals(cache))
- continue;
- try{ try
- {
- result = cache.get(key);
- }
- catch(MemcachedException ex)
- {
- Logger.error(new StringBuilder(helper.getCacheName())
- .append(" cluster get error"),ex);
- continue;
- }
- //僅僅判斷另一臺備份機器,不多次判斷,防止效率低下
- if (helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE ) && result != null)
- {
- Object[] commands = new Object[]{CacheCommand.RECOVER,key,result};
- // 加入隊列,異步執(zhí)行復制數(shù)據(jù)
- addCommandToQueue(commands);
- }
- break;
- }
- catch(Exception e)
- {
- Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster get error"),e);
- }
- }
- }
- return result;
- }
更新數(shù)據(jù)時,異步更新到集群內(nèi)其他節(jié)點,示例代碼如下:
- public boolean add(String key, Object value)
- {
- boolean result = getCacheClient(key).add(key,value);
- if (helper.hasCluster())
- {
- Object[] commands = new Object[]{CacheCommand.ADD,key,value};
- // 加入隊列,異步執(zhí)行
- addCommandToQueue(commands);
- }
- return result;
- }
刪除數(shù)據(jù)時,需要同步執(zhí)行,如果異步的話,會產(chǎn)生臟數(shù)據(jù),代碼如下:
- public Object remove(String key)
- {
- Object result = getCacheClient(key).delete(key);
- //異步刪除由于集群會導致無法被刪除,因此需要一次性全部清除
- if (helper.hasCluster())
- {
- List<MemCachedClient> caches = helper.getClusterCache();
- for(MemCachedClient cache : caches)
- {
- if (getCacheClient(key).equals(cache))
- continue;
- try
- {
- cache.delete(key);
- }
- catch(Exception ex)
- {
- Logger.error(new StringBuilder(helper.getCacheName())
- .append(" cluster remove error"),ex);
- }
- }
- }
- return result;
- }
異步執(zhí)行集群內(nèi)數(shù)據(jù)同步,因為不可能每次數(shù)據(jù)都要同步執(zhí)行到集群內(nèi)每個節(jié)點,這樣會降低系統(tǒng)性能,所以在構(gòu)造MemcachedCache對象時,會建立一個隊列,線程安全的linked阻塞隊列LinkedBlockingQueue,將所有需要異步執(zhí)行的命令放入隊列中,異步執(zhí)行,具體異步執(zhí)行由ClusterProcessor類負責,代碼如下:
- public MemcachedCache(MemCachedClientHelper helper,int statisticsInterval)
- {
- this.helper = helper;
- dataQueue = new LinkedBlockingQueue<Object[]>();
- ………
- processor = new ClusterProcessor(dataQueue,helper);
- processor.setDaemon(true);
- processor.start();
- }
本地緩存的使用是為了降低連接服務(wù)端的IO開銷,當有些數(shù)據(jù)變化頻率很低時,完全可以放在應(yīng)用服務(wù)器本地,同時可以設(shè)置有效時間,直接獲取,DefaultCacheImpl類為本地緩存的實現(xiàn)類,在構(gòu)造MemcachedCache對象時,即初始化.
每次查找數(shù)據(jù)時,會先查找本地緩存,如果沒有再去查緩存,結(jié)束后將數(shù)據(jù)讓如本地緩存中,代碼如下:
- public Object get(String key, int localTTL)
- {
- Object result = null;
- // 本地緩存中查找
- result = localCache.get(key);
- if (result == null)
- {
- result = get(key);
- if (result != null)
- {
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.SECOND, localTTL);
- // 放入本地緩存
- localCache.put(key, result,calendar.getTime());
- }
- }
- return result;
- }
增加緩存數(shù)據(jù)時,會刪除本地緩存中對應(yīng)的數(shù)據(jù),代碼如下:
- public Object put(String key, Object value, Date expiry)
- {
- boolean result = getCacheClient(key).set(key,value,expiry);
- //移除本地緩存的內(nèi)容
- if (result) localCache.remove(key);
- ……..
- return value;
- }
改造部分
據(jù)以上分析,我們通過封裝,做到了客戶端的可配置化,memcached實例的水平擴展,通過集群解決了單點故障問題,并且保證了應(yīng)用程序只要每次使用相同的數(shù)據(jù)對象的key值即可獲取相同的memcached實例進行操作。但是,為了使緩存的使用對于應(yīng)用程序來說完全透明,我們對cluster部分進行了再次封裝,即把cluster看做一個node,根據(jù)cluster名稱屬性,進行HASH數(shù)值空間計算(同樣基于MD5算法),映射到一個HASH環(huán)上.
這部分邏輯放在初始化資源池clusterpool時進行(即放在MemcahedCacheManager.initMemCacheClientPool方法中),與上文中所描述的memcached實例HASH環(huán)映射的邏輯一致,部分代碼如下.
- //populate cluster node to hash consistent Buckets
- MessageDigest md5 = MD5.get();
- // 使用cluster的名稱計算HASH數(shù)值空間
- byte[] d = md5.digest((node.getName()).getBytes());
- for (int h = 0; h < 4; h++)
- {
- Long k = ((long) (d[3 + h * 4] & 0xFF) << 24)
- | ((long) (d[2 + h * 4] & 0xFF) << 16)
- | ((long) (d[1 + h * 4] & 0xFF) << 8)
- | ((long) (d[0 + h * 4] & 0xFF));
- consistentClusterBuckets.put(k, node.getName());
- if (log.isDebugEnabled())
- log.debug("++++ added " + node.getName() + " to cluster bucket");
- }
在進行緩存操作時,仍然使用數(shù)據(jù)對象的key值獲取到某個cluster節(jié)點,然后再使用取余算法(這種算法也是經(jīng)常用到的分布式定位算法,但是有局限性,即隨著節(jié)點數(shù)的增減,定位越來越不準確),拿到cluster中的某個節(jié)點,在進行緩存的操作;定位hash環(huán)上cluster節(jié)點的邏輯也與上文一樣,這里不在贅述。部分定位cluster中節(jié)點的取余算法代碼如下:
- public IMemcachedCache getCacheClient(String key){
- ………….
- String clusterNode = getClusterNode(key);
- MemcachedClientCluster mcc = clusterpool.get(clusterNode);
- List<IMemcachedCache> memcachedCachesClients = mcc.getCaches();
- //根據(jù)取余算法獲取集群中的某一個緩存節(jié)點
- if (!memcachedCachesClients.isEmpty())
- {
- long keyhash = key.hashCode();
- int index = (int)keyhash % memcachedCachesClients.size();
- if (index < 0 )
- index *= -1;
- return memcachedCachesClients.get(index);
- }
- return null;
- }
這樣,對于應(yīng)用來說,配置好資源池以后,無需關(guān)心那個集群或者客戶端節(jié)點,直接通過MemcachedCacheManager獲取到某個memcachedcache,然后進行緩存操作即可.
最后,使用GAP平臺分布式緩存組件,需要提前做好容量規(guī)劃,集群和客戶端事先配置好,另外,緩存組件沒有提供數(shù)據(jù)持久化功能.
新聞熱點
疑難解答