当前位置:网站首页>利用线程通信、解决缓存穿透数据库雪崩
利用线程通信、解决缓存穿透数据库雪崩
2020-11-07 18:55:00 【lis1314】
业务场景:
有一个面向C端的查询接口,访问量很大,假设我们使用缓存技术进行了传统的优化,第一次查询数据时,查缓存->缓存没有->查数据库->写入缓存
但是可能会面临一个问题、在同一时刻有很多用户(假设1W)查询同一条数据(假设商品ID一致)、此时数据并没有在缓存、可能会造成数据库雪崩,原因是这个时刻可能因为同一条数据对数据库进行了1W次的SQL查询(这里不讨论预先写入缓存方案,同时这里说的1W次也是极端的情况)。
我们怎么进行优化,这是本篇文章讨论的重点:
思路效果:
无论有多少用户访问查询、假设他们查询商品数据携带的ID一致、那么只产生1条查询、其他用户线程共用同一个查询结果,那么也就从n(10000)变成了1。
这个方案适合任何多线程工作去除重复效果提高性能的场景,这里只是其中一个场景。
下面讨论实现:
A、B、C 3个线程并发查询商品ID为1的数据,传统的缓存优化缓存中没有数据时,会发起3个SQL查询访问数据库,例sql:select * from products where id = 1;
那么我们如何利用行级锁(用户ID维度)来达到A、B、C,3个线程只查询一次数据库得到对应的结果呢?
思路:首先我们需要一个通信维度(用户ID一致认为是同一行为),如果访问入参的用户ID一样,我们可以实现如下的一个效果:
第一个进入的线程进行查询操作(假设是A),那么B、C线程我们让他进入等待的状态,等待线程A的查询结果,最终达到只查询一次数据库的操作。
代码灵感来自阿里开源缓存框架jetcache,下面是改良后的公共实现,可以通用
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* 同步工具类<br/>
* 主要预防优化缓存雪崩、数据雪崩</br>
* 多线程访问的情况下、认为是同一操作的重复行为
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor 设置key转换规则<br/>
* 可以为空,默认使用fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* 同步加载函数<br/>
* 例如:A、B、C三个线程同一时刻查询执行某一个操作(查询数据库等),实际上查询的参数条件一样<br/>
* 这个时候我们可以对线程进行优化、防止数据雪崩缓存穿透,让其中的一个线程进行查库操作,其他线程等待具体工作线程的返回结果<br/>
* 如:A线程进行查库、B、C线程进行等待A线程的返回结果。
* @param <K>
* @param <V>
* @param timeout 等待时间(毫秒)(假设A线程查询超过等待时间、那么B、C线程放弃等待自己去执行业务查询)
* @param key 辨别是否是同一操作的key
* @param loader 加载数据的函数<br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
//有没有线程去做具体的获取业务数据的逻辑
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
//第一个进入的线程进行真实的访问操作
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
//其他线程进行等待操作
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
//如果等待超时,放弃等待,当前线程直接进行访问
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
//共用同一个返回结果
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
测试代码
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();//线程等待,触发多线程同步的效果
// realSelect1(id);//传统查询
realSelect2(id);//优化后
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
//线程通信的访问
Function<Long,ActivityInfo> loader = (param)->{
//记录真实的访问数据库次数(1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
//记录真实的访问数据库次数(100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println("真实调用次数="+count.get());
}
}
传统的查询结果
优化后的查询结果,只执行了1次SQL
版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
https://my.oschina.net/lis1314/blog/4707646
边栏推荐
- Chinese sub forum of | 2020 PostgreSQL Asia Conference: Pan Juan
- 三步轻松理解Kerberos协议
- yum [Errno 256] No more mirrors to try 解决方法
- 谈了多年的数字化转型,为什么还有很多企业依然“口头管理”
- After pulling four message queues into a group, they quarreled
- Benefits and functions of auto maintenance app development
- Logo design company, Nanjing
- 把 4个消息队列都拉到一个群里后,他们吵起来了
- Do you really know how to use search engines?
- 带你深入了解 GitLab CI/CD 原理及流程
猜你喜欢
How to solve the problem of blank page in Google Chrome browser
MongoDB下,启动服务时,出现“服务没有响应控制功能”解决方法
8. Swarm creates maintenance and horizontal extension service
REM: the solution of PC and mobile
How does LeadTools detect, read and write barcodes
2020-11-06:go中,谈一下调度器。
OpenCV計算機視覺學習(10)——影象變換(傅立葉變換,高通濾波,低通濾波)
Git SSH bad permissions
JS string - string string object method
Knowledge competition of garbage classification
随机推荐
Knowledge competition of garbage classification
【涂鸦物联网足迹】物联网主流通信方式
Jenkins pipline stage setting timeout
CI / CD of gitlab continuous integrated development environment
条形码识别器Dynamsoft Barcode Reader v7.5全新上线!
使用RabbitMQ实现分布式事务
8.Swarm创建维护和水平扩展Service
失眠一个整晚上
南京标志设计,logo设计公司
PHP安全:变量的前世今生
python3操作Jenkins模块api
三步轻松理解Kerberos协议
嘉宾专访|2020 PostgreSQL亚洲大会中文分论坛:岳彩波
Gantt chart grouping activities tutorial
20 XR projects roadshows, nearly 20 capital institutions attended! We sincerely invite you to attend the 2020 qcomm XR eco Partner Conference
11.Service更新
甘特图对活动进行分组教程
Chinese sub forum of | 2020 PostgreSQL Asia Conference: Pan Juan
Vue: Axios uses this pointer
11. Service update