国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發(fā) > PHP > 正文

阿里改造后的memcached客戶端源碼詳解

2024-05-04 21:49:39
字體:
供稿:網(wǎng)友

最近項目需要用應(yīng)用緩存解決方案,選擇了目前比較流行的memcached作為分布式緩存,客戶端我們選擇了阿里改造后的memchaced-client-forjava,因為該客戶端經(jīng)過阿里內(nèi)部大量實際項目的線上運行,表現(xiàn)給力.

源碼分析

memcached本身是一個集中式的內(nèi)存緩存系統(tǒng),對于分布式的支持服務(wù)端并沒有實現(xiàn),只有通過客戶端實現(xiàn);再者,memcached是基于TCP/UDP進行通信,只要客戶端語言支持TCP/UDP即可實現(xiàn)客戶端,并且可以根據(jù)需要進行功能擴展,memchaced-client-forjava 既是使用java語言實現(xiàn)的客戶端,并且實現(xiàn)了自己的功能擴展.

幾個重要類的說明:

MemcachedCacheManager:管理類,負責緩存服務(wù)端,客戶端,以及相關(guān)資源池的初始化工作,獲取客戶端等等

MemcachedCache:memcached緩存實體類,實現(xiàn)了所有的緩存API,實際上也會調(diào)用MemcachedClient進行操作

MemcachedClient:memcached緩存客戶端,一個邏輯概念,負責與服務(wù)端實例的實際交互,通過調(diào)用sockiopool中的socket

SockIOPool:socket連接資源池,負責與memcached服務(wù)端進行交互

ClusterProcessor:集群內(nèi)數(shù)據(jù)異步操作工具類

客戶端可配置化

MemcachedCacheManager是入口,其start方法讀取配置文件memcached.xml,初始化各個組建,包括memcached客戶端,socket連接池以及集群節(jié)點.

memcached客戶端是個邏輯概念,并不是和memcached服務(wù)端實例一一對應(yīng)的,可以認為其是一個邏輯環(huán)上的某個節(jié)點,后面會講到hash一致性算法時涉及,該配置文件中,可配置一個或多個客戶端,每個客戶端可配置一個socketPool連接池,如下:

  1. <client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0”> //開源軟件:Vevb.com 
  2.     <errorHandler>com.alisoft.xplatform.asf.cache.memcached.MemcachedErrorHandler</errorHandler> 
  3. </client> 

擴容

socketpool連接池配置的才是真正連接的memcached服務(wù)實例,當然,你可以連接多個memcached服務(wù)實例,多個實例可以分布在一臺或者多臺物理機器上。這樣,隨著實際業(yè)務(wù)數(shù)據(jù)量的增加,可以對現(xiàn)有緩存容量進行擴容,只需在servers中增加memcached實例即可,或者增加多個socketpool配置項,配置如下:

  1. <socketpool name="pool0" failover="true" initConn="5" minConn="5" maxConn="250" maintSleep="5000" nagle="false" socketTO="3000" aliveCheck="true"
  2.     <servers>192.168.1.66:11211,192.168.1.68:11211</servers> 
  3. </socketpool> 

初始化過程

上文提及的MemcachedCacheManager,該類功能包括有初始化各種資源池,獲取所有客戶端,重新加載配置文件以及集群復制等。我們重點分析方法start,該方法首先加載配置文件,然后初始化資源池,即方法initMemCacheClientPool,該方法中定義了三個資源池,即socket連接資源池socketpool,memcachedcache資源池cachepool,以及由客戶端組成的集群資源池clusterpool,這些資源池的數(shù)據(jù)結(jié)構(gòu)都是線程安全的ConcurrentHashMap,保證了并發(fā)效率,將配置信息分別實例化后,再分別放入對應(yīng)的資源池容器中,socket連接放入socketpool中,memcached客戶端放入cachepool中,定義的集群節(jié)點放入clusterpool中。

注意,在實例化socket連接池資源socketpool時,會調(diào)用每個pool的初始化方法pool.initialize(),來映射memcached實例到HASH環(huán)上,以及初始化socket連接.

單點問題

memcached的分布式,解決了容量水平擴容的問題,但是當某個節(jié)點失效時,還是會丟失一部分數(shù)據(jù),單點故障依然存在,分布式只是解決了數(shù)據(jù)整體失效問題,而在實際項目中,特別是GAP平臺適應(yīng)的企業(yè)級項目中,是不允許數(shù)據(jù)不一致的,所以對每一份保存的數(shù)據(jù)都需要進行容災(zāi)處理,那么對于定義的每個memcached客戶端,都至少增加一個新客戶端與其組成一個cluster集群,當更新或者查找數(shù)據(jù)時,會先定位到該集群中某個節(jié)點,如果該節(jié)點失效,就去另外一個節(jié)點進行操作。在實際項目中,通過合理規(guī)劃配置cluster和client(memcached客戶端),可以最大限度的避免單點故障,當所有client都失效時還會丟失數(shù)據(jù),在配置文件中,集群配置如下:

  1. <cluster name="cluster1" mode="active"
  2.     <memCachedClients>mclient1,mclient2</memCachedClients> 
  3. </cluster> 

HASH一致性算法

在memcached支持分布式部署場景下,如何獲取一個memcached實例?如何平均分配memcached實例的存儲?這些需要一個算法來實現(xiàn),我們選擇的是HASH一致性算法,具體就體現(xiàn)在客戶端如何獲取一個連接memcached服務(wù)端的socket上,也就是如何定位memcached實例的問題?算法要求能夠根據(jù)每次提供的同一個key獲得同一個實例.

HASH閉環(huán)的初始化

本質(zhì)上,hash一致性算法是需要實現(xiàn)一個邏輯環(huán),如圖所示,環(huán)上所有的節(jié)點即為一個memcached實例,如何實現(xiàn)?其實是根據(jù)每個memcached實例所在的ip地址,將所有的實例映射到hash數(shù)值空間中,構(gòu)成一個閉合的圓環(huán)。

HASH環(huán)映射的初始化的代碼位于SocketIOPool.populateConsistentBuckets方法中,主要代碼如下:

  1. private void populateConsistentBuckets() 
  2.      { 
  3.          ……... 
  4.           for (int i = 0; i < servers.length; i++) 
  5.           { 
  6.                int thisWeight = 1; 
  7.                if (this.weights != null && this.weights[i] != null) 
  8.                     thisWeight = this.weights[i]; 
  9.                     double factor = Math .floor(((double) (40 * this.servers.length * thisWeight)) / (double                                      ) this.totalWeight); 
  10.                for (long j = 0; j < factor; j++) 
  11.                { 
  12.                     byte[] d = md5.digest((servers[i] + "-" + j).getBytes()); 
  13.                     for (int h = 0; h < 4; h++) 
  14.                     { 
  15.                          // k 的值使用MD5hash算法計算獲得 
  16.                          Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) 
  17.                                    | ((long) (d[2 + h * 4] & 0xFF) << 16) 
  18.                                    | ((long) (d[1 + h * 4] & 0xFF) << 8) 
  19.                                    | ((long) (d[0 + h * 4] & 0xFF)); 
  20.                          // 用treemap來存儲memcached實例所在的ip地址, 
  21.                          // 也就是將每個緩存實例所在的ip地址映射到由k組成的hash環(huán)上 
  22.                         consistentBuckets.put(k, servers[i]); 
  23.                          if (log.isDebugEnabled()) 
  24.                               log.debug("++++ added " + servers[i] 
  25.                                         + " to server bucket"); 
  26.                     } 
  27.                } 
  28.            ……... 
  29.           } 
  30.      } 

獲取socket連接

