网站首页
Java
站长
开源
框架
理论
JS
Linux
DB
服务器
NET
生活
软件
PHP
其他
您的位置:首页 > 开源 > RabbitMQ延迟启动
RabbitMQ延迟启动
2023-9-26    946    0

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。

所有主要的编程语言均有与代理接口通讯的客户端库。

https://www.rabbitmq.com/ 


RabbitMQ.jpg

 

如果不指定,系统运行,MQ就会马上进行消息的处理。但是此时系统并没有完全加载完毕,有一些内存的变量现在还没有值,这样启动肯定会有问题。

这里我们需要指定一下MQ为手动启动,启动时机为系统初始化完毕。


首先本地需要有RabbitMQ环境,然后创建一个Queue名字为myQueue。


用SpringBoot搭建一个简易的示例程序,在POM中引入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


创建MQ配置类,主要用于线程池和连接池等相关配置

package com.example.springboot.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class MyRabbitListenerConfig {

    /**
     * 连接工厂配置
     */
    @Bean(name = "myQueueFactory")
    public SimpleRabbitListenerContainerFactory mainFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                            @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(30); // 设置并发消费者数量
        factory.setMaxConcurrentConsumers(50); // 设置最大并发消费者数量
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动应答
        factory.setDefaultRequeueRejected(false); // 拒绝消息是否重回队列
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    /**
     * 连接工厂
     */
    @Bean(name = "myConnectionFactory")
    public ConnectionFactory mainConnectionFactory(@Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port,
                                                   @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    /**
     * 处理消息的线程池
     */
    @Bean("myTaskExecutor")
    public Executor modbusTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(50);
        // 设置最大线程数(队列无限大的话,这个就没有意义了)
        executor.setMaxPoolSize(50);
        //配置队列大小
        executor.setQueueCapacity(Integer.MAX_VALUE);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("处理上报信息");
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }

}

合理的配置相关参数,有助于提升消息处理性能


创建Listener类,用于监听MQ消息,但是注意这里指定autoStartup为不启动

package com.example.springboot.listener;

import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.Executor;

@Component
@Slf4j
public class MyRabbitListener {

    @RabbitListener(id = "myQueueID", autoStartup = "false", queues = "myQueue",containerFactory = "myQueueFactory")
    @RabbitHandler
    @Async(value = "myTaskExecutor")
    public void processModBusMessage(String content, Message msg, Channel channel) throws IOException {
        System.out.println(JSON.toJSONString(content));
        //手动进行应答
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }

}


创建Spring的Runner,并指定启动顺序为最后,启动后打开MQ监听

package com.example.springboot.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@Order(value = 99)
public class MqInitRunner implements ApplicationRunner {

    private final RabbitListenerEndpointRegistry registry;

    // 这里红线提示无需处理
    @Autowired
    public MqInitRunner(RabbitListenerEndpointRegistry registry) {
        this.registry =  registry;
    }

    @Override
    public void run(ApplicationArguments args) {
        //得到容器的对象
        MessageListenerContainer container = registry.getListenerContainer("myQueueID");
        //判断容器状态
        if(!container.isRunning()){
            //开启容器
            container.start();
            log.info("开启MQ容器 成功");
        }
    }

}


我们事先在MQ的队列中增加几条消息,任意消息均可,然后看启动顺序

11:09:32.938  INFO o.s.a.r.c.CachingConnectionFactory       LN:651 Attempting to connect to: 127.0.0.1:5672
11:09:32.955  INFO o.s.a.r.c.CachingConnectionFactory       LN:590 Created new connection: myConnectionFactory#4357524b:0/SimpleConnection@665ed71a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64387]
11:09:32.983  INFO c.e.springboot.listener.MqInitRunner     LN:33 开启MQ容器 成功
系统启动成功  
通过以下地址访问 http://localhost:1088
              http://172.31.240.1:1088
"2"
"1"
"3"
11:10:11.130  INFO o.s.a.r.l.SimpleMessageListenerContainer LN:652 Waiting for workers to finish.


可以看到,消息的处理,是在系统正式启动成功后才开始运行的。

上一篇: 强制浏览器Edge、GoogleChrome、Firefox中强制启用黑色模式
下一篇: Minio的安装和Java使用示例
发表评论:
您的网名:
个人主页:
编辑内容: