Spring Cloud Stream异常处理过程解析

网友投稿 572 2023-07-27

Spring Cloud Stream异常处理过程解析

Spring Cloud Stream异常处理过程解析

应用处理

当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。

本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。

局部处理

Stream相关的配置内容如下:

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 192.168.190.129:9876

bindings:

input:

destination: stream-test-topic

group: binder-group

所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:

package com.zj.node.usercenter.rocketmq;

imporZJYFqBmt lombok.extern.slf4j.Slf4j;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.ErrorMessage;

import org.springframework.stereotype.Service;

/**

* 消费者

*

* @author 01

* @date 2019-08-10

**/

@Slf4j

@Service

public class TestStreamConsumer {

@StreamListener(Sink.INPUT)

public void receive1(String messageBody) {

log.info("消费消息,messageBody = {}", messageBody);

throw new IllegalArgumentException("参数错误");

}

/**

* 处理局部异常的方法

*

* @param errorMessage 异常消息对象

*/

@ServiceActivator(

// 通过特定的格式指定处理哪个channel的异常

inputChannel = "stream-test-topic.binder-group.errors"

)

public void handleError(ErrorMessage errorMessage) {

// 获取异常对象

Throwable errorMessagePayload = errorMessage.getPayload();

log.error("发生异常", errorMessagePayload);

// 获取消息体

Message> originalMessage = errorMesshttp://age.getOriginalMessage();

if (originalMessage != null) {

log.error("消息体: {}", originalMessage.getPayload());

} else {

log.error("消息体为空");

}

}

}

全局处理

全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:

@StreamListener(Sink.INPUT)

public void receive1(String messageBody) {

log.info("消费消息,messageBody = {}", messageBody);

throw new IllegalArgumentException("参数错误");

}

/**

* 处理全局异常的方法

*

* @param errorMessage 异常消息对象

*/

@StreamListener("errorChannel")

public void handleError(ErrorMessage errorMessage) {

log.error("发生异常. errorMessage = {}", errorMessage);

}

系统处理

系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:

丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产

requeue(重新排队,从而重新处理)

将失败的消息发送给DLQ(死信队列)

DLQ

目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:

spring:

cloud:

stream:

bindings:

input:

destination: stream-test-topic

group: binder-group

rabbit:

bindings:

input:

consumer:

# 自动将失败的消息发送给DLQ

auto-bind-dlq: true

消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:

cloud:

stream:

rabbit:

bindings:

input:

consumer:

republish-to-dlq: true

requeue

Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。

需要添加如下配置:

# 默认是3,设为1则禁用重试

spring.cloud.stream.bindings..consumer.max-attempts=1

# 表示是否要requeue被拒绝ZJYFqBm的消息(即:requeue处理失败的消息)

spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

RetryTemplate

RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例

spring:

cloud:

stream:

bindings:

:

consumer:

# 最多尝试处理几次,默认3

maxAttempts: 3

# 重试时初始避退间隔,单位毫秒,默认1000

backOffInitialInterval: 1000

# 重试时最大避退间隔,单位毫秒,默认10000

backOffMaxInterval: 10000

# 避退乘数,默认2.0

backOffMultiplier: 2.0

# 当listen抛出retryableExceptions未列出的异常时,是否要重试

defaultRetryable: true

# 异常是否允许重试的map映射

retryableExceptions:

java.lang.RuntimeException: true

java.lang.IllegalStateException: false

另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:

@Configuration

class RetryConfiguration {

@StreamRetryTemplate

public RetryTemplate sinkConsumerRetryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

retryTemplate.setRetryPolicy(retryPolicy());

retryTemplate.setBackOffPolicy(backOffPolicy());

return retryTemplate;

}

private ExceptionClassifierRetryPolicy retryPolicy() {

BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(

Collections.singletonList(IllegalAccessException.class

));

keepRetryingClassifier.setTraverseCauses(true);

SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);

AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();

retryPolicy.setExceptionClassifier(

classifiable -> keepRetryingClassifier.classify(classifiable) ZJYFqBm63;

alwaysRetryPolicy : simpleRetryPolicy);

return retryPolicy;

}

private FixedBackOffPolicy backOffPolicy() {

final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();

backOffPolicy.setBackOffPeriod(2);

return backOffPolicy;

}

}

最后还需要添加一段配置:

spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持设置retry-template-name

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Boot 2.X 快速集成单元测试解析
下一篇:SpringBoot如何优雅地处理全局异常详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~