IT教程 ·

RocketMQ重试机制和音讯幂等

一、重试机制

  因为MQ常常处于庞杂的分布式体系中,斟酌收集波动,效劳宕机,程序非常要素,很有大概涌现音讯发送或许花费失利的问题。因而,音讯的重试就是一切MQ中间件必需斟酌到的一个症结点。假如没有音讯重试,就大概发作音讯丧失的问题,大概对体系发作很大的影响。所以,秉持宁肯多发音讯,也不可丧失音讯的准绳,大部分MQ都对音讯重试供应了很好的支撑。

  MQ 花费者的花费逻辑失利时,能够经由过程设置返回状况到达音讯重试的结果。

  MQ 音讯重试只针对集群花费体式格局见效;播送体式格局不供应失利重试特征,即花费失利后,失利音讯不再重试,继承花费新的音讯。

1、模仿非常

花费者:

package com.zn.retry;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * RocketMQ重试机制花费者
 */
public class RetryConsumer {
    public static void main(String[] args) throws MQClientException {
        //建立花费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //设置NameServer地点
        consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //设置实例称号
        consumer.setInstanceName("consumer");
        //定阅topic
        consumer.subscribe("itmayiedu-topic","TagA");

        //监听音讯
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //猎取音讯
                for (MessageExt messageExt:list){
                    System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
                }
                try {
                    //模仿毛病
                    int i=5/0;
                }catch (Exception e){
                    e.printStackTrace();
                    //须要重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //不须要重试
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动花费者
        consumer.start();
        System.out.println("Consumer Started!");
    }
}

控制台结果:

RocketMQ重试机制和音讯幂等 IT教程 第1张

2、模仿收集耽误

package com.zn.retry;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * RocketMQ重试机制花费者
 */
public class RetryConsumer {
    public static void main(String[] args) throws MQClientException {
        //建立花费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //设置NameServer地点
        consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //设置实例称号
        consumer.setInstanceName("consumer");
        //定阅topic
        consumer.subscribe("itmayiedu-topic","TagA");

        //监听音讯
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //猎取音讯
                for (MessageExt messageExt:list){
                    System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
                }
                try {
                    //收集耽误
                    Thread.sleep(600000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //花费胜利
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动花费者
        consumer.start();
        System.out.println("Consumer Started!");
    }
}

二、音讯幂等

1、在什么情况下会发作RocketMQ的音讯反复花费

①、当体系的挪用链路比较长的时刻,比方体系A挪用体系B,体系B再把音讯发送到RocketMQ中,在体系A挪用体系B的时刻,假如体系B处置惩罚胜利,然则迟迟没有将挪用胜利的结果返回给体系A的时刻,体系A就会尝试从新提议要求给体系B,形成体系B反复处置惩罚,提议多条音讯给RocketMQ形成反复花费;

  ②、在体系B发送给RocketMQ的时刻,也有大概会发作和上面一样的问题,音讯发送超时,节骨体系B重试,致使RocketMQ吸收到了重读音讯;

  ③、当RocketMQ胜利吸收到音讯,并将音讯交给花费者处置惩罚,假如花费者花费完成后还没来得及提交offset给RocketMQ,本身宕机或许重启了,那末RocketMQ没有吸收到offset,就会以为花费失利了,会重发音讯给花费者再次花费;

2、怎样处理音讯的反复花费

经由过程幂等性来保证,只需保证反复音讯不对结果发作影响,就完美地处理这个问题。

在生产者端保证幂等性,一下两种体式格局:

  ①、RocketMQ支撑音讯查询的功用,只需去RocketMQ查询一下是不是已发送过该条音讯就能够了,不存在则发送,存在则不发送;

  ②、引入Redis,在发送音讯到RocketMQ胜利以后,向Redis中插进去一条数据,假如发送重试,则先去Redis查询一个该条音讯是不是已发送过了,存在的话就不反复发送音讯了;

  要领一:RocketMQ音讯查询的机能不是迥殊好,假如在高并发的场景下,每条音讯在发送到RocketMQ时都去查询一下,大概会影响接口的机能;

  要领二:在一些极度的场景下,Redis也没法保证音讯发送胜利以后,就一定能写入Redis胜利,比方写入音讯胜利而Redis此时宕机,那末再次查询Redis推断音讯是不是已发送过,是没法获得准确结果的;

3、生产者

package com.zn.idempotent;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**
 * 音讯幂等生产者
 */
public class IdempotentProvider {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
        //建立一个生产者
        DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
        //设置NameServer地点
        producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //设置生产者实例称号
        producer.setInstanceName("producer");
        //启动生产者
        producer.start();

            //发送音讯
            for (int i=1;i<=1;i++){
                //模仿收集耽误,每秒发送一次MQ
                Thread.sleep(1000);
                //建立音讯,topic主题称号  tags临时价代表小分类, body代表音讯体
                Message message=new Message("itmayiedu-topic03","TagA",("itmayiedu-"+i).getBytes());
                //音讯的唯一标识
                message.setKeys("定单音讯:"+i);
                //发送音讯
                SendResult sendResult=producer.send(message);
                System.out.println("信息幂等问题来了:"+sendResult.toString());
            }
        producer.shutdown();
    }
}

4、花费者

package com.zn.idempotent;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.LogManager;
import java.util.logging.Logger;

/**
 * 音讯幂等花费者
 */
public class IdempotentConsumer {

    static private Map<String, Object> logMap = new HashMap<>();

    public static void main(String[] args) throws MQClientException {
        //建立花费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //设置NameServer地点
        consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //设置实例称号
        consumer.setInstanceName("consumer");
        //定阅topic
        consumer.subscribe("itmayiedu-topic03","TagA");

        //监听音讯
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                String key=null;
                String msgId=null;

                    for (MessageExt messageExt:list){
                        key=messageExt.getKeys();
                        //判读redis中有没有当前音讯key
                        if (logMap.containsKey(key)) {
                            // 无需继承重试。
                            System.out.println("key:"+key+",已花费,无需重试...");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        //RocketMQ因为是集群环境,所以发作的音讯ID大概会反复
                        msgId = messageExt.getMsgId();
                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(messageExt.getBody()));
                        //将当前key保存在redis中
                        logMap.put(messageExt.getKeys(),messageExt);
                    }
                try {
                    int i=5/0;
                }catch (Exception e){
                    e.printStackTrace();
                    //人工赔偿
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
        //启动花费者
        consumer.start();
        System.out.println("Consumer Started!");
    }
}

5、控制台结果

RocketMQ重试机制和音讯幂等 IT教程 第2张

 

参与评论