2022-08-02 08:16:00 【nginx】
public class ConcurrentBagimplements AutoCloseable {private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
// 所有连接:通过CopyOnWriteArrayList + State + cas 来避免了上锁
private final CopyOnWriteArrayListsharedList;
// threadList是否使用弱引用
private final boolean weakThreadLocals;
// 归还的时候缓存空闲连接到 ThreadLocal:requite()、borrow()
private final ThreadLocal<1list> threadList;
private final IBagStateListener listener;
// 等待获取连接的线程数:调 borrow() 方法+1,调完-1
private final AtomicInteger waiters;
// 连接池关闭标识
private volatile boolean closed;
// 队列大小为0的阻塞队列:生产者消费者模式
private final SynchronousQueuehandoffQueue;
public interface IConcurrentBagEntry {
int STATE_NOT_IN_USE = 0; // 空闲
int STATE_IN_USE = 1; // 活跃
int STATE_REMOVED = -1; // 移除
int STATE_RESERVED = -2; // 不可用
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
public interface IBagStateListener {
void addBagItem(int waiting);
public ConcurrentBag(final IBagStateListener listener) {
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.handoffQueue = new SynchronousQueue<>(true);
this.waiters = new AtomicInteger();
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
} else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
// Try the thread-local list first
// 先从 threadLocal 缓存中获取
final List list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
// 从尾部读取:后缓存的优先用,细节!
final Object entry = list.remove(i);
@SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
// Otherwise, scan the shared list ... then poll the handoff queue
// 如果本地缓存获取不到,从 shardList 连接池中获取,等待连接数+1
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
// 并发情况下,保证能够及时补充连接
if (waiting > 1) {
listener.addBagItem(waiting - 1);
return bagEntry;
// 如果 shardList 连接池中也没获得连接,提交添加连接的异步任务,然后再从 handoffQueue 阻塞获取。
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
} finally {
// 等待连接数减 1
public void requite(final T bagEntry) {
// 如果有线程正在获取链接,则优先通过 handoffQueue 阻塞队列归还给其他线程使用
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
} else if ((i & 0xff) == 0xff) {
// 每遍历 255 个休眠 10 微妙
} else {
// 线程让步
// 没有其它线程用,就放入本地缓存
final List threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
// spin until a thread takes it or none are waiting
// 如果有线程等待获取连接,循环通过 handoffQueue 提交连接
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
public boolean remove(final T bagEntry) {
// 使用 CAS 将连接置为 STATE_REMOVED 状态
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
// CAS 成功后再删除连接
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
return removed;
public void close() {
closed = true;
public boolean reserve(final T bagEntry) {
return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
- PyCharm usage tutorial (more detailed, picture + text)
- Axial Turbine Privacy Policy
- 普林斯顿微积分读本03第二章--编程实现函数图像绘制、三角学回顾
- In a recent build figure SLAM, and locate the progress
- Biotin hydrazide HCl|CAS:66640-86-6|生物素-酰肼盐酸盐
- 文章解读 -- FlowNet3D:Learning Scene Flow in 3D Point Clouds
- (Note) AXIS ACASIS DT-3608 Dual-bay Hard Disk Array Box RAID Setting
- Technology Cloud Report: To realize the metaverse, NVIDIA starts from building an infrastructure platform
- USACO美国信息学奥赛竞赛12月份开赛,中国学生备赛指南
- 2022-7-31 12点 程序爱生活 恒指底背离中,有1-2周反弹希望
MySQL Workbench 安装及使用
Biotin-C6-amine|N-biotinyl-1,6-hexanediamine|CAS: 65953-56-2
Write a small game in C (three chess)
MySQL 中 count() 和 count(1) 有什么区别?哪个性能最好?
BGP solves routing black hole through MPLS
pnpm: Introduction
PostgreSQL learning summary (11) - PostgreSQL commonly used high-availability cluster solutions
那些年我们踩过的 Flink 坑系列
R language plotly visualization: plotly visualizes the scatter plot of the actual value of the regression model and the predicted value of the regression, analyzes the prediction performance of the re
QT web development - Notes - 3
Biotin - LC - Hydrazide | CAS: 109276-34-8 | Biotin - LC - Hydrazide
unity pdg 设置隐藏不需要的节点以及实现自动勾选自动加载项
力扣:第 304 场周赛
Biotin-EDA|CAS:111790-37-5| 乙二胺生物素
A little bit of knowledge - why do not usually cook with copper pots
next permutation
What is the function of the import command of the page directive in JSP?
小说里的编程 【连载之二十五】元宇宙里月亮弯弯
LeetCode第三题(Longest Substring Without Repeating Characters)三部曲之一:解题思路