当前位置:网站首页>高并发框架 Disruptor
高并发框架 Disruptor
2022-07-30 03:48:00 【OoZzzy】
1.Disruptor介绍
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。
从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。
2.Disruptor 的核心概念
- RingBuffer: 环形缓冲区
- Sequence:
- SequenceBarrier
- Wait Strategy: 等待的事件的策略
- Event: 事件
- EventHandler: 事件处理接口
- EventProcessor: 事件持有者
- Producer: 提供者

2.1 Ring Buffer
环形的缓冲区, 曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
2.2 Sequence Disruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
2.3 Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
2.4 Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
2.5 Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
2.6 Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
2.7 EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
2.8 EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
2.9 Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
3.demo
maven
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
消息体Model
@Data
public class MessageModel {
private String message;
}
EventFactory
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
EventHandler-消费者
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
//这里停止1000ms是为了确定消费消息是异步的
Thread.sleep(1000);
log.info("消费者处理消息开始");
if (event != null) {
log.info("消费者消费的信息是:{}",event);
}
} catch (Exception e) {
log.info("消费者处理消息失败");
}
log.info("消费者处理消息结束");
}
}
BeanManager
/** * 获取实例化对象 */
@Component
public class BeanManager implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext; }
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}
MQManager
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
ExecutorService executor = Executors.newFixedThreadPool(2);
//指定事件工厂
HelloEventFactory factory = new HelloEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单线程模式,获取额外的性能
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
//设置事件业务处理器---消费者
disruptor.handleEventsWith(new HelloEventHandler());
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
}
Mqservice和实现类-生产者
public interface DisruptorMqService {
/** * 消息 * @param message */
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//获取下一个Event槽的下标
long sequence = messageModelRingBuffer.next();
try {
//给Event填充数据
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//发布Event,激活观察者去消费,将sequence传递给该消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}
测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {
@Autowired
private DisruptorMqService disruptorMqService;
/** * 项目内部使用Disruptor做消息队列 * @throws Exception */
@Test
public void sayHelloMqTest() throws Exception{
disruptorMqService.sayHelloMq("消息到了,Hello world!");
log.info("消息队列已发送完毕");
//这里停止2000ms是为了确定是处理消息是异步的
Thread.sleep(2000);
}
}
Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。
边栏推荐
- Anti-shake and throttling
- 基于全志D1-H和XR806的名贵植物监控装置
- Let's learn the layout components of flutter together
- 操作配置:如何在一台服务器中以服务方式运行多个EasyCVR程序?
- Taobao/Tmall get the list of sold product orders API
- Public chains challenging the "Impossible Triangle"
- 精品:淘宝/天猫获取购买到的商品订单详情 API
- What is the difference between mission, vision and values?
- 小程序毕设作品之微信二手交易小程序毕业设计成品(4)开题报告
- MySQ deadlock
猜你喜欢

Let's learn the layout components of flutter together

星光不问赶路人!武汉校区小姐姐三个月成功转行软件测试,收获9k+13薪!

组织在线化:组织数字化变革的新趋势

路由过滤器

Organizations Going Online: A New Trend in Organizational Digital Transformation

Operational configuration: How to run multiple EasyCVR programs as a service in one server?

sql中 exists的用法

Introduction to management for technical people 1: What is management

Hystrix 服务熔断

Nacos cluster partition
随机推荐
Nacos集群分区
spicy (two) unit hooks
vscode 调试和远程
Gateway routing gateway
Taobao/Tmall get the list of sold product orders API
OA项目之待开会议&历史会议&所有会议
小程序毕设作品之微信二手交易小程序毕业设计成品(2)小程序功能
淘宝H5接口获取app数据6.0格式
Problems caused by List getting the difference
小程序毕设作品之微信二手交易小程序毕业设计成品(8)毕业设计论文模板
Starlight does not ask passers-by!The young lady on the Wuhan campus successfully switched to software testing in three months and received a salary of 9k+13!
List获取差集产生的问题
Operational configuration: How to run multiple EasyCVR programs as a service in one server?
论坛管理系统
第51篇-知乎请求头参数分析【2022-07-28】
Organizations Going Online: A New Trend in Organizational Digital Transformation
Nacos cluster partition
Mini Program Graduation Works WeChat Points Mall Mini Program Graduation Design Finished Work (5) Task Book
Mini Program Graduation Works WeChat Second-hand Trading Mini Program Graduation Design Finished Works (7) Interim Inspection Report
ospf 综合实验(重发布,特殊区域)