在實際獲取memcahced實例所在服務(wù)器的soket時,只要使用基于同一個存儲對象的key的MD5Hash算法,就可以獲得相同的memcached實例所在的ip地址,也就是可以準確定位到hash環(huán)上相同的節(jié)點,代碼位于SocketIOPool.getSock方法中,主要代碼如下:

  1.  public SockIO getSock(String key, Integer hashCode){ 
  2.       …………. 
  3.           
  4.       // from here on, we are working w/ multiple servers 
  5.       // keep trying different servers until we find one 
  6.       // making sure we only try each server one time 
  7.       Set<String> tryServers = new HashSet<String>(Arrays.asList(servers)); 
  8.       // get initial bucket 
  9.       // 通過key值計算hash值,使用的是基于MD5的算法 
  10.       long bucket = getBucket(key, hashCode); 
  11.       String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets .get(bucket)  : buckets.g                          et((int) bucket); 
  12.        …………...   
  13.  } 
  14.  private long getBucket(String key, Integer hashCode) 
  15.  { 
  16.       / / 通過key值計算hash值,使用的是基于MD5的算法 
  17.       long hc = getHash(key, hashCode); 
  18.       if (this.hashingAlg == CONSISTENT_HASH) 
  19.       { 
  20.            return findPointFor(hc); 
  21.       } else 
  22.       { 
  23.            long bucket = hc % buckets.size(); 
  24.            if (bucket < 0) 
  25.                 bucket *= -1; 
  26.            return bucket; 
  27.       } 
  28.  } 
  29.  /** 
  30.   * Gets the first available key equal or above the given one, if none found, 
  31.   * returns the first k in the bucket 
  32.   * 
  33.   * @param k 
  34.   *            key 
  35.   * @return 
  36.   */ 
  37.  private Long findPointFor(Long hv) 
  38.  { 
  39.       // this works in java 6, but still want to release support for java5 
  40.       // Long k = this.consistentBuckets.ceilingKey( hv ); 
  41.       // return ( k == null ) ? this.consistentBuckets.firstKey() : k; 
  42.       // 該consistentBuckets中存儲的是HASH結(jié)構(gòu)初始化時,存入的所有memcahced實例節(jié)點,也就是整個hash環(huán) 
  43.       // tailMap方法是取出大于等于hv的所有節(jié)點,并且是遞增有序的 
  44.       SortedMap<Long, String> tmap = this.consistentBuckets.tailMap(hv); 
  45.       // 如果tmap為空,就默認返回hash環(huán)上的第一個值,否則就返回最接近hv值的那個節(jié)點 
  46.       return (tmap.isEmpty()) ? this.consistentBuckets.firstKey() : tmap .firstKey(); 
  47.  } 
  48. /** 
  49.   * Returns a bucket to check for a given key. 
  50.   * 
  51.   * @param key 
  52.   *            String key cache is stored under 
  53.   * @return int bucket 
  54.   */ 
  55.  private long getHash(String key, Integer hashCode) 
  56.  { 
  57.       if (hashCode != null) 
  58.       { 
  59.            if (hashingAlg == CONSISTENT_HASH) 
  60.                 return hashCode.longValue() & 0xffffffffL; 
  61.            else 
  62.                 return hashCode.longValue(); 
  63.       } else 
  64.       { 
  65.            switch (hashingAlg) 
  66.            { 
  67.            case NATIVE_HASH: 
  68.                 return (long) key.hashCode(); 
  69.            case OLD_COMPAT_HASH: 
  70.                 return origCompatHashingAlg(key); 
  71.            case NEW_COMPAT_HASH: 
  72.                 return newCompatHashingAlg(key); 
  73.            case CONSISTENT_HASH: 
  74.                 return md5HashingAlg(key); 
  75.            default
  76.                 // use the native hash as a default 
  77.                 hashingAlg = NATIVE_HASH; 
  78.                 return (long) key.hashCode(); 
  79.            } 
  80.       } 
  81.  } 
  82. /** 
  83.   * Internal private hashing method. 
  84.   * 
  85.   * MD5 based hash algorithm for use in the consistent hashing approach. 
  86.   * 
  87.   * @param key 
  88.   * @return 
  89.   */ 
  90.  private static long md5HashingAlg(String key) 
  91.  { 
  92.       / /通過key值計算hash值,使用的是基于MD5的算法 
  93.       MessageDigest md5 = MD5.get(); 
  94.       md5.reset(); 
  95.       md5.update(key.getBytes()); 
  96.       byte[] bKey = md5.digest(); 
  97.       long res = ((long) (bKey[3] & 0xFF) << 24) 
  98.                 | ((long) (bKey[2] & 0xFF) << 16) 
  99.                 | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); 
  100.       return res; 
  101.  } 

