Android-Rxjava&retrofit&daggerAndroid联网那些事

史上超详细的okhttp使用和拦截器

2019-09-25  本文已影响0人  凌烟醉卧

okhttp是基于网络的通信的一个开源框架,在了解之如果对网络7层模型,5层模型,4层模型TCP三次握手和四次挥手清楚的话,那就更好了。

使用起来非常简单,但我们最好知其然,知其所以然

okhttp在github的地址:

https://github.com/square/okhttp

添加依赖

 implementation 'com.squareup.okhttp3:okhttp:3.14.2'

这篇文件是基于okhttp3.14.2的。

okhttp4.0以上的都是针对kotlin的,4.0以下的都是针对java的。

基本使用

okhttp可以同步也可以异步操作

同步操作

        String url = "http://www.baidu.com";
        OkHttpClient okHttpClient = new OkHttpClient();
        final Request request = new Request.Builder()
                .url(url)
                .build();
        final Call call = okHttpClient.newCall(request);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Response response = call.execute();//同步 
                    Log.i("log", "" + response.code());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

异步操作

String url = "http://www.baidu.com";
        OkHttpClient okHttpClient = new OkHttpClient();
        final Request request = new Request.Builder()
                .url(url)
                .build();
        final Call call = okHttpClient.newCall(request);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    call.enqueue(new Callback() {//异步
                        @Override
                        public void onFailure(Call call, IOException e) {
                        }

                        @Override
                        public void onResponse(Call call, Response response) throws IOException {

                        }
                    });
                    Log.i("log", "" + response.code());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

可以看到,okhttp的异步操作比同步操作多了一个回调,失败和成功。

不管是同步和异步,创建OkHttpClient的时候都使用了建造者模式,建造者模式在需要配置很多参数的时候可以使用。

同步的时候使用的是:

call.execute()

异步的时候使用的是:

call.enqueue()

我们先来看同步操作
execute方法的原型如下:

Response execute() throws IOException;

这时来到了 Call.java这个类。
没有任何实现,说明去交给子类来实现了,那它的子类是谁呢,进一步跟踪:

此时跳转到了RealCall.java这个类,说明RealCall是Call的一个子类。

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
      client.dispatcher().executed(this);
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }

我们首先来看:

client.dispatcher().executed(this);

dispatcher是一个分发器,可以简单的理解为可以分发同步和异步。

接着查看executed这个方法,它是Dispatcher这个类中的一个方法:

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

可以看到将RealCall添加进了一个runningSyncCalls(队列)中

private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

然后我们返回接着看RealCall中的execute()这个方法的另一句:

 return getResponseWithInterceptorChain();

这句代码执行完后,返回了一个Response,就是我们发送数据,服务器给到我们的相应,getResponseWithInterceptorChain这个方法里面进行了一些拦截器的操作,拦截器最后再看。

此时同步方法结束,已经得到了Response。

接着来看看异步操作是如何实现的,我们查看下enqueue这个方法:
异步和同步的大同小异

enqueue方法是在 Call.java中:

void enqueue(Callback responseCallback);//多了一个参数;回调

enqueue()方法也是在RealCall中实现的:

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

看到这里,可以知道Dispather这个分发器分发了异步。
enqueue中的参数就是我们在外部传进来的参数。

AsyncCall(responseCallback)

紧着着我们进入AsyncCall.java这个类中

final class AsyncCall extends NamedRunnable

可以看到AsyncCall继承了NamedRunnable这个类,我们继续查看NamedRunnable这个类

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

至此,我们知道了okhttp追踪到了异步开启子线程的过程。

我们回到RealCall中的enqueue这个方法,通过下面代码

 client.dispatcher().enqueue(new AsyncCall(responseCallback));

查看enqueue这个方法,这个方法也是在Didpatcher这个类中实现的

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }

看比较关键的一句

 readyAsyncCalls.add(call);

