SpringBoot+RabbitMQ+Redis实现商品秒杀

网友投稿 948 2022-11-23

SpringBoot+RabbitMQ+Redis实现商品秒杀

SpringBoot+RabbitMQ+Redis实现商品秒杀

业务分析

一般而言,商品秒杀大概可以拆分成以下几步:

用户校验

校验是否多次抢单,保证每个商品每个用户只能秒杀一次

下单

订单信息进入消息队列,等待消费

减少库存

消费订单消息,减少商品库存,增加订单记录

付款

十五分钟内完成支付,修改支付状态

创建表

goods_info 商品库存表

说明

id

主键(uuid)

goods_name

商品名称

goods_stock

商品库存

package com.jason.seckill.order.entity;

/**

* 商品库存

*/

public class GoodsInfo {

private String id;

private String goodsName;

private String goodsStock;

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getGoodsName() {

return goodsName;

}

public void setGoodsName(String goodsName) {

this.goodsName = goodsName;

}

public String getGoodsStock() {

return goodsStock;

}

public void setGoodsStock(String goodsStock) {

this.goodsStock = goodsStock;

}

@Override

public String toString() {

return "GoodsInfo{" +

"id='" + id + '\'' +

", goodsName='" + goodsName + '\'' +

", goodsStock='" + goodsStock + '\'' +

'}';

}

}

order_info 订单记录表

说明

id

主键(uuid)

user_id

用户id

goods_id

商品id

pay_status

支付状态(0-超时未支付 1-已支付 2-待支付)

package com.jason.seckill.order.entity;

/**

* 下单记录

*/

public class OrderRecord {

private String id;

private String userId;

private String goodsId;

/**

* 0-超时未支付 1-已支付 2-待支付

*/

private Integer payStatus;

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

public String getGoodsId() {

return goodsId;

}

public void setGoodsId(String goodsId) {

this.goodsId = goodsId;

}

public Integer getPayStatus() {

return payStatus;

}

public void setPayStatus(Integer payStatus) {

this.payStatus = payStatus;

}

@Override

public String toString() {

return "OrderRecord{" +

"id='" + id + '\'' +

", userId='" + userId + '\'' +

", goodsId='" + goodsId + '\'' +

'}';

}

}

功能实现

1.用户校验

使用redis做用户校验,保证每个用户每个商品只能抢一次,上代码:

public boolean checkSeckillUser(OrderRequest order) {

String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId();

return redisTemplate.opsForValue().setIfAbsent(key,"1");

}

userId+orderId的组合作为key,利用redis的setnx分布式锁原理来实现。如果是限时秒杀,可以通过设置key的过期时间来实现。

2.下单

下单信息肯定是要先扔到消息队列里的,这里采用RabbitMQ来做消息队列,先来看一下消息队列的模型图:

rabbitmq的配置:

#rabbitmq配置

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

#消费者数量

spring.rabbitmq.listener.simple.concurrency=5

#最大消费者数量

spring.rabbitmq.listener.simple.max-concurrency=10

#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理

spring.rabbitmq.listener.simple.prefetch=1

#消费接收确认机制-手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

mq.env=local

#订单处理队列

#交换机名称

order.mq.exchange.name=${mq.env}:order:mq:exchange

#队列名称

order.mq.queue.name=${mq.env}:order:mq:queue

#routingkey

order.mq.routing.key=${mq.env}:order:mq:routing:key

rabbitmq配置类OrderRabbitmqConfig:

/**

* rabbitmq配置

*/

@Configuration

public class OrderRabbitmqConfig {

private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class);

@Autowired

private Environment env;

/**

* channel链接工厂

*/

@Autowired

private CachingConnectionFactory connectionFactory;

/**

* -容器配置

*/

@Autowired

private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

/**

* 声明rabbittemplate

* @return

*/

@Bean

public RabbitTemplate rabbitTemplate(){

//消息发送成功确认,对应application.properties中的spring.rabbitmq.publisher-confirms=true

connectionFactory.setPublisherConfirms(true);

//消息发送失败确认,对应application.properties中的spring.rabbitmq.publisher-returns=true

connectionFactory.setPublisherReturns(true);

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

//设置消息发送格式为json

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

rabbitTemplate.setMandatory(true);

//消息发送到exchange回调 需设置:spring.rabbitmq.publisher-confirms=true

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);

}

});

//消息从exchange发送到queue失败回调 需设置:spring.rabbitmq.publisher-returns=true

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);

}

});

return rabbitTemplate;

}

//---------------------------------------订单队列------------------------------------------------------

NVIYQa

/**

* 声明订单队列的交换机

* @return

*/

@Bean("orderTopicExchange")

public TopicExchange orderTopicExchange(){

//设置为持久化 不自动删除

return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false);

}

/**

* 声明订单队列

* @return

*/

@Bean("orderQueue")

public Queue orderQueue(){

return new Queue(env.getProperty("order.mq.queue.name"),true);

}

