声明:本站文章均为作者个人原创,图片均为实际截图。如有需要请收藏网站,禁止转载,谢谢配合!!!

Rabbitmq WorkQueue工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

1、定义发送者

package cn.badianboke.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException{
        String queueName = "simple.queue";
        String message = "hello, spring amqp - ";
        for (int i = 1; i <= 50; i++){
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20); //每秒钟发50个
        }
    }
}

2、定义两个消费者

package cn.badianboke.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String message) throws InterruptedException {
        System.out.println("Consumer1 Received Message: " + message + " / " + LocalTime.now());
        Thread.sleep(20); //每秒钟接收50个消息
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String message) throws InterruptedException {
        System.err.println("Consumer2 Received Message: " + message + " / " + LocalTime.now());
        Thread.sleep(200); //每秒钟接收5个消息
    }
}

3、设置消息预取数量

listener: simple: prefetch: 1

spring:
  rabbitmq:
    host: 170.106.116.XXX
    port: 5672
    username: badianboke
    password: 123321
    verify-host: /
    listener:
      simple:
        prefetch: 1