关于springboot响应式编程整合webFlux的问题

网友投稿 738 2022-11-04

关于springboot响应式编程整合webFlux的问题

关于springboot响应式编程整合webFlux的问题

在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况,因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。

在servlet3.0标准之后,为了解决此类问题,所以提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他用户使用,这样的操作机制将极大的提升程序的并发性能。

对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。

而在spring中实现响应式编程,那么则需要使用到spring webFlux,该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式,另一种是基于SpringMVC注解方式。

Maven引入

org.springframework.boot

spring-boot-starter-webflux

整合处理器:

package com.example.oldguy.myWebFlux.handler;

import com.example.oldguy.myVo.Message;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import reactor.core.publisher.Mono;

@Component

@Slf4j

public class MessageHandler {

public Mono echoHandler(Message message){

log.info("【{}】业务层接收处理数据:{}",Thread.currentThread().getName());

message.setTitle("【】"+Thread.currentThread().getName()+"】"+message.getTitle());

message.setContent("【】"+Thread.currentThread().getName()+"】"+message.getContent());

return Mono.create(item->item.success(message)); //实现数据响应

}

}

整合控制器:

package com.example.oldguy.myController;

import com.example.oldguy.myVo.Message;

import com.example.oldguy.myWebFlux.handler.MessageHandler;

import com.example.oldguy.mytask.MyThreadTask;

import io.swagger.annotations.Api;

import io.swagger.annotations.ApiOperation;

import lombok.SneakyThrows;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.web.bind.WebDataBinder;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.InitBinder;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.context.request.RequestContextHolder;

import org.springframework.web.context.request.ServletRequestAttributes;

import org.springframework.web.context.request.async.DeferredResult;

import javax.servlet.http.HttpServletRequest;

import java.beans.PropertyEditorSupport;

import java.time.Instant;

import java.time.LocalDate;

import java.time.ZoneId;

import java.time.format.DateTimeFormatter;

import java.util.Date;

import java.util.concurrent.TimeUnit;

/**

* 异步线程的处理机制

*/

@RestController

@RequestMapping("/message/*")

@Slf4j

@Api(tags = "异步处理")

public class AsyncController {

@Autowired

private ThreadPoolTaskExecutor threadPoolTaskExecutor;

private MyThreadTask task;

private MessageHandler messageHandler;

/**

* 日期转换

* @param

* @return

*/

private static final DateTimeFormatter LOCAL_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");

@InitBinder

public void initBinder(WebDataBinder binder){

binder.registerCustomEditor(Date.class,new PropertyEditorSupport(){

@Override

public void setAsText(String text) throws IllegalArgumentException {

LocalDate localDate = LocalDate.parse(text,LOCAL_DATE_FORMAT);

Instant instant = localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant();

super.setValue(Date.from(instant));

}

});

}

@GetMapping("runnable")

@ApiOperation("异常处理Runnable")

public Object message(String message) {

log.info("外部线程:{}", Thread.currentThread().getName());

HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();

DeferredResult result = new DeferredResult<>(6000L); //设置异步响应

this.threadPoolTaskExecutor.execute(new Runnable() { //线程核心任务

@SneakyThrows

public void run() {

log.info("内部线程:{}",Thread.currentThread().getName());

TimeUnit.SECONDS.sleep(7);

result.setResult("[echo]"+message); //执行最终的响应

result.onCompletion(new Runnable() { //完成处理线程

log.info("完成线程:{}",Thread.currentThread().getName()); //日志输出

result.onTimeout(new Runnable() {

log.info("超时线程:{}",Thread.currentThread().getName());

result.setResult("【请求超时】"+request.getRequestURI()); //超时路径

return result;

@GetMapping("task")

@ApiOperation("task异步任务开启")

public Object messageTask(String message){

log.info("外部线程{}",Thread.currentThread().getName());

this.task.startTaskHander();

return "【echo】"+message;

@GetMapping("webflux")

@ApiOperation("整合webflux")

public Object echo(Message message){

log.info("接收用户信息,用户方发送的参数为message={}",message);

return this.messageHandler.echoHandler(message);

}

页面响应:

控制台响应:

2021-11-30 15:04:06.946  INFO 22884 --- [nio-1999-exec-1] c.e.oldguy.myController.AsyncController  : 接收用户信息,用户方发送的参数为message=Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)2021-11-30 15:04:06.947  INFO 22884 --- [nio-1999-exec-1] c.e.o.myWebFlux.handler.MessageHandler   : 【http-nio-1999-exec-1】业务层接收处理数据:Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)

webFlux响应map和List

//webFlux响应集合

public Flux list(Message message){

List messageList = new ArrayList<>();

for(int i=0;i<10;i++){

Message m = new Message();

m.setTitle(i+"--"+message.getTitle());

m.setContent(i+"--"+message.getContent());

m.setPubdate(message.getPubdate());

messageList.add(m);

}

return Flux.fromIterable(messageList);

}

public Flux> map(Message message){

Map map = new HashMap<>();

for(int i=0;i<10;i++){

Message m = new Message();

m.setTitle(i+"--"+message.getTitle());

m.setContent(i+"--"+message.getContent());

m.setPubdate(message.getPubdate());

map.put("pansd-"+i,m);

}

// Set> entries = map.entrySet();

return Flux.fromIterable(map.entrMwqaQeHNHySet());

}

@GetMapping("webfluxList")

@ApiOperation("整合webfluxList")

public Object echoList(Message message){

log.info("接收用户信息,用户方发送的参数为message={}",message);

return this.messageHandler.list(message);

}

@GetMapping("webfluxMap")

@ApiOperation("整合webfluxMap")

public Object echoMap(Message message){

log.info("接收用户信息,用户方发送的参数为message={}",message);

return this.messageHandler.map(message);

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:辉芒微单片机的c语言仿真器,辉芒微单片机
下一篇:Fastor - C++11/14/17 中的轻量级高性能SIMD优化张量代数框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~