当前位置:网站首页>Hikari连接池源码解读
Hikari连接池源码解读
2022-08-02 08:16:00 【nginx】
public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
// 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁
private final CopyOnWriteArrayListsharedList;
// threadList是否使用弱引用
private final boolean weakThreadLocals;
// 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()
private final ThreadLocal<1list> threadList;
private final IBagStateListener listener;
// 等待获取连接的线程数:调 borrow() 方法+1,调完-1
private final AtomicInteger waiters;
// 连接池关闭标识
private volatile boolean closed;
// 队列大小为0的阻塞队列:生产者消费者模式
private final SynchronousQueuehandoffQueue;
public interface IConcurrentBagEntry {
int STATE_NOT_IN_USE = 0; // 空闲
int STATE_IN_USE = 1; // 活跃
int STATE_REMOVED = -1; // 移除
int STATE_RESERVED = -2; // 不可用
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
public interface IBagStateListener {
void addBagItem(int waiting);
}
public ConcurrentBag(final IBagStateListener listener) {
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
this.waiters = new AtomicInteger();
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
} else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
}
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
// Try the thread-local list first
// 先从 threadLocal 缓存中获取
final List list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
// 从尾部读取:后缓存的优先用,细节!
final Object entry = list.remove(i);
@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
// 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
// 并发情况下,保证能够及时补充连接
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
// 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
} finally {
// 等待连接数减 1
waiters.decrementAndGet();
}
}
public void requite(final T bagEntry) {
bagEntry.setState(STATE_NOT_IN_USE);
// 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
} else if ((i & 0xff) == 0xff) {
// 每遍历 255 个休眠 10 微妙
parkNanos(MICROSECONDS.toNanos(10));
} else {
// 线程让步
yield();
}
}
// 没有其它线程用,就放入本地缓存
final List threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
// 如果有线程等待获取连接,循环通过 handoffQueue 提交连接
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}
}
public boolean remove(final T bagEntry) {
// 使用 CAS 将连接置为 STATE_REMOVED 状态
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
// CAS 成功后再删除连接
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
@Override
public void close() {
closed = true;
}
public boolean reserve(final T bagEntry) {
return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
}
边栏推荐
- PyCharm usage tutorial (more detailed, picture + text)
- Axial Turbine Privacy Policy
- 普林斯顿微积分读本03第二章--编程实现函数图像绘制、三角学回顾
- In a recent build figure SLAM, and locate the progress
- Biotin hydrazide HCl|CAS:66640-86-6|生物素-酰肼盐酸盐
- 文章解读 -- FlowNet3D:Learning Scene Flow in 3D Point Clouds
- (Note) AXIS ACASIS DT-3608 Dual-bay Hard Disk Array Box RAID Setting
- Technology Cloud Report: To realize the metaverse, NVIDIA starts from building an infrastructure platform
- USACO美国信息学奥赛竞赛12月份开赛,中国学生备赛指南
- 2022-7-31 12点 程序爱生活 恒指底背离中,有1-2周反弹希望
猜你喜欢
MySQL Workbench 安装及使用
postman下载安装汉化及使用
Biotin-C6-amine|N-biotinyl-1,6-hexanediamine|CAS: 65953-56-2
Write a small game in C (three chess)
MySQL 中 count() 和 count(1) 有什么区别?哪个性能最好?
BGP solves routing black hole through MPLS
HCIP笔记十六天
pnpm: Introduction
PostgreSQL learning summary (11) - PostgreSQL commonly used high-availability cluster solutions
HCIP9_BGP增加实验
随机推荐
那些年我们踩过的 Flink 坑系列
R language plotly visualization: plotly visualizes the scatter plot of the actual value of the regression model and the predicted value of the regression, analyzes the prediction performance of the re
QT web development - Notes - 3
Biotin - LC - Hydrazide | CAS: 109276-34-8 | Biotin - LC - Hydrazide
unity pdg 设置隐藏不需要的节点以及实现自动勾选自动加载项
Redisson的看门狗机制
力扣:第 304 场周赛
redis-desktop-manager下载安装
积分商城商品供应商选择的三个要求
Biotin-EDA|CAS:111790-37-5| 乙二胺生物素
A little bit of knowledge - why do not usually cook with copper pots
oracle的sql改成mysql版本
next permutation
【电子电路】长按键拉低电平,适用在有休眠机制的MCU但是没有看门狗,一个按键多个功能场景下使用
第3周学习:ResNet+ResNeXt
What is the function of the import command of the page directive in JSP?
小说里的编程 【连载之二十五】元宇宙里月亮弯弯
[ansible]playbook结合项目解释执行步骤
ip地址那点事(二)
LeetCode第三题(Longest Substring Without Repeating Characters)三部曲之一:解题思路