Netty 入门篇 Day 3---网络编程

网友投稿 612 2022-09-06

Netty 入门篇 Day 3---网络编程

Netty 入门篇 Day 3---网络编程

6.网络编程

文章目录

​​6.网络编程​​

​​6.1阻塞和非阻塞​​

​​阻塞​​

​​非阻塞​​

​​6.2selector选择器​​

​​6.2.1处理accept事件​​​​6.2.2cancel事件​​​​6.2.3 处理read事件​​

​​解决读事件时边界信息的问题​​

​​6.2.4处理write事件​​

​​结语​​

6.1阻塞和非阻塞

阻塞

在阻塞模式下,会导致 线程暂停 ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作 channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作 服务器端的单线程模式下,阻塞方法会导致这个线程暂停(闲置); 同时 多个client相到受影响,几乎不能正确工作,需要服务器端的多线程支持 服务器端的多线程模式缺点:1) 占用内存多 2)多线程切换,带来比较大的内存开销

阻塞模式的服务器端代码:

public static void main(String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(16); // 1. 创建服务器端对象 ServerSocketChannel ssc = ServerSocketChannel.open(); // 2. 绑定服务监听端口 ssc.bind(new InetSocketAddress(9999)); // 3. 连接集合 List channels = new ArrayList<>(); while (true) { System.out.println("等待连接..."); // 4. 接收client的连接 SocketChannel sc = ssc.accept(); // 阻塞的方法 会导致线程暂停,一直等到有client连接 才继续工作 System.out.println("已连接... " + sc); channels.add(sc); for (SocketChannel channel : channels) { System.out.println("before read....."); // 5. 接收client发送的数据 channel.read(buffer); // 阻塞的方法 会导致线程暂停,一直等client发送信息 才继续进行读操作 buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("after read....."); } }}

阻塞模式的client端代码:

public static void main(String[] args) throws IOException { // 1 创建client连接对象 SocketChannel sc = SocketChannel.open(); System.out.println(sc); // 2 和server建议连接 sc.connect(new InetSocketAddress("localhost", 9999)); System.out.println(".....");}

client的运行,使用debug+运行时多实例模式,进行演示。

非阻塞

设置非阻塞模式: serverSocketChannel.configureBlocking(false); // 设置非阻塞模式 socketChannel.configureBlocking(false); // 非阻塞模式 非阻塞模式下,不会导致线程暂停

SocketChannel sc = ssc.accept(); // 非阻塞的方法 不会导致线程暂停。没有连接时返回null;有连接时正确获取 int len = channel.read(buffer); // 非阻塞的方法 不会导致线程暂停。 没发消息则返回0;client发送信息正常读取 非阻塞模式下,线程不会暂停,即使 没有连接或没有可读数据,线程仍然不断运行,浪费CPU

非阻塞模式的server端代码:

public static void main(String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(16); // 1. 创建服务器端对象 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 设置非阻塞模式 // 2. 绑定服务监听端口 ssc.bind(new InetSocketAddress(9999)); // 3. 连接集合 List channels = new ArrayList<>(); while (true) { // System.out.println("等待连接..."); // 4. 接收client的连接 SocketChannel sc = ssc.accept(); // 非阻塞的方法 不会导致线程暂停。没有连接时返回null;有连接时正确获取 if (sc != null) { // 有client连接 System.out.println("已连接... " + sc); sc.configureBlocking(false); // 非阻塞模式 channels.add(sc); } for (SocketChannel channel : channels) { // System.out.println("before read....."); // 5. 接收client发送的数据 int len = channel.read(buffer); // 非阻塞的方法 不会导致线程暂停。 没发消息则返回0;client发送信息正常读取 if (len > 0 ) { // 读到消息 buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("after read....."); } } }}

阻塞模式的client端代码:

public static void main(String[] args) throws IOException { // 1 创建client连接对象 SocketChannel sc = SocketChannel.open(); System.out.println(sc); // 2 和server建议连接 sc.connect(new InetSocketAddress("localhost", 9999)); System.out.println(".....");}

client的运行,使用debug+运行时多实例模式,进行演示。

6.2selector选择器

单个线程配合selector完成 对多个channel事件的监控,称为多路复用。

6.2.1处理accept事件

服务器端代码:

public static void main(String[] args) throws IOException { // 1 创建selector 管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2 注册 channel 到 selector 上(建议channel和selector的关系) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件 System.out.println("sscKey: " + sscKey); ssc.bind(new InetSocketAddress(9999)); while (true){ // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞 selector.select(); // 4 处理事件 Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); System.out.println(sc); } } }

client端代码:

public static void main(String[] args) throws IOException { // 1 创建client连接对象 SocketChannel sc = SocketChannel.open(); System.out.println(sc); // 2 和server建议连接 sc.connect(new InetSocketAddress("localhost", 9999)); System.out.println(".....");}

6.2.2cancel事件

tex // select() 在事件未处理时,不阻塞 (事件发生后 要么处理 要么cancel) selector.select();

注意: 事件发生后 要么处理,要么cancel取消,否则 下次该事件 仍会触发。

public static void main(String[] args) throws IOException { // 1 创建selector 管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2 注册 channel 到 selector 上(建议channel和selector的关系) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件 System.out.println("sscKey: " + sscKey); ssc.bind(new InetSocketAddress(9999)); while (true){ // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞 // select()方法 在事件未处理时,不阻塞 (事件发生后 要么处理 要么cancel) selector.select(); System.out.println("......."); // 4 处理事件 Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); System.out.println("key: " + key); /*ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); System.out.println(sc);*/ // 注意: 事件发生后, 要么处理, 要么cancel取消,否则下次该事件 仍会触发 key.cancel(); // 取消事件 } } }

6.2.3 处理read事件

public static void main(String[] args) throws IOException { // 1 创建selector 管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2 注册 channel 到 selector 上(建议channel和selector的关系) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件 System.out.println("sscKey: " + sscKey); ssc.bind(new InetSocketAddress(9999)); while (true){ // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞 // select()方法 在事件未处理时,不阻塞 (事件发生后 要么处理 要么cancel) selector.select(); System.out.println("......."); // 4 处理事件 Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); // 处理key时, 要从selectedKeys中进行删除,否则下次处理时会遇到问题 iter.remove(); System.out.println("key: " + key); // 区分不同的事件 进行处理 if (key.isAcceptable()){ // accept事件 ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); System.out.println(sc); sc.configureBlocking(false); // 把channel 注册到 selector上 SelectionKey scKey = sc.register(selector, 0, null); scKey.interestOps(SelectionKey.OP_READ); // read事件 } else if(key.isReadable()) { // read事件 try { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16); int len = sc.read(buffer); if (len == -1){ // 没读到内容 key.cancel(); // 正常close client时的处理 cancel事件 } else { // 读到内容 buffer.flip(); ByteBufferUtil.debugRead(buffer); } }catch (Exception e){ // e.printStackTrace(); key.cancel(); // 非正常关闭client时的处理 cancel事件 } } } } }

解决读事件时边界信息的问题

服务器端代码:

public static void main(String[] args) throws IOException { // 1 创建selector 管理多个channel Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 2 注册 channel 到 selector 上(建议channel和selector的关系) SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); // accept事件 System.out.println("sscKey: " + sscKey); ssc.bind(new InetSocketAddress(9999)); while (true){ // 3 通过select监控事件,有事件 线程正常工作,没有事件 线程阻塞 selector.select(); // 4 处理事件 Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); iter.remove(); // 处理key时, 要从selectedKeys中进行删除,否则下次处理时会遇到问题 System.out.println("key: " + key); // 区分不同的事件 进行处理 if (key.isAcceptable()){ // accept事件 ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); System.out.println(sc); sc.configureBlocking(false); // 把channel 注册到 selector上 // 第3个参数 attachment附件 ByteBuffer buffer = ByteBuffer.allocate(10); SelectionKey scKey = sc.register(selector, 0, buffer); scKey.interestOps(SelectionKey.OP_READ); // read事件 } else if(key.isReadable()) { // read事件 try { SocketChannel sc = (SocketChannel) key.channel(); // ByteBuffer buffer = ByteBuffer.allocate(5); // 获取 channel注册时的attr (已存在的buffer) ByteBuffer buffer = (ByteBuffer) key.attachment(); int len = sc.read(buffer); if (len == -1){ // 没读到内容(正常close时) key.cancel(); } else { // 读到内容 // 以\n做为分隔进行读取 split(buffer); if (buffer.position() == buffer.limit()){ // 对buffer进行扩容 ByteBuffer buffer2 = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); buffer2.put(buffer); key.attach(buffer2); } } }catch (Exception e){ // e.printStackTrace(); key.cancel(); } } } } } // 拆分缓冲区中的数据 private static void split(ByteBuffer source){ source.flip(); // 切换原缓冲区为 读模式 // 依次按字节读取原缓冲区内容 for(int i=0; i

客户端代码:

public static void main(String[] args) throws IOException { // 1 创建client连接对象 SocketChannel sc = SocketChannel.open(); // 2 和server建议连接 sc.connect(new InetSocketAddress("localhost", 9999)); sc.write(Charset.defaultCharset().encode("12\n34567890\naaaa")); sc.write(Charset.defaultCharset().encode("123\n4567890\nbb")); sc.write(Charset.defaultCharset().encode("123456\n7890cccdddeee\n")); System.out.println("....."); }

6.2.4处理write事件

= SocketChannel.open(); // 2 和server建议连接 sc.connect(new InetSocketAddress("localhost", 9999)); sc.write(Charset.defaultCharset().encode("12\n34567890\naaaa")); sc.write(Charset.defaultCharset().encode("123\n4567890\nbb")); sc.write(Charset.defaultCharset().encode("123456\n7890cccdddeee\n")); System.out.println("....."); }

结语

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、评论、收藏➕关注,您的支持是我坚持写作最大的动力。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于 NoSQL 数据库你应该了解的 10 件事
下一篇:MySQL时间日期查询方法与函数
相关文章

 发表评论

暂时没有评论,来抢沙发吧~