Java NIO的简单封装

张开发
2026/4/16 14:34:51 15 分钟阅读

分享文章

Java NIO的简单封装
一、主要目的封装Java NIO框架更方便使用。主要是提供事件处理的接口任务队列快速事件注册读写任务类。二、主要代码(一)主服务器类package org.aio; import org.aio.entity.DefaultListener; import org.aio.entity.Listener; import org.aio.entity.Session; import org.aio.util.KeyUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class NIOServer { private String host 0.0.0.0; private int port 8080; private ServerSocketChannel server null; private AtomicBoolean isRun new AtomicBoolean(true); private Listener listener new DefaultListener(); public NIOServer(){} public NIOServer(int port) { this.port port; } public NIOServer(String host, int port) { this.host host; this.port port; } public void setListener(Listener listener) { this.listener listener; } public void start() { try { server ServerSocketChannel.open(); server.configureBlocking(false); server.bind(new InetSocketAddress(host, port)); Selector selector Selector.open(); // 注册ACCEPT事件 server.register(selector, SelectionKey.OP_ACCEPT); while (isRun.get()) { selector.select(); // 阻塞直到有事件就绪 SetSelectionKey keys selector.selectedKeys(); IteratorSelectionKey it keys.iterator(); while (it.hasNext()) { SelectionKey key it.next(); if (key.isAcceptable()) { SocketChannel client server.accept(); client.configureBlocking(false); System.out.println(远方客户端 client.getRemoteAddress()); listener.handleAccept(selector, client); } else if (key.isReadable()) { System.out.println(处理读取); SocketChannel client (SocketChannel) key.channel(); listener.handleRead(selector, client, (Session)key.attachment()); } else if (key.isWritable()) { System.out.println(处理写入); SocketChannel client (SocketChannel) key.channel(); listener.handleWrite(selector, client, (Session)key.attachment()); } it.remove(); } } } catch (IOException e) { e.printStackTrace(); } } /** * 关闭服务器 */ public void close(){ this.isRun.set(false); try { this.server.close(); } catch (IOException e) { e.printStackTrace(); } } }(二)监听器package org.aio.entity; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public interface Listener { public abstract void handleAccept(Selector selector, SocketChannel channel); public abstract void handleRead(Selector selector, SocketChannel channel, Session session); public abstract void handleWrite(Selector selector, SocketChannel channel, Session session); }(三)监听器实现类package org.aio.entity; import org.aio.util.KeyUtil; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class DefaultListener implements Listener { Override public void handleAccept(Selector selector, SocketChannel channel) { // 注册读取事件 try { KeyUtil.registerRead(selector, channel); SelectionKey key channel.keyFor(selector); Session session new Session(key); key.attach(session); ReadTask task new ReadTask(session); task.setHandler(new MReadTask()); // 添加读取任务 System.out.println(---- 添加读取任务 -----); session.getReadQueue().enQueue(task); } catch (ClosedChannelException e) { e.printStackTrace(); } } Override public void handleRead(Selector selector, SocketChannel channel, Session session) { // 处理读取任务 session.doRead(channel); } Override public void handleWrite(Selector selector, SocketChannel channel, Session session) { // 处理写入任务 session.doWrite(channel); } }(四)会话类package org.aio.entity; import org.aio.util.KeyUtil; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import org.aio.entity.Queue; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class Session { private SelectionKey key; /** * 读取队列 */ private QueueTask rQueue new QueueTask(); /** * 输出队列 */ private QueueTask wQueue new QueueTask(); public Session(SelectionKey key) { this.key key; } public QueueTask getReadQueue(){ return this.rQueue; } public QueueTask getWriteQueue(){ return this.wQueue; } public SelectionKey getKey(){ return this.key; } /** * 处理读取的逻辑 * param channel */ public void doRead(SocketChannel channel){ int code 0; // 取消读取事件监听 KeyUtil.unRegisterRead(this.key); try{ System.out.println(队列是否为空 rQueue.empty()); System.out.println(队列任务个数 rQueue.size()); // 循环如果队列不为空 while(!rQueue.empty()){ // 获取队头任务 Task t rQueue.peek(); // 执行任务 code t.run(channel); System.out.println(执行返回值 code); if(code 0){ // 出队任务 rQueue.deQueue(); } else if(code 1){ // 跳出循环 break; } } } catch(Exception e){ e.printStackTrace(); } // 如果返回值为1添加读取监听 if(code 1){ System.out.println(-------再次注册读取事件-----); KeyUtil.registerRead(this.key); } } /** * 执行写入任务 * param channel */ public void doWrite(SocketChannel channel){ try{ System.out.println(取消写入监听); // 取消写入事件监听 KeyUtil.unRegisterWrite(this.key); int code 0; System.out.println(输出队列是否为空 wQueue.empty()); System.out.println(输出队列任务个数 wQueue.size()); // 循环如果队列不为空 while(!wQueue.empty()){ // 获取队头任务 Task t wQueue.peek(); // 执行任务 code t.run(channel); System.out.println(执行任务返回值 code); if(code 0){ // 出队任务 wQueue.deQueue(); } else if(code 1){ // 跳出循环 break; } } // 如果返回值为1添加读取监听 if(code 1){ System.out.println(再次注册输出事件); KeyUtil.registerWrite(this.key); } } catch(Exception e){ e.printStackTrace(); } } }(五)任务类run(SocketChannel c)方法会多次调用根据返回值判断返回1代表不出队要再次监听等到下回处理。返回0代表出队任务结束。package org.aio.entity; import java.io.IOException; import java.nio.channels.SocketChannel; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public interface Task { // 返回值1代表需要下次监听0代表结束 int run(SocketChannel channel) throws IOException; }可以实现文件输出任务、字节序列输出任务、字符串输出任务、关闭任务。(六)字节输出的任务package org.aio.entity; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * author * version 1.0.0 * p 输出字节序列的任务 * date: 2026/4/7 **/ public class ByteTask implements Task { private ByteBuffer buff; public ByteTask(byte[] array) { this.buff ByteBuffer.wrap(array); } // 写入字节序列 public int run(SocketChannel channel) throws IOException { int code 0; // 如果有剩余就执行循环 while (buff.hasRemaining()) { System.out.println(有剩余字节写入); int k channel.write(this.buff); System.out.println(写入字节个数 k); // 如果有剩余并且返回值为0 if (k 0 buff.hasRemaining()) { code 1; break; } } return code; } }三、测试案例1、添加处理任务处理HTTP请求实现Handler接口。package org.aio.entity; import org.aio.entity.Queue; import org.aio.util.KeyUtil; import java.io.File; import java.io.UnsupportedEncodingException; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class MReadTask implements Handler{ Override public void handle(HttpRequest req, Session session){ System.out.println(-------------------); System.out.println(方法 req.getMethod()); System.out.println(资源 req.getResource()); System.out.println(版本 req.getVersion()); System.out.println(Accept: req.getHeader(Accept)); System.out.println(-------------------); // 注册写入 System.out.println(----注册写入-----); KeyUtil.registerWrite(session.getKey()); // 获得会话的输出任务队列 QueueTask queue session.getWriteQueue(); System.out.println(------ 添加输出任务 ----); try { ResponseHead head new ResponseHead(200, OK); // 定义响应体 byte[] buff pHello World!/p.getBytes(UTF-8); // 设置内容长度 head.setContentLength(buff.length); // 添加响应头输出任务 queue.enQueue(head); // 添加响应体字节序列输出任务 queue.enQueue(new ByteTask(buff)); // 添加关闭任务 queue.enQueue(new CloseTask()); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }2、启动类import org.aio.NIOServer; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class NIOTest { public static void main(String[] args) { NIOServer server new NIOServer(80); System.out.println(监听在80端口); server.start(); } }四、文件内容输出的任务实现package org.aio.entity; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class FileTask implements Task { private FileChannel fc null; private ByteBuffer buff ByteBuffer.allocate(8024); /** * 是否文件结束 */ private boolean isEOF false; public FileTask(String path){ try { this.fc FileChannel.open(Paths.get(path)); } catch (IOException e) { e.printStackTrace(); } } public int run(SocketChannel channel) throws IOException { int code 0; while (true) { // 如果文件没有结束 if (!isEOF){ // 阻塞读取 int size fc.read(buff); // -1表示文件结束 if (size -1) { fc.close(); isEOF true; } // 切换为读取模式 buff.flip(); } // 如果有剩余就执行循环 while(buff.hasRemaining()){ int k channel.write(this.buff); // 如果有字节剩余并且返回值为0代表输出满了 if(k 0 buff.hasRemaining()){ code 1; break; } } // 如果有剩余数据进行压缩防止丢失 if (buff.hasRemaining()){ buff.compact(); } // 如果是1代表需要下回事件触发时进行处理 if(code 1){ break; } // 如果读取结束跳出循环 if (isEOF){ break; } // 清空数据 buff.clear(); } return code; } }五、关闭任务的实现package org.aio.entity; import java.io.IOException; import java.nio.channels.SocketChannel; /** * author * version 1.0.0 * p * date: 2026/4/7 **/ public class CloseTask implements Task { public int run(SocketChannel channel) throws IOException { channel.close(); System.out.println(执行关闭-------------------); return 0; } }六、总结1、基本可以使用。2、每次处理事件时首先要取消监听防止再次触发。3、需要在处理读取的时候需要一个缓冲类缓冲一部分数据方便处理。例如规定下列方法// 添加最近读取的数据void add(ByteBuffer b)// 返回缓冲的字节个数int getBuffSize()// 判断是否有一行boolean hasLine()// 读取一行字符String readline()// 读取指定个数的字节块byte[] read(int 个数)4、NIO的缺点异步注册事件的时候会阻塞在register()方法处。用wakeup()唤醒Selector的select()方法不生效。

更多文章