/**

* 将队列绑定到交换机

* @return

*/

@Bean

public Binding simpleBinding(){

return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key"));

}

/**

* 注入订单对列消费-

*/

@Autowired

private OrderListener orderListener;

/**

* 声明订单队列-配置容器

* @return

*/

@Bean("orderListenerContainer")

public SimpleMessageListenerContainer orderListenerContainer(){

//创建-容器工厂

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

//将配置信息和链接信息赋给容器工厂

factoryConfigurer.configure(factory,connectionFactory);

//容器工厂创建-容器

SimpleMessageListenerContainer container = factory.createListenerContainer();

//指定-

container.setMessageListener(orderListener);

//指定-监听的队列

container.setQueues(orderQueue());

return container;

}

}

配置类声明了订单队列,交换机,通过指定的routingkey绑定了队列与交换机。另外,rabbitTemplate用来发送消息,ListenerContainer指定-(消费者)监听的队列。

客户下单,生产消息,上代码:

@Service

public class SeckillService {

private static final Logger logger = LoggerFactory.getLogger(SeckillService.class);

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private Environment env;

/**

* 生产消息

* @param order

*/

public void seckill(OrderRequest order){

//设置交换机

rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name"));

//设置routingkey

rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key"));

//创建消息体

Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build();

//发送消息

rabbitTemplate.convertAndSend(msg);

}

}

很简单,操作rabbitTemplate,指定交换机和routingkey,发送消息到绑定的队列,等待消费处理。

3.减少库存

消费者消费订单消息,做业务处理。

看一下-(消费者)OrderListener:

/**

* 消息-(消费者)

*/

@Component

public class OrderListener implements ChannelAwareMessageListener {

private static final Logger logger = LoggerFactory.getLogger(OrderListener.class);

@Autowired

private OrderService orderService;

/**

* 处理接收到的消息

* @param message 消息体

* @param channel 通道,确认消费用

* @throws Exception

*/

@Override

public void onMessage(Message message, Channel channel) throws Exception {

try{

//获取交付tag

long tag = message.getMessageProperties().getDeliveryTag();

String str = new String(message.getBody(),"utf-8");

logger.info("接收到的消息:{}",str);

JSONONVIYQabject obj = JSONObject.parseObject(str);

//下单,操作数据库

orderService.order(obj.getString("userId"),obj.getString("goodsId"));

//确认消费

channel.basicAck(tag,true);

}catch(Exception e){

logger.error("消息监听确认机制发生异常:",e.fillInStackTrace());

}

}

}

业务处理 OrderService:

@Service

public class OrderService {

@Resource

private SeckillMapper seckillMapper;

/**

* 下单,操作数据库

* @param userId

* @param goodsId

*/

@Transactional()

public void order(String userId,String goodsId){

//该商品库存-1(当库存>0时)

int count = seckillMapper.reduceGoodsStockById(goodsId);

//更新成功,表明抢单成功,插入下单记录,支付状态设为2-待支付

if(count > 0){

OrderRecord orderRecord = new OrderRecord();

orderRecord.setId(CommonUtils.createUUID());

orderRecord.setGoodsId(goodsId);

orderRecord.setUserId(userId);

orderRecord.setPayStatus(2);

seckillMapper.insertOrderRecord(orderRecord);

}

}

}

Dao接口和Mybatis文件就不往出贴了,这里的逻辑是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},这条update相当于将查询库存和减少库存合并为一个原子操作,避免高并发问题,执行成功,插入订单记录,执行失败,则库存不够抢单失败。

4.支付

订单处理完成后,如果库存减少,也就是抢单成功,那么需要用户在十五分钟内完成支付,这块就要用到死信队列(延迟队列)来处理了,先看模型图:

DLX:dead-letter Exchange 死信交换机

DLK:dead-letter RoutingKey 死信路由

ttl:time-to-live 超时时间

死信队列中,消息到期后,会通过DLX和DLK进入到pay-queue,进行消费。这是另一组消息队列,和订单消息队列是分开的。这里注意他们的绑定关系,主交换机绑定死信队列,死信交换机绑定的是主队列(pay queue)。

接下来声明图中的一系列组件,首先application.properties中增加配置:

#支付处理队列

#主交换机

pay.mq.exchange.name=${mq.env}:pay:mq:exchange

#死信交换机(DLX)

pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange

#主队列

pay.mq.queue.name=${mq.env}:pay:mq:queue

#死信队列

pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue

#主routingkey

pay.mq.routing.key=${mq.env}:pay:mq:routing:key

#死信routingkey(DLK)

pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key

#支付超时时间(毫秒)(TTL),测试原因,这里模拟5秒,如果是生产环境,这里可以是15分钟等

pay.mq.ttl=5000

配置类OrderRabbitmqConfig中增加支付队列和死信队列的声明:

/**

* 死信队列,十五分钟超时

* @return

*/

@Bean

public Queue payDeadLetterQueue(){

Map args = new HashMap();

//声明死信交换机

args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name"));

//声明死信routingkey

args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key"));

//声明死信队列中的消息过期时间

args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class));

