RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。
所有主要的编程语言均有与代理接口通讯的客户端库。
如果不指定,系统运行,MQ就会马上进行消息的处理。但是此时系统并没有完全加载完毕,有一些内存的变量现在还没有值,这样启动肯定会有问题。
这里我们需要指定一下MQ为手动启动,启动时机为系统初始化完毕。
首先本地需要有RabbitMQ环境,然后创建一个Queue名字为myQueue。
用SpringBoot搭建一个简易的示例程序,在POM中引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
创建MQ配置类,主要用于线程池和连接池等相关配置
package com.example.springboot.config; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration public class MyRabbitListenerConfig { /** * 连接工厂配置 */ @Bean(name = "myQueueFactory") public SimpleRabbitListenerContainerFactory mainFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(30); // 设置并发消费者数量 factory.setMaxConcurrentConsumers(50); // 设置最大并发消费者数量 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动应答 factory.setDefaultRequeueRejected(false); // 拒绝消息是否重回队列 configurer.configure(factory, connectionFactory); return factory; } /** * 连接工厂 */ @Bean(name = "myConnectionFactory") public ConnectionFactory mainConnectionFactory(@Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } /** * 处理消息的线程池 */ @Bean("myTaskExecutor") public Executor modbusTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(50); // 设置最大线程数(队列无限大的话,这个就没有意义了) executor.setMaxPoolSize(50); //配置队列大小 executor.setQueueCapacity(Integer.MAX_VALUE); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("处理上报信息"); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); //执行初始化 executor.initialize(); return executor; } }
合理的配置相关参数,有助于提升消息处理性能
创建Listener类,用于监听MQ消息,但是注意这里指定autoStartup为不启动
package com.example.springboot.listener; import com.alibaba.fastjson2.JSON; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.Executor; @Component @Slf4j public class MyRabbitListener { @RabbitListener(id = "myQueueID", autoStartup = "false", queues = "myQueue",containerFactory = "myQueueFactory") @RabbitHandler @Async(value = "myTaskExecutor") public void processModBusMessage(String content, Message msg, Channel channel) throws IOException { System.out.println(JSON.toJSONString(content)); //手动进行应答 channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } }
创建Spring的Runner,并指定启动顺序为最后,启动后打开MQ监听
package com.example.springboot.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @Slf4j @Order(value = 99) public class MqInitRunner implements ApplicationRunner { private final RabbitListenerEndpointRegistry registry; // 这里红线提示无需处理 @Autowired public MqInitRunner(RabbitListenerEndpointRegistry registry) { this.registry = registry; } @Override public void run(ApplicationArguments args) { //得到容器的对象 MessageListenerContainer container = registry.getListenerContainer("myQueueID"); //判断容器状态 if(!container.isRunning()){ //开启容器 container.start(); log.info("开启MQ容器 成功"); } } }
我们事先在MQ的队列中增加几条消息,任意消息均可,然后看启动顺序
11:09:32.938 INFO o.s.a.r.c.CachingConnectionFactory LN:651 Attempting to connect to: 127.0.0.1:5672 11:09:32.955 INFO o.s.a.r.c.CachingConnectionFactory LN:590 Created new connection: myConnectionFactory#4357524b:0/SimpleConnection@665ed71a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64387] 11:09:32.983 INFO c.e.springboot.listener.MqInitRunner LN:33 开启MQ容器 成功 系统启动成功 通过以下地址访问 http://localhost:1088 http://172.31.240.1:1088 "2" "1" "3" 11:10:11.130 INFO o.s.a.r.l.SimpleMessageListenerContainer LN:652 Waiting for workers to finish.
可以看到,消息的处理,是在系统正式启动成功后才开始运行的。
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: