分布式锁实现方式~DB
本节重点
- 数据库分布式锁实现原理
- 掌握悲观锁和乐观锁
分布式锁实现方式
悲观锁
悲观锁是在数据修改之前,把待修改的数据进行锁定,防止并发修改。通常采用for update加锁方式来实现,使用时需要注意以下两点:
- 首先要开启事务
- 在for update语句中where条件字段上创建索引,因为是通过索引来加锁的,否则会锁整个表。
无事务的for update演练
模拟100个用户抢购商品,商品销售数量做加1操作,校验数据库中的销售数量是否是100,代码如下:
@DS(Constant.DataSource.MASTER)
@Override
public BaseResponse<BoolResult> testLock() {
GoodsInfo goodsInfo = baseMapper.selectOne(new LambdaQueryWrapper<GoodsInfo>().eq(GoodsInfo::getId,1)
.last(" for update"));
goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
Integer result = baseMapper.updateById(goodsInfo);
if (result != null && result.intValue() > 0) {
log.warn("testLock success!");
return BaseResponse.ok(new BoolResult(true));
} else {
log.warn("testLock fail!");
return BaseResponse.ok(new BoolResult(false));
}
}
结果:日志‘testLock success!’ 打印了 100条,而库中销售数量是2,显然和我们预期结果不一致。
有事务的for update演练
@DS(Constant.DataSource.MASTER)
@Transactional(rollbackFor = Exception.class)
@Override
public BaseResponse<BoolResult> testLock() {
GoodsInfo goodsInfo = baseMapper.selectOne(new LambdaQueryWrapper<GoodsInfo>().eq(GoodsInfo::getId,1)
.last(" for update"));
goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
Integer result = baseMapper.updateById(goodsInfo);
if (result != null && result.intValue() > 0) {
log.warn("testLock success!");
return BaseResponse.ok(new BoolResult(true));
} else {
log.warn("testLock fail!");
return BaseResponse.ok(new BoolResult(false));
}
}
结果:日志‘testLock success!’ 打印了 100条,而库中销售数量是100,显然和我们预期结果一致。
在实际开发中,很少使用悲观锁防止并发操作,因为这种方式控制不当的话,经常会出现死锁情况。
乐观锁
乐观锁是相对悲观锁而言的,假设数据一般情况不会发生冲突的,所以在数据提交的时候,才会进行数据冲突校验,如果发送数据冲突,由用户决定如何操作。乐观锁通常是数据版本(version)机制实现的。何谓数据版本?数据版本就是在表中增加一个version字段。读取数据时,同时将version字段读取出来,数据每次更新,对此version值加1。当我们提交数据时,将对比表中对应记录的版本和取出来的版本是否一致,一致则更新成功,否则更新失败。
这里的数据库分布式锁通过乐观锁方式来实现的,首先创建一个锁表distribute_lock,来保存锁记录,SQL如下:
CREATE TABLE `distribute_lock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`lock_name` varchar(20) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '锁记录名',
`thread_id` varchar(50) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '线程id',
`lock_time` bigint(20) DEFAULT '0' COMMENT '锁失效时间',
`is_deleted` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '删除状态 0正常 1删除',
`gmt_create` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
`gmt_update` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_lock_name` (`lock_name`) USING BTREE,
KEY `idx_lockName_threadId` (`lock_name`,`thread_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='分布式锁';
加锁原理
在字段lock_name上创建一个唯一索引,首次加锁时,向distribute_lock表中插入一条锁记录,插入成功,则加锁成功,否则触发唯一索引约束,插入失败,即加锁失败。
insert into distribute_lock(`lock_name`,`thread_id`,`lock_time`) values('test','thread1', 5000);
非首次加锁,通过lock_name查询出当前锁记录,如果某个线程正在持有锁(即:thread_id!= '' && lock_time>当前时间戳),则加锁失败,否则,以已查询出的lock_name+thread_id为条件,把thread_id字段更新为当前线程id,同时把lock_time字段更新为当前锁失效时间,更新成功,则加锁成功,反之加锁失败。
update distribute_lock set thread_id='thread2',lock_time=5000 where lock_name='test' and thread_id='thread1';
解锁原理
释放当前线程所持有锁,即更新thread_id字段为空和lock_time字段为0。
update distribute_lock set thread_id='',lock_time=0 where lock_name='test' and thread_id='thread1';
实现代码
// 加锁
public boolean tryLock(String key, long exprieMis) {
try {
DistributeLock distributeLock = baseMapper.selectOne(new LambdaQueryWrapper<DistributeLock>()
.eq(DistributeLock::getLockName,key)
.eq(DistributeLock::getIsDeleted,false)
.last("limit 1"));
if (distributeLock == null) {
distributeLock = new DistributeLock();
distributeLock.setLockName(key);
distributeLock.setLockTime(System.currentTimeMillis() + exprieMis);
distributeLock.setThreadId(Thread.currentThread().getName());
Integer result = baseMapper.insert(distributeLock);
return result != null && result.intValue() > 0 ? true : false;
}
if (!StringUtils.isEmpty(distributeLock.getLockName())
&& distributeLock.getLockTime() !=0
&& System.currentTimeMillis() <= distributeLock.getLockTime()) {
return false;
}
DistributeLock updateLock = new DistributeLock();
updateLock.setThreadId(Thread.currentThread().getName());
updateLock.setLockTime(System.currentTimeMillis() + exprieMis);
Integer updateResult = baseMapper.update(updateLock, Wrappers.<DistributeLock>lambdaUpdate()
.eq(DistributeLock::getLockName, key)
.eq(DistributeLock::getThreadId, distributeLock.getThreadId()));
return updateResult != null && updateResult.intValue() > 0 ? true : false;
} catch (Exception ex) {
log.error("tryLock error key:" + key, ex);
return false;
}
}
// 解锁
@Override
public boolean unLock(String key) {
DistributeLock updateLock = new DistributeLock();
updateLock.setThreadId("");
updateLock.setLockTime(0L);
Integer updateResult = baseMapper.update(updateLock, Wrappers.<DistributeLock>lambdaUpdate()
.eq(DistributeLock::getLockName, key)
.eq(DistributeLock::getThreadId, Thread.currentThread().getName()));
return updateResult != null && updateResult.intValue() > 0 ? true : false;
}
锁正确使用
让我们来看下下面的加解锁代码:
// 错误的加解锁方式
@Override
public boolean uselock_error(Integer id) {
String key = "lock_" + id;
// 加锁
boolean lockSucces = tryLock(key,5000);
if (!lockSucces){
throw new BaseException(-1, "加锁失败!");
}
//业务逻辑处理......
// 解锁
unLock(key);
return false;
}
这样写有问题吗?这不都进行加锁,解锁了吗?感觉很正确,没啥问题。如果此时业务逻辑处理出现异常,解锁方法(unLock)还解锁吗?所以,解锁一定要放到finally中。有时我们在使用分布式锁的同时还想用事务来保证数据的一致性,那该怎么操作呢?只需在方法上添加@Transactional(rollbackFor = Exception.class)开启事务,catch的时候把异常再次抛出,保证事务回滚。
@Transactional(rollbackFor = Exception.class)
@Override
public boolean uselock_transactional(Integer id) {
String key = "lock_" + id;
try {
// 加锁
boolean lockSucces = tryLock(key,5000);
if (!lockSucces){
throw new BaseException(-1, "加锁失败!");
}
// 业务逻辑处理
} catch (Exception ex) {
// 捕获的异常,直接抛出,进行事务回滚
throw ex;
} finally {
// 解锁
unLock(key);
}
return false;
}
实战演练
模拟100个用户抢购商品,商品销售数量做加1操作,校验成功卖出数量和数据库中的数量是否一致,实战代码如下:
@DS(Constant.DataSource.MASTER)
@Transactional(rollbackFor = Exception.class)
@Override
public BaseResponse<BoolResult> testLock() {
try{
Boolean isSuccess = distributeLockService.tryLock("testLock1", 5000);
if (!isSuccess) {
throw new BaseException(-1, "加锁失败!");
}
GoodsInfo goodsInfo = baseMapper.selectById(1);
goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
baseMapper.updateById(goodsInfo);
int rand = RandomUtil.randomInt(100);
if (rand / 2 == 0) {
log.error("testLock mock ex!");
throw new BaseException(-1, "模拟异常!");
} else {
log.error("testLock success!");
return BaseResponse.ok(new BoolResult(true));
}
} catch (Exception ex) {
if (ex instanceof BaseException) {
throw ex;
}
log.error("testLock error", ex);
throw ex;
} finally {
distributeLockService.unLock("testLock1");
}
}
结果如下:
数据库商品id=1的销售数量sale_count:98,日志输出的‘testLock success!’的数量也是98,说明能够正确防止并发操作。
总结
- 使用for update加锁时,需要开启事务,并且where条件要建索引,由于容易出现死锁,实际开发中应用很少。
- 使用乐观锁时,一定要把解锁(unlock)操作放到finally中。和redis分布式锁对比,性能比较差,但更安全。