這篇文章主要介紹“如何解決RocketMQ消息消費(fèi)異常”,在日常操作中,相信很多人在如何解決RocketMQ消息消費(fèi)異常問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”如何解決RocketMQ消息消費(fèi)異常”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括東阿網(wǎng)站建設(shè)、東阿網(wǎng)站制作、東阿網(wǎng)頁制作以及東阿網(wǎng)絡(luò)營(yíng)銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,東阿網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到東阿省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
開發(fā)中在項(xiàng)目重啟時(shí)會(huì)重復(fù)消費(fèi)消息,但其實(shí)消息已經(jīng)消費(fèi)過了。


然而通過源碼斷點(diǎn)MQClientInstance 定時(shí)任務(wù)正常,只是每次更新的offset都是原offet
由于是用的spring-boot整合的client,跟蹤consumer源碼,代碼在DefaultRocketMQListenerContainer.handleMessage方法中
然而一切正常,再往上跟蹤到DefaultMessageListenerConcurrently
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}首先在catch代碼塊點(diǎn)打斷點(diǎn)看看是不是有問題,結(jié)果發(fā)現(xiàn)并沒有走到這里,這就坑爹了,害我又從其它方面各種查原因,浪費(fèi)了很多時(shí)間。后面一步一步調(diào)試,最終在 log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); 打日志這一步時(shí)拋出了異常,這尼瑪打個(gè)日志還能異常,還不是Exception的異常。。本來松了口氣以為找到了原因就好解決了,沒想到這才是剛剛開始。
java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter

原因是在MessageClientExt類中調(diào)用getMsgId方法里,調(diào)用了MessageClientIDSetter.getUniqID(this)直接拋出的異常

從異常信息來看是MessageClientIDSetter 在初始化的時(shí)候出了問題
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.position(0);
tempBuffer.put(ip);
tempBuffer.position(ip.length);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 2);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}發(fā)面是在ip = UtilAll.getIP();出了問題,然則并沒有到catch代碼塊,而是跳到了DefaultMqPushConsumerImpl類中,這里又一個(gè)坑爹的是異常塊沒有任何處理,看不到異常信息,好吧只能一步一步繼續(xù)斷點(diǎn)調(diào)試

最終在UtillAll類的ipV6Check方法執(zhí)行到InetAddressValidator.getInstance();出了問題。
private static boolean ipV6Check(byte[] ip) {
if (ip.length != 16) {
throw new RuntimeException("illegal ipv6 bytes");
}
InetAddressValidator validator = InetAddressValidator.getInstance();
return validator.isValidInet6Address(ipToIPv6Str(ip));
}但是在本地調(diào)試這段代碼又沒有任何問題。因此只能在debug時(shí)調(diào)試,報(bào)的錯(cuò)是classNotFound異常

從rocketMq的依賴來看他需要的版本是1.6

因此我們需要把1.3.1的版本移除
看著這密密麻麻的依賴關(guān)系,Idea還沒有查找功能,只能慢慢找了,最后功夫不復(fù)有心人,把依賴移除,重啟一切都好了

到此,關(guān)于“如何解決RocketMQ消息消費(fèi)異常”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
當(dāng)前標(biāo)題:如何解決RocketMQ消息消費(fèi)異常
轉(zhuǎn)載來于:http://chinadenli.net/article30/ippopo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、網(wǎng)站建設(shè)、搜索引擎優(yōu)化、動(dòng)態(tài)網(wǎng)站、電子商務(wù)、定制網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)