redis分布式锁redissonlock的实现细节解析-mile米乐体育
行业资讯
2021年06月19日 06:22
3
redis分布式锁redissonlock的实现细节解析
redis分布式锁redissonlock
简单使用
string key = "key-lock"; rlock lock = redisson.getlock(key); lock.lock(); try { // todo } catch (exception e){ log.error(e.getmessage(), e); } finally { lock.unlock(); }
string key = "key-trylock"; long maxwaittime = 3_000; rlock lock = redisson.getlock(key); if (lock.trylock(maxwaittime, timeunit.milliseconds)){ try { // todo } catch (exception e){ log.error(e.getmessage(), e); } finally { lock.unlock(); } } else { log.debug("redis锁竞争失败"); }
流程图
多个线程节点锁竞争的正常流程如下图:
多个线程节点锁竞争,并出现节点下线的异常流程如下图:
源码解析
redissonlock是可重入锁,使用redis的hash结构作为锁的标识存储,锁的名称作为hash的key,uuid 线程id作为hash的field,锁被重入的次数作为hash的value。如图所示:
private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception { long threadid = thread.currentthread().getid(); // 尝试获取锁,锁获取成功则ttl为null;获取失败则返回锁的剩余过期时间 long ttl = tryacquire(leasetime, unit, threadid); if (ttl == null) { return; } // 锁被其他线程占用而索取失败,使用线程通知而非自旋的方式等待锁 // 使用redis的发布订阅pub/sub功能来等待锁的释放通知 rfuturefuture = subscribe(threadid); commandexecutor.syncsubscription(future); try { while (true) { ttl = tryacquire(leasetime, unit, threadid); // 尝试获取锁,锁获取成功则ttl为null;获取失败则返回锁的剩余过期时间 if (ttl == null) { break; } if (ttl >= 0) { // 使用locksupport.parknanos方法线程休眠 try { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } catch (interruptedexception e) { if (interruptibly) { throw e; } getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } } else { if (interruptibly) { getentry(threadid).getlatch().acquire(); } else { getentry(threadid).getlatch().acquireuninterruptibly(); } } } } finally { // 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅 unsubscribe(future, threadid); } }
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception { long time = unit.tomillis(waittime); long current = system.currenttimemillis(); long threadid = thread.currentthread().getid(); long ttl = tryacquire(leasetime, unit, threadid); if (ttl == null) { return true; } time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } current = system.currenttimemillis(); rfuturesubscribefuture = subscribe(threadid); if (!await(subscribefuture, time, timeunit.milliseconds)) { if (!subscribefuture.cancel(false)) { subscribefuture.oncomplete((res, e) -> { if (e == null) { unsubscribe(subscribefuture, threadid); } }); } acquirefailed(threadid); return false; } try { time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } while (true) { long currenttime = system.currenttimemillis(); ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } currenttime = system.currenttimemillis(); if (ttl >= 0 && ttl < time) { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds); } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } } } finally { unsubscribe(subscribefuture, threadid); } }
redissonlock实现的是可重入锁,通过redis的hash结构实现,而非加单的set nx ex。为了实现原子性的复杂的加锁逻辑,而通过lua脚本实现。获取锁会有如下三种状态:
1、锁未被任何线程占用,则锁获取成功,返回null
2、锁被当前线程占用,则锁获取成功并进行锁的重入,对锁的重入计数 1,返回null
3、锁被其他线程占用,则锁获取失败,返回该锁的自动过期时间ttl
rfuture trylockinnerasync(long leasetime, timeunit unit, long threadid, redisstrictcommand command) { internallockleasetime = unit.tomillis(leasetime); return commandexecutor.evalwriteasync(getname(), longcodec.instance, command, "if (redis.call('exists', keys[1]) == 0) then " "redis.call('hset', keys[1], argv[2], 1); " "redis.call('pexpire', keys[1], argv[1]); " "return nil; " "end; " "if (redis.call('hexists', keys[1], argv[2]) == 1) then " "redis.call('hincrby', keys[1], argv[2], 1); " "redis.call('pexpire', keys[1], argv[1]); " "return nil; " "end; " "return redis.call('pttl', keys[1]);", collections.