当前位置:网站首页>Hikari连接池源码解读
Hikari连接池源码解读
2022-08-02 08:16:00 【nginx】
public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);// 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁private final CopyOnWriteArrayListsharedList;// threadList是否使用弱引用private final boolean weakThreadLocals;// 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()private final ThreadLocal<1list> threadList;private final IBagStateListener listener;// 等待获取连接的线程数:调 borrow() 方法+1,调完-1private final AtomicInteger waiters;// 连接池关闭标识private volatile boolean closed;// 队列大小为0的阻塞队列:生产者消费者模式private final SynchronousQueuehandoffQueue;public interface IConcurrentBagEntry {int STATE_NOT_IN_USE = 0; // 空闲int STATE_IN_USE = 1; // 活跃int STATE_REMOVED = -1; // 移除int STATE_RESERVED = -2; // 不可用boolean compareAndSet(int expectState, int newState);void setState(int newState);int getState();}public interface IBagStateListener {void addBagItem(int waiting);}public ConcurrentBag(final IBagStateListener listener) {this.listener = listener;this.weakThreadLocals = useWeakThreadLocals();this.handoffQueue = new SynchronousQueue<>(true);this.waiters = new AtomicInteger();this.sharedList = new CopyOnWriteArrayList<>();if (weakThreadLocals) {this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));} else {this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));}}public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {// Try the thread-local list first// 先从 threadLocal 缓存中获取final List list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {// 从尾部读取:后缓存的优先用,细节!final Object entry = list.remove(i);@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// Otherwise, scan the shared list ... then poll the handoff queue// 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1final int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {// If we may have stolen another waiter's connection, request another bag add.// 并发情况下,保证能够及时补充连接if (waiting > 1) {listener.addBagItem(waiting - 1);}return bagEntry;}}// 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。listener.addBagItem(waiting);timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null;} finally {// 等待连接数减 1waiters.decrementAndGet();}}public void requite(final T bagEntry) {bagEntry.setState(STATE_NOT_IN_USE);// 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;} else if ((i & 0xff) == 0xff) {// 每遍历 255 个休眠 10 微妙parkNanos(MICROSECONDS.toNanos(10));} else {// 线程让步yield();}}// 没有其它线程用,就放入本地缓存final List threadLocalList = threadList.get();threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);}public void add(final T bagEntry) {if (closed) {LOGGER.info("ConcurrentBag has been closed, ignoring add()");throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");}sharedList.add(bagEntry);// spin until a thread takes it or none are waiting// 如果有线程等待获取连接,循环通过 handoffQueue 提交连接while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {yield();}}public boolean remove(final T bagEntry) {// 使用 CAS 将连接置为 STATE_REMOVED 状态if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);return false;}// CAS 成功后再删除连接final boolean removed = sharedList.remove(bagEntry);if (!removed && !closed) {LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);}return removed;}@Overridepublic void close() {closed = true;}public boolean reserve(final T bagEntry) {return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);}}
边栏推荐
猜你喜欢

C Language Basics_Union

IO process thread -> process -> day4

如何做好项目管理

二分类和多分类

Pycharm (1) the basic use of tutorial

Application and case analysis of CASA model and CENTURY model

PostgreSQL learning summary (11) - PostgreSQL commonly used high-availability cluster solutions

2022-7-31 12点 程序爱生活 恒指底背离中,有1-2周反弹希望

抓包工具Charles修改Response步骤

PyQt5 (a) PyQt5 installation and configuration, read from the folder and display images, simulation to generate the sketch image
随机推荐
day——05 迭代器,生成器
redis的安装与应用
十大免费cms建站系统介绍推荐
JSP页面中page指令contentPage/pageEncoding具有什么功能呢?
Seleniu screenshots code and assign name to the picture
Database Plus 的云上之旅:SphereEx 正式开源 ShardingSphere on Cloud 解决方案
PyCharm usage tutorial (detailed version - graphic and text combination)
shell中计算命令详解(expr、(())、 $[]、let、bc )
houdini 求出曲线的法向 切线以及副法线
[ansible] playbook explains the execution steps in combination with the project
R语言plotly可视化:使用plotly可视化模型预测真阳性率(True positive)TPR和假阳性率(False positive)FPR在不同阈值(threshold)下的曲线
按键控制流水灯(计时器)
积分商城商品供应商选择的三个要求
Redisson的看门狗机制
pnpm:简介
mysql 中 in 的用法
【特别提醒】订阅此专栏的用户请先阅读本文再决定是否需要购买此专栏
Redisson distributed lock source code analysis for high-level use of redis
postman使用方法
Axial Turbine Privacy Policy