Okhttp主流程源码浅析(2)

2018-07-04  本文已影响24人  wenou

上一篇Okhttp主流程源码浅析(1)分析到任务调度方面,接着把剩下的主流程分析.

当一个任务被执行起来,会调用getResponseWithInterceptorChain()来获取到响应结果 response

//TODO: 责任链模式,拦截器链  执行请求
//TODO: 拿到回调结果
Response response = getResponseWithInterceptorChain();

getResponseWithInterceptorChain()源码:

 Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        //TODO 责任链 
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        //TODO 处理重试与重定向
        interceptors.add(retryAndFollowUpInterceptor);
        //TODO 处理 配置请求头等信息
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
        //TODO 设置请求头
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //TODO 连接服务器
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            interceptors.addAll(client.networkInterceptors());
        }
        //TODO 执行流操作(写出请求体、获得响应数据)
        //TODO 进行http请求报文的封装与请求报文的解析
        interceptors.add(new CallServerInterceptor(forWebSocket));

        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
                originalRequest, this, eventListener, client.connectTimeoutMillis(),
                client.readTimeoutMillis(), client.writeTimeoutMillis());

        return chain.proceed(originalRequest);
    }

创建List<Interceptor> interceptors = new ArrayList<>()集合来保存拦截器,平时我们自己给OkHttpClient添加的拦截器也是在这里处理.

例如添加log 拦截器:

//添加log拦截器
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient.Builder builder = new OkHttpClient.Builder()
              .connectTimeout(10, TimeUnit.SECONDS)
              .readTimeout(15, TimeUnit.SECONDS)
              .writeTimeout(15, TimeUnit.SECONDS)
              .retryOnConnectionFailure(true)
              .addInterceptor(interceptor); 

这些拦截器保存在OkHttpClient实例里面,到这里会调用interceptors.addAll(client.interceptors())把我们的拦截器先添加到集合里面,然后再添加一些默认的拦截器来处理请求与响应

责任链的拦截器集合内容如下:

1.自定义拦截器
2.retryAndFollowUpInterceptor: 重试与重定向拦截器
3.BridgeInterceptor: 处理配置请求头等信息
4.CacheInterceptor: 处理缓存信息
5.ConnectInterceptor: 连接服务器
6.CallServerInterceptor: http报文封装与解析,与服务器执行流操作

责任链调用顺序图:

责任链调用顺序图:

调用顺序:
当发起一个请求,首先会进入到自定义拦截器里面,然后再依次进入okhttp默认的各个拦截器里面,最终连接到服务器,进行流的读写, 拿到服务器返回的响应结果,再反过来,依次的回调到调用者.

在上面的getResponseWithInterceptorChain()方法里面,把拦截器添加完成之后,会执行如下代码:

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
                originalRequest, this, eventListener, client.connectTimeoutMillis(),
                client.readTimeoutMillis(), client.writeTimeoutMillis());

        return chain.proceed(originalRequest);

创建RealInterceptorChain realInterceptorChain = new RealInterceptorChain,然后开始执行责任链,调用chain.proceed(originalRequest)把请求传入进去,看下RealInterceptorChain的proceed方法

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    //省略代码 
    ...  
    //TODO: 创建新的拦截链,链中的拦截器集合index+1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    //TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    //省略代码 
    ...  
    return response;
  }

删除了部分判断的代码,这里看重点,在proceed()里面看到,会再创建一个新的对象RealInterceptorChain next = new RealInterceptorChain(...)并且把拦截器集合通过构造方法传入,还有一些其他信息.

注意: 这里的会把 index + 1再传入进去,下面会用到

然后再从拦截器集合里面拿到一个拦截器interceptors.get(index),并且去执行这个拦截器的方法拿到响应结果Response response = interceptor.intercept(next);

回到上面看下调用顺序图,如果我们有给okhttpClient加入自定义拦截器,这里就先调用自定义拦截器,否则就开始执行第一个默认的拦截器retryAndFollowUpInterceptor

看下retryAndFollowUpInterceptor.intercept(next)方法

 @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Call call = realChain.call();
        EventListener eventListener = realChain.eventListener();
        //TODO 核心 协调连接、请求/响应以及复用
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                createAddress(request.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;

        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
            if (canceled) {
                streamAllocation.release();
                throw new IOException("Canceled");
            }

            Response response;
            boolean releaseConnection = true;
            try {
                //TODO 执行责任链 实际上就是下一个拦截器
                response = realChain.proceed(request, streamAllocation, null, null);
                releaseConnection = false;
            } catch (RouteException e) {
            } catch (IOException e) {
            } finally {
            }
            ...删除部分代码
            return response;
      }
  }

这里会StreamAllocation streamAllocation = new StreamAllocation(...)创建一个StreamAllocation对象,并且把连接池,和请求地址信息,call等等通过构造传入,这里只关心主流程,StreamAllocation的作用不说了

然后开了个while (true) {}循环,在里面会调用response = realChain.proceed(request, streamAllocation, null, null); 继续执行责任链的下一个拦截器,这个realChain就是上面retryAndFollowUpInterceptor.intercept(next)传入的这个next,就是上面提到的那个RealInterceptorChain对象,注意这个时候的index = 1然后继续执行proceed()方法

这里又回到了前面RealInterceptorChain.proceed()

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    //省略代码 
    ...  
    //TODO: 创建新的拦截链,链中的拦截器集合index+1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    //TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    //省略代码 
    ...  
    return response;
  }

这里的代码有点绕,第一次看有点晕头转向的感觉~~

没错,这里又回到上面的地方,不过有两点不同了
1.第一次进来的时候index = 0,streamAllocation = null
2.这一次index = 1,streamAllocation刚刚被 new 出来

然后再次创建一个新的RealInterceptorChain对象,把信息传入构造.

注意: 这个时候 index = 1, 所以传入next 里面的 index = 1+1

 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);

然后再调用Interceptor interceptor = interceptors.get(index);从拦截器里头取下一个拦截器,因为index=1,这个时候取出的是BridgeInterceptor拦截器(处理配置请求头等信息),执行Response response = interceptor.intercept(next);

看下BridgeInterceptor拦截器的intercept()方法

@Override
    public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();

        RequestBody body = userRequest.body();
        if (body != null) {
            MediaType contentType = body.contentType();
            if (contentType != null) {
                requestBuilder.header("Content-Type", contentType.toString());
            }

            long contentLength = body.contentLength();
            if (contentLength != -1) {
                requestBuilder.header("Content-Length", Long.toString(contentLength));
                requestBuilder.removeHeader("Transfer-Encoding");
            } else {
                requestBuilder.header("Transfer-Encoding", "chunked");
                requestBuilder.removeHeader("Content-Length");
            }
        }

        if (userRequest.header("Host") == null) {
            requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }

        if (userRequest.header("Connection") == null) {
            requestBuilder.header("Connection", "Keep-Alive");
        }

        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
            transparentGzip = true;
            requestBuilder.header("Accept-Encoding", "gzip");
        }

        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
            requestBuilder.header("Cookie", cookieHeader(cookies));
        }

        if (userRequest.header("User-Agent") == null) {
            requestBuilder.header("User-Agent", Version.userAgent());
        }
        //TODO 执行下一个拦截器
        Response networkResponse = chain.proceed(requestBuilder.build());

        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

        Response.Builder responseBuilder = networkResponse.newBuilder()
                .request(userRequest);

        if (transparentGzip
                && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
                && HttpHeaders.hasBody(networkResponse)) {
            GzipSource responseBody = new GzipSource(networkResponse.body().source());
            Headers strippedHeaders = networkResponse.headers().newBuilder()
                    .removeAll("Content-Encoding")
                    .removeAll("Content-Length")
                    .build();
            responseBuilder.headers(strippedHeaders);
            String contentType = networkResponse.header("Content-Type");
            responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }

        return responseBuilder.build();
    }

代码挺多,但是逻辑也清晰,就是为我们的请求,按照http协议添加一些请求头信息,为后面使用 socket 写出数据做准备

配置一些请求头信息 ,例如:

1.Content-Type 请求中的媒体类型信息
2.Content-Length 请求体body的长度
3.Host 请求域名
4.Connection: "Keep-Alive" 是否保持长连接
5.Cookie 等等....

关于http的内容,大伙可以上网去搜一下对应的文章了解一下..

然后调用Response networkResponse = chain.proceed(requestBuilder.build());跟上面一样,还是会回到RealInterceptorChain.proceed()方法里面,再次创建一个新的RealInterceptorChain对象,并且把 index+1,然后继续执行一下个拦截的intercept(next)方法,这里不贴出相同的源码了.然后会执行到CacheInterceptor拦截器

看一下CacheInterceptor.intercept(next)

 @Override public Response intercept(Chain chain) throws IOException {
    //TODO request对应缓存的Response
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;
    //TODO 执行响应缓存策略
    long now = System.currentTimeMillis();
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }
    //TODO 缓存无效 关闭资源
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    //TODO 如果禁止使用网络,并且没有缓存,就返回失败
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    //TODO 不使用网络请求直接返回响应
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    //TODO 执行下一个拦截器
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    //TODO 如果存在缓存 更新
    if (cacheResponse != null) {
      //TODO 304响应码 自从上次请求后,请求需要响应的内容未发生改变
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    //TODO 缓存Response
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

这个拦截器是处理缓存相关,具体细节这里不管,只分析主流程...

不过从注释可以清晰的看出一些大概的逻辑
1.从缓存里面取出响应结果Response cacheCandidate
2.判断缓存是否失效,如果失效就关闭资源
3.判断如果禁止使用网络,并且没有缓存,就返回失败
等等....

然后又继续调用networkResponse = chain.proceed(networkRequest);执行下一个拦截器ConnectInterceptor

ConnectInterceptor.intercept(Chain chain)

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //TODO 连接服务器/复用socket
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    //TODO 找到复用的 socket或者创建新的 socket,并且连接服务器,但是这里还没有把请求写出去
    //TODO 然后执行下一个拦截器
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

这个拦截器负责连接服务器,会调用streamAllocation.newStream

 public HttpCodec newStream(
            OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();

        try {
            RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                    writeTimeout, pingIntervalMillis, connectionRetryEnabled,
                    doExtensiveHealthChecks);
            //TODO HttpCodec 处理解析请求与响应的工具类
            HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

            synchronized (connectionPool) {
                codec = resultCodec;
                return resultCodec;
            }
        } catch (IOException e) {
            throw new RouteException(e);
        }
    }

