当前位置:网站首页>【线上问题】Timeout waiting for connection from pool 问题排查
【线上问题】Timeout waiting for connection from pool 问题排查
2022-06-11 17:15:00 【new hilbert()】
1.问题发现
背景:进行app端外推送的时候,某些场景会用到手机厂商进行推送,在用Vivo的sdk进行推送的时候,qps 过高的时候会出现以下这个问题。
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute$original$2s8CgZpA(InternalHttpClient.java:184)
at org.apache.http.impl.client.InternalHttpClient.doExecute$original$2s8CgZpA$accessor$3Fb5SRfG(InternalHttpClient.java)
at org.apache.http.impl.client.InternalHttpClient$auxiliary$MIrDHOOm.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:88)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at com.vivo.push.sdk.common.HttpUtil.execute(HttpUtil.java:118)
at com.vivo.push.sdk.common.HttpUtil.doPost(HttpUtil.java:111)
at com.vivo.push.sdk.server.Sender.sendSingle(Sender.java:43)
2.问题排查
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
这是一个获取HttpClient连接池的连接超时异常,说明此时连接池不够用。
查看vivo SDK 对连接线程池设置的参数:
ViVo SDK设置连接池大小为10,HttpRoute 路由连接支持的最大连接数为5,这样在同一个HttpRoute 路由并发大于5 的情况下,超时获取不到连接,会发生 Timeout waiting for connection from pool
manager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
manager.setMaxTotal(Constants.HTTP_MAX_CONNECTION);
manager.setDefaultMaxPerRoute(Constants.HTTP_MAX_ROUTE);
HTTP_MAX_CONNECTION = 10;
HTTP_MAX_ROUTE = 5;
3.原理解析
(1)设置连接池参数
关键参数:
- validateAfterInactivity: 空闲连接存活时间
- maxTotal: 连接池大小
- defaultMaxPerRoute: 每个ip+port或者域名最多支持连接数

public class SupplierHttpClientUtil {
private static final CloseableHttpClient HTTP_CLIENT;
private static final PoolingHttpClientConnectionManager HTTP_CLIENT_CONNECTION_MANAGER;
@Value("${supplier.connection-timeout.default:1500}")
private static int connectTimeout;
@Value("${supplier.socket-timeout.default:1500}")
private static int socketTimeout;
@Value("${supplier.connection-request-timeout.default:500}")
private static int connectionRequestTimeout;
static {
// 创建连接池管理器
HTTP_CLIENT_CONNECTION_MANAGER = new PoolingHttpClientConnectionManager(1L, TimeUnit.SECONDS);
// 连接池大小
HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(200);
// 每个ip+port或者域名最多支持连接数
HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(100);
// 空闲连接存活时间
HTTP_CLIENT_CONNECTION_MANAGER.setValidateAfterInactivity(1000);
// 默认请求设置
RequestConfig configBuilder = RequestConfig.custom()
// 设置连接超时
.setConnectTimeout(connectTimeout)
// 设置读取超时
.setSocketTimeout(socketTimeout)
// 设置从连接池获取连接实例的超时
.setConnectionRequestTimeout(connectionRequestTimeout)
.build();
// 创建httpclient对象
HTTP_CLIENT = HttpClients.custom()
.evictExpiredConnections()
.evictIdleConnections(10L, TimeUnit.SECONDS)
.setDefaultRequestConfig(configBuilder)
.setConnectionManager(HTTP_CLIENT_CONNECTION_MANAGER)
.disableAutomaticRetries()
.build();
}
public static CloseableHttpClient getHttpClient() {
return HTTP_CLIENT;
}
}
//设置客户端发送
public class HwClientSender {
public static final String CHARSET = "UTF-8";
public static String post(String url, String accessToken, String postData) throws IOException {
String result = null;
CloseableHttpClient httpClient = SupplierHttpClientUtil.getHttpClient();
CloseableHttpResponse response = null;
try {
HttpPost post = new HttpPost(url);
post.addHeader("Authorization", "Bearer " + accessToken);
post.addHeader("Content-Type", "application/json; charset=UTF-8");
StringEntity s = new StringEntity(postData, CHARSET);
s.setContentEncoding(CHARSET);
s.setContentType("application/json");
post.setEntity(s);
response = httpClient.execute(post);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
result = EntityUtils.toString(response.getEntity());
}
return result;
} catch (Exception e) {
log.error(" doPost error data:" + postData + " ,url:" + url + " ,msg:" + e.getMessage(), e);
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
return null;
}
}
(2)httpClient.execute 最终底层调用的是 requestConnection进行请求
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
}
//根据route从连接池里面获取连接
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
pool.lease 方法详解 返回是一个Future,future 里面有一个关键方法是
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
根据route 从连接池里面获取连接
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (done.compareAndSet(false, true)) {
cancelled.set(true);
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.cancelled();
}
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public E get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (final TimeoutException ex) {
throw new ExecutionException(ex);
}
}
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
};
}
(3)getPoolEntryBlocking 根据route从连接池获取连接
- RouteSpecificPool<T, C, E> pool: 每个route 有对应的pool连接池
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
this.lock.lock();
try {
//获取对应route 的连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
//如果有空闲的能用直接返回
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// 看一下能不能新建一个连接
final int maxPerRoute = getMax(route);
// 看一下超了多少个
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
//把超过的干掉,因为可以动态设置这个maxPerRoute
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
//没有超过的话,就新建一个新的
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
boolean success = false;
try {
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
边栏推荐
- Jinte Net Foundation will participate in the global strategy conference of dimension chain through online live broadcast
- 信息安全数学基础 Chapter 2——同余
- 自动化测试-Selenium
- Oracle analysis function over and MySQL achieve similar effects
- How to become an optimist organization?
- String to numeric value
- Format eslint automatique lorsque vscode enregistre le Code
- 信息安全数学基础 Chapter 3——有限域(二)
- 信息安全数学基础 Chapter 1——整除
- Haas, which integrates relevant technologies of Alibaba cloud, Dharma Institute and pingtouge, has officially announced the book
猜你喜欢

Katalon Studio Enterprise

Tornado environment construction and basic framework construction -- familiar Hello World

04_特征工程—特征选择

Science popularization genius on the left, madman on the right

Biden ordered to enforce the zero trust structure

论文阅读 dyngraph2vec: Capturing Network Dynamics using Dynamic Graph Representation Learning

^32 execution context stack interview questions

API management artifact that allows you to call wow directly

子类继承了什么、多态、 向上转型

Dynamic: capturing network dynamics using dynamic graph representation learning
随机推荐
Weekly recommended short video: rookie CEO talks about the new logistics track in the future
从制造到“智造”,探索制造企业破局之道
error:指针作为函数参数错误总结
Is it safe for Xiaobai to open an account directly on the flush?
ShellBrowser . NET Crack
Typescript learning notes (II)
满k叉树编号为 i 的节点的孩子编号公式推导
Oracle database merge row records, wmsys WM_ Use of the concat function and group in MySQL_ Use and comparison of concat (ID).
05_ Feature Engineering - dimension reduction
7 life tools: SWOT, PDCA, 6w2h, smart, WBS, time management, and the 28 principles
Exception handling and exception usage in golang
LeetCode-384. 打乱数组
RecyclerView缓存复用解析,源码解读
A journey of database full SQL analysis and audit system performance optimization
Biden ordered to enforce the zero trust structure
vscode保存代碼時自動eslint格式化
Association relationship
Authing 背后的计算哲学
DFS and BFS notes (I) breadth first search based on C language
Cocoapod only updates the specified library (does not update the index)