二十五、OkHttp原理分析(一)

2021-07-09  本文已影响0人  大虾啊啊啊

一、使用方式

OkHttp的使用分为了同步请求和异步请求,分别通过调用execute和enqueue方法,在异步的时候需要传入一个CallBack回调。当使用Get请求的时候,直接传入URL,而在Post请求的时候需要构建RequestBody。

package com.jilian.pinzi.base;

import java.io.IOException;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

public class OkHttpTest {
    OkHttpClient okHttpClient = new OkHttpClient();

    /**
     * 同步请求
     *
     * @param url
     */
    void getSyn(String url) throws IOException {
        Request request = new Request.Builder().url(url).build();
        Call call = okHttpClient.newCall(request);
        Response response = call.execute();
        ResponseBody body = response.body();
    }

    /**
     * 异步请求
     *
     * @param url
     */
    void getSyc(String url) throws IOException {
        RequestBody body = new FormBody.Builder().add("name","allen").build()
        Request request = new Request.Builder().url(url).
                post(body).
                build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                ResponseBody body = response.body();
            }
        });


    }
}

二、源码流程分析

1、OkHttp中的分发器Dispatcher

我们来看下异步的流程

    OkHttpClient okHttpClient = new OkHttpClient();
  RequestBody body = new FormBody.Builder().add("name","allen").build()
        Request request = new Request.Builder().url(url).
                post(body).
                build();
        Call call = okHttpClient.newCall(request);
  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
  }
   call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                ResponseBody body = response.body();
            }
        });

接着会执行到RealCall的如下代码,核心代码在最后一行,通过OkHttpClient的dispatcher()执行enqueue方法,并且将我们的Callback封装成了AsyncCall作为参数

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

接着就执行到了Dispatcher中的enqueue方法,Dispatcher也就是我们说的分发器,用来分发请求的任务。第一行将我们的Call,也就是请求任务添加到一个准备队列去。

  void enqueue(AsyncCall call) {
    synchronized (this) {
//请求任务添加到一个准备队列去
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }

最后一行调用promoteAndExecute方法
(1)遍历准备队列
(2)如果正在运行的任务队列大于设置的最大等于请求数maxRequests 64,直接跳出循环
(3)如果同一个相同Host的请求数量,也就是请求同一个主机的请求数量大于等于设置的最大数量maxRequestsPerHost 5,执行continue,遍历下一个请求任务
(4)如果满足运行队列的任务没有满,并且里面同一个主机的任务数没有超过最大数量5个,就将当前请求任务从准备队列readyAsyncCalls中移除
(5)将当前请求任务添加到运行队列
(6)同时给每一个请求任务设置一个线程池对象去执行。该线程池对象是单例的
总的来说就是分发器当收到请求任务的时候会根据最大运行任务数和同一个主机的最大任务数来判断是否要加入到运行队列去执行任务还是将任务放到准备队列等待执行。

 private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
//遍历准备队列
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
      //如果正在运行的任务队列大于设置的最大请求数maxRequests,直接跳出循环
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      //如果同一个相同Host的请求数量,也就是请求同一个主机的请求数量大于等于设置的最大数量maxRequestsPerHost,直接跳出循环
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
    //如果以上不满足,就将当前遍历的请求任务移除队列
        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
  //同时给每一个请求任务设置一个线程池对象。改线程池对象是单例的
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

AsyncCall是RealCall的一个内部类,不仅持有了我们传入的Callback同时也持有了传入的Request。到这里我们就知道我们的请求任务封装成了一个Request并封装成AsyncCall。最终通过线程池去执行我们的请求任务。在run方法中执行了execute方法,我们的任务最终执行到了execute方法:

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
      //执行我们的网络请求
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
//回调成功
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
//回调失败
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
//回调失败
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
//最终完成请求,分发器结束执行
        client.dispatcher().finished(this);
      }
    }
  }

(1)执行我们的网络请求
Response response = getResponseWithInterceptorChain();
(2)回调成功
responseCallback.onResponse(RealCall.this, response);
(3)回调失败
responseCallback.onFailure(RealCall.this, e);
(4)在finally部分表示请求完成,通过分发器调用finished方法,而在finished方法中,我们就去更新同步分发器中的两个队列,准备队列和运行队列。去决定是否要执行新的任务。

  void finished(AsyncCall call) {
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }
  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

2、OkHttp中的拦截器

我们以上分析了请求任务是通过分发器分发给了线程池去执行接着我们来看任务是如何执行的?
在OkHttp中的请求任务是通过五大拦截器去执行的,每个拦截器负责各自的任务,在这里采用了责任链的设计模式,来看下具体执行任务的方法getResponseWithInterceptorChain
主要包含了五大拦截器:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

因此我们知道在责任链社交模式中我们的一次网络请求任务通过五大拦截器往下执行,每个拦截器负责处理自己的任务然后交给下一个任务执行,并且也可以直接返回,不交给下面的拦截器,比如缓存拦截器,当需要读取缓存的时候直接读取缓存返回给上一个拦截器。并且最开始执行的拦截器总是最后收到请求结果,最后执行的拦截器比如请求服务器拦截器最先拿到结果,然后依次反馈给上面的拦截器。最终返回到我们的接受值。
我们通过一张图来了解拦截器的工作流程:


