当前位置:网站首页>Timed thread pool implements request merging
Timed thread pool implements request merging
2022-07-02 09:21:00 【Protect our fat tiger】
One 、 What is a request merge
stay WEB In the project , We usually use HTTP Protocol to handle requests
Then the way we interact with the server will be like this , One request , A processing

We all know , Calling batch interfaces has greater performance advantages than calling non batch interfaces ( Because there's less IO interactions ), In high concurrency , If there are very frequent interface requests , We can consider requesting a merger , Wait for multiple requests for a certain delay , When the cumulative number of requests reaches a certain level , Batch request processing
Two 、 Advantages and disadvantages of requesting consolidation
The so-called request consolidation , That is to say, multiple requests are merged into one batch request

advantage :
Wait for a certain time or number of requests to process multiple requests , Merge them into one request , Reduce IO Interaction
shortcoming :
Because the request needs to wait for a specified time or specify the number of requests , Therefore, there is a delay in the merged interface , Therefore, there are restrictions on the interface requesting consolidation , This interface cannot require timeliness of response , Support a certain time delay
3、 ... and 、 Request merge technology implementation
Adopt timed thread pool ScheduledExecutorService, And memory queues LinkedBlockingDeque Merge implementation requests
The principle is to cache the user's request , When the number of cached requests reaches the specified number or reaches the scheduled thread pool execution , Merge multiple existing single request processing into multiple processing , Call the batch interface to operate
rely on
It only needs JDK, There is no need for any third party to rely on
The batch request merge tool class is defined as follows :
The core principle is Put the request in the queue , Check whether the number of memory queues exceeds the set threshold , And the expiration of the time threshold triggers the scheduled thread pool execution
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/** * @author lei * @create 2022-03-23 14:39 * @desc Request to merge tool classes **/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/** * constructor * * @param handler processor * @param countThreshold Quantity threshold , When this threshold is reached, the processor is triggered * @param timeThreshold Time threshold , When this time is reached, trigger the processor */
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/** * Add request element to queue * @param event */
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/** * Get request from queue , And conduct batch processing * @param handlerType */
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info(" The batch tool executes result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/** * Get the combiner instance * * @param batchHandler Process actuator * @param countThreshold Threshold number ( Number of queues ) * @param timeThreshold Threshold time Unit second ( The current setting is to obtain the threshold number request after triggering , It can be modified as needed ) * @param <E> * @param <R> * @return */
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/** * Request processing interface * * @param <T> * @param <R> */
public interface BatchHandler<T, R> {
/** * Handle user specific requests * * @param input * @param handlerType * @return */
R handle(T input, BatchHandlerType handlerType);
}
/** * Merge execution type enumeration */
public enum BatchHandlerType {
/** * Quantity type */
BATCH_HANDLER_TYPE_DATA,
/** * Time type */
BATCH_HANDLER_TYPE_TIME,
}
}
Use as follows :
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/** * @author lei * @create 2022-03-22 15:22 * @desc **/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// When the number of requests reaches 20 individual , Or every time 5s Merge and execute a request
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println(" Type of treatment :" + handlerType + ", Received batch request parameters :" + input);
return input.stream().mapToInt(x -> x).sum();
}
/** * Suppose here I am 300ms One request */
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println(" Current request parameters :" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}

Of course, the above tools are just DEMO, You can improve yourself , Weigh the pros and cons of requesting a merger , Reduce the pressure of the server in high concurrent requests
边栏推荐
- 盘点典型错误之TypeError: X() got multiple values for argument ‘Y‘
- Leetcode sword finger offer brush questions - day 23
- [go practical basis] how to customize and use a middleware in gin
- Mathematics in machine learning -- point estimation (I): basic knowledge
- [go practical basis] how to verify request parameters in gin
- idea查看字节码配置
- 【Go实战基础】gin 如何设置路由
- 概念到方法,绝了《统计学习方法》——第三章、k近邻法
- Mirror protocol of synthetic asset track
- Elastic Stack之Beats(Filebeat、Metricbeat)、Kibana、Logstash教程
猜你喜欢

「Redis源码系列」关于源码阅读的学习与思考

告别996,IDEA中必装插件有哪些?

【Go实战基础】gin 如何设置路由

西瓜书--第六章.支持向量机(SVM)

idea查看字节码配置

破茧|一文说透什么是真正的云原生

Don't look for it. All the necessary plug-ins for Chrome browser are here

Actual combat of microservices | discovery and invocation of original ecosystem implementation services

Matplotlib剑客行——布局指南与多图实现(更新)

win10使用docker拉取redis镜像报错read-only file system: unknown
随机推荐
Double non undergraduate students enter the factory, while I am still quietly climbing trees at the bottom (Part 1)
远程连接IBM MQ报错AMQ4036解决方法
DTM distributed transaction manager PHP collaboration client V0.1 beta release!!!
VIM操作命令大全
Number structure (C language) -- Chapter 4, compressed storage of matrices (Part 2)
C language implementation of mine sweeping game
Redis installation and deployment (windows/linux)
数构(C语言--代码有注释)——第二章、线性表(更新版)
Insight into cloud native | microservices and microservice architecture
双非本科生进大厂,而我还在底层默默地爬树(上)
Matplotlib剑客行——初相识Matplotlib
Chrome video download Plug-in – video downloader for Chrome
Knife4j 2.X版本文件上传无选择文件控件问题解决
Machine learning practice: is Mermaid a love movie or an action movie? KNN announces the answer
Dix ans d'expérience dans le développement de programmeurs vous disent quelles compétences de base vous manquez encore?
There is a problem with MySQL installation (the service already exists)
Redis sorted set data type API and application scenario analysis
[staff] common symbols of staff (Hualian clef | treble clef | bass clef | rest | bar line)
「面试高频题」难度大 1.5/5,经典「前缀和 + 二分」运用题
[staff] time mark and note duration (staff time mark | full note rest | half note rest | quarter note rest | eighth note rest | sixteenth note rest | thirty second note rest)