声明:本站文章均为作者个人原创,图片均为实际截图。如有需要请收藏网站,禁止转载,谢谢配合!!!

1、安装docker

2、使用docker拉取rabbitmq镜像

docker pull rabbitmq

3、运行容器

 docker run -it -d --name mq -p 15672:15672 -p 5672:5672 rabbitmq

4、安装依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>

5、配置

package com.example.demo.config;

import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory getFactory(){
        log.info("开始连接rabbitmq");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1"); //linux主机的ip地址
        factory.setPort(5672);//rabbitmq的端口号
        return factory;
    }
}

6、同步/异步 发送/接收消息

package com.example.demo.task;

import com.example.demo.db.pojo.MessageEntity;
import com.example.demo.db.pojo.MessageRefEntity;
import com.example.demo.exception.EmosException;
import com.example.demo.service.MessageService;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

@Component
@Slf4j
public class MessageTask {

    @Autowired
    private MessageService messageService;

    @Autowired
    private ConnectionFactory factory;

    public void send(String topic, MessageEntity entity) {
        String id = messageService.insertMessage(entity);
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();)
        {
            channel.queueDeclare(topic, true, false, false, null);

            HashMap map = new HashMap();
            map.put("messageId", id);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(map).build();
            channel.basicPublish("", topic, properties ,entity.getMsg().getBytes());
            log.debug("消息发送成功,messageTask");
        } catch (Exception e){
            log.error("执行异常messageTask");
        }
    }

    //异步发送
    @Async
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);
    }

    public int receive(String topic){
        int i = 0;
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();)
        {
            channel.queueDeclare(topic, true, false, false, null);

            while (true){
                GetResponse response = channel.basicGet(topic, false);
                if(response != null){
                    AMQP.BasicProperties properties = response.getProps();
                    Map<String, Object> map = properties.getHeaders();
                    String messageId = map.get("messageId").toString();
                    byte[] body = response.getBody();
                    String message = new String(body);
                    log.debug("从RabbitMq接收的消息:" + message);


                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity);

                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);

                    i++;
                }else {
                    break;
                }
            }
            log.debug("消息发送成功,messageTask");
        } catch (Exception e){
            log.error("执行异常messageTask666", e);
            throw new EmosException("接收消息失败");
        }
        return i;

    }

    @Async
    public int receiveAsync(String topic){
        return receive(topic);
    }

    public void deleteQueue(String topic){
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();)
        {
            channel.queueDelete(topic);
            log.debug("队列删除成功");

        }catch (Exception e){
            log.error("队列删除失败", e);
            throw new EmosException("队列删除失败");
        }
    }

    @Async
    public void deleteQueueAsync(String topic){
        deleteQueue(topic);
    }
}