1、安装docker
2、使用docker拉取rabbitmq镜像
docker pull rabbitmq
3、运行容器
docker run -it -d --name mq -p 15672:15672 -p 5672:5672 rabbitmq
4、安装依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
5、配置
package com.example.demo.config;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory getFactory(){
log.info("开始连接rabbitmq");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //linux主机的ip地址
factory.setPort(5672);//rabbitmq的端口号
return factory;
}
}
6、同步/异步 发送/接收消息
package com.example.demo.task;
import com.example.demo.db.pojo.MessageEntity;
import com.example.demo.db.pojo.MessageRefEntity;
import com.example.demo.exception.EmosException;
import com.example.demo.service.MessageService;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class MessageTask {
@Autowired
private MessageService messageService;
@Autowired
private ConnectionFactory factory;
public void send(String topic, MessageEntity entity) {
String id = messageService.insertMessage(entity);
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();)
{
channel.queueDeclare(topic, true, false, false, null);
HashMap map = new HashMap();
map.put("messageId", id);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();
channel.basicPublish("", topic, properties ,entity.getMsg().getBytes());
log.debug("消息发送成功,messageTask");
} catch (Exception e){
log.error("执行异常messageTask");
}
}
//异步发送
@Async
public void sendAsync(String topic, MessageEntity entity) {
send(topic, entity);
}
public int receive(String topic){
int i = 0;
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();)
{
channel.queueDeclare(topic, true, false, false, null);
while (true){
GetResponse response = channel.basicGet(topic, false);
if(response != null){
AMQP.BasicProperties properties = response.getProps();
Map<String, Object> map = properties.getHeaders();
String messageId = map.get("messageId").toString();
byte[] body = response.getBody();
String message = new String(body);
log.debug("从RabbitMq接收的消息:" + message);
MessageRefEntity entity = new MessageRefEntity();
entity.setMessageId(messageId);
entity.setReceiverId(Integer.parseInt(topic));
entity.setReadFlag(false);
entity.setLastFlag(true);
messageService.insertRef(entity);
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
i++;
}else {
break;
}
}
log.debug("消息发送成功,messageTask");
} catch (Exception e){
log.error("执行异常messageTask666", e);
throw new EmosException("接收消息失败");
}
return i;
}
@Async
public int receiveAsync(String topic){
return receive(topic);
}
public void deleteQueue(String topic){
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();)
{
channel.queueDelete(topic);
log.debug("队列删除成功");
}catch (Exception e){
log.error("队列删除失败", e);
throw new EmosException("队列删除失败");
}
}
@Async
public void deleteQueueAsync(String topic){
deleteQueue(topic);
}
}