当前位置:网站首页>Timed thread pool implements request merging
Timed thread pool implements request merging
2022-07-02 09:21:00 【Protect our fat tiger】
One 、 What is a request merge
stay WEB In the project , We usually use HTTP Protocol to handle requests
Then the way we interact with the server will be like this , One request , A processing

We all know , Calling batch interfaces has greater performance advantages than calling non batch interfaces ( Because there's less IO interactions ), In high concurrency , If there are very frequent interface requests , We can consider requesting a merger , Wait for multiple requests for a certain delay , When the cumulative number of requests reaches a certain level , Batch request processing
Two 、 Advantages and disadvantages of requesting consolidation
The so-called request consolidation , That is to say, multiple requests are merged into one batch request

advantage :
Wait for a certain time or number of requests to process multiple requests , Merge them into one request , Reduce IO Interaction
shortcoming :
Because the request needs to wait for a specified time or specify the number of requests , Therefore, there is a delay in the merged interface , Therefore, there are restrictions on the interface requesting consolidation , This interface cannot require timeliness of response , Support a certain time delay
3、 ... and 、 Request merge technology implementation
Adopt timed thread pool ScheduledExecutorService, And memory queues LinkedBlockingDeque Merge implementation requests
The principle is to cache the user's request , When the number of cached requests reaches the specified number or reaches the scheduled thread pool execution , Merge multiple existing single request processing into multiple processing , Call the batch interface to operate
rely on
It only needs JDK, There is no need for any third party to rely on
The batch request merge tool class is defined as follows :
The core principle is Put the request in the queue , Check whether the number of memory queues exceeds the set threshold , And the expiration of the time threshold triggers the scheduled thread pool execution
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/** * @author lei * @create 2022-03-23 14:39 * @desc Request to merge tool classes **/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/** * constructor * * @param handler processor * @param countThreshold Quantity threshold , When this threshold is reached, the processor is triggered * @param timeThreshold Time threshold , When this time is reached, trigger the processor */
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/** * Add request element to queue * @param event */
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/** * Get request from queue , And conduct batch processing * @param handlerType */
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info(" The batch tool executes result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/** * Get the combiner instance * * @param batchHandler Process actuator * @param countThreshold Threshold number ( Number of queues ) * @param timeThreshold Threshold time Unit second ( The current setting is to obtain the threshold number request after triggering , It can be modified as needed ) * @param <E> * @param <R> * @return */
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/** * Request processing interface * * @param <T> * @param <R> */
public interface BatchHandler<T, R> {
/** * Handle user specific requests * * @param input * @param handlerType * @return */
R handle(T input, BatchHandlerType handlerType);
}
/** * Merge execution type enumeration */
public enum BatchHandlerType {
/** * Quantity type */
BATCH_HANDLER_TYPE_DATA,
/** * Time type */
BATCH_HANDLER_TYPE_TIME,
}
}
Use as follows :
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/** * @author lei * @create 2022-03-22 15:22 * @desc **/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// When the number of requests reaches 20 individual , Or every time 5s Merge and execute a request
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println(" Type of treatment :" + handlerType + ", Received batch request parameters :" + input);
return input.stream().mapToInt(x -> x).sum();
}
/** * Suppose here I am 300ms One request */
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println(" Current request parameters :" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}

Of course, the above tools are just DEMO, You can improve yourself , Weigh the pros and cons of requesting a merger , Reduce the pressure of the server in high concurrent requests
边栏推荐
- I've taken it. MySQL table 500W rows, but someone doesn't partition it?
- Redis installation and deployment (windows/linux)
- [go practical basis] how can gin get the request parameters of get and post
- Data type case of machine learning -- using data to distinguish men and women based on Naive Bayesian method
- 微服务实战|Eureka注册中心及集群搭建
- [staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the
- [go practical basis] how to verify request parameters in gin
- gocv opencv exit status 3221225785
- Jd.com interviewer asked: what is the difference between using on or where in the left join association table and conditions
- Watermelon book -- Chapter 5 neural network
猜你喜欢

聊聊消息队列高性能的秘密——零拷贝技术

定时线程池实现请求合并

微服务实战|原生态实现服务的发现与调用

Probability is not yet. Look at statistical learning methods -- Chapter 4, naive Bayesian method

Microservice practice | declarative service invocation openfeign practice

C language implementation of mine sweeping game

Chrome视频下载插件–Video Downloader for Chrome

Redis installation and deployment (windows/linux)

Redis zadd导致的一次线上问题排查和处理

C language - Blue Bridge Cup - 7 segment code
随机推荐
数构(C语言)——第四章、矩阵的压缩存储(下)
【Go实战基础】gin 如何绑定与使用 url 参数
Oracle delete tablespace and user
十年開發經驗的程序員告訴你,你還缺少哪些核心競爭力?
概念到方法,绝了《统计学习方法》——第三章、k近邻法
Cloudrev self built cloud disk practice, I said that no one can limit my capacity and speed
C language implementation of mine sweeping game
Oracle modify database character set
Machine learning practice: is Mermaid a love movie or an action movie? KNN announces the answer
[go practical basis] how to customize and use a middleware in gin
微服务实战|声明式服务调用OpenFeign实践
C language - Blue Bridge Cup - 7 segment code
oracle修改数据库字符集
[staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the
数构(C语言--代码有注释)——第二章、线性表(更新版)
[go practical basis] gin efficient artifact, how to bind parameters to structures
AMQ6126问题解决思路
Matplotlib swordsman Tour - an artist tutorial to accommodate all rivers
京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别
十年开发经验的程序员告诉你,你还缺少哪些核心竞争力?