优秀的编程知识分享平台

网站首页 > 技术文章 正文

zookeeper源码分析之三客户端发送请求流程

nanyue 2024-08-04 16:51:55 技术文章 9 ℃

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

知识准备:

zookeeper定义的状态有:

Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);

事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

watcher定义的的类型有Children(1), Data(2), Any(3);

在上一篇zookeeper源码分析之一客户端

中,我们连接zookeeper时,启动了一个MyWatcher

protected void connectToZK(String newHost) throws InterruptedException, IOException {
 if (zk != null && zk.getState().isAlive()) {
 zk.close();
 }
 host = newHost;
 boolean readOnly = cl.getOption("readonly") != null;
 if (cl.getOption("secure") != null) {
 System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
 System.out.println("Secure connection is enabled");
 }
 zk = new ZooKeeper(host,
 Integer.parseInt(cl.getOption("timeout")),
 new MyWatcher(), readOnly);
 }

创建zookeeper示例时,使用到watchManager:

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
 boolean canBeReadOnly, HostProvider aHostProvider)
 throws IOException {
 LOG.info("Initiating client connection, connectString=" + connectString
 + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
 watchManager = defaultWatchManager();
 watchManager.defaultWatcher = watcher;
 ConnectStringParser connectStringParser = new ConnectStringParser(
 connectString);
 hostProvider = aHostProvider;
 cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
 hostProvider, sessionTimeout, this, watchManager,
 getClientCnxnSocket(), canBeReadOnly);
 cnxn.start();
 }

将传进来的MyWatcher作为默认watcher,存入watchManager,然后通过ClientCnxn包装后,启动线程。

那我们先了解一下ClientCnxn吧,ClientCnxn管理客户端socket的io,它维护了一组可以连接上的server及当需要转换时可以透明的转换到的一组server。

先了解一下如何获取socket的吧:

 private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
 String clientCnxnSocketName = System
 .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
 if (clientCnxnSocketName == null) {
 clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
 }
 try {
 return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
 .newInstance();
 } catch (Exception e) {
 IOException ioe = new IOException("Couldn't instantiate "
 + clientCnxnSocketName);
 ioe.initCause(e);
 throw ioe;
 }
 }

接着启动ClientCnxn的start()方法,在此方法中启动了两个线程:

 public void start() {
 sendThread.start();
 eventThread.start();
 }

其中SendThread类为发送的请求队列提供服务,并且产生心跳。它同时也产生ReadThread。

我们看一下SendThread的run方法的主体:

 if (!clientCnxnSocket.isConnected()) {
 // don't re-establish connection if we are closing
 if (closing) {
 break;
 }
 startConnect();
 clientCnxnSocket.updateLastSendAndHeard();
 }
 if (state.isConnected()) {
 // determine whether we need to send an AuthFailed event.
 if (zooKeeperSaslClient != null) {
 boolean sendAuthEvent = false;
 if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
 try {
 zooKeeperSaslClient.initialize(ClientCnxn.this);
 } catch (SaslException e) {
 LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
 state = States.AUTH_FAILED;
 sendAuthEvent = true;
 }
 }
 KeeperState authState = zooKeeperSaslClient.getKeeperState();
 if (authState != null) {
 if (authState == KeeperState.AuthFailed) {
 // An authentication error occurred during authentication with the Zookeeper Server.
 state = States.AUTH_FAILED;
 sendAuthEvent = true;
 } else {
 if (authState == KeeperState.SaslAuthenticated) {
 sendAuthEvent = true;
 }
 }
 }
 if (sendAuthEvent == true) {
 eventThread.queueEvent(new WatchedEvent(
 Watcher.Event.EventType.None,
 authState,null));
 }
 }
 to = readTimeout - clientCnxnSocket.getIdleRecv();
 } else {
 to = connectTimeout - clientCnxnSocket.getIdleRecv();
 }
 
 if (to <= 0) {
 String warnInfo;
 warnInfo = "Client session timed out, have not heard from server in "
 + clientCnxnSocket.getIdleRecv()
 + "ms"
 + " for sessionid 0x"
 + Long.toHexString(sessionId);
 LOG.warn(warnInfo);
 throw new SessionTimeoutException(warnInfo);
 }
 if (state.isConnected()) {
 //1000(1 second) is to prevent race condition missing to send the second ping
 //also make sure not to send too many pings when readTimeout is small 
 int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
 ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
 //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
 if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
 sendPing();
 clientCnxnSocket.updateLastSend();
 } else {
 if (timeToNextPing < to) {
 to = timeToNextPing;
 }
 }
 }
 // If we are in read-only mode, seek for read/write server
 if (state == States.CONNECTEDREADONLY) {
 long now = Time.currentElapsedTime();
 int idlePingRwServer = (int) (now - lastPingRwServer);
 if (idlePingRwServer >= pingRwTimeout) {
 lastPingRwServer = now;
 idlePingRwServer = 0;
 pingRwTimeout =
 Math.min(2*pingRwTimeout, maxPingRwTimeout);
 pingRwServer();
 }
 to = Math.min(to, pingRwTimeout - idlePingRwServer);
 }
 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
 

ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:

 loop:
 - try:
 - - !isConnected()
 - - - connect()
 - - doTransport()
 - catch:
 - - cleanup()
 close()

