当前位置:网站首页>定时线程池实现请求合并
定时线程池实现请求合并
2022-07-02 06:33:00 【保护我方胖虎】
一、什么是请求合并
在WEB项目中,我们一般会使用HTTP协议来处理请求
那么我们与服务器交互方式将会是这样的,一次请求,一次处理

我们都知道,调用批量接口相比调用非批量接口有更大的性能优势(因为减少了IO交互操作),在高并发情况下,如果有非常频繁的接口请求发生的话,我们则可以考虑请求合并了,将多个请求进行一定的等待延迟,当请求累计达到一定量级的时候,进行批量请求处理
二、请求合并的优缺点
所谓请求合并,就是讲多次请求合并为一次批量请求

优点:
将多次请求处理进行一定时间或请求数量的等待,使之合并成为一次请求,减少IO交互
缺点:
由于请求需要等待指定时间或指定请求数量,所以合并的接口存在延时,故对请求合并的接口有所限制,该接口不能对响应及时性有要求,支持一定时间的延迟
三、请求合并技术实现
采用定时线程池ScheduledExecutorService,与内存队列LinkedBlockingDeque进行实现请求合并
原理是将用户的请求进行缓存起来,缓存的请求数量达到指定数量或达到定时线程池执行时,将已有多个单请求处理合并为多处理,调用批量接口进行操作
依赖
只需要JDK,无需任何第三方依赖
批量请求合并工具类定义如下:
核心原理就是 将请求放入队列,放入时检测内存队列数量是否超过设置阈值,以及时间阈值到期触发定时线程池执行
package com.leilei.support;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/** * @author lei * @create 2022-03-23 14:39 * @desc 请求合并工具类 **/
@Log4j2
public class BatchCollapser<T, R> {
private static final Map<Class, BatchCollapser> BATCH_INSTANCE =new ConcurrentHashMap<>();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private final LinkedBlockingDeque<T> batchContainer = new LinkedBlockingDeque<>();
private final BatchHandler<List<T>, R> handler;
private final int countThreshold;
/** * constructor * * @param handler 处理器 * @param countThreshold 数量阈值,达到此阈值后触发处理器 * @param timeThreshold 时间阈值,达到此时间后触发处理器 */
private BatchCollapser(BatchHandler<List<T>, R> handler, int countThreshold, long timeThreshold) {
this.handler = handler;
this.countThreshold = countThreshold;
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_TIME);
} catch (Exception e) {
log.error("pop-up container exception", e);
}
}, timeThreshold, timeThreshold, TimeUnit.SECONDS);
}
/** * 添加请求元素入队 * @param event */
public void addRequestParam(T event) {
batchContainer.add(event);
if (batchContainer.size() >= countThreshold) {
popUpAndHandler(BatchHandlerType.BATCH_HANDLER_TYPE_DATA);
}
}
/** * 从队列获取请求,并进行批量处理 * @param handlerType */
private void popUpAndHandler(BatchHandlerType handlerType) {
List<T> tryHandlerList = Collections.synchronizedList(new ArrayList<>(countThreshold));
batchContainer.drainTo(tryHandlerList, countThreshold);
if (tryHandlerList.size() < 1) {
return;
}
try {
R handle = handler.handle(tryHandlerList, handlerType);
log.info("批处理工具执行result:{}", handle);
} catch (Exception e) {
log.error("batch execute error, transferList:{}", tryHandlerList, e);
}
}
/** * 获取合并器实例 * * @param batchHandler 处理执行器 * @param countThreshold 阈值数量(队列数量) * @param timeThreshold 阈值时间 单位秒(目前设置是触发后获取阈值数量请求,可根据需要修改) * @param <E> * @param <R> * @return */
public static <E, R> BatchCollapser<E, R> getInstance(BatchHandler<List<E>, R> batchHandler, int countThreshold, long timeThreshold) {
Class jobClass = batchHandler.getClass();
if (BATCH_INSTANCE.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
BATCH_INSTANCE.putIfAbsent(jobClass, new BatchCollapser<>(batchHandler, countThreshold, timeThreshold));
}
}
return BATCH_INSTANCE.get(jobClass);
}
/** * 请求处理接口 * * @param <T> * @param <R> */
public interface BatchHandler<T, R> {
/** * 处理用户具体请求 * * @param input * @param handlerType * @return */
R handle(T input, BatchHandlerType handlerType);
}
/** * 合并执行类型枚举 */
public enum BatchHandlerType {
/** * 数量类型 */
BATCH_HANDLER_TYPE_DATA,
/** * 时间类型 */
BATCH_HANDLER_TYPE_TIME,
}
}
使用方式如下:
package com.leilei.support;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
/** * @author lei * @create 2022-03-22 15:22 * @desc **/
@Service
public class ProductService implements BatchCollapser.BatchHandler<List<Integer>, Integer> {
private BatchCollapser<Integer, Integer> batchCollapser;
@PostConstruct
private void postConstructorInit() {
// 当请求数量达到20个,或每过5s合并执行一次请求
batchCollapser = BatchCollapser.getInstance(ProductService.this, 20, 5);
}
@Override
public Integer handle(List<Integer> input, BatchCollapser.BatchHandlerType handlerType) {
System.out.println("处理类型:" + handlerType + ",接受到批量请求参数:" + input);
return input.stream().mapToInt(x -> x).sum();
}
/** * 假设我这里是300ms一次请求 */
@Scheduled(fixedDelay = 300)
public void aaa() {
Integer requestParam = (int) (Math.random() * 100) + 1;
batchCollapser.addRequestParam(requestParam);
System.out.println("当前请求参数:" + requestParam);
}
}
@Data
public class Product {
private Integer id;
private String notes;
}