image.png

三、小结

image.png

四、责任链设计模式

我们知道在Okhttp的设计模式中五大拦截器使用了责任链的设计模式,每一个拦截器处理各自的任务然后把Request交给下一个拦截器。最后一个拦截器负责和服务器通信拿到响应结果,然后依次往上传递,因此最先拿到Request的拦截器总是最后拿到结果。最后拿到Request的拦截器最先拿到结果。下面我们通过代码演示

package com.example.flowlayout;

import java.io.IOException;

/**
 * 拦截器接口
 */
public interface Interceptor {

    Response intercept(Chain chain) throws IOException;

    interface Chain{
        Request request();

        Response proceed(Request request) throws IOException;

    }
}

package com.example.flowlayout;

import java.io.IOException;

/**
 * 重试和重定向拦截器
 */
public class RetryAndFollowUpInterceptor  implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        System.out.println("RetryAndFollowUpInterceptor拿到request = "+chain.request().getParam());
        Response response =chain.proceed(chain.request());
        System.out.println("RetryAndFollowUpInterceptor拿到Response = "+response.getData());
        return response;
    }
}
package com.example.flowlayout;

import android.util.Log;

import java.io.IOException;

/**
 * 桥接拦截器
 */
public class BridgeInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        System.out.println("BridgeInterceptor拿到request = "+chain.request().getParam());
        Response response =chain.proceed(chain.request());
        System.out.println("BridgeInterceptor拿到Response = "+response.getData());
        return response;
    }
}
package com.example.flowlayout;

import java.io.IOException;

/**
 * 缓存拦截器
 */
public class CacheInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        System.out.println("CacheInterceptor拿到request = "+chain.request().getParam());
        Response response =chain.proceed(chain.request());
        System.out.println("CacheInterceptor拿到Response = "+response.getData());
        return response;
    }
}
package com.example.flowlayout;

import java.io.IOException;

/**
 * 连接拦截器
 */
public class ConnectInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        System.out.println("ConnectInterceptor拿到request = "+chain.request().getParam());
        Response response =chain.proceed(chain.request());
        System.out.println("ConnectInterceptor拿到Response = "+response.getData());
        return response;
    }
}
package com.example.flowlayout;

import java.io.IOException;

/**
 * 请求服务器拦截器
 */
public class CallServerInterceptor  implements Interceptor{
    @Override
    public Response intercept(Chain chain) throws IOException {
        System.out.println("CallServerInterceptor拿到request = "+chain.request().getParam());
        Response response = new Response();
        response.setData("我从服务器拿到结果了");
        System.out.println("CallServerInterceptor拿到Response = "+response.getData());
        return response;
    }
}

package com.example.flowlayout;

import java.io.IOException;
import java.util.List;

/**
 * RealInterceptorChain类
 * 封装了Request的传递
 * Response的返回
 *
 */
public class RealInterceptorChain implements Interceptor.Chain {
    private Request request;
    private int index;
    private List<Interceptor> interceptors;

    public RealInterceptorChain(Request request, int index, List<Interceptor> interceptors) {
        this.request = request;
        this.index = index;
        this.interceptors = interceptors;
    }

    @Override
    public Request request() {
        return this.request;
    }
    /**
     * 通过proceed方法拦截器把任务往下传递
     * 通过Response返回值响应结果
     */
    @Override
    public Response proceed(Request request) throws IOException {
        //将任务交给下一个拦截器
        RealInterceptorChain chaine = new RealInterceptorChain(request, index + 1, interceptors);
        Response response = interceptors.get(index + 1).intercept(chaine);
        return response;
    }
}


package com.example.flowlayout;

/**
 * 请求类
 */
public class Request {
    private String param;

    public String getParam() {
        return param;
    }

    public void setParam(String param) {
        this.param = param;
    }
}

package com.example.flowlayout;

/**
 * 响应类
 */
public class Response {
    private String data;

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }
}

package com.example.flowlayout;

import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ExampleUnitTest {
    @Test
    public void myTest() throws IOException {
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.add(new RetryAndFollowUpInterceptor());
        interceptors.add(new BridgeInterceptor());
        interceptors.add(new CacheInterceptor());
        interceptors.add(new ConnectInterceptor());
        interceptors.add(new CallServerInterceptor());
        //请求任务
        Request request = new Request();
        request.setParam("我是请求服务器的参数");
        RealInterceptorChain chain = new RealInterceptorChain(request, -1, interceptors);
        chain.proceed(request);


    }
}
RetryAndFollowUpInterceptor拿到request = 我是请求服务器的参数
BridgeInterceptor拿到request = 我是请求服务器的参数
CacheInterceptor拿到request = 我是请求服务器的参数
ConnectInterceptor拿到request = 我是请求服务器的参数
CallServerInterceptor拿到request = 我是请求服务器的参数
CallServerInterceptor拿到Response = 我从服务器拿到结果了
ConnectInterceptor拿到Response = 我从服务器拿到结果了
CacheInterceptor拿到Response = 我从服务器拿到结果了
BridgeInterceptor拿到Response = 我从服务器拿到结果了
RetryAndFollowUpInterceptor拿到Response = 我从服务器拿到结果了
上一篇下一篇

猜你喜欢

热点阅读