Tomcat 源码分析 (Tomcat异步执行任务) (十四)

网友投稿 1145 2022-10-06

Tomcat 源码分析 (Tomcat异步执行任务) (十四)

Tomcat 源码分析 (Tomcat异步执行任务) (十四)

异步的开启异步的执行异步的结束异步的超时处理异步的执行流程

文章目录

​​1.异步的开启​​​​2.开启异步的流程​​​​3.异步的运行​​​​4.运行异步的流程​​​​5.Complate(复合) 其他线程​​

1.异步的开启

public void testAsync(HttpServletRequest request, HttpServletResponse response) { try{ AsyncContext context = request.startAsync(); context.start(new Runnable(){ @Override public void run() { /** * Do you own logic here in business thread and set final result into response */ //Complate asyn thread context.complete(); }}); }catch(Exception ex){ //Handle exception here }}

HttpServletRequest 的 startAsync() 方法开启了我们的异步调用。

2.开启异步的流程

异步调用的开启从​​HttpServletRequest ​​对象的 ​​startAsync()​​ 方法开始,最终到异步状态机 ​​AsyncStateMachine​​ 对象的 ​​asyncStart()​​ 方法。开启异步并不会触发异步​​listener​​ 的 ​​start​​ 事件,因为开启异步构造的上下文对象并没有设置 listener(可能 tomcat 异步实现者并没有考虑这个问题)。异步调用的所有状态由 AsyncStateMachine 对象来保持维护,该类属于Http11Processor 对象,每个Http11Processor 对像关联处理一个底层 socket,通过这种方式来关联异步状态机和请求。AsyncStateMachine 对象中异步的初始状态为 AsyncState.DISPATCHED,通过开启异步改变其状态为 AsyncState.STARTING。

​​HttpServletRequest startAsync()​​​ -> ​​Request.setStarted()​​​ -> ​​AsyncStateMachine.asyncStart()​​

​​HttpServletRequest startAsync()​​

protected long asyncTimeout = 30000public AsyncContext startAsync(ServletRequest request,ServletResponse response) { if (!isAsyncSupported()) { IllegalStateException ise = new IllegalStateException(sm.getString("request.asyncNotSupported")); log.warn(sm.getString("coyoteRequest.noAsync",StringUtils.join(getNonAsyncClassNames())), ise); throw ise; } if (asyncContext == null) { asyncContext = new AsyncContextImpl(this); } asyncContext.setStarted(getContext(), request, response,request==getRequest() && response==getResponse().getResponse()); asyncContext.setTimeout(getConnector().getAsyncTimeout()); return asyncContext;}

首先判断是否支持异步,不支持则抛异常创建异步上下文对象,该上下文对象属于 HttpServletRequest 实例。通过 getConnector().getAsyncTimeout() 来设置异步超时时间, 对于超时时间源码默认为 30 秒,所以如果我们的异步任务有可能大于 30 秒的时候要注意重新设置超时时间。调用异步上下文对象的 setStarted() 方法来开启异步​​setStarted()​​

public void setStarted(Context context, ServletRequest request, ServletResponse response, boolean originalRequestResponse) { synchronized (asyncContextLock) { this.request.getCoyoteRequest().action( ActionCode.ASYNC_START, this); this.context = context; context.incrementInProgressAsyncCount(); this.servletRequest = request; this.servletResponse = response; this.hasOriginalRequestAndResponse = originalRequestResponse; this.event = new AsyncEvent(this, request, response); List listenersCopy = new ArrayList<>(); listenersCopy.addAll(listeners); listeners.clear(); for (AsyncListenerWrapper listener : listenersCopy) { try { listener.fireOnStartAsync(event); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.warn(sm.getString("asyncContextImpl.onStartAsyncError",listener.getClass().getName()), t); } } }}

运行异步 listener 的 start 事件函数。最终会调到 AsyncStateMachine 对象实例的 asyncStart() 方法。

private volatile AsyncState state = AsyncState.DISPATCHED;synchronized void asyncStart(AsyncContextCallback asyncCtxt) { if (state == AsyncState.DISPATCHED) { generation.incrementAndGet(); state = AsyncState.STARTING; this.asyncCtxt = asyncCtxt; lastAsyncStart = System.currentTimeMillis(); } else { throw new IllegalStateException(sm.getString("asyncStateMachine.invalidAsyncState","asyncStart()", state)); }}

​​ AsyncStateMachine​​​ 对象实例属于​​Http11Processor​​​ 类,该 ​​processor​​​ 用来处理包装底层 ​​socket​​​ 的 ​​NioChannel​​​ 对象。所以异步请求的所有状态都通过 ​​AsyncStateMachine​​ 保存维护的。

​​AsyncStateMachine​​​ 对象中异步的初始状态为 ​​AsyncState.DISPATCHED​​​,通过开启异步改变其状态为 ​​AsyncState.STARTING​​。

3.异步的运行

public void testAsync(HttpServletRequest request, HttpServletResponse response) {try{ AsyncContext context = request.startAsync(); context.start(new Runnable(){ @Override public void run() { /** * Do you own logic here in business thread and set final result into response */ //Complate asyn thread context.complete(); }}); }catch(Exception e){ //Handle exception here }}

AsyncContext 对象的 start() 方法开启了我们的异步运行,该方法接受runnable 类型的对象,在异步线程中运行我们的逻辑。

4.运行异步的流程

​​AsyncContext.start() ​​

从 AsyncContext 的 star() 方法开始,一直调用到 SocketWrapperBase 对象实例的 execute() 方法,这里我们分析 execute() 核心方法:

public void execute(Runnable runnable) { Executor executor = endpoint.getExecutor(); if (!endpoint.isRunning() || executor == null) { throw new RejectedExecutionException(); } executor.execute(runnable);}

上述方法首先通过 endpoint.getExecutor() 调用得到 tomcat 的 io 线程池。将异步任务委托到 tomcat io 线程池中运行。因为原生的Api中, 任务线程占用了IO线程, 所以为了使得Servlet请求的运行, 引入了线程池, 避免影响服务器吞吐率。

public void testAsync(HttpServletRequest request, HttpServletResponse response) { try{ AsyncContext context = request.startAsync(); ThreadPool pool = ThreadPool.getThreadPool(); pool.execute(new Runnable(){ @Override public void run() { /** * Do you own logic here in business thread and set final result into response */ //Complate asyn thread context.complete(); }}); }catch(Exception ex){ //Handle exception here }}

AsyncContext.start() 方法会把异步任务交由 tomcat io 线程池运行,这样在大量启动异步任务的时候可能会过度占用 io 线程池,从而导致服务器吞吐率下降。所以一般建议引入业务线程池,根据场景设置好业务线程池的参数,把异步任务的执行,响应结果的设置,异步任务的结束等交由业务线程池运行。从而释放 io 线程池,避免降低吞吐率。

5.Complate(复合) 其他线程

当请求处理完成时,Web应用调用这个方法:把响应数据发送到浏览器。

这事不能由Web应用线程负责,即ctx.complete不能直接把响应数据发送到浏览器,因为这件事情应该由Tomcat线程处理。

连接器中的Endpoint组件检测到有请求数据达到时,会创建一个SocketProcessor对象交给线程池去处理,因此Endpoint的通信处理和具体请求处理在两个线程里运行。

在异步Servlet场景,Web应用通过调用ctx.complete方法时,也可生成一个新的SocketProcessor任务类,交给线程池处理。

对于异步Servlet请求来说,相应的Socket和协议处理组件Processor都被缓存起来了,并且这些对象都可以通过Request对象拿到。

public void complete() { //检查状态合法性,我们先忽略这句 check(); //调用Request对象的action方法,其实就是通知连接器,这个异步请求处理完了request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null); } case ASYNC_COMPLETE: { clearDispatches(); if (asyncStateMachine.asyncComplete()) { processSocketEvent(SocketEvent.OPEN_READ, true); } break;}protected void processSocketEvent(SocketEvent event, boolean dispatch) { SocketWrapperBase socketWrapper = getSocketWrapper(); if (socketWrapper != null) { socketWrapper.processSocket(event, dispatch); }}public boolean processSocket(SocketWrapperBase socketWrapper, SocketEvent event, boolean dispatch) { if (socketWrapper == null) { return false; } SocketProcessorBase sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 线程池运行 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); }}

createSocketProcessor函数的第二个参数SocketEvent,这里传入OPEN_READ,控制SocketProcessor的行为,不需要再把请求发送到容器进行处理,只需要向浏览器端发送数据,并且重新在这个Socket上监听新的请求。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于微信小程序中页面之间传参的解析(微信小程序如何传参)
下一篇:Scala安装使用
相关文章

 发表评论

暂时没有评论,来抢沙发吧~