HttpClient连接池及重试机制解析

网友投稿 1730 2022-10-23

HttpClient连接池及重试机制解析

HttpClient连接池及重试机制解析

目录一、HttpClient简介功能介绍使用方法二、HttpClientUtil2.1HttpClient版本2.2项目中用到的工具类如下2.3笔者着重说一下http连接池三、HttpClient的重试机制3.1.那么问题来了HttpClient有没有重试策略?3.2执行流程3.3关闭重试四、总结4.1重试发生的条件4.2不发生重试的异常4.3实践中遇到的异常

一、HttpClient

简介

HttpClient 是Apache Jakarta Common 下的子项目,可以用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包,基于标准的java语言。

功能介绍

支持HTTP和HTTPS协议实现了HTTP的方法,GET,POST,PUT,DELETE等方法。连接管理器支持多线程的应用。可以设置连接超时

使用方法

使用HttpClient发送请求,接收响应可以分为一下几步:

创建HttpClient对象创建请求方法的实例,并且指定URL发送请求参数,GET请求和POST请求发送参数的方式有所不同调用HttpClient对象的execute方法,返回HttpResponse对象调用HttpResponse的getAllHeaders()、getHeaders(String name)等方法可获取服务器的响应头;调用HttpResponse的getEntity()方法可获取HttpEntity对象,该对象包装了服务器的响应内容连接释放。无论成功与否,必须释放连接

二、HttpClientUtil

2.1 HttpClient版本

笔者用到的版本是4.5.5,由于是maven工程,需要在pom文件引入对应的坐标。

org.apache.httpcomponents

httpclient

4.5.5

2.2 项目中用到的工具类如下

package cn.htjc.customer.util;

import lombok.extern.slf4j.Slf4j;

import org.apache.http.HttpResponse;

import org.apache.http.NameValuePair;

import org.apache.http.client.ServiceUnavailableRetryStrategy;

import org.apache.http.client.config.RequestConfig;

import org.apache.http.client.entity.UrlEncodedFormEntity;

import org.apache.http.client.methods.CloseableHttpResponse;

import org.apache.http.client.methods.HttpGet;

import org.apache.http.client.methods.HttpPost;

import org.apache.http.client.utils.URIBuilder;

import org.apache.http.config.Registry;

import org.apache.http.config.RegistryBuilder;

import org.apache.http.conn.socket.ConnectionSocketFactory;

import org.apache.http.conn.socket.PlainConnectionSocketFactory;

import org.apache.http.conn.ssl.SSLConnectionSocketFactory;

import org.apache.http.conn.ssl.TrustSelfSignedStrategy;

import org.apache.http.entity.ContentType;

import org.apache.http.entity.StringEntity;

import org.apache.http.impl.client.CloseableHttpClient;

import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;

import org.apache.http.impl.client.HttpClients;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;

import org.apache.http.message.BasicNameValuePair;

import org.apache.http.protocol.HttpContext;

import org.apache.http.ssl.SSLContextBuilder;

import org.apache.http.util.EntityUtils;

import java.io.IOException;

import java-.SocketTimeoutException;

import java-.URI;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

@Slf4j

