网站首页 > 技术文章 正文
分享自己在Java方面的所思所想,希望你看完之后能有更多更深入的了解,欢迎关注(VX同号)
本文章是手写RPC框架系列的IO模型这部分,之前是利用BIO阻塞模型完成的,后续将会替换成Netty,而对Netty的使用了解之前,对NIO模型的学习肯定必不可少,今天就一起来学习下NIO模型的相关知识点。
目录
「系列教程」手写RPC框架(1),看看100个线程同时调用情况如何
该RPC的代码可关注后私信「RPC」获取
NIO是java1.4推出的一种全新的IO模型,全称是java non-blocking IO,提供ByteBuffer等缓存的容器,达到非阻塞式的高伸缩性网络。
IO 模型
IO模型是机器进行IO具体操作方法一种抽象,每种IO模型都有各自的优缺点,需要注意的是要完成各模型的实际开发需要操作系统的支持,在没有poll、epoll出来之前,java进行非阻塞式的读写操作很复杂,而当上述功能出现之后,java才在基于该功能上添加了nio模块,包名是java.nio,现在在类Linux是基于epoll实现的,类Unix(包含Mac)是基于kqueue 实现的,具体的可以调试自己机器上的SelectorImpl具体实现,例如下图
接下来,主要是依据NIO的特点分为Select,Channel,Buffer三个点学习,还包括一些具体的代码实践。
Buffer
是一种缓冲数据的容器,可以存储各种基本类型的数据。线程不安全,数据结构如下图所示
类似于一个数组,其中capacity为缓冲数组的长度,为固定的值;postion表示下一个需要操作的位置;limit为下一个不可操作的位置,各种数据的大小关系是0<=position<=limit<=capacity
- put 写入数据,每次写入数据的地方都是postion,就会使得postion的值变大,当直到填充的数据长度超过了数组的长度,会抛出BufferOverflowException异常
- get 读取数据 每次也会进行postion+1操作,这里需要注意到每次读取数据之前必须进行clear操作
- 要不然会出现数据错误的问题,如下使用例子
错误例子
正确例子
- clear() 清空缓冲区的数据,实际上是假清楚,只是将postion置位0,limit置位capacity
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
- allocate(int n) 申请缓冲区,大小由参数决定
- wrap(byte[] byets) 同样是申请缓冲区,传入的参数却是byte[],相当于设置的缓冲区大小是byte数组的长度,然后初始化设置了该缓冲容器的值
- flip() 切换到读模式,修改limit为postion,position为0
- hasRemaining() 查看是否还有数据可读 return position < limit;
一般的使用套路都是
ByteBuffer byteBuffer = ByteBuffer.allocate(10); byteBuffer.clear(); // 清空 byteBuffer.put("hello".getBytes()); // 写入数据 byteBuffer.flip(); // 读数据之前的必备操作 while (byteBuffer.hasRemaining()){ // 数据是否读取完毕 System.out.print(byteBuffer.get() + "\t" ); // 读取数据 } System.out.println();
Channel
Channel通道,管道感觉和pipeline一个意思,类似于IO的Stream,只是stream是单向,要么是Input要么是Output,在具体的Java IO库学习的时候深有体会;而Channel是双向的,也就意味着可以通过一个channel进行读写操作了。不过需要注意可以读写操作和能不能读写操作这是两回事,后面会具体说明。
Nio的channel具体实现主要为FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel四种,分别对应的是文件、UDP和TCP的客户端和服务端。重点介绍SocketChannle和ServerSocketChannel
SocketChannel
socketchannel 是客户端,连接一个创建好的TCP网络套接字的通道,可有两种创建方式
- 新建一个socketchannel,并连接到服务器上
- 服务器接收到来自客户端的请求接收到的
# 方法1 SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(8081)); // 连接到本地的8081端口的服务器上 # 方法2 SocketChannel socketChannel = serverSocketChannel.accept(); // 服务端接收到客户端发送的信息,通过accpet即可获取对应的socketChannel # 关闭socketChannel socketChannel.close(); // 由于其会抛出异常,最好是放在finally里面,并且做好空判断以及异常捕获
由上图所示,需要从channel读数据,以及向外发送数据都需要使用buffer作为缓冲容器
# 读数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int count= socketChannel.read(byteBuffer); // read 方法会返回读取了多少数据到buffer中,返回-1表示数据以及读取完毕,可以关闭通道了 # 写数据 String message = "Hello World!"; socketChannel.write(message.getBytes()); // 直接调用write方法写入相关的bytebuffer数组 # 写数据方法2 String message = "Hello World!"; ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.clear(); byetBuffer.put(message.getBytes()); byetBuffer.flip(); while(byteBuffer.hasRemaing()){ socketChannel.write(byetBuffer); }
由于可以其为异步模式了,在其调用connect()方法的时候是立即返回结果,连接成功返回true,连接不成功返回false,并继续进行连接(服务自主操作),存在还未建立连接就返回了,所以在使用服务端数据的时候再调用finishConnect()确保链接的建立
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking (false); // 一定要设置该内容 socketChannel.connect(new InetSocketAddress(8081)); while(!socketChannel.finishConnect()){ ..... // 日志打印等操作,直到连接成功 } // 这是阻塞模式的 // 连接成功,可以进行读写操作了
PS:可以通过方法isConnectionPending()的返回值确认是否处于连接中
ServerSocketChannel
ServerSocketChannel 是应用在服务端的,和socketchannel相对应,主要是用来监听新来的TCP请求的一个通道。绑定的本地端口,IP是本机IP。创建方式是
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8081)); // 通过对该channel持有的socket进行绑定端口操作 // 之前很奇怪一个问题,这个直接调用socket().bind不会有空指针么?肯定不会有这个错误的 // 在调用socket(),如果发现其内置的socket为null,就会生成一个socket的适配参数替换null class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl { private static NativeDispatcher nd; private final FileDescriptor fd; private int fdVal; private volatile long thread = 0L; private final Object lock = new Object(); private final Object stateLock = new Object(); private static final int ST_UNINITIALIZED = -1; private static final int ST_INUSE = 0; private static final int ST_KILLED = 1; private int state = -1; private InetSocketAddress localAddress; private boolean isReuseAddress; ServerSocket socket; // 这个就是调用socket取得的数据 public ServerSocket socket() { Object var1 = this.stateLock; synchronized(this.stateLock) { if (this.socket == null) { this.socket = ServerSocketAdaptor.create(this); } return this.socket; } } } public class ServerSocketAdaptor extends ServerSocket { private final ServerSocketChannelImpl ssc; private volatile int timeout = 0; public static ServerSocket create(ServerSocketChannelImpl var0) { try { return new ServerSocketAdaptor(var0); } catch (IOException var2) { throw new Error(var2); } } }
Selector
selector 是nio中能够管理多个channel通道并感知各个通道的读写状态的一个组件,可以使用单线程管理多个channel的从而同时处理多个网络请求。selector和channel是通过selectorkey绑定的
Selector selector = Selector.open(); // 创建一个selector ServerSocketChannel serverSocketChannel ..... // 创建serversocketChannel并绑定端口 ServerSocketChannel.configureBlocking(false); // 设置为非阻塞模式 SelectionKey key = channel.register(selector,Selectionkey.OP_READ); // register绑定selector和channel到一个对象SelectionKey中
注意register()方法的第二个参数。这是一个“interest集合”,意思是在通过Selector监听Channel时对什么事件感兴趣。可以监听四种不同类型的事件:
- Selectionkey.Connect 可以连接 (客户端接收到可连接到请求)
- Selectionkey.Accept 可以接受 (服务端接到客户端连接的请求)
- Selectionkey.Read 可以读取数据
- Selectionkey.Write 可以写入数据
SelectionKey
绑定channel和selector的对象,还包含有read集合和interest集合(感兴趣,在register设置的值),还可以通过attachment()方法绑定一些其他数据。配套的还有判断其状态的方法
public class SelectionKeyImpl extends AbstractSelectionKey { final SelChImpl channel; public final SelectorImpl selector; private int index; private volatile int interestOps; private int readyOps;
选择通道
selector从已经注册好的channel中获取已经准备就绪的通道进行操作,以下三种是获取通道的方法
- int select() // 阻塞模式,至少有一个准备就绪的通道才返回
- int select(long timeout) // 加入超时设置
- int selectNow() // 会立即返回,返回当前就绪的通道个数
- selectedKeys()获取当前就绪的通道集合
- close() 关闭当前的selector,使得绑定的key全部不可用,但是通道本身还是可以正常使用的
int count = selector.select(); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()){ SelectionKey selectionKey = (SelectionKey)it.next(); it.remove(); // 处理完成一个key就移除掉,无需再次处理 if(selectionKey.isAcceptable()){ ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); createProcessor(socketChannel); socketChannel.register(selector, SelectionKey.OP_WRITE); selectionKey.cancel(); // 当前key取消掉了,但是通道依旧可用 } }
DatagramChannel
收发UTP包的通道,适用于UTP协议,发送和读取的是用户数据报
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(10002)); // 建立了一个本地10002端口的UTP服务端 channel.connect(new InetSocketAddress(10002)); // 连接一个IP默认为本机,端口为10002服务 // 读写和传统的read、write类似
该RPC的代码可关注后私信「RPC」获取
猜你喜欢
- 2024-10-04 Netty:数据传输载体- ByteBuf 详解
- 2024-10-04 JDK源码阅读:ByteBuffer(jdk源码阅读顺序以及优先级)
- 2024-10-04 初级java程序员面试题 项目业务逻辑问题 培训班 基础题 开发
- 2024-10-04 好文推荐:从JVM模型谈十种内存溢出及解决方法
- 2024-10-04 「系列教程」手写RPC框架(2) BIO模型替换成NIO模型学习
- 2024-10-04 详解Java NIO中的Buffer、Channel 和 Selector
- 2024-10-04 Java常见的面试题(java面试常见问题与答案)
- 2024-10-04 什么是缓冲区溢出攻击?(缓冲区溢出攻击代名词)
- 2024-10-04 java教程、JAVA学习 |JAVA面试题大全(高级)
- 2024-10-04 Java并发编程之NIO简明教程(java并发编程实战 看不懂)
- 10-02基于深度学习的铸件缺陷检测_如何控制和检测铸件缺陷?有缺陷铸件如何处置?
- 10-02Linux Mint 22.1 Cinnamon Edition 搭建深度学习环境
- 10-02AWD-LSTM语言模型是如何实现的_lstm语言模型
- 10-02NVIDIA Jetson Nano 2GB 系列文章(53):TAO模型训练工具简介
- 10-02使用ONNX和Torchscript加快推理速度的测试
- 10-02tensorflow GPU环境安装踩坑日记_tensorflow配置gpu环境
- 10-02Keye-VL-1.5-8B 快手 Keye-VL— 腾讯云两卡 32GB GPU保姆级部署指南
- 10-02Gateway_gateways
- 最近发表
-
- 基于深度学习的铸件缺陷检测_如何控制和检测铸件缺陷?有缺陷铸件如何处置?
- Linux Mint 22.1 Cinnamon Edition 搭建深度学习环境
- AWD-LSTM语言模型是如何实现的_lstm语言模型
- NVIDIA Jetson Nano 2GB 系列文章(53):TAO模型训练工具简介
- 使用ONNX和Torchscript加快推理速度的测试
- tensorflow GPU环境安装踩坑日记_tensorflow配置gpu环境
- Keye-VL-1.5-8B 快手 Keye-VL— 腾讯云两卡 32GB GPU保姆级部署指南
- Gateway_gateways
- Coze开源本地部署教程_开源canopen
- 扣子开源本地部署教程 丨Coze智能体小白喂饭级指南
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)