Java小强个人技术博客站点    手机版
当前位置: 首页 >> 开源 >> 内存队列Disruptor

内存队列Disruptor

12960 开源 | 2023-7-4

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。

Disruptor 提供的功能类似于 Kafka、RocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。


Github 地址:https://github.com/LMAX-Exchange/disruptor 

官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html 

Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题。以下为官方给出的对比图:

latency-histogram.png


参照官方示例,写一个HelloWorld工程,首先是引入相关依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>


创建一个Event,我们可以理解为MQ中的消息体

package com.example.demo.mq;
import lombok.Data;
@Data
public class LogEvent {
    private String message;
    // 可以定义其他属性
}


创建LogEventFactory用于创建Event

package com.example.demo.mq;
import com.lmax.disruptor.EventFactory;
public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}


创建Handler,这是负责处理消息的地方

package com.example.demo.mq;
import com.lmax.disruptor.EventHandler;
public class LogEventHandler implements EventHandler<LogEvent> {
    /**
     * 处理事件的 Handler
     * @param logEvent 待消费/处理的事件
     * @param sequence 正在处理的事件在环形数组(RingBuffer)中的位置
     * @param endOfBatch 表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)
     */
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) {
        System.out.println(logEvent.getMessage());
    }
}


创建测试类

package com.example.demo;
import com.example.demo.mq.LogEvent;
import com.example.demo.mq.LogEventFactory;
import com.example.demo.mq.LogEventHandler;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class DisruptorTest {
    @Test
    public void test1() {
        int bufferSize = 1024;
        // 获取 Disruptor 对象
        // Disruptor<LogEvent> disruptor = new Disruptor<>(LogEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
        Disruptor<LogEvent> disruptor = getLogEventDisruptor();
        // 绑定处理事件的Handler对象
        // 以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
        disruptor.handleEventsWith(new LogEventHandler());
        // 下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3
        // disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
        // 启动 Disruptor
        disruptor.start();
        // 获取保存事件的环形数组(RingBuffer)
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
        // 发布 5个 个事件
        for (int i = 1; i <= 5; i++) {
            try {
                // 初始化 Event,对其赋值
                ringBuffer.publishEvent((event, sequence, buffer) -> event.setMessage("这是日志消息 " + System.nanoTime()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        // 关闭 Disruptor
        disruptor.shutdown();
        // 模拟堵塞主线程不退出
        try {
            while (true) {
                Thread.sleep(5000);
            }
        } catch (Exception e) {

        }
    }
    private static Disruptor<LogEvent> getLogEventDisruptor() {
        // 创建 LogEvent 的工厂
        LogEventFactory logEventFactory = new LogEventFactory();
        // Disruptor 的 RingBuffer 缓存大小,环形缓冲器的大小,必须是2的幂
        int bufferSize = 1024;
        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            // 线程安全的计数类
            // https://www.javacui.com/Theory/691.html
            final AtomicInteger threadNum = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                System.out.println("LogEventThread" + " [#" + threadNum.incrementAndGet() + "] 已创建");
                return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
            }
        };
        //实例化 Disruptor
        return new Disruptor<>(
                logEventFactory, // 自定义的事件工厂
                bufferSize,      // 自定义的时间工厂
                threadFactory,   // 自定义的线程工厂。Disruptor的默认线程池是自定义的,我们只需要传入线程工厂即可。
                ProducerType.MULTI,          // 指定是单个事件发布者模式还是多个事件发布者模式
                new BlockingWaitStrategy()); // 阻塞等待策略
    }
}


运行程序,可以看到打印输出。


END

推荐您阅读更多有关于“ mq 消息队列 内存队列 Disruptor ”的文章

上一篇:dynamic动态数据源编码切换数据源 下一篇:dynamic动态数据源集成druid实现多数据源

猜你喜欢

发表评论: