当前位置:网站首页>Using thread communication to solve the problem of cache penetrating database avalanche
Using thread communication to solve the problem of cache penetrating database avalanche
2020-11-07 18:55:00 【lis1314】
Business scenario :
There is one aspect C The query interface of the end , A lot of visits , Suppose we use caching technology for traditional optimization , The first time you query data , Check cache -> The cache does not -> Database search -> Write cache
But there may be a problem 、 There are many users at the same time ( hypothesis 1W) Query the same data ( Suppose the commodity ID Agreement )、 At this point, the data is not being cached 、 May cause database avalanche , The reason is that at this moment, the same data may be used for the database 1W Time of SQL Inquire about ( The write ahead caching scheme is not discussed here , At the same time, it's said here 1W This is an extreme situation ).
How do we optimize , This is the focus of this article :
Thought effect :
No matter how many users access the query 、 Let's say they query the product data for ID Agreement 、 So it only produces 1 Queries 、 Other user threads share the same query result , So it's from n(10000) Turned into 1.
This scheme is suitable for any scenario where multithreading works to remove duplication and improve performance , This is just one of the scenes .
The implementation is discussed below :
A、B、C 3 Three threads simultaneously query products ID by 1 The data of , Traditional cache optimization when there is no data in the cache , By the church 3 individual SQL Query access database , example sql:select * from products where id = 1;
So how do we use row level locks ( user ID dimension ) In order to achieve A、B、C,3 Each thread only queries the database once to get the corresponding result ?
Ideas : First, we need a communication dimension ( user ID Consensus is the same behavior ), If the user accessing the parameter ID equally , We can achieve the following effect :
The first thread that enters performs the query operation ( The assumption is A), that B、C Thread we put him in a waiting state , Wait for thread A Query results of , Finally, we can query the database only once .
Code inspiration from Alibaba open source cache framework jetcache, Here is the improved public implementation , Can be used in general
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* Synchronization tool class <br/>
* Cache optimization mainly prevents avalanches 、 Data avalanche </br>
* In the case of multithreading access 、 Think of it as a repetition of the same operation
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor Set up key Conversion rules <br/>
* Can be null , By default fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* Synchronous loading function <br/>
* for example :A、B、C Three threads query at the same time to perform an operation ( Query database, etc ), In fact, the query parameters are the same <br/>
* At this time, we can optimize the thread 、 Prevent data avalanche, cache penetration , Let one of the threads check the database , Other threads wait for the return result of the specific worker thread <br/>
* Such as :A The thread searches the database 、B、C The thread waits A The return result of the thread .
* @param <K>
* @param <V>
* @param timeout Waiting time ( millisecond )( hypothesis A Thread query exceeded waiting time 、 that B、C Threads give up waiting for themselves to execute business queries )
* @param key Identify whether it is the same operation key
* @param loader Function to load data <br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
// Whether there are threads to do specific logic to obtain business data
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
// The first incoming thread performs the actual access operation
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
// Other threads are waiting
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
// If the wait times out , Give up waiting , The current thread accesses directly
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
// Share the same return result
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
Test code
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();// Thread waiting , The effect of triggering multithreading synchronization
// realSelect1(id);// Traditional query
realSelect2(id);// After optimization
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
// Access to thread communication
Function<Long,ActivityInfo> loader = (param)->{
// Record the actual number of visits to the database (1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
// Record the actual number of visits to the database (100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println(" The actual number of calls ="+count.get());
}
}
Traditional query results
Optimized query results , Only execution 1 Time SQL
版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
边栏推荐
- Let you have a deep understanding of gitlab CI / CD principle and process
- 2020-11-06:go中,谈一下调度器。
- Key points of C language -- index article (let you fully understand indicators) | understand indicators from memory | complete analysis of indicators
- 课堂练习
- Shanghai Pudong Development Bank, which frequently receives penalty tickets, has been cheated by hundreds of millions of yuan in loans, and lacks of internal control?
- How to create an interactive kernel density chart
- Three steps to understand Kerberos Protocol easily
- Idea activation to 2089 failure
- python3操作Jenkins模块api
- 谈了多年的数字化转型,为什么还有很多企业依然“口头管理”
猜你喜欢
7. Swarm builds clusters
ajax 载入html后不能执行其中的js解决方法
How does LeadTools detect, read and write barcodes
Exclusive interview with Yue Caibo
使用LWA和Lync模拟外部测试无边缘单前端环境
VARCHART XGantt如何在日历上表示工作日
The JS solution cannot be executed after Ajax loads HTML
Gantt chart grouping activities tutorial
南京标志设计,logo设计公司
LEADTOOLS如何检测,读取和写入条形码
随机推荐
Developing STM32 USB with cubemx
LEADTOOLS如何检测,读取和写入条形码
idea 激活到 2089 失效
快進來!花幾分鐘看一下 ReentrantReadWriteLock 的原理!
三步轻松理解Kerberos协议
8.Swarm创建维护和水平扩展Service
What should be considered in the promotion plan outside the station?
南京标志设计,logo设计公司
Gantt chart grouping activities tutorial
Win7 AppCrash (solution)
K-vim installation and the ycmd server shut down (restart with ': ycmrestartserver')
Benefits and functions of auto maintenance app development
聊聊先享後付
失眠一个整晚上
The official 1909 version of win10 cannot open the real-time protection solution of virus and threat protection in windows security center.
Plug in bilibilibili new version 0.5.5
JS array the usage of array is all here (array method reconstruction, array traversal, array de duplication, array judgment and conversion)
If you want to forget the WiFi network you used to connect to your Mac, try this!
Using rabbitmq to implement distributed transaction
Git SSH bad permissions