当前位置:网站首页>Timed thread pool implements request merging
Timed thread pool implements request merging
2022-07-02 09:21:00 【Protect our fat tiger】
One 、 What is a request merge
stay WEB In the project , We usually use HTTP Protocol to handle requests
Then the way we interact with the server will be like this , One request , A processing

We all know , Calling batch interfaces has greater performance advantages than calling non batch interfaces ( Because there's less IO interactions ), In high concurrency , If there are very frequent interface requests , We can consider requesting a merger , Wait for multiple requests for a certain delay , When the cumulative number of requests reaches a certain level , Batch request processing
Two 、 Advantages and disadvantages of requesting consolidation
The so-called request consolidation , That is to say, multiple requests are merged into one batch request

advantage :
Wait for a certain time or number of requests to process multiple requests , Merge them into one request , Reduce IO Interaction
shortcoming :
Because the request needs to wait for a specified time or specify the number of requests , Therefore, there is a delay in the merged interface , Therefore, there are restrictions on the interface requesting consolidation , This interface cannot require timeliness of response , Support a certain time delay
3、 ... and 、 Request merge technology implementation
Adopt timed thread pool ScheduledExecutorService, And memory queues LinkedBlockingDeque Merge implementation requests
The principle is to cache the user's request , When the number of cached requests reaches the specified number or reaches the scheduled thread pool execution , Merge multiple existing single request processing into multiple processing , Call the batch interface to operate
rely on
It only needs JDK, There is no need for any third party to rely on
The batch request merge tool class is defined as follows :
The core principle is Put the request in the queue , Check whether the number of memory queues exceeds the set threshold , And the expiration of the time threshold triggers the scheduled thread pool execution
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/** * @author lei * @create 2022-03-23 14:39 * @desc Request to merge tool classes **/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/** * constructor * * @param handler processor * @param countThreshold Quantity threshold , When this threshold is reached, the processor is triggered * @param timeThreshold Time threshold , When this time is reached, trigger the processor */
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/** * Add request element to queue * @param event */
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/** * Get request from queue , And conduct batch processing * @param handlerType */
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info(" The batch tool executes result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/** * Get the combiner instance * * @param batchHandler Process actuator * @param countThreshold Threshold number ( Number of queues ) * @param timeThreshold Threshold time Unit second ( The current setting is to obtain the threshold number request after triggering , It can be modified as needed ) * @param <E> * @param <R> * @return */
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/** * Request processing interface * * @param <T> * @param <R> */
public interface BatchHandler<T, R> {
/** * Handle user specific requests * * @param input * @param handlerType * @return */
R handle(T input, BatchHandlerType handlerType);
}
/** * Merge execution type enumeration */
public enum BatchHandlerType {
/** * Quantity type */
BATCH_HANDLER_TYPE_DATA,
/** * Time type */
BATCH_HANDLER_TYPE_TIME,
}
}
Use as follows :
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/** * @author lei * @create 2022-03-22 15:22 * @desc **/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// When the number of requests reaches 20 individual , Or every time 5s Merge and execute a request
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println(" Type of treatment :" + handlerType + ", Received batch request parameters :" + input);
return input.stream().mapToInt(x -> x).sum();
}
/** * Suppose here I am 300ms One request */
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println(" Current request parameters :" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}

Of course, the above tools are just DEMO, You can improve yourself , Weigh the pros and cons of requesting a merger , Reduce the pressure of the server in high concurrent requests
边栏推荐
- MySql报错:unblock with mysqladmin flush-hosts
- ORA-12514问题解决方法
- Win10 uses docker to pull the redis image and reports an error read only file system: unknown
- Oracle修改表空间名称以及数据文件
- [go practical basis] gin efficient artifact, how to bind parameters to structures
- [staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the
- 分布式服务架构精讲pdf文档:原理+设计+实战,(收藏再看)
- Oracle modify database character set
- Chrome浏览器插件-Fatkun安装和介绍
- JVM instruction mnemonic
猜你喜欢

别找了,Chrome浏览器必装插件都在这了

Number structure (C language -- code with comments) -- Chapter 2, linear table (updated version)

Avoid breaking changes caused by modifying constructor input parameters

Ora-12514 problem solving method

Matplotlib swordsman Tour - an artist tutorial to accommodate all rivers
![[go practical basis] how to install and use gin](/img/0d/3e899bf69abf4e8cb7e6a0afa075a9.png)
[go practical basis] how to install and use gin

Chrome视频下载插件–Video Downloader for Chrome

Statistical learning methods - Chapter 5, decision tree model and learning (Part 1)
![[staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the](/img/dc/c0ea188ef353ded86759dbe9b29df3.jpg)
[staff] the lines and spaces of the staff (the nth line and the nth space in the staff | the plus N line and the plus N space on the staff | the plus N line and the plus N space below the staff | the

How to realize asynchronous programming in a synchronous way?
随机推荐
京东面试官问:LEFT JOIN关联表中用ON还是WHERE跟条件有什么区别
cmd窗口中中文呈现乱码解决方法
WSL安装、美化、网络代理和远程开发
Break the cocoon | one article explains what is the real cloud primordial
Mysql 多列IN操作
Pdf document of distributed service architecture: principle + Design + practice, (collect and see again)
使用IBM MQ远程连接时报错AMQ 4043解决思路
Matplotlib剑客行——布局指南与多图实现(更新)
Chrome browser plug-in fatkun installation and introduction
Oracle修改表空间名称以及数据文件
Leetcode sword finger offer brush questions - day 22
「Redis源码系列」关于源码阅读的学习与思考
win10使用docker拉取redis镜像报错read-only file system: unknown
概率还不会的快看过来《统计学习方法》——第四章、朴素贝叶斯法
西瓜书--第六章.支持向量机(SVM)
[go practical basis] how to customize and use a middleware in gin
Knowledge points are very detailed (code is annotated) number structure (C language) -- Chapter 3, stack and queue
Microservice practice | declarative service invocation openfeign practice
十年开发经验的程序员告诉你,你还缺少哪些核心竞争力?
Dix ans d'expérience dans le développement de programmeurs vous disent quelles compétences de base vous manquez encore?