Apache HttpClient连接池泄露问题排查

2021-09-05  本文已影响0人  hdfg159

Apache HttpClient连接池泄露问题排查

问题背景

问题来源

同步接口

public interface SyncHelper {

    Order syncOrder();

    void syncAllAccount();
    
    void syncSingleAccount(Long accountId);
    
    default boolean enableSync() {
        return true;
    }
}

大同步功能实现

@Slf4j
@Component
public class SyncAccountResourceListener {
    @Autowired
    private final List<SyncHelper> helpers;
    // 单线程线程池
    private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
            .setDaemon(false)
            .setNameFormat("h3c-sync-resource-%d")
            .build()
    );
    
    public void sync(){
        for (SyncHelper helper : helpers) {
            if (Thread.currentThread().isInterrupted()) {
                log.error("[{}] sync task interrupted,account:[{}]", className, accountId);
                continue;
            }
            Future<?> future = EXECUTOR.submit(() -> helper.syncSingleAccount(accountId));
            try {
                future.get(helper.getTimeOut(), TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException|TimeoutException e) {
                log.error("[{}] sync error,account:[{}]", className, accountId, e);
            } finally {
                future.cancel(true);
            }
        }
    }
}

排查步骤

本想着以最快速度解决问题,系统上同步进度列表 显示都卡在同一个同步类,然后粗略看了一下相关同步类的代码,发现并没有相关可能导致死循环的代码

尝试复现

至此,问题就更加疑惑。并无法在测试环境和本地单元测试复现,生产怎么就会有相关的问题?

死锁

一开始没去排查死锁问题,因为大部分同步都没有用到多线程

可能原因

        List<SysDept> deptList = ......
        List<CompletableFuture<CmdbUsageReport>> futureList = new ArrayList<>();
        deptList.forEach(t -> futureList.add(
                CompletableFuture.supplyAsync(() -> {
                    // 耗时任务
                    return report;
                }, ioPool)));
        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
        List<CmdbUsageReport> reportList = futureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(toList());
        // ......

既然排查线程的问题,直接使用相关的分析工具去分析,看一下到底是怎么回事

分析线程状态

使用 阿里 arthas 或者 visualvm 查看同步任务的线程状态

启动 arthas attach 相应进程

java -jar arthas-boot.jar

thread --all 查看所有线程简单信息

arthas查看所有线程信息.png
使用单线程线程池跑同步任务,执行线程池线程也有自定义名称,名称为 `h3c-sync-resource-0`(进程 ID 为 250 ,线程状态为 `WAITING` )

thread 250 查看同步信息进程的详细信息

arthas查看线程信息.png
"h3c-sync-resource-0" - Thread t@195
        java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <66bb3d00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
        at org.apache.http.pool.AbstractConnPool.access$300(AbstractConnPool.java:70)
        at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:253)
        - locked <637b83f5> (a org.apache.http.pool.AbstractConnPool$2)
        at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:198)
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
        at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
        at feign.httpclient.ApacheHttpClient.execute(ApacheHttpClient.java:83)
        at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:119)
        at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89)
        at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100)
        at com.sun.proxy.$Proxy360.osAggregates(Unknown Source)

根据线程信息看到关键代码,卡在 feign 请求的地方,再细看发现是 apache http client dead lock 死锁

AbstractConnPool.java:393 位于 org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking

详细看代码,发现是取 http client 连接池的空闲连接阻塞等待导致的问题

源码追踪

现在访问 http client 官网看一下简单的 demo example,demo 案例访问地址:httpcomponents-client-quickstart

会看到一个简单的使用案例:

try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
    HttpGet httpGet = new HttpGet("http://httpbin.org/get");
    // The underlying HTTP connection is still held by the response object
    // to allow the response content to be streamed directly from the network socket.
    // In order to ensure correct deallocation of system resources
    // the user MUST call CloseableHttpResponse#close() from a finally clause.
    // Please note that if response content is not fully consumed the underlying
    // connection cannot be safely re-used and will be shut down and discarded
    // by the connection manager.
    try (CloseableHttpResponse response1 = httpclient.execute(httpGet)) {
        System.out.println(response1.getCode() + " " + response1.getReasonPhrase());
        HttpEntity entity1 = response1.getEntity();
        // do something useful with the response body
        // and ensure it is fully consumed
        EntityUtils.consume(entity1);
    }
}