从上述描述中,我们可以看到ClientCnxnSocket的工作流程,先判断是否连接,没有连接则调用connect方法进行连接,有连接则直接使用;然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;最后关闭连接。故最主要的流程为doTransport()方法:

 @Override
 void doTransport(int waitTimeOut,
 List<Packet> pendingQueue,
 ClientCnxn cnxn)
 throws IOException, InterruptedException {
 try {
 if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
 return;
 }
 Packet head = null;
 if (needSasl.get()) {
 if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
 return;
 }
 } else {
 if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
 return;
 }
 }
 // check if being waken up on closing.
 if (!sendThread.getZkState().isAlive()) {
 // adding back the patck to notify of failure in conLossPacket().
 addBack(head);
 return;
 }
 // channel disconnection happened
 if (disconnected.get()) {
 addBack(head);
 throw new EndOfStreamException("channel for sessionid 0x"
 + Long.toHexString(sessionId)
 + " is lost");
 }
 if (head != null) {
 doWrite(pendingQueue, head, cnxn);
 }
 } finally {
 updateNow();
 }
 }

我们简化一下上面的程序,一个是异常处理addBack(head),另一个正常流程处理doWrite(pendingQueue, head, cnxn),我们先抛掉异常,走正常流程看看:

先获取Packet:

Packet head = null;
 if (needSasl.get()) {
 if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
 return;
 }
 } else {
 if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
 return;
 }
 }

其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一个链表阻塞队列,保存发出的请求;

然后执行doWrite方法:

 /**
 * doWrite handles writing the packets from outgoingQueue via network to server.
 */
 private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
 updateNow();
 while (true) {
 if (p != WakeupPacket.getInstance()) {
 if ((p.requestHeader != null) &&
 (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
 (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
 p.requestHeader.setXid(cnxn.getXid());
 synchronized (pendingQueue) {
 pendingQueue.add(p);
 }
 }
 sendPkt(p);
 }
 if (outgoingQueue.isEmpty()) {
 break;
 }
 p = outgoingQueue.remove();
 }
 }

dowrite方法负责将outgoingQueue的报文通过网络写到服务器上。发送报文程序如上红色所示:

 private void sendPkt(Packet p) {
 // Assuming the packet will be sent out successfully. Because if it fails,
 // the channel will close and clean up queues.
 p.createBB();
 updateLastSend();
 sentCount++;
 channel.write(ChannelBuffers.wrappedBuffer(p.bb));
 }

1. Packet报文的结构如下:

 /**
 * This class allows us to pass the headers and the relevant records around.
 */
 static class Packet {
 RequestHeader requestHeader;
 ReplyHeader replyHeader;
 Record request;
 Record response;
 ByteBuffer bb;
 /** Client's view of the path (may differ due to chroot) **/
 String clientPath;
 /** Servers's view of the path (may differ due to chroot) **/
 String serverPath;
 boolean finished;
 AsyncCallback cb;
 Object ctx;
 WatchRegistration watchRegistration;
 public boolean readOnly;
 WatchDeregistration watchDeregistration;
 /** Convenience ctor */
 Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
 Record request, Record response,
 WatchRegistration watchRegistration) {
 this(requestHeader, replyHeader, request, response,
 watchRegistration, false);
 }
 Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
 Record request, Record response,
 WatchRegistration watchRegistration, boolean readOnly) {
 this.requestHeader = requestHeader;
 this.replyHeader = replyHeader;
 this.request = request;
 this.response = response;
 this.readOnly = readOnly;
 this.watchRegistration = watchRegistration;
 }
 public void createBB() {
 try {
 ByteArrayOutputStream baos = new ByteArrayOutputStream();
 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 boa.writeInt(-1, "len"); // We'll fill this in later
 if (requestHeader != null) {
 requestHeader.serialize(boa, "header");
 }
 if (request instanceof ConnectRequest) {
 request.serialize(boa, "connect");
 // append "am-I-allowed-to-be-readonly" flag
 boa.writeBool(readOnly, "readOnly");
 } else if (request != null) {
 request.serialize(boa, "request");
 }
 baos.close();
 this.bb = ByteBuffer.wrap(baos.toByteArray());
 this.bb.putInt(this.bb.capacity() - 4);
 this.bb.rewind();
 } catch (IOException e) {
 LOG.warn("Ignoring unexpected exception", e);
 }
 }
 @Override
 public String toString() {
 StringBuilder sb = new StringBuilder();
 sb.append("clientPath:" + clientPath);
 sb.append(" serverPath:" + serverPath);
 sb.append(" finished:" + finished);
 sb.append(" header:: " + requestHeader);
 sb.append(" replyHeader:: " + replyHeader);
 sb.append(" request:: " + request);
 sb.append(" response:: " + response);
 // jute toString is horrible, remove unnecessary newlines
 return sb.toString().replaceAll("\r*\n+", " ");
 }
 }

从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

2. 更新最后一次发送updateLastSend

 void updateLastSend() {
 this.lastSend = now;
 }

3. 使用nio channel 发送字节缓存到server

channel.write(ChannelBuffers.wrappedBuffer(p.bb));

其中,bb的类型为ByteBuffer,在packet中进行了初始化。

 this.bb = ByteBuffer.wrap(baos.toByteArray());
 this.bb.putInt(this.bb.capacity() - 4);
 this.bb.rewind();

小结:

zookeeper客户端和服务器的连接主要是通过ClientCnxnSocket来实现的,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

先判断是否连接,没有连接则调用connect方法进行连接,有连接则进入下一步;

然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;

最后关闭连接。

上述的发现可以在SendThread的run方法中体现。

Tags:

最近发表
标签列表