当前位置:网站首页>利用线程通信、解决缓存穿透数据库雪崩
利用线程通信、解决缓存穿透数据库雪崩
2020-11-07 18:55:00 【lis1314】
业务场景:
有一个面向C端的查询接口,访问量很大,假设我们使用缓存技术进行了传统的优化,第一次查询数据时,查缓存->缓存没有->查数据库->写入缓存
但是可能会面临一个问题、在同一时刻有很多用户(假设1W)查询同一条数据(假设商品ID一致)、此时数据并没有在缓存、可能会造成数据库雪崩,原因是这个时刻可能因为同一条数据对数据库进行了1W次的SQL查询(这里不讨论预先写入缓存方案,同时这里说的1W次也是极端的情况)。
我们怎么进行优化,这是本篇文章讨论的重点:
思路效果:
无论有多少用户访问查询、假设他们查询商品数据携带的ID一致、那么只产生1条查询、其他用户线程共用同一个查询结果,那么也就从n(10000)变成了1。
这个方案适合任何多线程工作去除重复效果提高性能的场景,这里只是其中一个场景。
下面讨论实现:
A、B、C 3个线程并发查询商品ID为1的数据,传统的缓存优化缓存中没有数据时,会发起3个SQL查询访问数据库,例sql:select * from products where id = 1;
那么我们如何利用行级锁(用户ID维度)来达到A、B、C,3个线程只查询一次数据库得到对应的结果呢?
思路:首先我们需要一个通信维度(用户ID一致认为是同一行为),如果访问入参的用户ID一样,我们可以实现如下的一个效果:
第一个进入的线程进行查询操作(假设是A),那么B、C线程我们让他进入等待的状态,等待线程A的查询结果,最终达到只查询一次数据库的操作。
代码灵感来自阿里开源缓存框架jetcache,下面是改良后的公共实现,可以通用
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* 同步工具类<br/>
* 主要预防优化缓存雪崩、数据雪崩</br>
* 多线程访问的情况下、认为是同一操作的重复行为
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor 设置key转换规则<br/>
* 可以为空,默认使用fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* 同步加载函数<br/>
* 例如:A、B、C三个线程同一时刻查询执行某一个操作(查询数据库等),实际上查询的参数条件一样<br/>
* 这个时候我们可以对线程进行优化、防止数据雪崩缓存穿透,让其中的一个线程进行查库操作,其他线程等待具体工作线程的返回结果<br/>
* 如:A线程进行查库、B、C线程进行等待A线程的返回结果。
* @param <K>
* @param <V>
* @param timeout 等待时间(毫秒)(假设A线程查询超过等待时间、那么B、C线程放弃等待自己去执行业务查询)
* @param key 辨别是否是同一操作的key
* @param loader 加载数据的函数<br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
//有没有线程去做具体的获取业务数据的逻辑
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
//第一个进入的线程进行真实的访问操作
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
//其他线程进行等待操作
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
//如果等待超时,放弃等待,当前线程直接进行访问
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
//共用同一个返回结果
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
测试代码
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();//线程等待,触发多线程同步的效果
// realSelect1(id);//传统查询
realSelect2(id);//优化后
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
//线程通信的访问
Function<Long,ActivityInfo> loader = (param)->{
//记录真实的访问数据库次数(1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
//记录真实的访问数据库次数(100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println("真实调用次数="+count.get());
}
}
传统的查询结果
优化后的查询结果,只执行了1次SQL
版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
https://my.oschina.net/lis1314/blog/4707646
边栏推荐
- How to use Gantt chart layers and filters
- Chinese sub forum of | 2020 PostgreSQL Asia Conference: Pan Juan
- 【笔记】Error while loading PyV8 binary: exit code 1解决方法
- 条形码识别性能低,如何优化Dynamsoft Barcode Reader解码性能
- 【原創】ARM平臺記憶體和cache對xenomai實時性的影響
- Image processing toolkit imagexpresshow to view events
- In simple terms, the large front-end framework angular6 practical course (angular6 node.js 、keystonejs、
- 频收罚单的浦发银行:增收不增利,曾遭骗贷数亿元,内控缺位?
- C enumerates the differences between permissions |, and |
- [note] error while loading pyv8 binary: exit code 1 solution
猜你喜欢
Experiment one
如何使用甘特图图层和筛选器
使用RabbitMQ实现分布式事务
confd
Plug in bilibilibili new version 0.5.5
[note] error while loading pyv8 binary: exit code 1 solution
K-vim installation and the ycmd server shut down (restart with ': ycmrestartserver')
RECH8.0版本学习 days 12 rh134部分
HandlerMethodArgumentResolver使用和原理
条形码识别器Dynamsoft Barcode Reader v7.5全新上线!
随机推荐
REM: the solution of PC and mobile
傲視Kubernetes(一):Kubernetes簡介
测试攻城狮必备技能点!一文带你解读DevOps下的测试技术
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
插件Bilibili新版0.5.5
DOM node operation
ImageMagick - 添加水印
廬山真面目之二微服務架構NGINX版本實現
Mate 40 series launch with Huawei sports health service to bring healthy digital life
课堂练习
11. Service update
Git submission specification
Gantt chart grouping activities tutorial
如何利用PopupWindow实现弹出菜单并解决焦点获取以及与软键盘冲突问题
滴滴的分布式ID生成器(Tinyid),好用的一批
【涂鸦物联网足迹】物联网主流通信方式
gitlab 持续集成开发环境之CI/CD
Dynamsoft barcode reader v7.5!
How to create an interactive kernel density chart
Application and principle of handlermethodargumentresolver