追踪 Feign 源码

现在看一下 Apache HttpClient 转换 Feign 请求的大概流程和源码, HttpClient 转换 Feign Response 方法 : feign.httpclient.ApacheHttpClient.toFeignBody

  Response.Body toFeignBody(HttpResponse httpResponse) {
    final HttpEntity entity = httpResponse.getEntity();
    if (entity == null) {
      return null;
    }
    return new Response.Body() {

      @Override
      public Integer length() {
        // 如果是 Transfer-Encoding: chunked  length 必定是返回 null
        return entity.getContentLength() >= 0 && entity.getContentLength() <= Integer.MAX_VALUE
            ? (int) entity.getContentLength()
            : null;
      }

      @Override
      public boolean isRepeatable() {
        return entity.isRepeatable();
      }

      @Override
      public InputStream asInputStream() throws IOException {
        // 单纯传递 InputStream
        return entity.getContent();
      }

      @SuppressWarnings("deprecation")
      @Override
      public Reader asReader() throws IOException {
        return new InputStreamReader(asInputStream(), UTF_8);
      }

      @Override
      public Reader asReader(Charset charset) throws IOException {
        Util.checkNotNull(charset, "charset should not be null");
        return new InputStreamReader(asInputStream(), charset);
      }

      @Override
      public void close() throws IOException {
        // 资源回收方法
        EntityUtils.consume(entity);
      }
    };
  }

因为响应内容需要完全被消费才能回到连接池重用连接, org.apache.http.util.EntityUtils.consume 大概代码如下:

    public static void consume(final HttpEntity entity) throws IOException {
        if (entity == null) {
            return;
        }
        if (entity.isStreaming()) {
            // 还在传输状态,获取流
            final InputStream inStream = entity.getContent();
            // close 直接关闭回收资源
            if (inStream != null) {
                inStream.close();
            }
        }
    }

EntityUtils.consume 消费响应内容并安全重用连接 流程如下,如果有兴趣可以自己去看一下,这边就不长篇讨论了:

org.apache.http.util.EntityUtils.consume

org.apache.http.impl.execchain.ResponseEntityProxy.getContent 包装成自动释放连接的 EofSensorInputStream

org.apache.http.conn.EofSensorInputStream.close

org.apache.http.conn.EofSensorInputStream.checkClose

org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed

org.apache.http.impl.execchain.ResponseEntityProxy.releaseConnection

feign.AsyncResponseHandler#handleResponse

  void handleResponse(CompletableFuture<Object> resultFuture,
                      String configKey,
                      Response response,
                      Type returnType,
                      long elapsedTime) {
    // copied fairly liberally from SynchronousMethodHandler
    boolean shouldClose = true;

    try {
      if (logLevel != Level.NONE) {
        // 日志级别不是NONE就输出日志
        response = logger.logAndRebufferResponse(configKey, logLevel, response,
            elapsedTime);
      }
      if (Response.class == returnType) {
        if (response.body() == null) {
          resultFuture.complete(response);
        } else if (response.body().length() == null
            || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
          // 如果是 Transfer-Encoding: chunked  length 必定是返回 null,导致下面 finally 块 没有关闭回收资源
          shouldClose = false;
          resultFuture.complete(response);
        } else {
          // Ensure the response body is disconnected
          // InputStream 转 byte[] 回收资源,并存回 response
          final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
          resultFuture.complete(response.toBuilder().body(bodyData).build());
        }
      } else if (response.status() >= 200 && response.status() < 300) {
        if (isVoidType(returnType)) {
          resultFuture.complete(null);
        } else {
          final Object result = decode(response, returnType);
          shouldClose = closeAfterDecode;
          resultFuture.complete(result);
        }
      } else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
        final Object result = decode(response, returnType);
        shouldClose = closeAfterDecode;
        resultFuture.complete(result);
      } else {
        resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
      }
    } catch (final IOException e) {
      if (logLevel != Level.NONE) {
        logger.logIOException(configKey, logLevel, e, elapsedTime);
      }
      resultFuture.completeExceptionally(errorReading(response.request(), response, e));
    } catch (final Exception e) {
      resultFuture.completeExceptionally(e);
    } finally {
      if (shouldClose) {
        ensureClosed(response.body());
      }
    }

  }

