网站首页 > 技术文章 正文
序
本文主要研究一下redisson的RLock的unlock
RLock
org/redisson/api/RLock.java
/**
* Redis based implementation of {@link java.util.concurrent.locks.Lock}
* Implements re-entrant lock.
*
* @author Nikita Koksharov
*
*/
public interface RLock extends Lock, RLockAsync {
/**
* Returns name of object
*
* @return name - name of object
*/
String getName();
/**
* Acquires the lock with defined <code>leaseTime</code>.
* Waits if necessary until lock became available.
*
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param leaseTime the maximum time to hold the lock after it's acquisition,
* if it hasn't already been released by invoking <code>unlock</code>.
* If leaseTime is -1, hold the lock until explicitly unlocked.
* @param unit the time unit
* @throws InterruptedException - if the thread is interrupted
*/
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire the lock with defined <code>leaseTime</code>.
* Waits up to defined <code>waitTime</code> if necessary until the lock became available.
*
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param waitTime the maximum time to acquire the lock
* @param leaseTime lease time
* @param unit time unit
* @return <code>true</code> if lock is successfully acquired,
* otherwise <code>false</code> if lock is already set.
* @throws InterruptedException - if the thread is interrupted
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires the lock with defined <code>leaseTime</code>.
* Waits if necessary until lock became available.
*
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param leaseTime the maximum time to hold the lock after it's acquisition,
* if it hasn't already been released by invoking <code>unlock</code>.
* If leaseTime is -1, hold the lock until explicitly unlocked.
* @param unit the time unit
*
*/
void lock(long leaseTime, TimeUnit unit);
/**
* Unlocks the lock independently of its state
*
* @return <code>true</code> if lock existed and now unlocked
* otherwise <code>false</code>
*/
boolean forceUnlock();
/**
* Checks if the lock locked by any thread
*
* @return <code>true</code> if locked otherwise <code>false</code>
*/
boolean isLocked();
/**
* Checks if the lock is held by thread with defined <code>threadId</code>
*
* @param threadId Thread ID of locking thread
* @return <code>true</code> if held by thread with given id
* otherwise <code>false</code>
*/
boolean isHeldByThread(long threadId);
/**
* Checks if this lock is held by the current thread
*
* @return <code>true</code> if held by current thread
* otherwise <code>false</code>
*/
boolean isHeldByCurrentThread();
/**
* Number of holds on this lock by the current thread
*
* @return holds or <code>0</code> if this lock is not held by current thread
*/
int getHoldCount();
/**
* Remaining time to live of the lock
*
* @return time in milliseconds
* -2 if the lock does not exist.
* -1 if the lock exists but has no associated expire.
*/
long remainTimeToLive();
}
RLock接口继承了JDK的
java.util.concurrent.locks.Lock接口,同时还扩展提供了isLocked、isHeldByThread、isHeldByCurrentThread等方法
RedissonLock
org/redisson/RedissonLock.java
public class RedissonLock extends RedissonBaseLock {
//......
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
//......
}
RedissonLock继承了RedissonBaseLock,其unlock方法调用的是父类的unlockAsync,传递的参数为Thread.currentThread().getId()
unlockAsync
org/redisson/RedissonBaseLock.java
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
//......
@Override
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
//......
}
unlockAsync执行的是unlockInnerAsync,参数为当前thread的id,再其完成之后触发cancelExpirationRenewal
unlockInnerAsync
org/redisson/RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
RedissonLock提供了unlockInnerAsync方法,执行的是一段lua脚本:
if redis.call('hexists', KEYS[1], ARGV[3]) == 0 then
return nil
end
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1)
if counter > 0 then
redis.call('pexpire', KEYS[1], ARGV[2])
return 0
else
redis.call('del', KEYS[1])
redis.call('publish', KEYS[2], ARGV[1])
return 1
end
return nil
先判断当前hash结构指定getLockName(threadId)的元素是否存在,不存在直接返回nil;接着对该线程的count减1,之后判断,若大于0则使用pexpire执行过期返回0;否则执行del命令,然后publish发送LockPubSub.UNLOCK_MESSAGE通知,返回1
isLocked
org/redisson/RedissonBaseLock.java
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
//......
@Override
public boolean isLocked() {
return isExists();
}
//......
}
isLocked方法是执行父类的isExists方法
@Override
public boolean isExists() {
return get(isExistsAsync());
}
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EXISTS, getRawName());
}
通过exists命令来判断,参数为getRawName
isHeldByCurrentThread
org/redisson/RedissonBaseLock.java
@Override
public boolean isHeldByCurrentThread() {
return isHeldByThread(Thread.currentThread().getId());
}
@Override
public boolean isHeldByThread(long threadId) {
RFuture<Boolean> future = commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getRawName(), getLockName(threadId));
return get(future);
}
isHeldByCurrentThread调用的是isHeldByThread方法,传递的是Thread.currentThread().getId();isHeldByThread执行的是hexist命令,参数是getRawName()及getLockName(threadId)
小结
redisson的RLock接口继承了JDK的
java.util.concurrent.locks.Lock接口,同时还扩展提供了isLocked、isHeldByThread、isHeldByCurrentThread等方法。
- 其中unlock的时候先判断当前hash结构指定getLockName(threadId)的元素是否存在,不存在直接返回nil;接着对该线程的count减1,之后判断,若大于0则使用pexpire执行过期返回0;否则执行del命令,然后publish发送LockPubSub.UNLOCK_MESSAGE通知,返回1;
- isLocked通过exists命令来判断,参数为getRawName()
- isHeldByThread执行的是hexist命令,参数是getRawName()及getLockName(threadId)
doc
- locks-and-synchronizers
- 上一篇: Linux分区页框分配器之水位
- 下一篇: Python 3.8异步并发编程指南
猜你喜欢
- 2025-08-06 如何阅读Linux内核源码?Linux内存管理中SLAB分配器(源码分析)
- 2025-08-06 关于读取smbios数据
- 2025-08-06 内存管理:Malloc缺页中断不同情况处理总结
- 2025-08-06 android 功耗分析方法和优化(1)
- 2025-08-06 Tenda AX12 路由器设备分析(一)
- 2025-08-06 译见:从理论到实践,基于Java的开源大数据工具
- 2025-08-06 全新 MQTT 订阅、BLOB 类型:TDengine 时序数据库 最新版本亮点速览
- 2025-08-06 NXP Steps Up China Push as EV Boom Reshapes Global Chip Landscape
- 2025-08-06 Why China is irreplaceable in supply chain
- 2025-05-24 ROS2 Jazzy:用C++实现一个动作服务器和客户端
- 最近发表
-
- count(*)、count1(1)、count(主键)、count(字段) 哪个更快?
- 深入探索 Spring Boot3 中 MyBatis 的 association 标签用法
- js异步操作 Promise fetch API 带来的网络请求变革—仙盟创梦IDE
- HTTP状态码超详细说明_http 状态码有哪些
- 聊聊跨域的原理与解决方法_跨域解决方案及原理
- 告别懵圈!产品新人的接口文档轻松入门指南
- 在Javaweb中实现发送简单邮件_java web发布
- 优化必备基础:Oracle中常见的三种表连接方式
- Oracle常用工具使用 - AWR_oracle工具有哪些
- 搭载USB 3.1接口:msi 微星 发布 990FXA Gaming 游戏主板
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)