通過以上代碼的分析,整個memcahced服務(wù)端實例HASH環(huán)的初始化,以及數(shù)據(jù)更新和查找使用的算法都是基于同一種算法,這就保證了通過同一個key獲得的memcahced實例為同一個.

socket連接池

這部分單獨介紹,請猛烈地戳這里。

容災(zāi)、故障轉(zhuǎn)移以及性能

衡量系統(tǒng)的穩(wěn)定性,很大程度上是對各種異常情況的處理,充分考慮異常情況,以及合理處理異常是對系統(tǒng)設(shè)計人員的要求,下面看看在故障處理和容災(zāi)方面系統(tǒng)都做了那些工作。

定位memcached實例時,當?shù)谝淮味ㄎ皇?會對所有其他的屬于同一個socketpool中的memcahced實例進行定位,找到一個可用的,代碼如下:

  1. // log that we tried 
  2. // 先刪除定位失敗的實例 
  3. tryServers.remove(server); 
  4. if (tryServers.isEmpty()) 
  5.     break
  6. // if we failed to get a socket from this server 
  7. // then we try again by adding an incrementer to the 
  8. // current key and then rehashing 
  9. int rehashTries = 0; 
  10. while (!tryServers.contains(server)) 
  11.   // 重新計算key值 
  12.   String newKey = new StringBuilder().append(rehashTries).append(key).toString(); 
  13.   // String.format( "%s%s", rehashTries, key ); 
  14.   if (log.isDebugEnabled()) 
  15.       log.debug("rehashing with: " + newKey); 
  16.   // 去HASH環(huán)上定位實例節(jié)點 
  17.   bucket = getBucket(newKey, null); 
  18.   server=(this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket); 
  19.   rehashTries++; 
  20.  } 

查找數(shù)據(jù)時,當前節(jié)點獲取不到,會嘗試到所在集群中其他的節(jié)點查找,成功后,會將數(shù)據(jù)復制到原先失效的節(jié)點中,代碼如下:

  1. public Object get(String key) 
  2.      { 
  3.           Object result = null; 
  4.           boolean isError = false;   
  5.        ……....... 
  6.           if (result == null && helper.hasCluster()) 
  7.            if (isError || helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE)) 
  8.           { 
  9.                List<MemCachedClient> caches = helper.getClusterCache(); 
  10.                for(MemCachedClient cache : caches) 
  11.                { 
  12.                     if (getCacheClient(key).equals(cache)) 
  13.                          continue
  14.                     try{ try 
  15.                          { 
  16.                               result = cache.get(key); 
  17.                          } 
  18.                          catch(MemcachedException ex) 
  19.                          { 
  20.                               Logger.error(new StringBuilder(helper.getCacheName()) 
  21.                                    .append(" cluster get error"),ex); 
  22.                               continue
  23.                          } 
  24.                          //僅僅判斷另一臺備份機器,不多次判斷,防止效率低下 
  25.                          if (helper.getClusterMode().equals(MemcachedClientClusterConfig.CLUSTER_MODE_ACTIVE                                                      ) && result != null) 
  26.                          { 
  27.                               Object[] commands = new Object[]{CacheCommand.RECOVER,key,result}; 
  28.                              // 加入隊列,異步執(zhí)行復制數(shù)據(jù) 
  29.                               addCommandToQueue(commands); 
  30.                          } 
  31.                          break;     
  32.                     } 
  33.                     catch(Exception e) 
  34.                     { 
  35.                      Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster get error"),e); 
  36.                     } 
  37.                } 
  38.           } 
  39.           return result; 
  40.      } 