feign.slf4j.Slf4jLogger.logAndRebufferResponse

  protected Response logAndRebufferResponse(String configKey,
                                            Level logLevel,
                                            Response response,
                                            long elapsedTime)
      throws IOException {
    if (logger.isDebugEnabled()) {
      // DEBUG 级别调用父类方法进行日志输出
      return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
    }
    return response;
  }

feign.Logger.logAndRebufferResponse

  protected Response logAndRebufferResponse(String configKey,
                                            Level logLevel,
                                            Response response,
                                            long elapsedTime)
      throws IOException {
    String reason =
        response.reason() != null && logLevel.compareTo(Level.NONE) > 0 ? " " + response.reason()
            : "";
    int status = response.status();
    log(configKey, "<--- HTTP/1.1 %s%s (%sms)", status, reason, elapsedTime);
    if (logLevel.ordinal() >= Level.HEADERS.ordinal()) {
       // 大于 HEADERS 级别进行日志输出
      for (String field : response.headers().keySet()) {
        for (String value : valuesOrEmpty(response.headers(), field)) {
          log(configKey, "%s: %s", field, value);
        }
      }

      int bodyLength = 0;
      if (response.body() != null && !(status == 204 || status == 205)) {
        // HTTP 204 No Content "...response MUST NOT include a message-body"
        // HTTP 205 Reset Content "...response MUST NOT include an entity"
        if (logLevel.ordinal() >= Level.FULL.ordinal()) {
          log(configKey, ""); // CRLF
        }
        // 这里很关键,读取 InputStream 转换成 byte[],调用 InputStream close 进行资源回收操作
        byte[] bodyData = Util.toByteArray(response.body().asInputStream());
        bodyLength = bodyData.length;
        if (logLevel.ordinal() >= Level.FULL.ordinal() && bodyLength > 0) {
          log(configKey, "%s", decodeOrDefault(bodyData, UTF_8, "Binary data"));
        }
        log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
        // 因为当前 response.body() 被消费,重新存回去 response
        return response.toBuilder().body(bodyData).build();
      } else {
        log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
      }
    }
    return response;
  }

feign.Util.toByteArray

  public static byte[] toByteArray(InputStream in) throws IOException {
    checkNotNull(in, "in");
    try {
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      copy(in, out);
      return out.toByteArray();
    } finally {
      // closeable close 关闭回收资源
      ensureClosed(in);
    }
  }
追踪 Feign Slf4jLogger `feign/Logger.java:84` , 发现 `feign.Logger#logAndRebufferResponse` 会读取一次 `body` 内容

大于`HEADERS` 级别:`logLevel.ordinal() >= Level.HEADERS.ordinal()`,在后面 `byte[] bodyData = Util.toByteArray(response.body().asInputStream());`,进行一次数据拷贝,然后 close 掉原来的 `InputStream`

读源码思考

源码分析到这个地方以后,发现一般情况下也会自动释放掉相应内容:

验证问题

生产环境 Feign 日志级别是 FULL(大于 HEADERS),但是 Logger 开启的日志级别是 INFO,尝试以下步骤复现问题