public class HttpClientUtil {

// utf-8字符编码

private static final String CHARSET_UTF_8 = "utf-8";

// HTTP内容类型。相当于form表单的形式,提交数据

private static final String CONTENT_TYPE_FORM_URL = "application/x-www-form-urlencoded";

// 连接管理器

private static PoolingHttpClientConnectionManager pool;

// 请求配置

private static RequestConfig requestConfig;

static {

try {

log.info("初始自定义HttpClient......开始");

SSLContextBuilder builder = new SSLContextBuilder();

builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());

SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build());

// 配置同时支持 HTTP 和 HTPPS

Registry socketFactoryRegistry = RegistryBuilder.create()

.register("http", PlainConnectionSocketFactory.getSocketFactory())

.register("https", sslsf).build();

// 初始化连接管理器

pool = new PoolingHttpClientConnectionManager(

socketFactoryRegistry);

// 设置连接池的最大连接数

pool.setMaxTotal(200);

// 设置每个路由上的默认连接个数

pool.setDefaultMaxPerRoute(20);

// 根据默认超时限制初始化requestConfig

// 客户端从服务器读取数据的timeout

int socketTimeout = 1000;

// 客户端和服务器建立连接的timeout

int connectTimeout = 10000;

// 从连接池获取连接的timeout

int connectionRequestTimeout = 10000;

//设置请求超时时间

requestConfig = RequestConfig.custom().setConnectionRequestTimeout(

connectionRequestTimeout).setSocketTimeout(socketTimeout).setConnectTimeout(

connectTimeout).build();

log.info("初始自定义HttpClient......结束");

} catch (Exception e) {

log.error("初始自定义HttpClient......失败");

}

}

private HttpClientUtil() {

}

private static CloseableHttpClient getHttpClient() {

// 状态码是503的时候,该策略生效

ServiceUnavailableRetryStrategy serviceUnavailableRetryStrategy = new ServiceUnavailableRetryStrategy() {

@Override

public boolean retryRequest(HttpResponse httpResponse, int i, HttpContext httpContext) {

if (i < 3) {

log.info("ServiceUnavailableRetryStrategy========================"+i);

return true;

}

return false;

}

@Override

public long getRetryInterval() {

return 2000L;

}

};

CloseableHttpClient httpClient = HttpClients.custom()

// 设置连接池管理

.setConnectionManager(pool)

// 设置请求配置

.setDefaultRequestConfig(requestConfig)

// 设置重试次数

.setRetryHandler(new DefaultHttpRequestRetryHandler())

.setServiceUnavailableRetryStrategy(serviceUnavailableRetryStrategy)

.build();

return httpClient;

}

