分布式锁实现方式~DB
本节重点
- 数据库分布式锁实现原理
- 掌握悲观锁和乐观锁
分布式锁实现方式
悲观锁
悲观锁是在数据修改之前,把待修改的数据进行锁定,防止并发修改。通常采用for update加锁方式来实现,使用时需要注意以下两点:
- 首先要开启事务
- 在for update语句中where条件字段上创建索引,因为是通过索引来加锁的,否则会锁整个表。
无事务的for update演练
模拟100个用户抢购商品,商品销售数量做加1操作,校验数据库中的销售数量是否是100,代码如下:
@DS(Constant.DataSource.MASTER)
@Override
public BaseResponse<BoolResult> testLock() {
    GoodsInfo goodsInfo = baseMapper.selectOne(new LambdaQueryWrapper<GoodsInfo>().eq(GoodsInfo::getId,1)
                                               .last(" for update"));
    goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
    Integer result = baseMapper.updateById(goodsInfo);
    if (result != null && result.intValue() > 0) {
        log.warn("testLock success!");
        return BaseResponse.ok(new BoolResult(true));
    } else {
        log.warn("testLock fail!");
        return BaseResponse.ok(new BoolResult(false));
    }
}结果:日志‘testLock success!’ 打印了 100条,而库中销售数量是2,显然和我们预期结果不一致。
有事务的for update演练
@DS(Constant.DataSource.MASTER)
@Transactional(rollbackFor = Exception.class)
@Override
public BaseResponse<BoolResult> testLock() {
    GoodsInfo goodsInfo = baseMapper.selectOne(new LambdaQueryWrapper<GoodsInfo>().eq(GoodsInfo::getId,1)
                                               .last(" for update"));
    goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
    Integer result = baseMapper.updateById(goodsInfo);
    if (result != null && result.intValue() > 0) {
        log.warn("testLock success!");
        return BaseResponse.ok(new BoolResult(true));
    } else {
        log.warn("testLock fail!");
        return BaseResponse.ok(new BoolResult(false));
    }
}结果:日志‘testLock success!’ 打印了 100条,而库中销售数量是100,显然和我们预期结果一致。
在实际开发中,很少使用悲观锁防止并发操作,因为这种方式控制不当的话,经常会出现死锁情况。
乐观锁
乐观锁是相对悲观锁而言的,假设数据一般情况不会发生冲突的,所以在数据提交的时候,才会进行数据冲突校验,如果发送数据冲突,由用户决定如何操作。乐观锁通常是数据版本(version)机制实现的。何谓数据版本?数据版本就是在表中增加一个version字段。读取数据时,同时将version字段读取出来,数据每次更新,对此version值加1。当我们提交数据时,将对比表中对应记录的版本和取出来的版本是否一致,一致则更新成功,否则更新失败。
这里的数据库分布式锁通过乐观锁方式来实现的,首先创建一个锁表distribute_lock,来保存锁记录,SQL如下:
CREATE TABLE `distribute_lock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `lock_name` varchar(20) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '锁记录名',
  `thread_id` varchar(50) COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '线程id',
  `lock_time` bigint(20) DEFAULT '0' COMMENT '锁失效时间',
  `is_deleted` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '删除状态 0正常 1删除',
  `gmt_create` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `gmt_update` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_lock_name` (`lock_name`) USING BTREE,
  KEY `idx_lockName_threadId` (`lock_name`,`thread_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='分布式锁';加锁原理
在字段lock_name上创建一个唯一索引,首次加锁时,向distribute_lock表中插入一条锁记录,插入成功,则加锁成功,否则触发唯一索引约束,插入失败,即加锁失败。
insert into distribute_lock(`lock_name`,`thread_id`,`lock_time`) values('test','thread1', 5000);非首次加锁,通过lock_name查询出当前锁记录,如果某个线程正在持有锁(即:thread_id!= '' && lock_time>当前时间戳),则加锁失败,否则,以已查询出的lock_name+thread_id为条件,把thread_id字段更新为当前线程id,同时把lock_time字段更新为当前锁失效时间,更新成功,则加锁成功,反之加锁失败。
update distribute_lock set thread_id='thread2',lock_time=5000 where lock_name='test' and thread_id='thread1';解锁原理
释放当前线程所持有锁,即更新thread_id字段为空和lock_time字段为0。
update distribute_lock set thread_id='',lock_time=0 where lock_name='test' and thread_id='thread1';实现代码
// 加锁
public boolean tryLock(String key, long exprieMis) {
    try {
        DistributeLock distributeLock = baseMapper.selectOne(new LambdaQueryWrapper<DistributeLock>()
                                                             .eq(DistributeLock::getLockName,key)
                                                             .eq(DistributeLock::getIsDeleted,false)
                                                             .last("limit 1"));
        if (distributeLock == null) {
            distributeLock = new DistributeLock();
            distributeLock.setLockName(key);
            distributeLock.setLockTime(System.currentTimeMillis() + exprieMis);
            distributeLock.setThreadId(Thread.currentThread().getName());
            Integer result = baseMapper.insert(distributeLock);
            return result != null && result.intValue() > 0 ? true : false;
        }
        if (!StringUtils.isEmpty(distributeLock.getLockName())
            && distributeLock.getLockTime() !=0
            && System.currentTimeMillis() <= distributeLock.getLockTime()) {
            return false;
        }
        DistributeLock updateLock = new DistributeLock();
        updateLock.setThreadId(Thread.currentThread().getName());
        updateLock.setLockTime(System.currentTimeMillis() + exprieMis);
        Integer updateResult = baseMapper.update(updateLock, Wrappers.<DistributeLock>lambdaUpdate()
                                                 .eq(DistributeLock::getLockName, key)
                                                 .eq(DistributeLock::getThreadId, distributeLock.getThreadId()));
        return updateResult != null && updateResult.intValue() > 0 ? true : false;
    } catch (Exception ex) {
        log.error("tryLock error key:" + key, ex);
        return false;
    }
}
// 解锁
@Override
public boolean unLock(String key) {
    DistributeLock updateLock = new DistributeLock();
    updateLock.setThreadId("");
    updateLock.setLockTime(0L);
    Integer updateResult = baseMapper.update(updateLock, Wrappers.<DistributeLock>lambdaUpdate()
                                             .eq(DistributeLock::getLockName, key)
                                             .eq(DistributeLock::getThreadId, Thread.currentThread().getName()));
    return updateResult != null && updateResult.intValue() > 0 ? true : false;
}锁正确使用
让我们来看下下面的加解锁代码:
// 错误的加解锁方式
@Override
public boolean uselock_error(Integer id) {
    String key = "lock_" + id;
    // 加锁
    boolean lockSucces = tryLock(key,5000);
    if (!lockSucces){
        throw new BaseException(-1, "加锁失败!");
    }
    //业务逻辑处理......
    // 解锁
    unLock(key);
    return false;
}这样写有问题吗?这不都进行加锁,解锁了吗?感觉很正确,没啥问题。如果此时业务逻辑处理出现异常,解锁方法(unLock)还解锁吗?所以,解锁一定要放到finally中。有时我们在使用分布式锁的同时还想用事务来保证数据的一致性,那该怎么操作呢?只需在方法上添加@Transactional(rollbackFor = Exception.class)开启事务,catch的时候把异常再次抛出,保证事务回滚。
@Transactional(rollbackFor = Exception.class)
@Override
public boolean uselock_transactional(Integer id) {
    String key = "lock_" + id;
    try {
        // 加锁
        boolean lockSucces = tryLock(key,5000);
        if (!lockSucces){
            throw new BaseException(-1, "加锁失败!");
        }
        // 业务逻辑处理
    } catch (Exception ex) {
        // 捕获的异常,直接抛出,进行事务回滚
        throw ex;
    } finally {
        // 解锁
        unLock(key);
    }
    return false;
}实战演练
模拟100个用户抢购商品,商品销售数量做加1操作,校验成功卖出数量和数据库中的数量是否一致,实战代码如下:
@DS(Constant.DataSource.MASTER)
@Transactional(rollbackFor = Exception.class)
@Override
public BaseResponse<BoolResult> testLock() {
    try{
        Boolean isSuccess = distributeLockService.tryLock("testLock1", 5000);
        if (!isSuccess) {
            throw new BaseException(-1, "加锁失败!");
        }
        GoodsInfo goodsInfo = baseMapper.selectById(1);
        goodsInfo.setSaleCount(goodsInfo.getSaleCount() + 1);
        baseMapper.updateById(goodsInfo);
        int rand = RandomUtil.randomInt(100);
        if (rand / 2 == 0) {
            log.error("testLock mock ex!");
            throw new BaseException(-1, "模拟异常!");
        } else {
            log.error("testLock success!");
            return BaseResponse.ok(new BoolResult(true));
        }
    } catch (Exception ex) {
        if (ex instanceof BaseException) {
            throw ex;
        }
        log.error("testLock error", ex);
        throw ex;
    } finally {
        distributeLockService.unLock("testLock1");
    }
}结果如下:
数据库商品id=1的销售数量sale_count:98,日志输出的‘testLock success!’的数量也是98,说明能够正确防止并发操作。
总结
- 使用for update加锁时,需要开启事务,并且where条件要建索引,由于容易出现死锁,实际开发中应用很少。
- 使用乐观锁时,一定要把解锁(unlock)操作放到finally中。和redis分布式锁对比,性能比较差,但更安全。