CLIENT = new ApacheHttpClient(
                HttpClientBuilder.create()
                        .setSSLSocketFactory(
                                new SSLConnectionSocketFactory(trustAllSslSocketFactory(),
                                        (hostname, session) -> true)
                        )
                        // 设置最大路由
                        .setMaxConnPerRoute(1)
                        // 设置最大连接总数
                        .setMaxConnTotal(1)
                        .setDefaultRequestConfig(
                                RequestConfig.custom()
                                        .setConnectTimeout(CONNECT_TIMEOUT)
                                        .setSocketTimeout(REQ_TIMEOUT)
                                        .build()
                        )
                        .useSystemProperties()
                        .build()

单元测试和源码分析后得到问题结果就非常明显:没有完全消费释放 响应信息 导致连接池连接无法安全复用

追溯问题根源

回顾上面源码分析的 feign.AsyncResponseHandler#handleResponse 源码, closeAfterDecode 默认为 true ,那就只有是以下情况才会不释放响应内容:
Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)

排查项目代码 是否存在 Response 返回类型,且请求响应体的长度为空的

回想起来请求对应 api 时候会异步校验并刷新 token,检验 token 不需要处理返回值,之前用了 Response 并判断返回码,以下是部分关键代码:

校验 token Feign 接口

    @RequestLine("GET /sys/identity/v2/tokens")
    @Headers({"X-Subject-Token: {token}"})
    Response verifyToken(@Param("token") String token);

校验 token 代码

    @Override
    public boolean verifyToken(H3CClientConfig config, String token) {
        IdentityApi identityApi = Feigns.h3c(IdentityApi.class, config);
        Response response = identityApi.verifyToken(token);
        return response.status() == HttpStatus.OK.value();
    }

请求响应日志

GET https://19.50.81.200:8100/sys/identity/v2/tokens

HTTP/1.1 200 OK
Date: Sun, 05 Sep 2021 02:55:09 GMT
Content-Type: application/json; charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
X-Subject-Token: eyJjdHkiOiJKV1QiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiZGlyIn0..yWaex8RVEvBvw1D-kk_LMQ.IwiMpoRWSPsUZiEpr09tL7WDZ1-vIRZqFsGqQq2CV4wBp6S8mBIhICI3Ce2sE_TLA_A2oX_NnMpAf5D4C_DwunJaiJ3lnD51Sg1bxWao_gXnPS7JdfpyaRXY-rtPMaxs-0FisUuyVlKfQh3Ab8t3WsCLzU9Yz7sQ367CKtW1z32ttafrWRlotLN0y7XX3ZRz7Ttznm2cZ5Ae79MEPQF1-hbKiGoz4B8kR1NRgeL-arlpa8qtgERYEEtr-VtJgydDpylusItc_uOtPqwEh0HAgYQjJovF75pej5WlCgdzYVQMr08OGT0JnBrReWYxl0h2P0xxZQtNcM2d0T54TebvvRhQKRyywvasQ064FS4B4mGN-8E3TZkxSfSfr4OWZ1Nmwpr3xFGBOSVpKf5-AufCoXPW3yGu3vFSpCahoKq01n9_gd4UbKLE82Cwou4uZf4VMZ7A7hOAdWYo_geb1bTzLUyTdDSUgbS8XiiYCOpaA4euv409ELE22U77F940M2DO2y8lbaDuk4iAv3QIp5gCGg.9pzTvRPM-FAMa-17a2J5kQ

上述代码我并没有处理 Response 响应,只是单纯判断一下响应码 ,但是通过请求来回日志发现请求响应的响应信息有点不一样:没有 Content-Length 字段且 带有响应头为 Transfer-Encoding: chunked

关于 Http Chuncked
Chuncked InputStream 的响应头是 Transfer-Encoding: chunked , 是未知长度, 没有 Content-Length 字段,最后一个数据块的长度必须为 0 ,也可以理解为 http 报文以0\r\n\r\n 来结尾。

刚好符合泄露的条件 Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) 如果没及时释放掉 Response 会导致连接池的连接无法安全重用。

问题总结

最终发现问题的根源是 输入流 InputStream 泄露导致请求连接池未能正常释放,feign 请求返回值使用 Response 并没有用 try-with-resources 形式包裹,导致没执行 close 方法

排查问题的关键:

解决方案

Feign 请求返回值使用 Response 响应类型的时候,使用 java7 try-with-resources 形式或者 try-finally 及时释放资源

上一篇下一篇

猜你喜欢

热点阅读