可以看到把子线程放入到了一个队列中,readyAsyncCalls的定义:

private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

查看promoteAndExecute(),这里面有关键的一句代码,进行了创建线程池的操作:

 asyncCall.executeOn(executorService());

我们点击查看executorService这个方法:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      //创建线程池
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

我们回过头来查看NamedRunnable这个类,来看execute()这个方法,这个方法由RealCall来实现:

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

在这个方法中我们得到了发起请求后,返回给我们的Response。

总结:
1)同步和异步都是通过Dispatch这个分发器来分发的。
2)异步操作比同步操作多了创建线程池的操作,开启了子线程。
3)同步和异步最后都走到了getResponseWithInterceptorChain这个方法。

在明白了上述后,就来看看okhttp中的拦截器是如何使用的。

在此之前,我们要了解什么是拦截器和拦截器的作用是什么?
可以简单的把拦截器理解为Hook,Hook就是在源码中插入我们自己的代码,那我们就可以在客户端发起请求和服务器响应请求后返回数据中间的这一段时间进行对拦截器的操作。

拦截器可以分为内部拦截器和外部拦截器
内部拦截器:okhttp系统的拦截器,有5个。

外部拦截器:就是我们自己定义的拦截器

还可以分为应用拦截器和网络拦截器,二者都需要实现Interceptor接口
应用拦截器:关注的是发起请求,不能拦截发起请求到请求成功后返回数据的中间的这段时期。

 OkHttpClient.Builder okHttpClient2 = new OkHttpClient().newBuilder();
 okHttpClient2.addInterceptor(new TokenHeaderInterceptor());

 public class TokenHeaderInterceptor implements Interceptor {
        @Override
        public Response intercept(Chain chain) throws IOException {
            // get token
            String token = "123456";
            Request originalRequest = chain.request();
            // get new request, add request header
            Request updateRequest = originalRequest.newBuilder()
                    .header("token", token)
                    .build();
            return chain.proceed(updateRequest);
        }
    }

网络拦截器:关注的是发起请求和请求后获取的数据中间的这一过程。

 OkHttpClient okHttpClient2 = new OkHttpClient().newBuilder().addNetworkInterceptor(你自己定义的拦截器);

应用拦截器和网络拦截器的区别:网络拦截器可以重定向。

下面我们通过源码对拦截器说明。

我们来到刚才的getResponseWithInterceptorChain这个方法查看:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

这是okhttp拦截器创建的过程,它的创建过程非常复杂,拦截器采用的是责任链的模式。

通过上面的代码可以看出,首先将我们自己的拦截器添加到集合中:

interceptors.addAll(client.interceptors());

然后就是系统的拦截器,上面提到了,一共有5个,那这5个拦截器又有什么功能呢?


okhttp拦截器链

结合上图中的说明我们对OkHttp拦截器链上的各个拦截器的作用进行说明

RetryAndFollowUpInterceptor拦截器

retryAndFollowUpInterceptor此拦截器顾名思义就是主要负责失败重连工作,但是并不是所有的网络请求都会进行失败重连的,在此拦截器内部会进行网络请求的异常检测和响应码的判断,如果都在限制范围内,那么就可以进行失败重连。

主要执行工作:
1.创建StreamAllocation对象
2.调用RealInterceptorChainrealChain.proceed(request, streamAllocation, null, null); 通过chain对象调用下一个拦截器BridgeInterceptor
3.从下一个拦截器那里接收传递过来的response,根据异常和响应结果判断是否重连
4.处理完成之后将response返回给上一个拦截器

BridgeInterceptor拦截器
要负责设置内容长度、编码方式、设置gzip压缩、添加请求头、cookie等相关功能
那么它是怎么实现这种桥接转换的呢,因为我们知道除了请求Url之外,浏览器还需要请求头等信息。
我这里通过查看王者荣耀的数据来展示一些请求头的信息:
https://pvp.qq.com/web201605/herolist.shtml
打开网页后右键审查元素-Network-左边选择一个条目:

Request请求头
桥接拦截器是链接客户端代码和网络代码的桥梁,BridgeInterceptor会根据用户请求创建真正的Network Request。
主要功能:
1.负责将用户构建的一个Request请求转化为能够进行网络访问的请求(桥接)
2.将这个符合网络请求的Request继续向下传递进行网络请求
3.将网络请求回来的响应Response转化为用户可以使用的Response(支持gzip压缩/解压)

CacheInterceptor拦截器
为什么要使用缓存?
一个优点就是让客户端下一次的网络请求节省更多的时间,更快的展示数据。
当CacheInterceptor拦截到请求之后会调用到cache类的put方法,cache类实现了InternalCache接口,此接口定义了缓存的增删改查等方法。
HTTP的缓存的工作是通过CacheInterceptor拦截器来完成的。

如果当前未使用网络 并且缓存不可以使用,通过构建者模式创建一个Response响应 抛出504错误。

如果有缓存 但是不能使用网络 直接返回缓存结果。

这是在进行网络请求之前所做的事情,当网络请求完成,得到下一个拦截器返回的response之后,判断response的响应码是否是HTTP_NOT_MODIFIED = 304 (未改变)是则从缓存中读取数据

此拦截器负责主要负责打开服务器之间的TCP链接,正式开启okhttp的网络请求。

ConnectInterceptor执行完请求之后接着做的就是读取服务器响应的数据,构建response.builder对象
主要做了两个工作:
1、调用HttpCodec的readResponseHeaders方法读取服务器响应的数据,构建Response.Builder对象(以Hppt1Codec)
2、通过ResopnseBuilder对象来最终创建Response对象并返回。主要是调用Http1Codec对象的openResponseBody方法,此方法将Socket的输入流InputStream对象交给OkIo的Source对象,然后封装成RealResponseBody(该类是ResponseBody的子类)作为Response的body。

CallServerInterceptor拦截器,主要做了如下工作:
1、获取HttpCodec对象,对Http1.1或者http/2不同协议的http请求进行处理
2、发送http请求数据,构建Resposne.Builder对象,然后构建Response并返回。

上面简单说了下okhttp内部的5个拦截器的功能,下面来研究下okhttp具体是如何连接的,以及看看它是不是通过socket实现的连接?
我们来看ConnectInterceptor这个拦截器,可以看到ConnectInterceptor里面有个方法intercept:

//Chain chain 参数是上一个拦截器传过来的,上一个拦截器是缓存拦截器
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
}

跟踪里面的newExchange方法:

//Transmitter类   newExchange方法
  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {//同步操作
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }
    
    //解码器   find方法
    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }

跟踪这个find方法

//ExchangeFinder类  find方法
 public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

继续跟踪findHealthyConnection这个方法:

//ExchangeFinder类的作用:主要是拿到连接对象 findHealthyConnection方法
  
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
      while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) { //connectionPool线程池,这里涉及到连接池和复用池
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }

继续跟踪findConnection这个方法:

//ExchangeFinder类
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new exchanges.
      releasedConnection = transmitter.connection;
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;
      } else {
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

接着查看connect方法(在RealConnection类 ),
这个方法里有这么一句:

connectSocket(connectTimeout, readTimeout, call, eventListener);

继续跟踪connectSocket:

//RealConnection类   connectSocket方法
  private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

里面有关键性的一句:

Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);

这个rawSocket就是一个Socket:

private Socket rawSocket;

点击查看connectSocket方法:

//Platform类
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
      throws IOException {
    socket.connect(address, connectTimeout);//socket连接
}

通过InetSocketAddress就可以得到IP和端口号。

当然,这个Socket是Android提供的API,真正实现了功能的socket是在底层完成的。

至此,okhttp基于socket连接到此结束。

上一篇 下一篇

猜你喜欢

热点阅读