当然以上工具类仅仅只是DEMO,各位大佬可自行完善,权衡请求合并利弊,降低服务器在高并发请求时的压力
边栏推荐
- AMQ6126问题解决思路
- 洞见云原生|微服务及微服务架构浅析
- 【Go实战基础】gin 如何验证请求参数
- How to realize asynchronous programming in a synchronous way?
- Talk about the secret of high performance of message queue -- zero copy technology
- Mysql安装时mysqld.exe报`应用程序无法正常启动(0xc000007b)`
- 京东高级工程师开发十年,编写出:“亿级流量网站架构核心技术”
- Pyspark de duplication dropduplicates, distinct; withColumn、lit、col; unionByName、groupBy
- Don't spend money, spend an hour to build your own blog website
- 微服务实战|手把手教你开发负载均衡组件
猜你喜欢

Chrome用户脚本管理器-Tampermonkey 油猴

十年开发经验的程序员告诉你,你还缺少哪些核心竞争力?

Matplotlib剑客行——初相识Matplotlib

The channel cannot be viewed when the queue manager is running

盘点典型错误之TypeError: X() got multiple values for argument ‘Y‘

C language implementation of mine sweeping game
![[go practical basis] how to set the route in gin](/img/23/f38d68c4fd238d453b9a7670483002.png)
[go practical basis] how to set the route in gin

Number structure (C language) -- Chapter 4, compressed storage of matrices (Part 2)

Troubleshooting and handling of an online problem caused by redis zadd

机器学习之数据类型案例——基于朴素贝叶斯法,用数据辩男女
随机推荐
From concept to method, the statistical learning method -- Chapter 3, k-nearest neighbor method
win10使用docker拉取redis镜像报错read-only file system: unknown
oracle删除表空间及用户
Jd.com interviewer asked: what is the difference between using on or where in the left join association table and conditions
Matplotlib swordsman line - layout guide and multi map implementation (Updated)
概率还不会的快看过来《统计学习方法》——第四章、朴素贝叶斯法
Matplotlib swordsman - a stylist who can draw without tools and code
我服了,MySQL表500W行,居然有人不做分区?
Mirror protocol of synthetic asset track
Programmers with ten years of development experience tell you, what core competitiveness do you lack?
Shengshihaotong and Guoao (Shenzhen) new energy Co., Ltd. build the charging pile industry chain
Jingdong senior engineer has developed for ten years and compiled "core technology of 100 million traffic website architecture"
WSL installation, beautification, network agent and remote development
2022/2/14 summary
使用IBM MQ远程连接时报错AMQ 4043解决思路
Gocv boundary fill
Matplotlib swordsman Tour - an artist tutorial to accommodate all rivers
Cloudrev self built cloud disk practice, I said that no one can limit my capacity and speed
QT -- how to set shadow effect in QWidget
Essay: RGB image color separation (with code)