当前位置:网站首页>Hikari连接池源码解读

Hikari连接池源码解读

2022-08-02 08:16:00 nginx

  
 
 
  1. public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
  2. // 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁
  3. private final CopyOnWriteArrayListsharedList;
  4. // threadList是否使用弱引用
  5. private final boolean weakThreadLocals;
  6. // 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()
  7. private final ThreadLocal<1list> threadList;
  8. private final IBagStateListener listener;
  9. // 等待获取连接的线程数:调 borrow() 方法+1,调完-1
  10. private final AtomicInteger waiters;
  11. // 连接池关闭标识
  12. private volatile boolean closed;
  13. // 队列大小为0的阻塞队列:生产者消费者模式
  14. private final SynchronousQueuehandoffQueue;
  15. public interface IConcurrentBagEntry {
  16. int STATE_NOT_IN_USE = 0; // 空闲
  17. int STATE_IN_USE = 1; // 活跃
  18. int STATE_REMOVED = -1; // 移除
  19. int STATE_RESERVED = -2; // 不可用
  20. boolean compareAndSet(int expectState, int newState);
  21. void setState(int newState);
  22. int getState();
  23. }
  24. public interface IBagStateListener {
  25. void addBagItem(int waiting);
  26. }
  27. public ConcurrentBag(final IBagStateListener listener) {
  28. this.listener = listener;
  29. this.weakThreadLocals = useWeakThreadLocals();
  30. this.handoffQueue = new SynchronousQueue<>(true);
  31. this.waiters = new AtomicInteger();
  32. this.sharedList = new CopyOnWriteArrayList<>();
  33. if (weakThreadLocals) {
  34. this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
  35. } else {
  36. this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
  37. }
  38. }
  39. public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
  40. // Try the thread-local list first
  41. // 先从 threadLocal 缓存中获取
  42. final List list = threadList.get();
  43. for (int i = list.size() - 1; i >= 0; i--) {
  44. // 从尾部读取:后缓存的优先用,细节!
  45. final Object entry = list.remove(i);
  46. @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
  47. if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
  48. return bagEntry;
  49. }
  50. }
  51. // Otherwise, scan the shared list ... then poll the handoff queue
  52. // 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1
  53. final int waiting = waiters.incrementAndGet();
  54. try {
  55. for (T bagEntry : sharedList) {
  56. if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
  57. // If we may have stolen another waiter's connection, request another bag add.
  58. // 并发情况下,保证能够及时补充连接
  59. if (waiting > 1) {
  60. listener.addBagItem(waiting - 1);
  61. }
  62. return bagEntry;
  63. }
  64. }
  65. // 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。
  66. listener.addBagItem(waiting);
  67. timeout = timeUnit.toNanos(timeout);
  68. do {
  69. final long start = currentTime();
  70. final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
  71. if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
  72. return bagEntry;
  73. }
  74. timeout -= elapsedNanos(start);
  75. } while (timeout > 10_000);
  76. return null;
  77. } finally {
  78. // 等待连接数减 1
  79. waiters.decrementAndGet();
  80. }
  81. }
  82. public void requite(final T bagEntry) {
  83. bagEntry.setState(STATE_NOT_IN_USE);
  84. // 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用
  85. for (int i = 0; waiters.get() > 0; i++) {
  86. if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
  87. return;
  88. } else if ((i & 0xff) == 0xff) {
  89. // 每遍历 255 个休眠 10 微妙
  90. parkNanos(MICROSECONDS.toNanos(10));
  91. } else {
  92. // 线程让步
  93. yield();
  94. }
  95. }
  96. // 没有其它线程用,就放入本地缓存
  97. final List threadLocalList = threadList.get();
  98. threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
  99. }
  100. public void add(final T bagEntry) {
  101. if (closed) {
  102. LOGGER.info("ConcurrentBag has been closed, ignoring add()");
  103. throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
  104. }
  105. sharedList.add(bagEntry);
  106. // spin until a thread takes it or none are waiting
  107. // 如果有线程等待获取连接,循环通过 handoffQueue 提交连接
  108. while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
  109. yield();
  110. }
  111. }
  112. public boolean remove(final T bagEntry) {
  113. // 使用 CAS 将连接置为 STATE_REMOVED 状态
  114. if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
  115. LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
  116. return false;
  117. }
  118. // CAS 成功后再删除连接
  119. final boolean removed = sharedList.remove(bagEntry);
  120. if (!removed && !closed) {
  121. LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
  122. }
  123. return removed;
  124. }
  125. @Override
  126. public void close() {
  127. closed = true;
  128. }
  129. public boolean reserve(final T bagEntry) {
  130. return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
  131. }
  132. }

原网站

版权声明
本文为[nginx]所创,转载请带上原文链接,感谢
https://www.openjq.com/thread-35293-1-1.html