优秀的编程知识分享平台

网站首页 > 技术文章 正文

kafka网络模型

nanyue 2024-12-12 14:09:04 技术文章 8 ℃

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)

  • Acceptor线程,负责监听Client端发起的请求
  • Processor线程,负责对Socket进行读写
  • Worker线程,处理具体的业务逻辑并生成Response返回

具体设计与实现如下:

1.SocketServer

SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程。 Processor线程数可以通num.network.threads配置进行调整。

注意:kafka支持监听多个端口,每个端口1个acceptor和3个Processor线程(num.network.threads默认是3),kafka实现中使用endpoints来保存多个端口。

2.Acceptor

Acceptor是一个继承自抽象类AbstractServerThread的线程类。 Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。

  • Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。
  • 然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。
  • 用轮询的方法,选择Processor进行处理,这样可以保证后面多个Processor线程的负载基本均匀。同时,将新接收的到客户端SocketChannel,保存到Processor的newConnections队列中(默认大小为20)。

3.Processor

Processor也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件

processor线程中有三个重要的变量:

(1)newConnections:在Acceptor一节中已经提到过,用于保存新连接交由Processor处理的socketChannel;

(2)inflightResponses:是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应;

(3)selector:是一个类型为KSelector变量,用于管理网络连接;

下面先给出Processor处理器线程run方法执行的流程图:

(4)repsonseQueue:接收响应消息的队列

processor线程处理流程如下:

(1)处理newConnections队列中的socketChannel。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;

(2)处理RequestChannel中与当前Processor对应响应队列中的Response。即从repsonseQueue获取消息, 在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;

(3)调用selector.poll()方法进行处理该方法底层即为调用nioSelector.select()方法进行读取数据或者发送数据。

  • 真正读取数据,并放入接收缓存队列stagedReceives,缓存所有channel的请求
  • 拿出每个channel的第一个请求,解析协议头部,放入completedReceives缓存中
  • 如果channel写ready,则进行write,将response返回给客户端

(4)处理已接受完成的数据包队列—completedReceives将selector.poll中读取数据的,转换成request,再processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;

(5)处理已发送完的队列—completedSends当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;

(6)处理断开连接的队列将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;

注意:newConnections队列中保存的只是连接的句柄,真实的数据,是要通过注册OP_READ后,通过poll检检测到接收事件,获取真正的数据,并转化成request,然后发送到requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处。 发送响应消息也是通过注册OP_WIRTE事件,让后通过poll检测到发送事件进行发送数据。

4.RequestChannel

在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。 requestQueue 大小可以通过queued.max.requests设置,默认大小是500

  • Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;
  • KafkaRequestHandler线程从请求队列requestQueue中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。

5.KafkaRequestHandler

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干num.io.threads个KafkaRequestHandler线程, num.io.threads默认值是8),这些线程以守护线程的方式在后台运行。

  • 在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,并根据协议头,选择对应的KafkaApi进行处理
  • 使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的Processor的responseQueue中

6.KafkaApis

KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。KafkaApis.handle根据requset的key,选择对应的函数进行处理。使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的Processor的responseQueue中。

总结:

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)结构。将网络请求和业务处理逻辑分离,极大提高了系统的吞吐量。同时使用了请求队列和发送队列,保证了消息的有序性,并且能够缓存数据,提高了系统处理效率,在以后的系统设计中可以考虑借鉴这种网络模型。

最近发表
标签列表