newStream()方法里面会调用findHealthyConnection()去寻找一个健康的可用的连接来进行socket连接

有关sockter连接部分还请大伙去查看对应的资料,这里只需要知道:

1.TPC/IP协议是传输层协议,主要规范数据如何在网络中传输
2.Socket则是对TCP/IP协议的封装和应用(程序员层面上)
3.Http 是应用层协议,解决如何包装数据,然后通过Socket发送到服务器
4.这里的RealConnection类是对Socket的封装,里面持有Socket的引用
5.这里的HttpCodec是处理解析请求与响应的工具类

回到上面,通过调用streamAllocation.newStream(...)连接之后,拿到一个HttpCodec,然后继续执行下一个拦截器,也就是最后的拦截器CallServerInterceptor,通过socket和服务器进行I/O流读写操作

CallServerInterceptor.intercept()

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();
    ...
    //TODO 把请求通过 socket 以http的协议格式,通过I/O流的形式写出去
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);
    
    ...

    //TODO 请求写出完成
    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    //TODO 处理响应
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    ...
    return response;
  }

删除了很多代码.具体实现先不去看,如果大伙想知道,可以去搜索对应方面的具体细节文章...

这里了解到:
1.在CallServerInterceptor这个拦截器,实现Socket与服务器进行TCP连接,然后进行I/O数据流的通信.
2.发送请求, Socket是以http协议的格式,把请求拆分,请求行,请求头,请求体等等,一行一行写出去的
3.接受响应, Socket也是以http协议的格式,把响应结果一行一行的读出来,然后再解析

Socket通信例子:

例如 : 用浏览器模拟 GET 去请求获取天气的接口,然后拿到结果


image.png
image.png

怎样用Socket来实现请求呢?

java的jdk为我们提供了具体的实现Socket,直接创建使用就行

static void doHttp() throws Exception {
        //创建一个Socket ,传入 域名和端口,http默认是80
        Socket socket = new Socket("restapi.amap.com", 80);
        //接受数据的输入流
        final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        //发送数据 输出流
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        new Thread() {
            @Override
            public void run() {
                while (true) {
                    String line = null;
                    try {
                        while ((line = br.readLine()) != null) {
                            System.out.println("recv :" + line);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        bw.write("GET /v3/weather/weatherInfo?city=%E9%95%BF%E6%B2%99&key=13cb58f5884f9749287abbead9c658f2 HTTP/1.1\r\n");
        bw.write("Host: restapi.amap.com\r\n\r\n");
        bw.flush();
    }

1.创建一个Socket ,传入域名和端口,http默认是80
2.通过Socket 拿到接受数据的输入流和发送数据的输出流
3.按http协议规范,把请求写出去bw.write("GET /v3/weathe....bw.write("Host: restapi.amap.com\r\n\r\n");
4.接受服务器响应结果

打印结果:

recv :HTTP/1.1 200 OK
recv :Server: Tengine
recv :Date: Thu, 05 Jul 2018 09:47:14 GMT
recv :Content-Type: application/json;charset=UTF-8
recv :Content-Length: 253
recv :Connection: close
recv :X-Powered-By: ring/1.0.0
recv :gsid: 011178122113153078403472700229732459707
recv :sc: 0.010
recv :Access-Control-Allow-Origin: *
recv :Access-Control-Allow-Methods: *
recv :Access-Control-Allow-Headers: DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,
If-Modified-Since,Cache-Control,Content-Type,key,x-biz,x-info,platinfo,encr,enginever,gzipped,poiid
recv :
recv :{"status":"1","count":"1","info":"OK","infocode":"10000","lives":
[{"province":"广东","city":"深圳市","adcode":"440300","weather":
"云","temperature":"29","winddirection":"南","windpower":"5","humidity":"77",
"reporttime":"2018-07-05 17:00:00"}]}

响应也是一行一行读写出来

最后一个拦截器拿到服务器响应结果,然后再依次往上面的拦截器返回response,这个流程就不多说了,到此一次通信就完成...

小结:

okhttp采用责任链模式,每一个拦截器负责具体的每一块的功能,降低每个功能模块的耦合度,让整体框架更加灵活,我们可以轻松的加入自己自定义的拦截器.

okhttp维护着一个连接池,能做到相同Host的socket复用

上一篇下一篇

猜你喜欢

热点阅读