RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。
所有主要的编程语言均有与代理接口通讯的客户端库。

如果不指定,系统运行,MQ就会马上进行消息的处理。但是此时系统并没有完全加载完毕,有一些内存的变量现在还没有值,这样启动肯定会有问题。
这里我们需要指定一下MQ为手动启动,启动时机为系统初始化完毕。
首先本地需要有RabbitMQ环境,然后创建一个Queue名字为myQueue。
用SpringBoot搭建一个简易的示例程序,在POM中引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
创建MQ配置类,主要用于线程池和连接池等相关配置
package com.example.springboot.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class MyRabbitListenerConfig {
/**
* 连接工厂配置
*/
@Bean(name = "myQueueFactory")
public SimpleRabbitListenerContainerFactory mainFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(30); // 设置并发消费者数量
factory.setMaxConcurrentConsumers(50); // 设置最大并发消费者数量
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动应答
factory.setDefaultRequeueRejected(false); // 拒绝消息是否重回队列
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* 连接工厂
*/
@Bean(name = "myConnectionFactory")
public ConnectionFactory mainConnectionFactory(@Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* 处理消息的线程池
*/
@Bean("myTaskExecutor")
public Executor modbusTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(50);
// 设置最大线程数(队列无限大的话,这个就没有意义了)
executor.setMaxPoolSize(50);
//配置队列大小
executor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("处理上报信息");
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
executor.initialize();
return executor;
}
}合理的配置相关参数,有助于提升消息处理性能
创建Listener类,用于监听MQ消息,但是注意这里指定autoStartup为不启动
package com.example.springboot.listener;
import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.Executor;
@Component
@Slf4j
public class MyRabbitListener {
@RabbitListener(id = "myQueueID", autoStartup = "false", queues = "myQueue",containerFactory = "myQueueFactory")
@RabbitHandler
@Async(value = "myTaskExecutor")
public void processModBusMessage(String content, Message msg, Channel channel) throws IOException {
System.out.println(JSON.toJSONString(content));
//手动进行应答
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}创建Spring的Runner,并指定启动顺序为最后,启动后打开MQ监听
package com.example.springboot.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@Order(value = 99)
public class MqInitRunner implements ApplicationRunner {
private final RabbitListenerEndpointRegistry registry;
// 这里红线提示无需处理
@Autowired
public MqInitRunner(RabbitListenerEndpointRegistry registry) {
this.registry = registry;
}
@Override
public void run(ApplicationArguments args) {
//得到容器的对象
MessageListenerContainer container = registry.getListenerContainer("myQueueID");
//判断容器状态
if(!container.isRunning()){
//开启容器
container.start();
log.info("开启MQ容器 成功");
}
}
}我们事先在MQ的队列中增加几条消息,任意消息均可,然后看启动顺序
11:09:32.938 INFO o.s.a.r.c.CachingConnectionFactory LN:651 Attempting to connect to: 127.0.0.1:5672 11:09:32.955 INFO o.s.a.r.c.CachingConnectionFactory LN:590 Created new connection: myConnectionFactory#4357524b:0/SimpleConnection@665ed71a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64387] 11:09:32.983 INFO c.e.springboot.listener.MqInitRunner LN:33 开启MQ容器 成功 系统启动成功 通过以下地址访问 http://localhost:1088 http://172.31.240.1:1088 "2" "1" "3" 11:10:11.130 INFO o.s.a.r.l.SimpleMessageListenerContainer LN:652 Waiting for workers to finish.
可以看到,消息的处理,是在系统正式启动成功后才开始运行的。
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: