声明:本站文章均为作者个人原创,图片均为实际截图。如有需要请收藏网站,禁止转载,谢谢配合!!!

准备:

windows安装erlang和rabbitmq

springboot使用rabbitmq

1、设置手动确认是否消费成功

acknowledge-mode: manual

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual

2、手动实现自动重试,超过最大次数进入死信队列

package com.timi.t1.config;


import com.rabbitmq.client.Channel;
import com.timi.t1.constant.AllProjectConstant;
import com.timi.t1.exception.GlobalException;
import com.timi.t1.service.TapdService;
import com.timi.t1.utils.LogUtil;
import com.timi.t1.utils.ProjectUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;


@Configuration
public class RabbitmqConfig {

    public static final String Queue_SetTapdCreateFrom = "Queue_SetTapdCreateFrom";

    @Autowired
    TapdService tapdService;

    @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
    private Integer retryCountMax;

    private static final long RETRY_INTERVAL = 5;

    /**
     * 设置建单来源为 T1需求系统
     * 持久化
     * @return
     */
    @Bean
    public Queue setTapdCreateFromQueue(){
        return new Queue(Queue_SetTapdCreateFrom, true);
    }

    @RabbitListener(queues = Queue_SetTapdCreateFrom)
    public void setTapdCreateFromReceive(String msg, Channel channel, Message message) throws IOException {
        //ProjectUtil.setProject_ymzx();
        System.out.println("Queue_SetTapdCreateFrom Receive : " + msg);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String queueDesc = "队列-回写建单来源为T1管理系统";
            //String t = Objects.equals(msg, "1020427612116985354") ? msg : "";
            String t = msg;
            boolean res = tapdService.setTapdCreateFrom(t);
            if (res){
                channel.basicAck(deliveryTag, false);
            }else{

                int retryCount = 0;
                boolean success = false;
                //手动重试
                // 消费失败并且重试次数<=重试上限次数
                while (!success && retryCount < retryCountMax) {
                    retryCount++;
                    // 具体业务逻辑
                    success = tapdService.setTapdCreateFrom(t);
                    // 如果失败则重试
                    if (!success) {
                        String errorTip = queueDesc + "第" + retryCount + "次消费失败" +
                                ((retryCount < retryCountMax) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
                        LogUtil.error(errorTip);
                        Thread.sleep(RETRY_INTERVAL * 1000);
                    }
                }

                if (success) {
                    // 消费成功,确认
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    LogUtil.info(queueDesc + "成功:" + t);
                } else {
                    // 重试多次之后仍失败,进入死信队列
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    LogUtil.info(queueDesc + "多次失败,进入死信队列:" + t);
                }
            }
        }catch (Exception e){

        }

    }
}