更新數(shù)據(jù)時,異步更新到集群內(nèi)其他節(jié)點,示例代碼如下:

  1. public boolean add(String key, Object value) 
  2.      { 
  3.           boolean result = getCacheClient(key).add(key,value); 
  4.           if (helper.hasCluster()) 
  5.           { 
  6.                Object[] commands = new Object[]{CacheCommand.ADD,key,value}; 
  7.                // 加入隊列,異步執(zhí)行 
  8.                addCommandToQueue(commands); 
  9.           } 
  10.           return result; 
  11.      } 

刪除數(shù)據(jù)時,需要同步執(zhí)行,如果異步的話,會產(chǎn)生臟數(shù)據(jù),代碼如下:

  1. public Object remove(String key) 
  2.      { 
  3.           Object result = getCacheClient(key).delete(key); 
  4.           //異步刪除由于集群會導致無法被刪除,因此需要一次性全部清除 
  5.           if (helper.hasCluster()) 
  6.           { 
  7.                List<MemCachedClient> caches = helper.getClusterCache(); 
  8.                
  9.                for(MemCachedClient cache : caches) 
  10.                { 
  11.                     if (getCacheClient(key).equals(cache)) 
  12.                          continue
  13.                     try 
  14.                     { 
  15.                          cache.delete(key); 
  16.                     } 
  17.                     catch(Exception ex) 
  18.                     { 
  19.                          Logger.error(new StringBuilder(helper.getCacheName()) 
  20.                                         .append(" cluster remove error"),ex); 
  21.                     } 
  22.                } 
  23.           } 
  24.           return result; 
  25.      } 

異步執(zhí)行集群內(nèi)數(shù)據(jù)同步,因為不可能每次數(shù)據(jù)都要同步執(zhí)行到集群內(nèi)每個節(jié)點,這樣會降低系統(tǒng)性能,所以在構(gòu)造MemcachedCache對象時,會建立一個隊列,線程安全的linked阻塞隊列LinkedBlockingQueue,將所有需要異步執(zhí)行的命令放入隊列中,異步執(zhí)行,具體異步執(zhí)行由ClusterProcessor類負責,代碼如下:

  1. public MemcachedCache(MemCachedClientHelper helper,int statisticsInterval) 
  2.      { 
  3.           this.helper = helper;  
  4.           dataQueue = new LinkedBlockingQueue<Object[]>(); 
  5.          ……… 
  6.           processor = new ClusterProcessor(dataQueue,helper); 
  7.           processor.setDaemon(true); 
  8.           processor.start(); 
  9.      } 

本地緩存的使用是為了降低連接服務(wù)端的IO開銷,當有些數(shù)據(jù)變化頻率很低時,完全可以放在應(yīng)用服務(wù)器本地,同時可以設(shè)置有效時間,直接獲取,DefaultCacheImpl類為本地緩存的實現(xiàn)類,在構(gòu)造MemcachedCache對象時,即初始化.

每次查找數(shù)據(jù)時,會先查找本地緩存,如果沒有再去查緩存,結(jié)束后將數(shù)據(jù)讓如本地緩存中,代碼如下:

  1. public Object get(String key, int localTTL) 
  2.      { 
  3.           Object result = null; 
  4.          // 本地緩存中查找 
  5.           result = localCache.get(key); 
  6.           if (result == null) 
  7.           { 
  8.                result = get(key); 
  9.                if (result != null) 
  10.                { 
  11.                     Calendar calendar = Calendar.getInstance(); 
  12.                     calendar.add(Calendar.SECOND, localTTL); 
  13.                     // 放入本地緩存 
  14.                     localCache.put(key, result,calendar.getTime()); 
  15.                } 
  16.           } 
  17.           return result; 
  18.      } 

增加緩存數(shù)據(jù)時,會刪除本地緩存中對應(yīng)的數(shù)據(jù),代碼如下:

  1. public Object put(String key, Object value, Date expiry) 
  2.      { 
  3.           boolean result = getCacheClient(key).set(key,value,expiry); 
  4.           //移除本地緩存的內(nèi)容 
  5.           if (result) localCache.remove(key); 
  6.         …….. 
  7.           return value; 
  8.      } 

改造部分

據(jù)以上分析,我們通過封裝,做到了客戶端的可配置化,memcached實例的水平擴展,通過集群解決了單點故障問題,并且保證了應(yīng)用程序只要每次使用相同的數(shù)據(jù)對象的key值即可獲取相同的memcached實例進行操作。但是,為了使緩存的使用對于應(yīng)用程序來說完全透明,我們對cluster部分進行了再次封裝,即把cluster看做一個node,根據(jù)cluster名稱屬性,進行HASH數(shù)值空間計算(同樣基于MD5算法),映射到一個HASH環(huán)上.

這部分邏輯放在初始化資源池clusterpool時進行(即放在MemcahedCacheManager.initMemCacheClientPool方法中),與上文中所描述的memcached實例HASH環(huán)映射的邏輯一致,部分代碼如下.

  1. //populate cluster node to hash consistent Buckets 
  2. MessageDigest md5 = MD5.get(); 
  3. // 使用cluster的名稱計算HASH數(shù)值空間 
  4. byte[] d = md5.digest((node.getName()).getBytes()); 
  5. for (int h = 0; h < 4; h++) 
  6.    Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) 
  7.                        | ((long) (d[2 + h * 4] & 0xFF) << 16) 
  8.                        | ((long) (d[1 + h * 4] & 0xFF) << 8) 
  9.                        | ((long) (d[0 + h * 4] & 0xFF)); 
  10.   consistentClusterBuckets.put(k, node.getName()); 
  11.   if (log.isDebugEnabled()) 
  12.        log.debug("++++ added " + node.getName() + " to cluster bucket"); 
  13. }  

在進行緩存操作時,仍然使用數(shù)據(jù)對象的key值獲取到某個cluster節(jié)點,然后再使用取余算法(這種算法也是經(jīng)常用到的分布式定位算法,但是有局限性,即隨著節(jié)點數(shù)的增減,定位越來越不準確),拿到cluster中的某個節(jié)點,在進行緩存的操作;定位hash環(huán)上cluster節(jié)點的邏輯也與上文一樣,這里不在贅述。部分定位cluster中節(jié)點的取余算法代碼如下:

  1. public IMemcachedCache getCacheClient(String key){ 
  2.        …………. 
  3.        String clusterNode = getClusterNode(key); 
  4.         MemcachedClientCluster mcc = clusterpool.get(clusterNode); 
  5.         List<IMemcachedCache> memcachedCachesClients = mcc.getCaches(); 
  6.         //根據(jù)取余算法獲取集群中的某一個緩存節(jié)點 
  7.         if (!memcachedCachesClients.isEmpty()) 
  8.         { 
  9.             long keyhash = key.hashCode(); 
  10.             int index = (int)keyhash % memcachedCachesClients.size(); 
  11.             if (index < 0 ) 
  12.                 index *= -1; 
  13.             return memcachedCachesClients.get(index); 
  14.         } 
  15.         return null; 
  16.     } 

這樣,對于應(yīng)用來說,配置好資源池以后,無需關(guān)心那個集群或者客戶端節(jié)點,直接通過MemcachedCacheManager獲取到某個memcachedcache,然后進行緩存操作即可.

最后,使用GAP平臺分布式緩存組件,需要提前做好容量規(guī)劃,集群和客戶端事先配置好,另外,緩存組件沒有提供數(shù)據(jù)持久化功能.

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 诸城市| 海丰县| 安义县| 隆德县| 广昌县| 沭阳县| 于田县| 长岛县| 普宁市| 东台市| 新化县| 彰化县| 康平县| 衡东县| 阿城市| 亳州市| 长垣县| 惠州市| 泾阳县| 大姚县| 惠东县| 涡阳县| 大关县| 化州市| 双辽市| 红原县| 塔河县| 晋中市| 苍梧县| 师宗县| 潜山县| 嵩明县| 嘉鱼县| 交口县| 阳江市| 古田县| 布尔津县| 庆城县| 石棉县| 广饶县| 松阳县|