当前位置:网站首页>Timed thread pool implements request merging
Timed thread pool implements request merging
2022-07-02 09:21:00 【Protect our fat tiger】
One 、 What is a request merge
stay WEB In the project , We usually use HTTP Protocol to handle requests
Then the way we interact with the server will be like this , One request , A processing

We all know , Calling batch interfaces has greater performance advantages than calling non batch interfaces ( Because there's less IO interactions ), In high concurrency , If there are very frequent interface requests , We can consider requesting a merger , Wait for multiple requests for a certain delay , When the cumulative number of requests reaches a certain level , Batch request processing
Two 、 Advantages and disadvantages of requesting consolidation
The so-called request consolidation , That is to say, multiple requests are merged into one batch request

advantage :
Wait for a certain time or number of requests to process multiple requests , Merge them into one request , Reduce IO Interaction
shortcoming :
Because the request needs to wait for a specified time or specify the number of requests , Therefore, there is a delay in the merged interface , Therefore, there are restrictions on the interface requesting consolidation , This interface cannot require timeliness of response , Support a certain time delay
3、 ... and 、 Request merge technology implementation
Adopt timed thread pool ScheduledExecutorService, And memory queues LinkedBlockingDeque Merge implementation requests
The principle is to cache the user's request , When the number of cached requests reaches the specified number or reaches the scheduled thread pool execution , Merge multiple existing single request processing into multiple processing , Call the batch interface to operate
rely on
It only needs JDK, There is no need for any third party to rely on
The batch request merge tool class is defined as follows :
The core principle is Put the request in the queue , Check whether the number of memory queues exceeds the set threshold , And the expiration of the time threshold triggers the scheduled thread pool execution
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/** * @author lei * @create 2022-03-23 14:39 * @desc Request to merge tool classes **/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/** * constructor * * @param handler processor * @param countThreshold Quantity threshold , When this threshold is reached, the processor is triggered * @param timeThreshold Time threshold , When this time is reached, trigger the processor */
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/** * Add request element to queue * @param event */
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/** * Get request from queue , And conduct batch processing * @param handlerType */
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info(" The batch tool executes result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/** * Get the combiner instance * * @param batchHandler Process actuator * @param countThreshold Threshold number ( Number of queues ) * @param timeThreshold Threshold time Unit second ( The current setting is to obtain the threshold number request after triggering , It can be modified as needed ) * @param <E> * @param <R> * @return */
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/** * Request processing interface * * @param <T> * @param <R> */
public interface BatchHandler<T, R> {
/** * Handle user specific requests * * @param input * @param handlerType * @return */
R handle(T input, BatchHandlerType handlerType);
}
/** * Merge execution type enumeration */
public enum BatchHandlerType {
/** * Quantity type */
BATCH_HANDLER_TYPE_DATA,
/** * Time type */
BATCH_HANDLER_TYPE_TIME,
}
}
Use as follows :
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/** * @author lei * @create 2022-03-22 15:22 * @desc **/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// When the number of requests reaches 20 individual , Or every time 5s Merge and execute a request
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println(" Type of treatment :" + handlerType + ", Received batch request parameters :" + input);
return input.stream().mapToInt(x -> x).sum();
}
/** * Suppose here I am 300ms One request */
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println(" Current request parameters :" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}

Of course, the above tools are just DEMO, You can improve yourself , Weigh the pros and cons of requesting a merger , Reduce the pressure of the server in high concurrent requests
边栏推荐
- [staff] time sign and note duration (full note | half note | quarter note | eighth note | sixteenth note | thirty second note)
- Double non undergraduate students enter the factory, while I am still quietly climbing trees at the bottom (Part 1)
- Redis 序列化 GenericJackson2JsonRedisSerializer和Jackson2JsonRedisSerializer的区别
- Chrome视频下载插件–Video Downloader for Chrome
- 聊聊消息队列高性能的秘密——零拷贝技术
- 查看was发布的应用程序的端口
- 别找了,Chrome浏览器必装插件都在这了
- CSDN Q & A_ Evaluation
- DTM distributed transaction manager PHP collaboration client V0.1 beta release!!!
- 微服务实战|熔断器Hystrix初体验
猜你喜欢
![[go practical basis] how can gin get the request parameters of get and post](/img/fd/66074d157d93bcf20a5d3b37da9b3e.png)
[go practical basis] how can gin get the request parameters of get and post

企业级SaaS CRM实现

Don't look for it. All the necessary plug-ins for Chrome browser are here

破茧|一文说透什么是真正的云原生

Don't spend money, spend an hour to build your own blog website

机器学习之数据类型案例——基于朴素贝叶斯法,用数据辩男女

Matplotlib swordsman - a stylist who can draw without tools and code

C language implementation of mine sweeping game

京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别

Taking the upgrade of ByteDance internal data catalog architecture as an example, talk about the performance optimization of business system
随机推荐
Elastic Stack之Beats(Filebeat、Metricbeat)、Kibana、Logstash教程
MySql报错:unblock with mysqladmin flush-hosts
[staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the
Gocv split color channel
【Go实战基础】如何安装和使用 gin
JVM instruction mnemonic
概率还不会的快看过来《统计学习方法》——第四章、朴素贝叶斯法
Attributes of classfile
Microservice practice | teach you to develop load balancing components hand in hand
Oracle modify database character set
Gocv image cutting and display
The channel cannot be viewed when the queue manager is running
Amq6126 problem solving ideas
Redis installation and deployment (windows/linux)
Talk about the secret of high performance of message queue -- zero copy technology
Pdf document of distributed service architecture: principle + Design + practice, (collect and see again)
Number structure (C language -- code with comments) -- Chapter 2, linear table (updated version)
Pyspark de duplication dropduplicates, distinct; withColumn、lit、col; unionByName、groupBy
Data type case of machine learning -- using data to distinguish men and women based on Naive Bayesian method
告别996,IDEA中必装插件有哪些?