技术要点
- 临时顺序节点
- 监听事件
- 线程间通信
回顾ZK知识点
节点类型
持久节点:节点被创建后,一直存在,不会随着会话(session)失效而消失。 临时节点:临时节点的生命周期和会话(session)绑定,一旦会话失效,临时节点会被自动清除掉。注意:会话失效不等于连接断开,通过下面两个异常来理解下这句话。我们经常遇见这两类异常CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期),连接断开(CONNECTIONLOSS):一般发生在网络闪断或所连接zk服务器挂掉;Session过期(SESSIONEXPIRED):一般发生在连接断开后,超过SESSION_TIMEOUT后还没有成功连接上zk服务器。 顺序节点:创建节点时,自动给节点名加上一个数字编号后缀。
监听器(watcher)
监听节点/数据的变化,类似于发布订阅功能。
Watcher事件类型:
| 事件 | 触发时机 | 
| NodeCreated | 节点被创建时 | 
| NodeDeleted | 节点被删除时 | 
| NodeDataChanged | 节点内容发生修改时 | 
| NodeChildrenChanged | 子节点列表发生变更时 | 
注册方法和事件对应关系:
| 注册方法 | Created | ChildrenChanged | DataChanged | Deleted | 
| zk.exists("/node",watcher) | √ | √ | √ | |
| zk.getData("/node",watcher) | √ | √ | ||
| zk.getChildren("/node",watcher) | √ | √ | 
实现原理
zk分布式锁实现原理是利用临时顺序节点特性和watch机制。为什么要用临时顺序节点?临时的作用类似于设置超时时间,防止宕机出现死锁。顺序:我们知道分布式锁的特点是 “互斥“,redis分布式锁的互斥通过set NX PX命令,DB分布式锁的互斥是通过for update和版本号,而zk有自己顺序节点,可以让抢锁者按照一定顺序来获取锁,也是一种互斥。让我们来看看具体实现原理:
- 创建锁根目录(/zk_lock)
- 在根目录(/zk_lock)下创建临时顺序节点。
- 获取根目录(/zk_lock)下的所有子节点并排序,如果自己排在第一位,则抢锁成功,反之,没有抢到锁,则监听前一个节点是否释放锁。
代码实现
@Slf4j
public class ZkLock {
    private final String LOCK_ROOT = "/zk_lock";
    private final ConcurrentMap<Thread, String> threadData = Maps.newConcurrentMap();
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            notifyFromWatcher();
        }
    };
    /**
     * 加锁
     * @param key
     * @param timeOutMs 超时时间,单位:毫秒
     * @return
     */
    public boolean tryLock(String key, Long timeOutMs) {
        Long startTime = System.currentTimeMillis();
        String lockPath = String.format("%s/%s",LOCK_ROOT, key);
        String ourPath = ZkManager.createEphemeralSeqNode(lockPath);
        boolean isSuccess = innerLock(startTime, timeOutMs, ourPath);
        if (isSuccess) {
            threadData.put(Thread.currentThread(), ourPath);
        }
        return isSuccess;
    }
    /**
     * 解锁
     */
    public void unLock(){
        String lockPath = threadData.get(Thread.currentThread());
        if (!StringUtils.isEmpty(lockPath)){
            ZkManager.delNode(lockPath);
        }
    }
    private boolean innerLock(Long startTime, Long timeOutMs, String ourPath){
        log.error("innerLock ourPath:" + ourPath);
        // 是否获取的锁
        boolean haveTheLock = false;
        // 是否删除超时节点
        boolean isDeleteTimeOutNode = false;
        try {
            while (!haveTheLock){
                String sequenceNodeName = ourPath.substring(LOCK_ROOT.length() + 1);
                List<String> childNodes = ZkManager.getNodeChild(LOCK_ROOT);
                Collections.sort(childNodes);
                int index = childNodes.indexOf(sequenceNodeName);
                if (index == 0) {
                    haveTheLock = true;
                    break;
                }
                synchronized (this){
                    // watch前一个节点
                    String previousNodePath = String.format("%s/%s",LOCK_ROOT, childNodes.get(index-1));
                    log.error("innerLock previousNodePath:" + previousNodePath);
                    try {
                        ZkManager.getClient().getData().usingWatcher(watcher).forPath(previousNodePath);
                        if (timeOutMs == null) {
                            wait();
                        }
                        timeOutMs -= (System.currentTimeMillis() - startTime);
                        log.warn("innerLock timeOutMs:" + timeOutMs);
                        startTime = System.currentTimeMillis();
                        if (timeOutMs <= 0) {
                            isDeleteTimeOutNode = true;
                            break;
                        }
                        wait(timeOutMs);
                    } catch (KeeperException.NoNodeException e) {
//                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            isDeleteTimeOutNode = true;
            e.printStackTrace();
        } finally {
            if (isDeleteTimeOutNode) {
                ZkManager.delNode(ourPath);
            }
        }
        return haveTheLock;
    }
    /**
     * 通知waiter
     */
    private synchronized void notifyFromWatcher(){
        notifyAll();
    }
}总结:
- 实现了互斥、锁超时自动失效、阻塞等待、公平性。
- 不足之处:未实现可重入(只需本地变量控制下就可以了)
锁超时并发问题
产生的原因是:GC停顿导致临时节点释放,如下图所示:
客户端1发生GC停顿的时候,Zookeeper检测不到心跳,也是有可能出现多个客户端同时操作共享资源的情形。当然,你可以说,我们可以通过JVM调优,避免GC停顿出现。但是注意了,我们所做的一切,只能尽可能避免多个客户端操作共享资源,无法完全消除。
不存在集群宕机并发执行问题
- 所有写操作,都是由leader来完成的,可以认为全局串行写操作。
- leader宕机,会选出新的leader,继续上面写操作。
实战演练
100个线程,使用zk分布式锁对count进行加1操作,验证结果是否是100?
@Slf4j
public class ZkLockTest {
    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    ZkLock zkLock = new ZkLock();
                    try {
                        boolean isSuccess = zkLock.tryLock("testLock",60000L);
                        if (!isSuccess) {
                            log.error("innerLock 加锁失败!");
                            return;
                        }
                        log.error("innerLock 加锁成功!" + Thread.currentThread().getName());
                        count++;
                        log.error("thread:"+ Thread.currentThread().getName() +" count:" + count);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    } finally {
                        zkLock.unLock();
                        log.error("innerLock 解锁成功!");
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        countDownLatch.await();
    }
}结果正确:
[11/08/21 03:15:26:026 CST] Thread-83 ERROR zk.ZkLockTest: thread:Thread-83 count:100
遇到的坑
- 使用NodeCache监听时,锁已经释放了,但是被通知的节点还是进入了等待,而没有立即获取到锁,如图所示:
问题就出在NodeCache竟然可以监听不存在的节点的,所以才会进入等待。
