当前位置:网站首页>Hikari连接池源码解读
Hikari连接池源码解读
2022-08-02 08:16:00 【nginx】
public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);// 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁private final CopyOnWriteArrayListsharedList;// threadList是否使用弱引用private final boolean weakThreadLocals;// 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()private final ThreadLocal<1list> threadList;private final IBagStateListener listener;// 等待获取连接的线程数:调 borrow() 方法+1,调完-1private final AtomicInteger waiters;// 连接池关闭标识private volatile boolean closed;// 队列大小为0的阻塞队列:生产者消费者模式private final SynchronousQueuehandoffQueue;public interface IConcurrentBagEntry {int STATE_NOT_IN_USE = 0; // 空闲int STATE_IN_USE = 1; // 活跃int STATE_REMOVED = -1; // 移除int STATE_RESERVED = -2; // 不可用boolean compareAndSet(int expectState, int newState);void setState(int newState);int getState();}public interface IBagStateListener {void addBagItem(int waiting);}public ConcurrentBag(final IBagStateListener listener) {this.listener = listener;this.weakThreadLocals = useWeakThreadLocals();this.handoffQueue = new SynchronousQueue<>(true);this.waiters = new AtomicInteger();this.sharedList = new CopyOnWriteArrayList<>();if (weakThreadLocals) {this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));} else {this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));}}public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {// Try the thread-local list first// 先从 threadLocal 缓存中获取final List list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {// 从尾部读取:后缓存的优先用,细节!final Object entry = list.remove(i);@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// Otherwise, scan the shared list ... then poll the handoff queue// 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1final int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {// If we may have stolen another waiter's connection, request another bag add.// 并发情况下,保证能够及时补充连接if (waiting > 1) {listener.addBagItem(waiting - 1);}return bagEntry;}}// 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。listener.addBagItem(waiting);timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null;} finally {// 等待连接数减 1waiters.decrementAndGet();}}public void requite(final T bagEntry) {bagEntry.setState(STATE_NOT_IN_USE);// 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用for (int i = 0; waiters.get() > 0; i++) {if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {return;} else if ((i & 0xff) == 0xff) {// 每遍历 255 个休眠 10 微妙parkNanos(MICROSECONDS.toNanos(10));} else {// 线程让步yield();}}// 没有其它线程用,就放入本地缓存final List threadLocalList = threadList.get();threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);}public void add(final T bagEntry) {if (closed) {LOGGER.info("ConcurrentBag has been closed, ignoring add()");throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");}sharedList.add(bagEntry);// spin until a thread takes it or none are waiting// 如果有线程等待获取连接,循环通过 handoffQueue 提交连接while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {yield();}}public boolean remove(final T bagEntry) {// 使用 CAS 将连接置为 STATE_REMOVED 状态if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);return false;}// CAS 成功后再删除连接final boolean removed = sharedList.remove(bagEntry);if (!removed && !closed) {LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);}return removed;}@Overridepublic void close() {closed = true;}public boolean reserve(final T bagEntry) {return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);}}
边栏推荐
- 商业智能平台BI 商业智能分析平台 如何选择合适的商业智能平台BI
- Biotin-C6-amine|N-biotinyl-1,6-hexanediamine|CAS: 65953-56-2
- MySQL 中 count() 和 count(1) 有什么区别?哪个性能最好?
- [ansible]playbook结合项目解释执行步骤
- 王学岗-编译出运行的文件
- 按键控制流水灯(计时器)
- [OC学习笔记]ARC与引用计数
- C Language Basics_Union
- Codeforces Round #811 (Div. 3)无DF
- PyCharm usage tutorial (detailed version - graphic and text combination)
猜你喜欢
![52. [Bool type input any non-0 value is not 1 version reason]](/img/73/c4e0048c504e0df073a6d07cfec3ab.png)
52. [Bool type input any non-0 value is not 1 version reason]

Database Plus 的云上之旅:SphereEx 正式开源 ShardingSphere on Cloud 解决方案

在 QT Creator 上配置 opencv 环境的一些认识和注意点
![[ansible] playbook explains the execution steps in combination with the project](/img/fe/82b8562075fef33490d5aae7e809f5.png)
[ansible] playbook explains the execution steps in combination with the project

BGP solves routing black hole through MPLS

Pycharm (1) the basic use of tutorial

PyQt5 (a) PyQt5 installation and configuration, read from the folder and display images, simulation to generate the sketch image

PyCharm usage tutorial (more detailed, picture + text)

MySQL Workbench 安装及使用

PyQt5(一) PyQt5安装及配置,从文件夹读取图片并显示,模拟生成素描图像
随机推荐
[OC学习笔记]ARC与引用计数
第3周学习:ResNet+ResNeXt
解决IDEA安装安装插件慢问题
类和对象【下】
What attributes and methods are available for page directives in JSP pages?
Shell becomes canonical and variable
day_05模块
积分商城商品供应商选择的三个要求
[OC学习笔记]weak的实现原理
【C】关于柔性数组.简要的谈谈柔性数组
小说里的编程 【连载之二十四】元宇宙里月亮弯弯
MySQL读写分离与主从延迟
【电子电路】长按键拉低电平,适用在有休眠机制的MCU但是没有看门狗,一个按键多个功能场景下使用
工程师如何对待开源 --- 一个老工程师的肺腑之言
Wang Xuegang - compiled shipment line file
抓包工具Charles修改Response步骤
PyCharm使用教程(较详细,图+文)
C语言_条件编译
下一个排列
【开源项目】X-TRACK源码分析