//创建死信队列

return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args);

}

/**

* 支付队列交换机(主交换机)

* @return

*/

@Bean

public TopicExchange payTopicExchange(){

return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false);

}

/**

* 将主交换机绑定到死信队列

* @return

*/

@Bean

public Binding payBinding(){

return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key"));

}

/**

* 支付队列(主队列)

* @return

*/

@Bean

public Queue payQueue(){

return new Queue(env.getProperty("pay.mq.queue.name"),true);

}

/**

* 死信交换机

* @return

*/

@Bean

public TopicExchange payDeadLetterExchange(){

return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false);

}

/**

* 将主队列绑定到死信交换机

* @return

*/

@Bean

public Binding payDeadLetterBinding(){

return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key"));

}

/**

* 注入支付-

*/

@Autowired

private PayListener payListener;

/**

* 支付队列-容器

* @return

*/

@Bean

public SimpleMessageListenerContainer payMessageListenerContainer(){

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factoryConfigurer.configure(factory,connectionFactory);

SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer();

listenerContainer.setMessageListener(payListener);

listenerContainer.setQueues(payQueue());

return listenerContainer;

}

支付队列和死信队列的Queue、Exchange、routingkey都已就绪。

看生产者:

@Service

public class OrderService {

@Resource

private SeckillMapper seckillMapper;

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private Environment env;

/**

* 下单,操作数据库

* @param userId

* @param goodsId

*/

@Transactional()

public void order(String userId,String goodsId){

//该商品库存-1(当库存>0时)

int count = seckillMapper.reduceGoodsStockById(goodsId);

//更新成功,表明抢单成功,插入下单记录,支付状态设为2-待支付

if(count > 0){

OrderRecord orderRecord = new OrderRecord();

orderRecord.setId(CommonUtils.createUUID());

orderRecord.setGoodsId(goodsId);

orderRecord.setUserId(userId);

orderRecord.setPayStatus(2);

seckillMapper.insertOrderRecord(orderRecord);

//将该订单添加到支付队列

rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name"));

rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key"));

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

String json = JSON.toJSONString(orderRecord);

Message msg = MessageBuilder.withBody(json.getBytes()).build();

rabbitTemplate.convertAndSend(msg);

}

}

}

在OrderService中,数据库操作完成后,将订单信息发送到死信队列,死信队列中的消息会在十五分钟后进入到支付队列,等待消费。

再看消费者:

@Component

public class PayListener implements ChannelAwareMessageListener {

private static final Logger logger = LoggerFactory.getLogger(PayListener.class);

@Autowired

private PayService payService;

@Override

public void onMessage(Message message, Channel channel) throws Exception {

Long tag = message.getMessageProperties().getDeliveryTag();

try {

String str = new String(message.getBody(), "utf-8");

logger.info("接收到的消息:{}",str);

JSONObject json = JSON.parseObject(str);

String orderId = json.getString("id");

//确认是否付款

payService.confirmPay(orderId);

//确认消费

channel.basicAck(tag, true);

}catch(Exception e){

logger.info("支付消息消费出错:{}",e.getMessage());

logger.info("出错的tag:{}",tag);

}

}

}

PayService:

@Service

public class PayService {

private static final Logger logger = LoggerFactory.getLogger(PayService.class);

@Resource

private SeckillMapper seckillMapper;

/**

* 确认是否支付

* @param orderId

*/

public void confirmPay(String orderId){

OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId);

//根据订单号校验该用户是否已支付

if(checkPay(orderId)){

//已支付

orderRecord.setPayStatus(1);

seckillMapper.updatePayStatus(orderRecord);

logger.info("用户{}已支付",orderId);

}else{

//未支付

orderRecord.setPayStatus(0);

seckillMapper.updatePayStatus(orderRecord);

//取消支付后,商品库存+1

seckillMapper.returnStock(orderRecord.getGoodsId());

logger.info("用户{}未支付",orderId);

}

}

/**

* 模拟判断订单支付成功或失败,成功失败随机

* @param orderId

* @return

*/

public boolean checkPay(String orderId){

Random random = new Random();

int res = random.nextInt(2);

return res==0?false:true;

}

这里checkPay()方法模拟调用第三方支付接口来判断用户是否已支付。若支付成功,订单改为已支付状态,支付失败,改为已取消状态,库存退回。

总结

整个demo,是两组消息队列撑起来的,一组订单消息队列,一组支付消息队列,而每一组队列都是由queue、exchange、routingkey、生产者以及消费者组成。交换机通过routingkey绑定队列,rabbitTemplate通过指定交换机和routingkey将消息发送到指定队列,消费者监听该队列进行消费。不同的是第二组支付队列里嵌入了死信队列来做一个十五分钟的延迟支付。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:直播微信小程序能运行在自有App上吗?
下一篇:解决Springboot项目打包后的页面丢失问题(thymeleaf报错)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~