当前位置:网站首页>Using thread communication to solve the problem of cache penetrating database avalanche
Using thread communication to solve the problem of cache penetrating database avalanche
2020-11-07 18:55:00 【lis1314】
Business scenario :
There is one aspect C The query interface of the end , A lot of visits , Suppose we use caching technology for traditional optimization , The first time you query data , Check cache -> The cache does not -> Database search -> Write cache
But there may be a problem 、 There are many users at the same time ( hypothesis 1W) Query the same data ( Suppose the commodity ID Agreement )、 At this point, the data is not being cached 、 May cause database avalanche , The reason is that at this moment, the same data may be used for the database 1W Time of SQL Inquire about ( The write ahead caching scheme is not discussed here , At the same time, it's said here 1W This is an extreme situation ).
How do we optimize , This is the focus of this article :
Thought effect :
No matter how many users access the query 、 Let's say they query the product data for ID Agreement 、 So it only produces 1 Queries 、 Other user threads share the same query result , So it's from n(10000) Turned into 1.
This scheme is suitable for any scenario where multithreading works to remove duplication and improve performance , This is just one of the scenes .
The implementation is discussed below :
A、B、C 3 Three threads simultaneously query products ID by 1 The data of , Traditional cache optimization when there is no data in the cache , By the church 3 individual SQL Query access database , example sql:select * from products where id = 1;
So how do we use row level locks ( user ID dimension ) In order to achieve A、B、C,3 Each thread only queries the database once to get the corresponding result ?
Ideas : First, we need a communication dimension ( user ID Consensus is the same behavior ), If the user accessing the parameter ID equally , We can achieve the following effect :
The first thread that enters performs the query operation ( The assumption is A), that B、C Thread we put him in a waiting state , Wait for thread A Query results of , Finally, we can query the database only once .
Code inspiration from Alibaba open source cache framework jetcache, Here is the improved public implementation , Can be used in general
package com.xxx.utils.sync;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
/**
* Synchronization tool class <br/>
* Cache optimization mainly prevents avalanches 、 Data avalanche </br>
* In the case of multithreading access 、 Think of it as a repetition of the same operation
*
*/
public class SyncUtil {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(SyncUtil.class);
private ConcurrentHashMap<Object, LoaderLock> loaderMap = new ConcurrentHashMap<>();
private Function<Object, Object> KeyConvertor;
public SyncUtil() {
this(null);
}
/**
* @param KeyConvertor Set up key Conversion rules <br/>
* Can be null , By default fastJson
*
*/
public SyncUtil(Function<Object, Object> KeyConvertor) {
if(KeyConvertor == null) {
this.KeyConvertor = new Function<Object, Object>() {
@Override
public Object apply(Object originalKey) {
if (originalKey == null) {
return null;
}
if (originalKey instanceof String) {
return originalKey;
}
return JSON.toJSONString(originalKey);
}
};
}else {
this.KeyConvertor = KeyConvertor;
}
}
/**
* Synchronous loading function <br/>
* for example :A、B、C Three threads query at the same time to perform an operation ( Query database, etc ), In fact, the query parameters are the same <br/>
* At this time, we can optimize the thread 、 Prevent data avalanche, cache penetration , Let one of the threads check the database , Other threads wait for the return result of the specific worker thread <br/>
* Such as :A The thread searches the database 、B、C The thread waits A The return result of the thread .
* @param <K>
* @param <V>
* @param timeout Waiting time ( millisecond )( hypothesis A Thread query exceeded waiting time 、 that B、C Threads give up waiting for themselves to execute business queries )
* @param key Identify whether it is the same operation key
* @param loader Function to load data <br/>
* Function<String, String> loader = new Function<String, String>() {
@Override
public String apply(String t) {
return "dbData";
}
};
*
* @return
*/
@SuppressWarnings("unchecked")
public <K, V> V synchronizedLoad(Integer timeout, K key, Function<K, V> loader) {
Object lockKey = buildLoaderLockKey(key);
while (true) {
// Whether there are threads to do specific logic to obtain business data
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
// The first incoming thread performs the actual access operation
V loadedValue = loader.apply(key);
ll.success = true;
ll.value = loadedValue;
return loadedValue;
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
// Other threads are waiting
try {
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout, TimeUnit.MILLISECONDS);
if (!ok) {
// If the wait times out , Give up waiting , The current thread accesses directly
logger.info("loader wait timeout:" + timeout);
return loader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return loader.apply(key);
}
// Share the same return result
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private Object buildLoaderLockKey(Object key) {
return KeyConvertor.apply(key);
}
static class LoaderLock {
CountDownLatch signal;
Thread loaderThread;
volatile boolean success;
volatile Object value;
}
}
Test code
package test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.xxx.MessageServiceApplication;
import com.xxx.mapper.ActivityInfoMapper;
import com.xxx.model.ActivityInfo;
import com.xxx.utils.sync.SyncUtil;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { MessageServiceApplication.class })
public class QueryTest {
@Autowired
private ActivityInfoMapper activityInfoMapper;
SyncUtil syncUtil = new SyncUtil();
private AtomicInteger count = new AtomicInteger();
private CyclicBarrier barrier = new CyclicBarrier(100);
ExecutorService pool = Executors.newFixedThreadPool(100);
public void synSelect(Long id) throws Exception {
for (int i = 0; i < 100; i++) {
pool.execute(()->{
try {
barrier.await();// Thread waiting , The effect of triggering multithreading synchronization
// realSelect1(id);// Traditional query
realSelect2(id);// After optimization
} catch (Exception e) {
e.printStackTrace();
}
});
}
pool.shutdown();
while(!pool.isTerminated()){
}
System.out.println();
}
public ActivityInfo realSelect2(Long id) {
// Access to thread communication
Function<Long,ActivityInfo> loader = (param)->{
// Record the actual number of visits to the database (1)
count.incrementAndGet();
return activityInfoMapper.selectById(param);
};
return syncUtil.synchronizedLoad(3000, id, loader);
}
public ActivityInfo realSelect1(Long id) {
// Record the actual number of visits to the database (100)
count.incrementAndGet();
return activityInfoMapper.selectById(id);
}
@Test
public void testSelect() throws Exception {
synSelect(1L);
System.out.println(" The actual number of calls ="+count.get());
}
}
Traditional query results
Optimized query results , Only execution 1 Time SQL
版权声明
本文为[lis1314]所创,转载请带上原文链接,感谢
边栏推荐
- Experiment one
- 7.Swarm搭建集群
- 想要忘记以前连接到Mac的WiFi网络,试试这个方法!
- Web API interface design: swaggerui document / unified response format / unified exception handling / unified authority verification
- How to add modules to nginx image?
- 高级并发编程系列九(Lock接口分析)
- 快進來!花幾分鐘看一下 ReentrantReadWriteLock 的原理!
- The advantages and functions of psychological counseling app
- LEADTOOLS如何检测,读取和写入条形码
- PHP backdoor hiding skills
猜你喜欢
Exception calling 'downloadstring' with '1' arguments: 'operation timed out'
What should be considered in the promotion plan outside the station?
关于DevOps的七大误解,99%的人都曾中过招!
聊聊先享後付
python3操作gitlab
嘉宾专访|2020 PostgreSQL亚洲大会阿里云数据库专场:王旭
Yum [errno 256] no more mirrors to try solution
想要忘记以前连接到Mac的WiFi网络,试试这个方法!
11.Service更新
测试攻城狮必备技能点!一文带你解读DevOps下的测试技术
随机推荐
【原創】ARM平臺記憶體和cache對xenomai實時性的影響
PHP backdoor hiding skills
11.Service更新
嘉宾专访|2020 PostgreSQL亚洲大会阿里云数据库专场:王旭
甘特图对活动进行分组教程
Vue: Axios uses this pointer
The official 1909 version of win10 cannot open the real-time protection solution of virus and threat protection in windows security center.
2020-11-06: go, let's talk about the scheduler.
Application layer software development Godfather teaches you how to refactor, senior programmers must professional skills
Business Facade 与 Business Rule
vue踩坑:axios使用this指针
PHP安全:变量的前世今生
想要忘记以前连接到Mac的WiFi网络,试试这个方法!
Shanghai Pudong Development Bank, which frequently receives penalty tickets, has been cheated by hundreds of millions of yuan in loans, and lacks of internal control?
使用“1”个参数调用“DownloadString”时发生异常:“操作超时”
Exception calling 'downloadstring' with '1' arguments: 'operation timed out'
REM: the solution of PC and mobile
抽絲剝繭——門面和調停者設計模式
git 提交规范
JS array the usage of array is all here (array method reconstruction, array traversal, array de duplication, array judgment and conversion)