Rabbitmq WorkQueue工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
1、定义发送者
package cn.badianboke.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2WorkQueue() throws InterruptedException{
String queueName = "simple.queue";
String message = "hello, spring amqp - ";
for (int i = 1; i <= 50; i++){
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20); //每秒钟发50个
}
}
}
2、定义两个消费者
package cn.badianboke.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("Consumer1 Received Message: " + message + " / " + LocalTime.now());
Thread.sleep(20); //每秒钟接收50个消息
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.err.println("Consumer2 Received Message: " + message + " / " + LocalTime.now());
Thread.sleep(200); //每秒钟接收5个消息
}
}
3、设置消息预取数量
listener: simple: prefetch: 1
spring:
rabbitmq:
host: 170.106.116.XXX
port: 5672
username: badianboke
password: 123321
verify-host: /
listener:
simple:
prefetch: 1