public static String doGet(String url, Map param) {

// 创建Httpclient对象

CloseableHttpClient httpClient = getHttpClient();

String resultString = "";

CloseableHttpResponse response = null;

try {

// 创建uri

URIBuilder builder = new URIBuilder(url);

if (param != null) {

for (String key : param.keySet()) {

builder.addParameter(key, param.get(key));

}

}

URI uri = builder.build();

// 创建http GET请求

HttpGet httpGet = new HttpGet(uri);

// 执行请求

response = httpClient.execute(httpGet);

// 判断返回状态是否为200

if (response.getStatusLine().getStatusCode() == 200) {

resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

if (response != null) {

response.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

return resultString;

}

public static String doGet(String url) {

return doGet(url, null);

}

public static String doPost(String url, Map param) {

// 创建Httpclient对象

CloseableHttpClient httpClient = getHttpClient();

CloseableHttpResponse response = null;

String resultString = "";

try {

// 创建Http Post请求

HttpPost httpPost = new HttpPost(url);

// 创建参数列表

if (param != null) {

List paramList = new ArrayList<>();

for (String key : param.keySet()) {

paramList.add(new BasicNameValuePair(key, param.get(key)));

}

// 模拟表单

UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList, CHARSET_UTF_8);

entity.setContentType(CONTENT_TYPE_FORM_URL);

httpPost.setEntity(entity);

}

// 执行http请求main

response = httpClient.execute(httpPost);

resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8);

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

if (response != null) {

response.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

return resultString;

}

public static String doPost(String url) {

return doPost(url, null);

}

public static String doPostjson(String url, String json) {

// 创建Httpclient对象

CloseableHttpClient httpClient = getHttpClient();

CloseableHttpResponse response = null;

String resultString = "";

try {

// 创建Http Post请求

HttpPost httpPost = new HttpPost(url);

// 创建请求内容

StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);

httpPost.setEntity(entity);

// 执行http请求

response = httpClient.execute(httpPost);

resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8);

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

if (response != null) {

response.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

return resultString;

}

}

代码中出现了@Slf4j,作用是引入log,手动打印日志。这个注解是lombok的注解。

解释一下,什么是Route?

Route的概念可以理解为客户端机器到目标机器的一条线路,例如使用HttpClient的实现来分别请求 163.com 的资源和 sina.com 的资源就会产生两个route。缺省条件下对于每个Route,HttpClient仅维护2个连接,总数不超过20个连接。

2.3 笔者着重说一下http连接池

1 为什么要使用http连接池?

延迟降低,如果不使用连接池,每次发起的http请求都会重新建立tcp连接(三次握手),用完就会关闭连接(4次握手),采用连接池则会减少这不是分时间的消耗。连接池管理的对象都是长连接。

支持更大的并发,由于连接池只适用于请求经常访问同一主机(或同一端口的情况),连接池避免了反复建立连接,抢占端口资源的情况,如果没用连接池,可能导致连接建立不了。

2 设置超时时间

首先要明白三个概念:socketTimeout,connectTimeout,connectionRequestTimeout。

socketTimeout:客户端和服务器读取数据的timeoutconnectTimeout:客户端和服务器建立连接的timeoutconnectionRequestTimeout:从连接池获取连接的timeout

3 解释:一次http请求

一次http请求,必定会有三个阶段,一:建立连接;二:数据传送;三,断开连接。

当建立连接在规定的时间内(ConnectionTimeOut )没有完成,那么此次连接就结束了。后续的SocketTimeOutException就一定不会发生。只有当连接建立起来后,

也就是没有发生ConnectionTimeOutException ,才会开始传输数据,如果数据在规定的时间内(SocketTimeOut)传输完毕,则断开连接。否则,触发SocketTimeOutException。

三、HttpClient的重试机制

上面说了这么多,就是为了引出下面的重试问题。由于项目中要访问外部接口,访问接口的时候,偶尔会出现SocketTimeOutException:Read timed out,其实就是客户端读取服务器的数据超时了。

3.1. 那么问题来了HttpClient有没有重试策略?

使用PoolingHttpClientConnectionManager得到的InternalHttpClient实例,是抽象类CloseableHttpClient的一个实现。

看一下ClientExecChain接口的实现类

简单看一下build()方法

public CloseableHttpClient build() {

// 省略一些代码

// 添加MainClientExec

ClientExecChain execChain = this.createMainExec(requestExecCopy, (HttpClientConnectionManager)connManagerCopy, (ConnectionReuseStrategy)reuseStrategyCopy, (ConnectionKeepAliveStrategy)keepAliveStrategyCopy, new ImmutableHttpProcessor(new HttpRequestInterceptor[]{new RequestTargetHost(), new RequestUserAgent(userAgentCopy)}), (AuthenticationStrategy)targetAuthStrategyCopy, (AuthenticationStrategy)proxyAuthStrategyCopy, (UserTokenHandler)userTokenHandlerCopy);

execChain = this.decorateMainExec(execChain);

// 添加ProtocolExec

ClientExecChain execChain = new ProtocolExec(execChain, httpprocessorCopy);

ClientExecChain execChain = this.decorateProtocolExec(execChain);

// Add request retry executor, if not disabled

if (!automaticRetriesDisabled) {

HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;

if (retryHandlerCopy == null) {

retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;

}

execChain = new RetryExec(execChain, retryHandlerCopy);

}

// 省去部分代码

// 如果不为空,添加ServiceUnavailableRetryExec

ServiceUnavailableRetryStrategy serviceUnavailStrategyCopy = this.serviceUnavailStrategy;

if (serviceUnavailStrategyCopy != null) {

execChain = new ServiceUnavailableRetryExec((ClientExecChain)execChain, serviceUnavailStrategyCopy);

}

// 添加RedirectExec

if (!this.redirectHandlingDisabled) {

authSchemeRegistryCopy = this.redirectStrategy;

if (authSchemeRegistryCopy == null) {

authSchemeRegistryCopy = DefaultRedirectStrategy.INSTANCE;

}

execChain = new RedirectExec((ClientExecChain)execChain, (HttpRoutePlanner)routePlannerCopy, (RedirectStrategy)authSchemeRegistryCopy);

}

// 省去部分代码

return new InternalHttpClient((ClientExecChain)execChain, (HttpClientConnectionManager)connManagerCopy, (HttpRoutePlanner)routePlannerCopy, cookieSpecRegistryCopy, (Lookup)authSchemeRegistryCopy, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, this.defaultRequestConfig != null ? this.defaultRequestConfig : RequestConfig.DEFAULT, closeablesCopy);

}

自上而下,创建了不同的ClientExecChain实例。注意:创建对象的顺序就是执行器链的顺序

在构造CloseableHttpClient实例的时候,判断是否关闭了自动重试功能,automaticRetriesDisabled默认是false。如果没有指定执行器链,就用RetryExec。默认的重试策略是DefaultHttpRequestRetryHandler。

如果重写了ServiceUnavailableRetryStrategy接口,或者使用了DefaultServiceUnavailableRetryStrategy,ServiceUnavailableRetryExec也会加入到执行器链里。

同理,redirectHandlingDisabled默认是false,RedirectExec也会加入到执行器链,并且会最先执行。

3.2 执行流程

前面已经看到我们使用的HttiClient本质上是InternalHttpClient,这里看下他的执行发送数据的方法。

@Override

protected CloseableHttpResponse doExecute(

final HttpHost target,

final HttpRequest request,

final HttpContext context) throws IOException, ClientProtocolException {

//省略一些代码

return this.execChain.execute(route, wrapper, localcontext, execAware);

}

}

首先经过RedirectExec,RedirectExec里面调用ServiceUnavailableRetryExec的excute(),进入ServiceUnavailableRetryExec后,调用RetryExec的excute(),进入发到RetryExec后,调用ProtocolExec的execute(),最后调用MainClientExec的excute()。

执行器链结束后,执行HttpRequestExecutor的excute(),excute()方法调用了自己的doSendRequest()。

之后一步一步的返回,遇到异常进行处理。

下面是RetryExec发送请求的部分

publiHOlEdmc CloseableHttpResponse execute(HttpRoute route,

HttpRequestWrapper request,

HttpClientContext context,

HttpExecutionAware execAware) throws IOException, HttpException {

// 参数检验

Args.notNull(route, "HTTP route");

Args.notNull(request, "HTTP request");

Args.notNull(context, "HTTP context");

// 获取请求头的全部信息

Header[] origheaders = request.getAllHeaders();

// 初始化请求次数为1

int execCount = 1;

while(true) {

try {

// 调用基础executor执行http请求

return this.requestExecutor.execute(route, request, context, execAware);

} catch (IOException var9) {

// 发生IO异常的时候,判断上下文是否已经中断,如果中断则抛异常退出

if (execAware != null && execAware.isAborted()) {

this.log.debug("Request has been aborted");

throw var9;

}

// 根据重试策略,判断当前执行状况是否要重试,如果是则进入下面逻辑

if (!this.retryHandler.retryRequest(var9, execCount, context)) {

if (var9 instanceof NoHttpResponseException) {

NoHttpResponseException updatedex = new NoHttpResponseException(route.getTargetHost().toHostString() + " failed to respond");

updatedex.setStackTrace(var9.getStackTrace());

throw updatedex;

}

throw var9;

}

// 日志

if (this.log.isInfoEnabled()) {

this.log.info("I/O exception (" + var9.getClass().getName() + ") caught when processing request to " + route + ": " + var9.getMessage());

}

// 日志

if (this.log.isDebugEnabled()) {

this.log.debug(var9.getMessage(), var9);

}

// 判断当前请求是否可以重复发起

if (!RequestEntityProxy.isRepeatable(request)) {

this.log.debug("Cannot retry non-repeatable request");

throw new NonRepeatableRequestException("Cannot retry request with a non-repeatable request entity", var9);

}

// 设置请求头

request.setHeaders(origheaders);

// 日志

if (this.log.isInfoEnabled()) {

this.log.info("Retrying request to " + route);

}

++execCount;

}

}

}

当发生IOException,判断是否要重试。如果重试则记录相应的次数,如果不重试,就抛出异常并且退出。

//单例模式 final 不可变的对象,线程安全

public static final DefaultHttpRequestRetryHandler INSTANCE = new DefaultHttpRequestRetryHandler();

//重试次数

private final int retryCount;

//如果一个请求发送成功过,是否还会被再次发送

private final boolean requestSentRetryEnabled;

// 不允许重试的异常类

private final Set> nonRetriableClasses;

// 默认重试3次,请求发送成功,不在发送

public DefaultHttpRequestRetryHandler() {

this(3, false);

}

public DefaultHttpRequestRetryHandler(final int retryCount, final boolean requestSentRetryEnabled) {

this(retryCount, requestSentRetryEnabled, Arrays.asList(

InterruptedIOException.class,

UnknownHostException.class,

ConnectException.class,

SSLException.class));

}

protected DefaultHttpRequestRetryHandler(

final int retryCount,

final boolean requestSentRetryEnabled,

final Collection> clazzes) {

super();

this.retryCount = retryCount;

this.requestSentRetryEnabled = requestSentRetryEnabled;

this.nonRetriableClasses = new HashSet>();

for (final Class extends IOException> clazz: clazzes) {

this.nonRetriableClasses.add(clazz);

}

}

通过构造函数,可以看出:

重试3次请求成功,就不再重试InterruptedIOException、UnknoHOlEdmwnHostException、ConnectException、SSLException,发生这4种异常不重试

重试3次请求成功,就不再重试InterruptedIOException、UnknownHostException、ConnectException、SSLException,发生这4种异常不重试

​ 关于默认的重试策略

如果超过三次不进行重试以上4中异常及其子类不进行重试同一个请求在异步任务中已经停止,不进行重试幂等的方法可以进行重试,比如get,含有http body都可以认为是非幂等请求没有发送成功,可以进行重试

问题来了,发送成功的请求是怎么样的?

下面的代码在HttpCoreContext里面,HttpCoreContext是HttpContext的实现类

public static final String HTTP_REQ_SENT = "http.request_sent";

public boolean isRequestSent() {

final Boolean b = getAttribute(HTTP_REQ_SENT, Boolean.class);

return b != null && b.booleanValue();

}

当前httpContext中的http.request_sent设置为true,则认为已经发送成功。

HttpRequestExecutor的excute(),调用了自己的doSendRequest()。

protected HttpResponse doSendRequest(HttpRequest request,

HttpClientConnection conn,

HttpContext context) throws IOException, HttpException {

// 参数检验

Args.notNull(request, "HTTP request");

Args.notNull(conn, "Client connection");

Args.notNull(context, "HTTP context");

HttpResponse response = null;

// 将连接放入上下文

context.setAttribute("http.connection", conn);

// 在请求发送之前,将http.request_sent放入上下文context的属性中,值为false

context.setAttribute("http.request_sent", Boolean.FALSE);

// 将request的header放入连接中

conn.sendRequestHeader(request);

// 如果是post/put这种有body的请求,要先进行判断

if (request instanceof HttpEntityEnclosingRequest) {

boolean sendentity = true;

// 获取http协议版本号

ProtocolVersion ver = request.getRequestLine().getProtocolVersion();

// 满足100-continue,并且http协议不是1.0

if (((HttpEntityEnclosingRequest)request).expectContinue() && !ver.lessEquals(HttpVersion.HTTP_1_0)) {

// 刷新当前连接,发送数据

conn.flush();

// Checks if response data is available from the connection

if (conn.isResponseAvailable(this.waitForContinue)) {

// Receives the request line and headers of the next response available from this connection.

response = conn.receiveResponseHeader();

// 判断相应是否携带实体(是否有body)

if (this.canResponseHaveBody(request, response)) {

// Receives the next response entity available from this connection and attaches it to an existing HttpResponse object.

conn.receiveResponseEntity(response);

}

// 获取请求状态码

int status = response.getStatusLine().getStatusCode();

if (status < 200) {

if (status != 100) {

throw new ProtocolException("Unexpected response: " + response.getStatusLine());

}

response = null;

} else {

sendentity = false;

}

}

}

if (sendentity) {

// 通过连接发送请求实体

conn.sendRequestEntity((HttpEntityEnclosingRequest)request);

}

}

// Writes out all pending buffered data over the open connection.

conn.flush();

// 将http.request_sent置为true

context.setAttribute("http.request_sent", Boolean.TRUE);

return response;

}

判断是否携带实体的方法

protected boolean canResponseHaveBody(HttpRequest request, HttpResponse response) {

// 如果是head请求,返回false HEAD:只请求页面首部

if ("HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {

return false;

} else {

int status = response.getStatusLine().getStatusCode();

return status >= 200 && status != 204 && status != 304 && status != 205;

}

}

注:HttpEntityEnclosingRequest是一个接口

public interface HttpEntityEnclosingRequest extends HttpRequest {

// 询问Server是否愿意接收数据

boolean expectContinue();

// 设置httpEntity

void setEntity(HttpEntity entity);

// 获取httpEntity

HttpEntity getEntity();

}

HttpEntityEnclosingRequestBase是实现HttpEntityEnclosingRequest的抽象类

public abstract class HttpEntityEnclosingRequestBase extends HttpRequestBase implements HttpEntityEnclosingRequest {

// HttpEntity其实相当于一个消息实体,内容是http传送的报文,有多个实现类,常用StringEntity

private HttpEntity entity;

public HttpEntityEnclosingRequestBase() {

}

public HttpEntity getEntity() {

return this.entity;

}

public void setEntity(HttpEntity entity) {

this.entity = entity;

}

// 判断此请求是否应使用expect-continue

public boolean expectContinue() {

// 从请求头获取Except键值对

Header expect = this.getFirstHeader("Expect");

// 如果except不为空,并且内容是 100-continue时返回true

return expect != null && "100-continue".equalsIgnoreCase(expect.getValue());

}

public Object clone() throws CloneNotSupportedException {

HttpEntityEnclosingRequestBase clone = (HttpEntityEnclosingRequestBase)super.clone();

if (this.entity != null) {

clone.entity = (HttpEntity)CloneUtils.cloneObject(this.entity);

}

return clone;

}

}

下图可以看出,HttpPost和HttpPut是HttpEntityEnclosingRequestBase的子类

简要分析一下,上述的操作过程

开始将http.request_sent设置为false通过流flush数据到客户端然后将http.request_sent设置为true

显然conn.flush()是可以发生异常的。注意:conn都是从连接池获取的。

3.3 关闭重试

默认是开启重试的,可以在创建HttpClientBuilder的时候,调用下面的方法关闭。

public final HttpClientBuilder disableAutomaticRetries() {

this.automaticRetriesDisabled = true;

return this;

}

四、总结

4.1重试发生的条件

只有发生IOException才会发生重试

InterruptedIOException、UnknownHostException、ConnectException、SSLException,发生这4种异常不重试

get方法可以重试3次,post方法对应的socket流没有被flush成功时可以重试3次

4.2不发生重试的异常

InterruptedIOException,线程中断异常UnknownHostException,找不到对应hostConnectException,找到了host但是建立连接失败。SSLException,https认证异常

4.3 实践中遇到的异常

另外,我们还经常会提到两种超时,连接超时与读超时:

1. java-.SocketTimeoutException: Read timed out

2. java-.SocketTimeoutException: connect timed out

这两种超时都是SocketTimeoutException,继承自InterruptedIOException,属于线程中断异常,不会进行重试。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:pprofile + matplotlib = 对Python程序进行分析形成一张热图
下一篇:typora 基本使用和更换typora的主题样式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~