当前位置:网站首页>【线上问题】Timeout waiting for connection from pool 问题排查
【线上问题】Timeout waiting for connection from pool 问题排查
2022-06-11 17:15:00 【new hilbert()】
1.问题发现
背景:进行app端外推送的时候,某些场景会用到手机厂商进行推送,在用Vivo的sdk进行推送的时候,qps 过高的时候会出现以下这个问题。
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute$original$2s8CgZpA(InternalHttpClient.java:184)
at org.apache.http.impl.client.InternalHttpClient.doExecute$original$2s8CgZpA$accessor$3Fb5SRfG(InternalHttpClient.java)
at org.apache.http.impl.client.InternalHttpClient$auxiliary$MIrDHOOm.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:88)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at com.vivo.push.sdk.common.HttpUtil.execute(HttpUtil.java:118)
at com.vivo.push.sdk.common.HttpUtil.doPost(HttpUtil.java:111)
at com.vivo.push.sdk.server.Sender.sendSingle(Sender.java:43)
2.问题排查
org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
这是一个获取HttpClient连接池的连接超时异常,说明此时连接池不够用。
查看vivo SDK 对连接线程池设置的参数:
ViVo SDK设置连接池大小为10,HttpRoute 路由连接支持的最大连接数为5,这样在同一个HttpRoute 路由并发大于5 的情况下,超时获取不到连接,会发生 Timeout waiting for connection from pool
manager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
manager.setMaxTotal(Constants.HTTP_MAX_CONNECTION);
manager.setDefaultMaxPerRoute(Constants.HTTP_MAX_ROUTE);
HTTP_MAX_CONNECTION = 10;
HTTP_MAX_ROUTE = 5;
3.原理解析
(1)设置连接池参数
关键参数:
- validateAfterInactivity: 空闲连接存活时间
- maxTotal: 连接池大小
- defaultMaxPerRoute: 每个ip+port或者域名最多支持连接数

