Tomcat 的 Connector 中NIO运行模式原理

网友投稿 882 2022-11-18

Tomcat 的 Connector 中NIO运行模式原理

Tomcat 的 Connector 中NIO运行模式原理

Tomcat 的 Connector 有三种运行模式 bio、nio、apr ,先了解一下这三种的区别。

1、 bio(blocking I/O),顾名思义即阻塞式 I/O 操作,表示 Tomcat 使用的是传统的 java I/O 操作(即​​java.io​​​包及其子包)。Tomcat 在默认情况下,就是以 bio 模式运行的。一般而言 bio 模式是三种运行模式中性能最低的一种。2、 nio(new I/O),是 Java SE 1.4 及后续版本提供的一种新的 I/O 操作方式(即​​​java.nio​​​包及其子包)。Java nio 是一个基于缓冲区并能提供非阻塞 I/O 操作的 Java API ,因此 nio 也被看成是 non-blocking I/O 的缩写。它拥有比传统 I/O 操作( bio )更好的并发运行性能。要让 Tomcat 以 nio 模式来运行只需要在 Tomcat 安装目录​​/conf/server.xml​​​文件中将 Connector 节点的 protocol 配置成​​org.apache.coyote.apr( Apache Portable Runtime/Apache 可移植运行时),是 Apache HTTP 服务器的支持库。可以简单地理解为 Tomcat 将以 JNI 的形式调 用Apache HTTP 服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大提高 Tomcat 对静态文件的处理性能。 Tomcat apr 也是在 Tomcat 上运行高并发应用的首选模式。

写个 BIO 的 Socket 服务器还是比较容易的,无非是每 accept 一个 socket 之后就扔到一个线程中处理请求生成响应,这种方式可以改进的点就是增加线程池的支持,本文主要分析一下 Tomcat 中 NIO 处理方式的相关代码逻辑。

关键代码都是在​​org.apache.tomcat.util-.NioEndpoint​​这个类里面,它是 Http11NioProtocol 中负责接收处理 socket 的主要组件,别看代码很长,仔细阅读会发现有很多共通的地方,如:

1、 都会对JDK中原有的API做一下扩展或者包装,如 ThreadPoolExecutor 是对​​java.util.concurrent.ThreadPoolExecutor​​​的扩展,NioChannel 是对 ByteChannel 的扩展,KeyAttachment 则是对 NioChannel 的包装2、 很多类设计成非 GC 的,方便缓存和重复使用,实现方式都是通过 ConcurrentLinkedQueue 类构造一个队列。比如 NioEndpoint 类里面的 ConcurrentLinkedQueue processorCache、ConcurrentLinkedQueue keyCache、ConcurrentLinkedQueue eventCache、ConcurrentLinkedQueue nioChannels 。Poller 类里面的 ConcurrentLinkedQueue events

先看下整个 Connector 组件结构图:

看过之前 Tomcat 启动文章的应该都知道,Connector 的启动会调用 Connector 类的 startInternal 方法,里面调用了 protocolHandler 的 start() ,该方法中将调用抽象的 endpoint 的 start() 方法,这个方法会调用到具体 Endpoint 类的 startInternal() ,所以代码分析先从 NioEndpoint 类的 startInternal 看起。

1.NioEndpoint 类核心组件的初始化

/** * Start the NIO endpoint, creating acceptor, poller threads. */ @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; // Create worker collection if ( getExecutor() == null ) { // 构造线程池,用于后续执行SocketProcessor线程,这就是上图中的Worker。 createExecutor(); } initializeConnectionLatch(); // Start poller threads // 根据处理器数量构造一定数目的轮询器,即上图中的Poller pollers = new Poller[getPollerThreadCount()]; for (int i=0; i

startAcceptorThreads 调用的是父类​​org.apache.tomcat.util-.AbstractEndpoint​​中的实现:

protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { // 调用子类的createAcceptor方法,本例中即NioEndpoint类的createAcceptor方法 acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }

以上就是 Acceptor、Poller、Worker 等核心组件的初始化过程。

2.请求接收

核心组件初始化之后接着就是 Acceptor 线程接收 socket 连接,看下 Acceptor 的源码:

// --------------------------------------------------- Acceptor Inner Class /** * 后台线程,用于监听TCP/IP连接以及将它们分发给相应的调度器处理。 * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // 循环遍历直到接收到关闭命令 // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { // 如果已经达到最大连接数则让线程等待 //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // 接收连接,这里用的阻塞模式。 // Accept the next incoming connection from the server // socket socket = serverSock.accept(); } catch (IOException ioe) { //we didn't get a socket countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // 注意这个setSocketOptions方法 // 它将把上面接收到的socket添加到轮询器Poller中 // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } }

3.Socket 参数设置

在 Acceptor 里接收到一个连接之后调用 setSocketOptions 方法设置 SocketChannel 的一些参数,然后将 SocketChannel 注册到 Poller 中。看下 setSocketOptions 的实现:

/** * Process the specified connection. */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { // 将SocketChannel配置为非阻塞模式 //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); // 设置Socket参数值(从server.xml的Connector节点上获取参数值) // 比如Socket发送、接收的缓存大小、心跳检测等 socketProperties.setProperties(sock); // 从NioChannel的缓存队列中取出一个NioChannel // NioChannel是SocketChannel的一个的包装类 // 这里对上层屏蔽SSL和一般TCP连接的差异 NioChannel channel = nioChannels.poll(); // 缓存队列中没有则新建一个NioChannel if ( channel == null ) { // SSL setup if (sslContext != null) { SSLEngine engine = createSSLEngine(); int appbufsize = engine.getSession().getApplicationBufferSize(); NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); } else { // normal tcp setup NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); } } else { // 将SocketChannel关联到从缓存队列中获取的NioChannel上来 channel.setIOChannel(socket); if ( channel instanceof SecureNioChannel ) { SSLEngine engine = createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else { channel.reset(); } } // 将新接收到的SocketChannel注册到Poller中 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(t); } // Tell to close the socket return false; } return true; }

核心调用是最后的​​getPoller0().register(channel);​​它将配置好的 SocketChannel 包装成一个 PollerEvent ,然后加入到 Poller 的 events 缓存队列中。

4.读取事件注册

getPoller0 方法将轮询当前的 Poller 数组,从中取出一个 Poller 返回。( Poller 的初始化参见上述第1步:NioEndpoint 类核心组件的初始化)

/** * Return an available poller in true round robin fashion */ public Poller getPoller0() { // 最简单的轮询调度算法,poller的计数器不断加1再对poller数组取余数 int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }

之后调用 Poller 对象的 register 方法:

public void register(final NioChannel socket) { // 设置socket的Poller引用,便于后续处理 socket.setPoller(this); // 从NioEndpoint的keyCache缓存队列中取出一个KeyAttachment KeyAttachment key = keyCache.poll(); // KeyAttachment实际是NioChannel的包装类 final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); // 重置KeyAttachment对象中Poller、NioChannel等成员变量的引用 ka.reset(this,socket,getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); // 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象 PollerEvent r = eventCache.poll(); // 设置读操作为感兴趣的操作 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 加入到Poller对象里的事件队列 addEvent(r); }

看下 Poller 类里 addEvent 的代码:

/** * Only used in this class. Will be made private in Tomcat 8.0.x * @deprecated */ @Deprecated public void addEvent(Runnable event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }

就两行,第一行从 event 对象添加到缓存队列中,第二行如果当前事件队列中没有事件,则唤醒处于阻塞状态的 selector 。

5.Poller 处理流程

上面讲的是从 Acceptor 中接收到的 Socket 以 PollerEvent 的形式包装并添加到 Poller 的事件缓存队列中,接下来看看另外一个核心组件 Poller 的处理过程:

/** * Poller class. */ public class Poller implements Runnable { // 这就是NIO中用到的选择器,可以看出每一个Poller都会关联一个Selector protected Selector selector; // 待处理的事件队列 protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); // 唤醒多路复用器的条件阈值 protected AtomicLong wakeupCounter = new AtomicLong(0l); public Poller() throws IOException { // 对Selector的同步访问,通过调用Selector.open()方法创建一个Selector synchronized (Selector.class) { // Selector.open() isn't thread safe // // Affects 1.6.0_29, fixed in 1.7.0_01 this.selector = Selector.open(); } } // 通过addEvent方法将事件添加到Poller的事件队列中 /** * Only used in this class. Will be made private in Tomcat 8.0.x * @deprecated */ @Deprecated public void addEvent(Runnable event) { events.offer(event); // 如果队列中没有待处理的事件则唤醒处于阻塞状态的selector if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); } // 处理事件队列中的所有事件,如果事件队列是空的则返回false /** * Processes events in the event queue of the Poller. * * @return true if some events were processed, * false if queue was empty */ public boolean events() { boolean result = false; Runnable r = null; // 将Poller的事件队列中的事件逐个取出并执行相应的事件线程 while ( (r = events.poll()) != null ) { result = true; try { // 执行事件处理逻辑 // 这里将事件设计成线程是将具体的事件处理逻辑和事件框架分开 r.run(); if ( r instanceof PollerEvent ) { ((PollerEvent)r).reset(); // 事件处理完之后,将事件对象返回NIOEndpoint的事件对象缓存中 eventCache.offer((PollerEvent)r); } } catch ( Throwable x ) { log.error("",x); } } return result; } // 将socket包装成统一的事件对象PollerEvent,加入到待处理事件队列中 public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(socket); ka.reset(this,socket,getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); // 从NIOEndpoint的事件对象缓存中取出一个事件对象 PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 将事件添加打Poller的事件队列中 addEvent(r); } // Poller是一个线程,该线程同Acceptor一样会监听TCP/IP连接并将它们交给合适的处理器处理 /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ @Override public void run() { // Loop until destroy() is called while (true) { try { // Loop if endpoint is paused while (paused && (!close) ) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } boolean hasEvents = false; // Time to terminate? if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { // 执行事件队列中的事件线程 hasEvents = events(); } try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { // 把wakeupCounter设成-1,这是与addEvent里的代码呼应,这样会唤醒selector //if we are here, means we have other stuff to do //do a non blocking select // 以非阻塞方式查看selector是否有事件发生 keyCount = selector.selectNow(); } else { // 查看selector是否有事件发生,超过指定时间则立即返回 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { // 执行事件队列中的事件线程 events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch ( NullPointerException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch ( CancelledKeyException x ) { //sun bug 5076772 on windows JDK 1.5 if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); if ( wakeupCounter == null || selector == null ) throw x; continue; } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 根据向selector中注册的key遍历channel中已经就绪的keys,并处理这些key // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); // 这里的attachment方法返回的就是在register()方法中注册的 // 而KeyAttachment对象是对socket的包装 KeyAttachment attachment = (KeyAttachment)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { // 更新通道最近一次发生事件的时间 // 防止因超时没有事件发生而被剔除出selector attachment.access(); iterator.remove(); // 具体处理通道的逻辑 processKey(sk, attachment); } }//while //process timeouts // 多路复用器每执行一遍完整的轮询便查看所有通道是否超时 // 对超时的通道将会被剔除出多路复用器 timeout(keyCount,hasEvents); if ( oomParachute > 0 && oomParachuteData == null ) checkParachute(); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while synchronized (this) { this.notifyAll(); } stopLatch.countDown(); } // 处理selector检测到的通道事件 protected boolean processKey(SelectionKey sk, KeyAttachment attachment) { boolean result = true; try { if ( close ) { cancelledKey(sk, SocketStatus.STOP, attachment.comet); } else if ( sk.isValid() && attachment != null ) { // 确保通道不会因超时而被剔除 attachment.access();//make sure we don't time out valid sockets sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); // 处理通道发生的读写事件 if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { if ( isWorkerAvailable() ) { // 在通道上注销对已经发生事件的关注 unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { // 具体的通道处理逻辑 if (!processSocket(channel, SocketStatus.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { // 解除无效通道 cancelledKey(sk,SocketStatus.DISCONNECT,false); } } else { result = false; } } } } else { //invalid key cancelledKey(sk, SocketStatus.ERROR,false); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk, SocketStatus.ERROR,false); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } return result; } // 这个unreg()很巧妙,防止了通道对同一个事件不断select的问题 protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) { //this is a must, so that we don't have multiple threads messing with the socket reg(sk,attachment,sk.interestOps()& (~readyOps)); } // 向NioChannel注册感兴趣的事件,具体代码看下面的PollerEvent类的说明 protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) { sk.interestOps(intops); attachment.interestOps(intops); attachment.setCometOps(intops); } }

6.PollerEvent 处理流程

Poller 处理的核心是启动执行事件队列中的 PollerEvent,接着从 selector 中遍历已经就绪的 key ,一旦发生了感兴趣的事件,则交由 processSocket 方法处理。PollerEvent 的作用是向 socket 注册或更新感兴趣的事件:

/** * * PollerEvent, cacheable object for poller events to avoid GC */ public static class PollerEvent implements Runnable { // 每个PollerEvent都会保存NioChannel的引用 protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public void reset() { reset(null, null, 0); } @Override public void run() { //socket第一次注册到selector中,完成对socket读事件的注册 if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error("", x); } } else { // socket之前已经注册到了selector中,更新socket所感兴趣的事件 final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); if ( att!=null ) { //handle callback flag if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) { att.setCometNotify(true); } else { att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag // 刷新事件的最后访问时间,防止事件超时 att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); } else { cancel = true; } } else { cancel = true; } if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); }catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true); }catch (Exception ignore) {} } }//end if }//run @Override public String toString() { return super.toString()+"[intOps="+this.interestOps+"]"; } }

7.将 socket 交给 Worker 执行

在第5步的 Poller 处理流程的分析中看到它的 run 方法最后会调用 processKey() 处理 selector 检测到的通道事件,而在这个方法最后会调用 processSocket 来调用具体的通道处理逻辑,看下 processSocket 方法的实现:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(); if (attachment == null) { return false; } attachment.setCometNotify(false); //will get reset upon next reg // 从SocketProcessor的缓存队列中取出一个来处理socket SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); // 将有事件发生的socket交给Worker处理 if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn("Socket processing request was rejected for:"+socket,rx); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }

Poller 通过 NioEndpoint 的协调,将发生事件的 socket 交给工作者线程 Worker 来进一步处理。整个事件框架的工作就到此结束,下面就是 Worker 的处理。

8.从 socket 中处理请求

在 Tomcat 6 版本的 NIO 处理实现中有一个 Worker 类,在 Tomcat 7 中把它去掉了,但工作者的职责还在,只是交由了上面看到的 SocketProcessor 这个类来担当,看下这个类的实现代码:

// ---------------------------------------------- SocketProcessor Inner Class // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。 /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. */ protected class SocketProcessor implements Runnable { // 每个SocketProcessor保存一个NioChannel的引用 protected NioChannel socket = null; protected SocketStatus status = null; public SocketProcessor(NioChannel socket, SocketStatus status) { reset(socket,status); } public void reset(NioChannel socket, SocketStatus status) { this.socket = socket; this.status = status; } @Override public void run() { // 从socket中获取SelectionKey SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); KeyAttachment ka = null; if (key != null) { ka = (KeyAttachment)key.attachment(); } // Upgraded connections need to allow multiple threads to access the // connection at the same time to enable blocking IO to be used when // NIO has been configured if (ka != null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) { doRun(key, ka); } } else { synchronized (socket) { doRun(key, ka); } } } private void doRun(SelectionKey key, KeyAttachment ka) { try { int handshake = -1; try { if (key != null) { // For STOP there is no point trying to handshake as the // Poller has been stopped. if (socket.isHandshakeComplete() || status == SocketStatus.STOP) { handshake = 0; } else { handshake = socket.handshake( key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. status = SocketStatus.OPEN_READ; } } }catch ( IOException x ) { handshake = -1; if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); }catch ( CancelledKeyException ckx ) { handshake = -1; } if ( handshake == 0 ) { SocketState state = SocketState.OPEN; // Process the request from this socket if (status == null) { // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求 state = handler.process(ka, SocketStatus.OPEN_READ); } else { state = handler.process(ka, status); } if (state == SocketState.CLOSED) { // Close socket and pool try { close(ka, socket, key, SocketStatus.ERROR); } catch ( Exception x ) { log.error("",x); } } } else if (handshake == -1 ) { close(ka, socket, key, SocketStatus.DISCONNECT); } else { ka.getPoller().add(socket, handshake); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key, null, false); } catch (OutOfMemoryError oom) { try { oomParachuteData = null; log.error("", oom); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false); } releaseCaches(); }catch ( Throwable oomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); }catch ( Throwable t ) { log.error("",t); if (socket != null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); } } finally { socket = null; status = null; //return to cache if (running && !paused) { processorCache.offer(this); } } } private void close(KeyAttachment ka, NioChannel socket, SelectionKey key, SocketStatus socketStatus) { ... } }

可以看到由 SocketProcessor 寻找合适的 Handler 处理器做最终 socket 转换处理。

可以用下面这幅图总结一下 NioEndpoint 的主要流程:

Acceptor 和 Poller 是线程数组,Worker 是一个线程池( Executor )

原作者:搜云库 ​​ 搜云库

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:好慌,Redis这么多集群方案,要用哪种?
下一篇:Docker入门1(CICD 插曲)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~