准备:
1、设置手动确认是否消费成功
acknowledge-mode: manual
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
listener:
simple:
acknowledge-mode: manual
2、手动实现自动重试,超过最大次数进入死信队列
package com.timi.t1.config;
import com.rabbitmq.client.Channel;
import com.timi.t1.constant.AllProjectConstant;
import com.timi.t1.exception.GlobalException;
import com.timi.t1.service.TapdService;
import com.timi.t1.utils.LogUtil;
import com.timi.t1.utils.ProjectUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
@Configuration
public class RabbitmqConfig {
public static final String Queue_SetTapdCreateFrom = "Queue_SetTapdCreateFrom";
@Autowired
TapdService tapdService;
@Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
private Integer retryCountMax;
private static final long RETRY_INTERVAL = 5;
/**
* 设置建单来源为 T1需求系统
* 持久化
* @return
*/
@Bean
public Queue setTapdCreateFromQueue(){
return new Queue(Queue_SetTapdCreateFrom, true);
}
@RabbitListener(queues = Queue_SetTapdCreateFrom)
public void setTapdCreateFromReceive(String msg, Channel channel, Message message) throws IOException {
//ProjectUtil.setProject_ymzx();
System.out.println("Queue_SetTapdCreateFrom Receive : " + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String queueDesc = "队列-回写建单来源为T1管理系统";
//String t = Objects.equals(msg, "1020427612116985354") ? msg : "";
String t = msg;
boolean res = tapdService.setTapdCreateFrom(t);
if (res){
channel.basicAck(deliveryTag, false);
}else{
int retryCount = 0;
boolean success = false;
//手动重试
// 消费失败并且重试次数<=重试上限次数
while (!success && retryCount < retryCountMax) {
retryCount++;
// 具体业务逻辑
success = tapdService.setTapdCreateFrom(t);
// 如果失败则重试
if (!success) {
String errorTip = queueDesc + "第" + retryCount + "次消费失败" +
((retryCount < retryCountMax) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
LogUtil.error(errorTip);
Thread.sleep(RETRY_INTERVAL * 1000);
}
}
if (success) {
// 消费成功,确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
LogUtil.info(queueDesc + "成功:" + t);
} else {
// 重试多次之后仍失败,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
LogUtil.info(queueDesc + "多次失败,进入死信队列:" + t);
}
}
}catch (Exception e){
}
}
}