public class SupplierHttpClientUtil {
private static final CloseableHttpClient HTTP_CLIENT;
private static final PoolingHttpClientConnectionManager HTTP_CLIENT_CONNECTION_MANAGER;
@Value("${supplier.connection-timeout.default:1500}")
private static int connectTimeout;
@Value("${supplier.socket-timeout.default:1500}")
private static int socketTimeout;
@Value("${supplier.connection-request-timeout.default:500}")
private static int connectionRequestTimeout;
static {
// 创建连接池管理器
HTTP_CLIENT_CONNECTION_MANAGER = new PoolingHttpClientConnectionManager(1L, TimeUnit.SECONDS);
// 连接池大小
HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(200);
// 每个ip+port或者域名最多支持连接数
HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(100);
// 空闲连接存活时间
HTTP_CLIENT_CONNECTION_MANAGER.setValidateAfterInactivity(1000);
// 默认请求设置
RequestConfig configBuilder = RequestConfig.custom()
// 设置连接超时
.setConnectTimeout(connectTimeout)
// 设置读取超时
.setSocketTimeout(socketTimeout)
// 设置从连接池获取连接实例的超时
.setConnectionRequestTimeout(connectionRequestTimeout)
.build();
// 创建httpclient对象
HTTP_CLIENT = HttpClients.custom()
.evictExpiredConnections()
.evictIdleConnections(10L, TimeUnit.SECONDS)
.setDefaultRequestConfig(configBuilder)
.setConnectionManager(HTTP_CLIENT_CONNECTION_MANAGER)
.disableAutomaticRetries()
.build();
}
public static CloseableHttpClient getHttpClient() {
return HTTP_CLIENT;
}
}
//设置客户端发送
public class HwClientSender {
public static final String CHARSET = "UTF-8";
public static String post(String url, String accessToken, String postData) throws IOException {
String result = null;
CloseableHttpClient httpClient = SupplierHttpClientUtil.getHttpClient();
CloseableHttpResponse response = null;
try {
HttpPost post = new HttpPost(url);
post.addHeader("Authorization", "Bearer " + accessToken);
post.addHeader("Content-Type", "application/json; charset=UTF-8");
StringEntity s = new StringEntity(postData, CHARSET);
s.setContentEncoding(CHARSET);
s.setContentType("application/json");
post.setEntity(s);
response = httpClient.execute(post);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
result = EntityUtils.toString(response.getEntity());
}
return result;
} catch (Exception e) {
log.error(" doPost error data:" + postData + " ,url:" + url + " ,msg:" + e.getMessage(), e);
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
return null;
}
}
(2)httpClient.execute 最终底层调用的是 requestConnection进行请求
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
}
//根据route从连接池里面获取连接
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
pool.lease 方法详解 返回是一个Future,future 里面有一个关键方法是
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
根据route 从连接池里面获取连接
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Asserts.check(!this.isShutDown, "Connection pool shut down");
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (done.compareAndSet(false, true)) {
cancelled.set(true);
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
if (callback != null) {
callback.cancelled();
}
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public boolean isDone() {
return done.get();
}
@Override
public E get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (final TimeoutException ex) {
throw new ExecutionException(ex);
}
}
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(leasedEntry)) {
leasedEntry.close();
release(leasedEntry, false);
continue;
}
}
}
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
}
}
}
};
}
(3)getPoolEntryBlocking 根据route从连接池获取连接
- RouteSpecificPool<T, C, E> pool: 每个route 有对应的pool连接池
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
this.lock.lock();
try {
//获取对应route 的连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
//如果有空闲的能用直接返回
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// 看一下能不能新建一个连接
final int maxPerRoute = getMax(route);
// 看一下超了多少个
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
//把超过的干掉,因为可以动态设置这个maxPerRoute
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
//没有超过的话,就新建一个新的
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
boolean success = false;
try {
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
边栏推荐
- LeetCode——42. Connected to rainwater (double pointer)
- Format eslint automatique lorsque vscode enregistre le Code
- Jinte Net Foundation will participate in the global strategy conference of dimension chain through online live broadcast
- 闭包的简单理解
- QLineEdit 设置输入掩码
- Talk about the interview questions of the collection
- Oracle analysis function over and MySQL achieve similar effects
- 04_特征工程—特征选择
- ASP. Net education OA system source code education industry OA system source code with document
- 10 times faster than 5g. Are you ready for 10 Gigabit communication?
猜你喜欢

Haas, which integrates relevant technologies of Alibaba cloud, Dharma Institute and pingtouge, has officially announced the book

《DAMA数据管理知识体系指南》:章节分值占比

10 times faster than 5g. Are you ready for 10 Gigabit communication?

Authing biweekly news: authing forum launched (4.25-5.8)

一套ThinkPHP微信小程序商城源码带后台管理

Analysis report on competition pattern and future development strategy of China's corn industry 2022-2028 Edition

每周推荐短视频:菜鸟CEO谈未来物流新赛道

Connection and difference of network streaming media protocol (RTP RTCP RTMP HLS)

从制造到“智造”,探索制造企业破局之道

Read and understand the development plan for software and information technology service industry during the "14th five year plan"
随机推荐
7 life tools: SWOT, PDCA, 6w2h, smart, WBS, time management, and the 28 principles
GemBox.Bundle 43.0 Crack
TypeScript学习笔记(二)
Qlineedit set input mask
JSP page initial loading method
sql server中关于FORCESCAN的使用以及注意项
Global and China Mobile Network Optimization (MnO) industry development panoramic survey and Investment Strategy Research Report 2022-2028
error:指针作为函数参数错误总结
LeetCode-1005. K 次取反后最大化的数组和
我的CのERROR们
Chip mass production, oppo entering a new era?
(validation file) validatejarfile report errors
Read and understand the development plan for software and information technology service industry during the "14th five year plan"
Is it safe for Xiaobai to open an account directly on the flush?
启牛商学院给的证券账户是安全的吗?开户收费吗
Vscode automatic eslint formatting when saving code
从制造到“智造”,探索制造企业破局之道
JPA failed to save multiple entities circularly
Kernel density estimation (2D, 3D)
自动化测试-Selenium