网站首页 > 技术文章 正文
Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)
- Acceptor线程,负责监听Client端发起的请求
- Processor线程,负责对Socket进行读写
- Worker线程,处理具体的业务逻辑并生成Response返回
具体设计与实现如下:
1.SocketServer
SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程。 Processor线程数可以通num.network.threads配置进行调整。
注意:kafka支持监听多个端口,每个端口1个acceptor和3个Processor线程(num.network.threads默认是3),kafka实现中使用endpoints来保存多个端口。
2.Acceptor
Acceptor是一个继承自抽象类AbstractServerThread的线程类。 Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。
- Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。
- 然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。
- 用轮询的方法,选择Processor进行处理,这样可以保证后面多个Processor线程的负载基本均匀。同时,将新接收的到客户端SocketChannel,保存到Processor的newConnections队列中(默认大小为20)。
3.Processor
Processor也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件
processor线程中有三个重要的变量:
(1)newConnections:在Acceptor一节中已经提到过,用于保存新连接交由Processor处理的socketChannel;
(2)inflightResponses:是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应;
(3)selector:是一个类型为KSelector变量,用于管理网络连接;
下面先给出Processor处理器线程run方法执行的流程图:
(4)repsonseQueue:接收响应消息的队列
processor线程处理流程如下:
(1)处理newConnections队列中的socketChannel。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;
(2)处理RequestChannel中与当前Processor对应响应队列中的Response。即从repsonseQueue获取消息, 在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;
(3)调用selector.poll()方法进行处理。该方法底层即为调用nioSelector.select()方法进行读取数据或者发送数据。
- 真正读取数据,并放入接收缓存队列stagedReceives,缓存所有channel的请求
- 拿出每个channel的第一个请求,解析协议头部,放入completedReceives缓存中
- 如果channel写ready,则进行write,将response返回给客户端
(4)处理已接受完成的数据包队列—completedReceives。将selector.poll中读取数据的,转换成request,再processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;
(5)处理已发送完的队列—completedSends。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;
(6)处理断开连接的队列。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;
注意:newConnections队列中保存的只是连接的句柄,真实的数据,是要通过注册OP_READ后,通过poll检检测到接收事件,获取真正的数据,并转化成request,然后发送到requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处。 发送响应消息也是通过注册OP_WIRTE事件,让后通过poll检测到发送事件进行发送数据。
4.RequestChannel
在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。 requestQueue 大小可以通过queued.max.requests设置,默认大小是500
- Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;
- KafkaRequestHandler线程从请求队列requestQueue中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。
5.KafkaRequestHandler
KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干num.io.threads个KafkaRequestHandler线程, num.io.threads默认值是8),这些线程以守护线程的方式在后台运行。
- 在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,并根据协议头,选择对应的KafkaApi进行处理
- 使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的Processor的responseQueue中
6.KafkaApis
KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。KafkaApis.handle根据requset的key,选择对应的函数进行处理。使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的Processor的responseQueue中。
总结:
Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)结构。将网络请求和业务处理逻辑分离,极大提高了系统的吞吐量。同时使用了请求队列和发送队列,保证了消息的有序性,并且能够缓存数据,提高了系统处理效率,在以后的系统设计中可以考虑借鉴这种网络模型。
猜你喜欢
- 2024-12-12 Kafka监控与指标解析-UnderReplicatedPartitions
- 2024-12-12 聊聊 Kafka: Consumer 源码解析之 ConsumerNetworkClient
- 2024-12-12 为什么Kafka依赖ZooKeeper?
- 2024-12-12 一款Kafka可视化Web界面管理工具:CMAK
- 2024-12-12 MongoDB 数据同步kafka
- 2024-12-12 kafka快速入门到精通
- 2024-12-12 SpringBoot集成Kafka+Kafka优化问题
- 2024-12-12 kafka consumer 配置详解
- 2024-12-12 kafka生产者配置详解
- 2024-12-12 Kafka两种集群详解和搭建教程
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)