redis分布式锁redissonlock的实现细节解析-mile米乐体育

redis分布式锁redissonlock的实现细节解析

redis分布式锁redissonlock

简单使用

 string key = "key-lock"; rlock lock = redisson.getlock(key); lock.lock(); try {     // todo } catch (exception e){     log.error(e.getmessage(), e); } finally {     lock.unlock(); }
 string key = "key-trylock"; long maxwaittime = 3_000; rlock lock = redisson.getlock(key); if (lock.trylock(maxwaittime, timeunit.milliseconds)){     try {         // todo     } catch (exception e){         log.error(e.getmessage(), e);     } finally {         lock.unlock();     } } else {     log.debug("redis锁竞争失败"); }

流程图

多个线程节点锁竞争的正常流程如下图:

多个线程节点锁竞争,并出现节点下线的异常流程如下图:

源码解析

redissonlock是可重入锁,使用redis的hash结构作为锁的标识存储,锁的名称作为hash的key,uuid 线程id作为hash的field,锁被重入的次数作为hash的value。如图所示:

 private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception {     long threadid = thread.currentthread().getid();     // 尝试获取锁,锁获取成功则ttl为null;获取失败则返回锁的剩余过期时间     long ttl = tryacquire(leasetime, unit, threadid);     if (ttl == null) {         return;     }     // 锁被其他线程占用而索取失败,使用线程通知而非自旋的方式等待锁     // 使用redis的发布订阅pub/sub功能来等待锁的释放通知     rfuture future = subscribe(threadid);     commandexecutor.syncsubscription(future);     try {         while (true) {             ttl = tryacquire(leasetime, unit, threadid);             // 尝试获取锁,锁获取成功则ttl为null;获取失败则返回锁的剩余过期时间             if (ttl == null) {                 break;             }             if (ttl >= 0) {                 // 使用locksupport.parknanos方法线程休眠                 try {                     getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);                 } catch (interruptedexception e) {                     if (interruptibly) {                         throw e;                     }                     getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);                 }             } else {                 if (interruptibly) {                     getentry(threadid).getlatch().acquire();                 } else {                     getentry(threadid).getlatch().acquireuninterruptibly();                 }             }         }     } finally {         // 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅         unsubscribe(future, threadid);     } } 
 public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {     long time = unit.tomillis(waittime);     long current = system.currenttimemillis();     long threadid = thread.currentthread().getid();     long ttl = tryacquire(leasetime, unit, threadid);     if (ttl == null) {         return true;     }          time -= system.currenttimemillis() - current;     if (time <= 0) {         acquirefailed(threadid);         return false;     }          current = system.currenttimemillis();     rfuture subscribefuture = subscribe(threadid);     if (!await(subscribefuture, time, timeunit.milliseconds)) {         if (!subscribefuture.cancel(false)) {             subscribefuture.oncomplete((res, e) -> {                 if (e == null) {                     unsubscribe(subscribefuture, threadid);                 }             });         }         acquirefailed(threadid);         return false;     }     try {         time -= system.currenttimemillis() - current;         if (time <= 0) {             acquirefailed(threadid);             return false;         }              while (true) {             long currenttime = system.currenttimemillis();             ttl = tryacquire(leasetime, unit, threadid);             // lock acquired             if (ttl == null) {                 return true;             }             time -= system.currenttimemillis() - currenttime;             if (time <= 0) {                 acquirefailed(threadid);                 return false;             }             currenttime = system.currenttimemillis();             if (ttl >= 0 && ttl < time) {                 getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);             } else {                 getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds);             }             time -= system.currenttimemillis() - currenttime;             if (time <= 0) {                 acquirefailed(threadid);                 return false;             }         }     } finally {         unsubscribe(subscribefuture, threadid);     } } 

redissonlock实现的是可重入锁,通过redis的hash结构实现,而非加单的set nx ex。为了实现原子性的复杂的加锁逻辑,而通过lua脚本实现。获取锁会有如下三种状态:

1、锁未被任何线程占用,则锁获取成功,返回null

2、锁被当前线程占用,则锁获取成功并进行锁的重入,对锁的重入计数 1,返回null

3、锁被其他线程占用,则锁获取失败,返回该锁的自动过期时间ttl

  rfuture trylockinnerasync(long leasetime, timeunit unit, long threadid, redisstrictcommand command) {     internallockleasetime = unit.tomillis(leasetime);     return commandexecutor.evalwriteasync(getname(), longcodec.instance, command,               "if (redis.call('exists', keys[1]) == 0) then "                     "redis.call('hset', keys[1], argv[2], 1); "                     "redis.call('pexpire', keys[1], argv[1]); "                     "return nil; "                 "end; "                 "if (redis.call('hexists', keys[1], argv[2]) == 1) then "                     "redis.call('hincrby', keys[1], argv[2], 1); "                     "redis.call('pexpire', keys[1], argv[1]); "                     "return nil; "                 "end; "                 "return redis.call('pttl', keys[1]);",                 collections.
网站地图