Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。
Disruptor 提供的功能类似于 Kafka、RocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存)。
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题。以下为官方给出的对比图:
参照官方示例,写一个HelloWorld工程,首先是引入相关依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
创建一个Event,我们可以理解为MQ中的消息体
package com.example.demo.mq; import lombok.Data; @Data public class LogEvent { private String message; // 可以定义其他属性 }
创建LogEventFactory用于创建Event
package com.example.demo.mq; import com.lmax.disruptor.EventFactory; public class LogEventFactory implements EventFactory<LogEvent> { @Override public LogEvent newInstance() { return new LogEvent(); } }
创建Handler,这是负责处理消息的地方
package com.example.demo.mq; import com.lmax.disruptor.EventHandler; public class LogEventHandler implements EventHandler<LogEvent> { /** * 处理事件的 Handler * @param logEvent 待消费/处理的事件 * @param sequence 正在处理的事件在环形数组(RingBuffer)中的位置 * @param endOfBatch 表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件) */ @Override public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) { System.out.println(logEvent.getMessage()); } }
创建测试类
package com.example.demo; import com.example.demo.mq.LogEvent; import com.example.demo.mq.LogEventFactory; import com.example.demo.mq.LogEventHandler; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.junit.jupiter.api.Test; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class DisruptorTest { @Test public void test1() { int bufferSize = 1024; // 获取 Disruptor 对象 // Disruptor<LogEvent> disruptor = new Disruptor<>(LogEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); Disruptor<LogEvent> disruptor = getLogEventDisruptor(); // 绑定处理事件的Handler对象 // 以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。 disruptor.handleEventsWith(new LogEventHandler()); // 下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 // disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3()); // 启动 Disruptor disruptor.start(); // 获取保存事件的环形数组(RingBuffer) RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); // 发布 5个 个事件 for (int i = 1; i <= 5; i++) { try { // 初始化 Event,对其赋值 ringBuffer.publishEvent((event, sequence, buffer) -> event.setMessage("这是日志消息 " + System.nanoTime())); }catch (Exception e){ e.printStackTrace(); } } // 关闭 Disruptor disruptor.shutdown(); // 模拟堵塞主线程不退出 try { while (true) { Thread.sleep(5000); } } catch (Exception e) { } } private static Disruptor<LogEvent> getLogEventDisruptor() { // 创建 LogEvent 的工厂 LogEventFactory logEventFactory = new LogEventFactory(); // Disruptor 的 RingBuffer 缓存大小,环形缓冲器的大小,必须是2的幂 int bufferSize = 1024; // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory() { // 线程安全的计数类 // https://www.javacui.com/Theory/691.html final AtomicInteger threadNum = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { System.out.println("LogEventThread" + " [#" + threadNum.incrementAndGet() + "] 已创建"); return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]"); } }; //实例化 Disruptor return new Disruptor<>( logEventFactory, // 自定义的事件工厂 bufferSize, // 自定义的时间工厂 threadFactory, // 自定义的线程工厂。Disruptor的默认线程池是自定义的,我们只需要传入线程工厂即可。 ProducerType.MULTI, // 指定是单个事件发布者模式还是多个事件发布者模式 new BlockingWaitStrategy()); // 阻塞等待策略 } }
运行程序,可以看到打印输出。
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: