RocketMQ重试机制和音讯幂等
一、重试机制
因为MQ常常处于庞杂的分布式体系中,斟酌收集波动,效劳宕机,程序非常要素,很有大概涌现音讯发送或许花费失利的问题。因而,音讯的重试就是一切MQ中间件必需斟酌到的一个症结点。假如没有音讯重试,就大概发作音讯丧失的问题,大概对体系发作很大的影响。所以,秉持宁肯多发音讯,也不可丧失音讯的准绳,大部分MQ都对音讯重试供应了很好的支撑。
MQ 花费者的花费逻辑失利时,能够经由过程设置返回状况到达音讯重试的结果。
MQ 音讯重试只针对集群花费体式格局见效;播送体式格局不供应失利重试特征,即花费失利后,失利音讯不再重试,继承花费新的音讯。
1、模仿非常
花费者:
package com.zn.retry; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * RocketMQ重试机制花费者 */ public class RetryConsumer { public static void main(String[] args) throws MQClientException { //建立花费者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //设置NameServer地点 consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //设置实例称号 consumer.setInstanceName("consumer"); //定阅topic consumer.subscribe("itmayiedu-topic","TagA"); //监听音讯 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //猎取音讯 for (MessageExt messageExt:list){ System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody())); } try { //模仿毛病 int i=5/0; }catch (Exception e){ e.printStackTrace(); //须要重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //不须要重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动花费者 consumer.start(); System.out.println("Consumer Started!"); } }
控制台结果:
2、模仿收集耽误
package com.zn.retry; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * RocketMQ重试机制花费者 */ public class RetryConsumer { public static void main(String[] args) throws MQClientException { //建立花费者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //设置NameServer地点 consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //设置实例称号 consumer.setInstanceName("consumer"); //定阅topic consumer.subscribe("itmayiedu-topic","TagA"); //监听音讯 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //猎取音讯 for (MessageExt messageExt:list){ System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody())); } try { //收集耽误 Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); } //花费胜利 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动花费者 consumer.start(); System.out.println("Consumer Started!"); } }
二、音讯幂等
1、在什么情况下会发作RocketMQ的音讯反复花费
①、当体系的挪用链路比较长的时刻,比方体系A挪用体系B,体系B再把音讯发送到RocketMQ中,在体系A挪用体系B的时刻,假如体系B处置惩罚胜利,然则迟迟没有将挪用胜利的结果返回给体系A的时刻,体系A就会尝试从新提议要求给体系B,形成体系B反复处置惩罚,提议多条音讯给RocketMQ形成反复花费;
②、在体系B发送给RocketMQ的时刻,也有大概会发作和上面一样的问题,音讯发送超时,节骨体系B重试,致使RocketMQ吸收到了重读音讯;
③、当RocketMQ胜利吸收到音讯,并将音讯交给花费者处置惩罚,假如花费者花费完成后还没来得及提交offset给RocketMQ,本身宕机或许重启了,那末RocketMQ没有吸收到offset,就会以为花费失利了,会重发音讯给花费者再次花费;
2、怎样处理音讯的反复花费
经由过程幂等性来保证,只需保证反复音讯不对结果发作影响,就完美地处理这个问题。
在生产者端保证幂等性,一下两种体式格局:
①、RocketMQ支撑音讯查询的功用,只需去RocketMQ查询一下是不是已发送过该条音讯就能够了,不存在则发送,存在则不发送;
②、引入Redis,在发送音讯到RocketMQ胜利以后,向Redis中插进去一条数据,假如发送重试,则先去Redis查询一个该条音讯是不是已发送过了,存在的话就不反复发送音讯了;
要领一:RocketMQ音讯查询的机能不是迥殊好,假如在高并发的场景下,每条音讯在发送到RocketMQ时都去查询一下,大概会影响接口的机能;
要领二:在一些极度的场景下,Redis也没法保证音讯发送胜利以后,就一定能写入Redis胜利,比方写入音讯胜利而Redis此时宕机,那末再次查询Redis推断音讯是不是已发送过,是没法获得准确结果的;
3、生产者
package com.zn.idempotent; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * 音讯幂等生产者 */ public class IdempotentProvider { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { //建立一个生产者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //设置NameServer地点 producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //设置生产者实例称号 producer.setInstanceName("producer"); //启动生产者 producer.start(); //发送音讯 for (int i=1;i<=1;i++){ //模仿收集耽误,每秒发送一次MQ Thread.sleep(1000); //建立音讯,topic主题称号 tags临时价代表小分类, body代表音讯体 Message message=new Message("itmayiedu-topic03","TagA",("itmayiedu-"+i).getBytes()); //音讯的唯一标识 message.setKeys("定单音讯:"+i); //发送音讯 SendResult sendResult=producer.send(message); System.out.println("信息幂等问题来了:"+sendResult.toString()); } producer.shutdown(); } }
4、花费者
package com.zn.idempotent; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.LogManager; import java.util.logging.Logger; /** * 音讯幂等花费者 */ public class IdempotentConsumer { static private Map<String, Object> logMap = new HashMap<>(); public static void main(String[] args) throws MQClientException { //建立花费者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //设置NameServer地点 consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); //设置实例称号 consumer.setInstanceName("consumer"); //定阅topic consumer.subscribe("itmayiedu-topic03","TagA"); //监听音讯 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { String key=null; String msgId=null; for (MessageExt messageExt:list){ key=messageExt.getKeys(); //判读redis中有没有当前音讯key if (logMap.containsKey(key)) { // 无需继承重试。 System.out.println("key:"+key+",已花费,无需重试..."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //RocketMQ因为是集群环境,所以发作的音讯ID大概会反复 msgId = messageExt.getMsgId(); System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(messageExt.getBody())); //将当前key保存在redis中 logMap.put(messageExt.getKeys(),messageExt); } try { int i=5/0; }catch (Exception e){ e.printStackTrace(); //人工赔偿 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动花费者 consumer.start(); System.out.println("Consumer Started!"); } }
5、控制台结果