当前位置:网站首页>Producer consumer model of concurrent model
Producer consumer model of concurrent model
2022-07-29 03:18:00 【Program cat Dagang】
producer - Consumer model
Producer consumer pattern is a classic multi thread design pattern .
summary :
- The producer thread submits the task to the memory buffer , The consumer thread gets the task from the memory buffer and executes .
- Through memory buffer , Avoid direct communication between producers and consumers , Thus decoupling producers and consumers .
- Through memory buffer , Allow performance differences between producers and consumers .
stay JDK Thread pool provided in (ThreadPoolExecutor) It is a typical producer consumer model ( The task is thread ), The implementation of memory buffer uses BlockingQueue Blocking queues .
producer - Consumer model ( No lock implementation )
stay ThreadPoolExecutor Used in BlockingQueue Block the queue to make the memory buffer , However, due to the use of locks and blocking waiting to achieve synchronization between threads , So the new energy is not high .
and LMAX The company has developed a framework of high-performance producer consumer mode without lock , be called Disruptor.
Example :( Producers generate data , The consumer calculates the square of the data )
Introduce dependencies :
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
Data entity :
public class PCData {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
Data factory :
public class PCDataFactory implements EventFactory<PCData> {
@Override
public PCData newInstance() {
return new PCData();
}
}
producer :
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb){
// Get the next sequence on the ring
long sequence = ringBuffer.next();
PCData data = ringBuffer.get(sequence);
// Set up the data
data.setValue(bb.getLong(0));
// Release sequence
ringBuffer.publish(sequence);
}
}
consumer :
import com.lmax.disruptor.WorkHandler;
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData pcData) throws Exception {
// Print the square value
System.out.println(Thread.currentThread().getName() +
" -- value="+pcData.getValue() +
" -- square ="+Math.pow(pcData.getValue(),2));
}
}
client :
public class Main {
public static void main(String[] args) throws InterruptedException {
// The size needs to be 2 The power of
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<>(
new PCDataFactory(),
bufferSize,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
// Choose the right strategy , Improve consumer response time
new BlockingWaitStrategy() // Blocking waiting strategy
// new SleepingWaitStrategy() // Sleep waiting strategy
// new YieldingWaitStrategy() // Humble waiting strategy
// new BusySpinWaitStrategy() // Busy spin wait strategy , Dead cycle
);
// 4 Consumers
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer()
);
disruptor.start();
// Generate the data
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
long size = 1000L;
// 2 A producer
new Thread(()->{
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = 0L;i<size;i++){
bb.putLong(0,i);
producer.pushData(bb);
System.out.println(Thread.currentThread().getName() + " - Generate data :"+i);
}
}).start();
new Thread(()->{
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = size;i<2*size;i++){
bb.putLong(0,i);
producer.pushData(bb);
System.out.println(Thread.currentThread().getName() + " - Generate data :"+i);
}
}).start();
}
}
summary :
- Choose the right strategy , Improve consumer response time
new BlockingWaitStrategy() // Blocking waiting strategy , province CPU
new SleepingWaitStrategy() // Sleep waiting strategy , Moderate delay , Sleep after spin waiting fails , Don't take up too much CPU
new YieldingWaitStrategy() // Humble waiting strategy , Low latency ,CPU The physical core is larger than the number of threads
new BusySpinWaitStrategy() // Busy spin wait strategy , Dead cycle , Eat all CPU resources
DisruptorYesSequenceUse the method of aligned filling to solve CPU Cache pseudo sharing problem .
CPU Cache pseudo share
Look at the picture below , To know CPU Cache pseudo share The problem of 
You can align the stored data to the cache row using padding (64 byte ) size , Only one data is stored in each cache line .
The following code snippet is Disruptor in Sequence inherited RhsPadding class , It is filled with 7 individual long Type value ( One long type 64 Bitwise is 8 byte , repair 7 One plus one of my own 8 individual , total 64 byte , It just takes up a cache line size )
class RhsPadding extends Value {
protected long p9;
protected long p10;
protected long p11;
protected long p12;
protected long p13;
protected long p14;
protected long p15;
RhsPadding() {
}
}
Reference material
- Books Ge Yiming * 《Java High concurrency programming 》
边栏推荐
- Summary of basic knowledge points of C language
- The Federal Reserve raised interest rates again, Powell "let go of doves" at 75 basis points, and US stocks reveled
- Shardingsphere's level table practice (III)
- GJB common confused concepts
- What if MySQL forgets the password
- Server operation management system
- C traps and defects Chapter 3 semantic "traps" 3.2 pointers to non arrays
- 生产部署zabbix5.0笔记
- C traps and defects Chapter 3 semantic "traps" 3.7 evaluation order
- C traps and defects Chapter 3 semantic "traps" 3.1 pointers and arrays
猜你喜欢

Example analysis of while, repeat and loop loops in MySQL process control

MYCAT read / write separation configuration

Shardingsphere's level table practice (III)

Implement Lmax disruptor queue from scratch (VI) analysis of the principle of disruptor solving pseudo sharing and consumers' elegant stopping

Singleton mode (hungry and lazy)

Shell programming specifications and variables

Ten thousand words detailed Google play online application standard package format AAB

年内首个“三连跌” 95号汽油回归“8元时代“

Hangao database best practice configuration tool Hg_ BP log collection content

复现20字符短域名绕过以及xss相关知识点
随机推荐
Reproduce 20 character short domain name bypass and XSS related knowledge points
Incremental real-time disaster recovery notes
13_ UE4 advanced_ Montage animation realizes attack while walking
Navicat new database
STC单片机驱动1.8‘TFT SPI屏幕演示示例(含资料包)
What is SOA (Service Oriented Architecture)?
Typescript学习(一)
MYSQL入门与进阶(十二)
微信为之疯狂的Glide使用——之生命周期学习
Introduction and advanced MySQL (XIV)
Score addition and subtraction of force deduction and brushing questions (one question per day 7/27)
Linux下安装MySQL8.0的详细步骤
C陷阱与缺陷 第3章 语义“陷阱” 3.9 整数溢出
Redis之sentinel哨兵集群怎么部署
力扣刷题之数组序号计算(每日一题7/28)
Tp5.0 applet users do not need to log in and directly obtain the user's mobile number.
mycat读写分离配置
C陷阱和缺陷 第3章 语义“陷阱” 3.2 非数组的指针
13_ue4进阶_蒙太奇动画实现一边走一边攻击
C traps and defects Chapter 3 semantic "traps" 3.3 array declaration as parameters