前言
單JVM內同步好辦, 直接用JDK提供的鎖就可以了,但是跨進程同步靠這個肯定是不可能的,這種情況下肯定要借助第三方,我這里實現用Redis,當然還有很多其他的實現方式。其實基于Redis實現的原理還算比較簡單的,在看代碼之前建議大家先去看看原理,看懂了之后看代碼應該就容易理解了。
我這里不實現JDK的java.util.concurrent.locks.Lock接口,而是自定義一個,因為JDK的有個newCondition方法我這里暫時沒實現。這個Lock提供了5個lock方法的變體,可以自行選擇使用哪一個來獲取鎖,我的想法是最好用帶超時返回的那幾個方法,因為不這樣的話,假如redis掛了,線程永遠都在那死循環了(關于這里,應該還可以進一步優化,如果redis掛了,Jedis的操作肯定會拋異常之類的,可以定義個機制讓redis掛了的時候通知使用這個lock的用戶,或者說是線程)
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;public interface Lock { /** * 阻塞性的獲取鎖, 不響應中斷 */ void lock;  /** * 阻塞性的獲取鎖, 響應中斷 *  * @throws InterruptedException */ void lockInterruptibly throws InterruptedException;  /** * 嘗試獲取鎖, 獲取不到立即返回, 不阻塞 */ boolean tryLock;  /** * 超時自動返回的阻塞性的獲取鎖, 不響應中斷 *  * @param time * @param unit * @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內未���取到鎖  *  */ boolean tryLock(long time, TimeUnit unit);  /** * 超時自動返回的阻塞性的獲取鎖, 響應中斷 *  * @param time * @param unit * @return {@code true} 若成功獲取到鎖, {@code false} 若在指定時間內未獲取到鎖 * @throws InterruptedException 在嘗試獲取鎖的當前線程被中斷 */ boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;  /** * 釋放鎖 */ void unlock; }看其抽象實現:
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;/** * 鎖的骨架實現, 真正的獲取鎖的步驟由子類去實現. *  * @author lixiaohui * */public abstract class AbstractLock implements Lock { /** * <pre> * 這里需不需要保證可見性值得討論, 因為是分布式的鎖,  * 1.同一個jvm的多個線程使用不同的鎖對象其實也是可以的, 這種情況下不需要保證可見性  * 2.同一個jvm的多個線程使用同一個鎖對象, 那可見性就必須要保證了. * </pre> */ protected volatile boolean locked; /** * 當前jvm內持有該鎖的線程(if have one) */ private Thread exclusiveOwnerThread; public void lock { try { lock(false, 0, null, false); } catch (InterruptedException e) { // TODO ignore } } public void lockInterruptibly throws InterruptedException { lock(false, 0, null, true); } public boolean tryLock(long time, TimeUnit unit) { try { return lock(true, time, unit, false); } catch (InterruptedException e) { // TODO ignore } return false; } public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException { return lock(true, time, unit, true); } public void unlock { // TODO 檢查當前線程是否持有鎖 if (Thread.currentThread != getExclusiveOwnerThread) { throw new IllegalMonitorStateException("current thread does not hold the lock"); }  unlock0; setExclusiveOwnerThread(null); } protected void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread { return exclusiveOwnerThread; } protected abstract void unlock0;  /** * 阻塞式獲取鎖的實現 *  * @param useTimeout  * @param time * @param unit * @param interrupt 是否響應中斷 * @return * @throws InterruptedException */ protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;}基于Redis的最終實現,關鍵的獲取鎖,釋放鎖的代碼在這個類的lock方法和unlock0方法里,大家可以只看這兩個方法然后完全自己寫一個:
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis;/** * <pre> * 基于Redis的SETNX操作實現的分布式鎖 *  * 獲取鎖時最好用lock(long time, TimeUnit unit), 以免網路問題而導致線程一直阻塞 *  * <a >SETNC操作參考資料</a> * </pre> *  * @author lixiaohui * */public class RedisBasedDistributedLock extends AbstractLock {  private Jedis jedis;  // 鎖的名字 protected String lockKey;  // 鎖的有效時長(毫秒) protected long lockExpires;  public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) { this.jedis = jedis; this.lockKey = lockKey; this.lockExpires = lockExpires; } // 阻塞式獲取鎖的實現 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{ if (interrupt) { checkInterruption; }  long start = System.currentTimeMillis; long timeout = unit.toMillis(time); // if !useTimeout, then it's useless  while (useTimeout ? isTimeout(start, timeout) : true) { if (interrupt) { checkInterruption; }  long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間 String stringOfLockExpireTime = String.valueOf(lockExpireTime);  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖 // TODO 成功獲取到鎖, 設置相關標識 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; }  String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假設多個線程(非單jvm)同時走到這里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的) // 加入拿到的oldValue依然是expired的,那么就說明拿到鎖了 if (oldValue != null && isTimeExpired(oldValue)) {  // TODO 成功獲取到鎖, 設置相關標識  locked = true;  setExclusiveOwnerThread(Thread.currentThread);  return true; } } else {  // TODO lock is not expired, enter next loop retrying } } return false; }  public boolean tryLock { long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//鎖超時時間 String stringOfLockExpireTime = String.valueOf(lockExpireTime);  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 獲取到鎖 // TODO 成功獲取到鎖, 設置相關標識 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; }  String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假設多個線程(非單jvm)同時走到這里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到這里時每個線程拿到的oldValue肯定不可能一樣(因為getset是原子性的) // 假如拿到的oldValue依然是expired的,那么就說明拿到鎖了 if (oldValue != null && isTimeExpired(oldValue)) { // TODO 成功獲取到鎖, 設置相關標識 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } } else {  // TODO lock is not expired, enter next loop retrying }  return false; }  /** * Queries if this lock is held by any thread. *  * @return {@code true} if any thread holds this lock and  *   {@code false} otherwise */ public boolean isLocked { if (locked) { return true; } else { String value = jedis.get(lockKey); // TODO 這里其實是有問題的, 想:當get方法返回value后, 假設這個value已經是過期的了, // 而就在這瞬間, 另一個節點set了value, 這時鎖是被別的線程(節點持有), 而接下來的判斷 // 是檢測不出這種情況的.不過這個問題應該不會導致其它的問題出現, 因為這個方法的目的本來就 // 不是同步控制, 它只是一種鎖狀態的報告. return !isTimeExpired(value); } } @Override protected void unlock0 { // TODO 判斷鎖是否過期 String value = jedis.get(lockKey); if (!isTimeExpired(value)) { doUnlock; } } private void checkInterruption throws InterruptedException { if(Thread.currentThread.isInterrupted) { throw new InterruptedException; } }  private boolean isTimeExpired(String value) { return Long.parseLong(value) < System.currentTimeMillis; }  private boolean isTimeout(long start, long timeout) { return start + timeout > System.currentTimeMillis; }  private void doUnlock { jedis.del(lockKey); }}如果將來還換一種實現方式(比如zookeeper之類的),到時直接繼承AbstractLock并實現lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0方法即可(所謂抽象嘛)
測試
模擬全局ID增長器,設計一個IDGenerator類,該類負責生成全局遞增ID,其代碼如下:
package cc.lixiaohui.lock;import java.math.BigInteger;import java.util.concurrent.TimeUnit;/** * 模擬ID生成  * @author lixiaohui * */public class IDGenerator { private static BigInteger id = BigInteger.valueOf(0); private final Lock lock; private static final BigInteger INCREMENT = BigInteger.valueOf(1); public IDGenerator(Lock lock) { this.lock = lock; }  public String getAndIncrement { if (lock.tryLock(3, TimeUnit.SECONDS)) { try { // TODO 這里獲取到鎖, 訪問臨界區資源 return getAndIncrement0; } finally { lock.unlock; } } return null; //return getAndIncrement0; } private String getAndIncrement0 { String s = id.toString; id = id.add(INCREMENT); return s; }}測試主邏輯:同一個JVM內開兩個線程死循環地(循環之間無間隔,有的話測試就沒意義了)獲取ID(我這里并不是死循環而是跑20s),獲取到ID存到同一個Set里面,在存之前先檢查該ID在set中是否存在,如果已存在,則讓兩個線程都停止。如果程序能正常跑完20s,那么說明這個分布式鎖還算可以滿足要求,如此測試的效果應該和不同JVM(也就是真正的分布式環境中)測試的效果是一樣的,下面是測試類的代碼:
package cc.lixiaohui.DistributedLock.DistributedLock;import java.util.HashSet;import java.util.Set;import org.junit.Test;import redis.clients.jedis.Jedis;import cc.lixiaohui.lock.IDGenerator;import cc.lixiaohui.lock.Lock;import cc.lixiaohui.lock.RedisBasedDistributedLock;public class IDGeneratorTest {  private static Set<String> generatedIds = new HashSet<String>;  private static final String LOCK_KEY = "lock.lock"; private static final long LOCK_EXPIRE = 5 * 1000;  @Test public void test throws InterruptedException { Jedis jedis1 = new Jedis("localhost", 6379); Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE); IDGenerator g1 = new IDGenerator(lock1); IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");  Jedis jedis2 = new Jedis("localhost", 6379); Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE); IDGenerator g2 = new IDGenerator(lock2); IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");  Thread t1 = new Thread(consume1); Thread t2 = new Thread(consume2); t1.start; t2.start;  Thread.sleep(20 * 1000); //讓兩個線程跑20秒  IDConsumeMission.stop;  t1.join; t2.join; }  static String time { return String.valueOf(System.currentTimeMillis / 1000); }  static class IDConsumeMission implements Runnable { private IDGenerator idGenerator;  private String name;  private static volatile boolean stop;  public IDConsumeMission(IDGenerator idGenerator, String name) { this.idGenerator = idGenerator; this.name = name; }  public static void stop { stop = true; }  public void run { System.out.println(time + ": consume " + name + " start "); while (!stop) { String id = idGenerator.getAndIncrement; if(generatedIds.contains(id)) {  System.out.println(time + ": duplicate id generated, id = " + id);  stop = true;  continue; }   generatedIds.add(id); System.out.println(time + ": consume " + name + " add id = " + id); } System.out.println(time + ": consume " + name + " done "); }  } }說明一點,我這里停止兩個線程的方式并不是很好,我是為了方便才這么做的,因為只是測試,最好不要這么做。
測試結果
跑20s打印的東西太多,前面打印的被clear了,只有差不多跑完的時候才有,下面截圖。說明了這個鎖能正常工作:

當IDGererator沒有加鎖(即IDGererator的getAndIncrement方法內部獲取id時不上鎖)時,測試是不通過的,非常大的概率中途就會停止,下面是不加鎖時的測試結果:
這個1秒都不到:

這個也1秒都不到:

結束語
好了,以上就是Java實現基于Redis的分布式鎖的全部內容,各位如果發現問題希望能指正,希望這篇文章能對大家的學習和工作帶來一定的幫助,如果有疑問可以留言交流。
新聞熱點
疑難解答