當消費者在處理接收到的消息時,有可能會由于某些原因而拋出異常。若希望對拋出來的異常進行處理的話,就需要采取一些異常處理手段,異常處理的方式可分為三種:應(yīng)用層面的處理、系統(tǒng)層面的處理以及通過RetryTemplate進行處理。
創(chuàng)新互聯(lián)公司專注于慈利企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),成都做商城網(wǎng)站。慈利網(wǎng)站建設(shè)公司,為慈利等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)
本小節(jié)先來介紹較為常用的應(yīng)用層面的異常處理方式,該方式又細分為局部處理和全局處理。
局部處理
Stream相關(guān)的配置內(nèi)容如下:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
input:
destination: stream-test-topic
group: binder-group
所謂局部處理就是針對指定的channel進行處理,需要定義一個處理異常的方法,并在該方法上添加@ServiceActivator
注解,該注解有一個inputChannel
屬性,用于指定對哪個channel進行處理,格式為{destination}.{group}.errors
。具體代碼如下:
package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
/**
* 消費者
*
* @author 01
* @date 2019-08-10
**/
@Slf4j
@Service
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
log.info("消費消息,messageBody = {}", messageBody);
throw new IllegalArgumentException("參數(shù)錯誤");
}
/**
* 處理局部異常的方法
*
* @param errorMessage 異常消息對象
*/
@ServiceActivator(
// 通過特定的格式指定處理哪個channel的異常
inputChannel = "stream-test-topic.binder-group.errors"
)
public void handleError(ErrorMessage errorMessage) {
// 獲取異常對象
Throwable errorMessagePayload = errorMessage.getPayload();
log.error("發(fā)生異常", errorMessagePayload);
// 獲取消息體
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
log.error("消息體: {}", originalMessage.getPayload());
} else {
log.error("消息體為空");
}
}
}
全局處理
全局處理則是可以處理所有channel拋出來的異常,所有的channel拋出異常后會生成一個ErrorMessage
對象,即錯誤消息。錯誤消息會被放到一個專門的channel里,這個channel就是errorChannel。所以通過監(jiān)聽errorChannel就可以實現(xiàn)全局異常的處理。具體代碼如下:
@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
log.info("消費消息,messageBody = {}", messageBody);
throw new IllegalArgumentException("參數(shù)錯誤");
}
/**
* 處理全局異常的方法
*
* @param errorMessage 異常消息對象
*/
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("發(fā)生異常. errorMessage = {}", errorMessage);
}
系統(tǒng)處理方式,因消息中間件的不同而異。如果應(yīng)用層面沒有配置錯誤處理,那么error將會被傳播給binder,而binder則會將error回傳給消息中間件。消息中間件可以選擇:
DLQ
目前RabbitMQ對DLQ的支持比較好,這里以RabbitMQ為例,只需要添加DLQ相關(guān)的配置:
spring:
cloud:
stream:
bindings:
input:
destination: stream-test-topic
group: binder-group
rabbit:
bindings:
input:
consumer:
# 自動將失敗的消息發(fā)送給DLQ
auto-bind-dlq: true
消息消費失敗后,就會放入死信隊列。在控制臺操作一下,即可將死信放回消息隊列,這樣,客戶端就可以重新處理。
如果想獲取原始錯誤的異常堆棧,可添加如下配置:
spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
republish-to-dlq: true
requeue
Rabbit及Kafka的binder依賴RetryTemplate實現(xiàn)消息重試,從而提升消息處理的成功率。然而,如果設(shè)置了spring.cloud.stream.bindings.input.consumer.max-attempts=1
,那么RetryTemplate則不會再重試。此時可以通過requeue方式來處理異常。
需要添加如下配置:
# 默認是3,設(shè)為1則禁用重試
spring.cloud.stream.bindings.<input channel名稱>.consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
這樣,失敗的消息將會被重新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException
異常為止。
RetryTemplate主要用于實現(xiàn)消息重試,也是錯誤處理的一種手段。有兩種配置方式,一種是通過配置文件進行配置,如下示例:
spring:
cloud:
stream:
bindings:
<input channel名稱>:
consumer:
# 最多嘗試處理幾次,默認3
maxAttempts: 3
# 重試時初始避退間隔,單位毫秒,默認1000
backOffInitialInterval: 1000
# 重試時最大避退間隔,單位毫秒,默認10000
backOffMaxInterval: 10000
# 避退乘數(shù),默認2.0
backOffMultiplier: 2.0
# 當listen拋出retryableExceptions未列出的異常時,是否要重試
defaultRetryable: true
# 異常是否允許重試的map映射
retryableExceptions:
java.lang.RuntimeException: true
java.lang.IllegalStateException: false
另一種則是通過代碼配置,在多數(shù)場景下,使用配置文件定制重試行為都是可以滿足需求的,但配置文件里支持的配置項可能無法滿足一些復(fù)雜需求。此時可使用代碼方式配置RetryTemplate,如下示例:
@Configuration
class RetryConfiguration {
@StreamRetryTemplate
public RetryTemplate sinkConsumerRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
private ExceptionClassifierRetryPolicy retryPolicy() {
BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
Collections.singletonList(IllegalAccessException.class
));
keepRetryingClassifier.setTraverseCauses(true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
retryPolicy.setExceptionClassifier(
classifiable -> keepRetryingClassifier.classify(classifiable) ?
alwaysRetryPolicy : simpleRetryPolicy);
return retryPolicy;
}
private FixedBackOffPolicy backOffPolicy() {
final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2);
return backOffPolicy;
}
}
最后還需要添加一段配置:
spring.cloud.stream.bindings.<input channel名稱>.consumer.retry-template-name=myRetryTemplate
注:Spring Cloud Stream 2.2才支持設(shè)置retry-template-name
本文名稱:SpringCloudStream異常處理
本文網(wǎng)址:http://chinadenli.net/article6/giidig.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、關(guān)鍵詞優(yōu)化、ChatGPT、搜索引擎優(yōu)化、靜態(tài)網(wǎng)站、軟件開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)