IT狗

Android八门神器(一): OkHttp框架源码解析

HTTP是我们交换数据和媒体流的现代应用网络,有效利用HTTP可以使我们节省带宽和更快地加载数据,Square公司开源的OkHttp网络请求是有效率的HTTP客户端。之前的知识面仅限于框架API的调用,接触到实际的工作之后深知自己知识的不足,故而深挖框架源码尽力吸取前辈的设计经验。关于此框架的源码解析网上的教程多不胜数,此文名为源码解析,实则是炒冷饭之作,如有错误之处还望各位看官指出。

拦截器

拦截器是OkHttp框架设计的精髓所在,拦截器所定义的是Request的所通过的责任链而不管Request的具体执行过程,并且可以让开发人员自定义自己的拦截器功能并且插入到责任链中

  1. 用户自定义的拦截器位于 OkHttpClient.addInterceptor() 添加到interceptors责任链中

  2. RealCall.execute()执行的时候调用RealCall.getResponseWithInterceptorChain()将 来自 OkHttpClient的interceptors以及默认的拦截器一并加入到RealInterceptorChain责任链中并调用, 代码并没有对originalRequest进行封装, InterceptorChain和originalRequest一并流转到 RealInterceptorChain类中处理

    CustomInterceptorRetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptorNetworkInterceptorsCallServerInterceptor
  3. RealInterceptorChain.proceed()

  4. EventListener.callStart()也是在RealCall.execute()嵌入到Request调用过程, EventListener.callEnd()位于StreamAllocation中调用

  5. Request.Builder

    • url (String/URL/HttpUrl)
    • header
    • CacheControl
    • Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)

BridgeInterceptor

Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.

此拦截器是应用码到网络码的桥接。它会将用户请求封装成一个网络请求并且执行请求,同时它还完成从网络响应到用户响应的转化. 最后Chain.proceed() 方法启动拦截器责任链, RealInterceptorChain中通过递归调用将网络请求以及响应的任务分别分配到各个拦截器中, 然后通过ResponseBuilder.build()方法将网络响应封装, 然后递归调用责任链模式使得调用以及Response处理的过程可以一并写入BridgeInterceptor中

public final class RealInterceptorChain implements Interceptor.Chain {   public Response proceed(Request request, StreamAllocation streamAllocation,    HttpCodec httpCodec, RealConnection connection) throws IOException {       if (index >= interceptors.size()) throw new AssertionError();       calls++;              ...       // Call the next interceptor in the chain.       RealInterceptorChain next = new RealInterceptorChain(interceptors,           streamAllocation, httpCodec,connection, index + 1, request, call,           eventListener, connectTimeout, readTimeout,writeTimeout);       Interceptor interceptor = interceptors.get(index);       Response response = interceptor.intercept(next);       ...       return response;     }}

CallServerInterceptor

Interceptor的逻辑均在intercept()方法中实现, 在通过Chain实体类获取到请求主题之后,通过BufferedSink接口将请求转发到Okio接口,在拦截过程中通过EventListener接口将拦截器处理状态(主要是RequestBodyStart和RequestBodyEnd两个状态)发送出去

public final class CallServiceInterceptor implements Interceptor {    @Override public Response intercept(Chain chain) throws IOException {          Response.Builder responseBuilder = null;          if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100          // Continue" response before transmitting the request body. If we don't get that, return          // what we did get (such as a 4xx response) without ever transmitting the request body.          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {            httpCodec.flushRequest();            realChain.eventListener().responseHeadersStart(realChain.call());            responseBuilder = httpCodec.readResponseHeaders(true);          }          if (responseBuilder == null) {            // Write the request body if the "Expect: 100-continue" expectation was met.            realChain.eventListener().requestBodyStart(realChain.call());            long contentLength = request.body().contentLength();            CountingSink requestBodyOut =                new CountingSink(httpCodec.createRequestBody(request, contentLength));            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);            request.body().writeTo(bufferedRequestBody);            bufferedRequestBody.close();            realChain.eventListener()                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);          } else if (!connection.isMultiplexed()) {            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection            // from being reused. Otherwise we're still obligated to transmit the request body to            // leave the connection in a consistent state.            streamAllocation.noNewStreams();          }        }    }}

CacheInterceptor

Dispatcher

Dispatcher(分离器或者复用器)是异步网络请求调用时执行的策略方法, 复用器的概念十分常见,它主要的作用是输入的各路信号进行卷积运算,最大可能压榨通信的带宽,提高信息传输的效率。 OkHttp在每个分离器使用一个ExecutorService内部调用请求, Dispatcher内部主要并不涉及执行的具体。

复用器

  synchronized void enqueue(AsyncCall call) {    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {      runningAsyncCalls.add(call);      executorService().execute(call);    } else {      readyAsyncCalls.add(call);    }  }    ...  /** Used by {@code Call#execute} to signal it is in-flight. */  synchronized void executed(RealCall call) {    runningSyncCalls.add(call);  }

ExecutorSevice.execute(AsyncCall)执行代码位于AsyncCall内部复写的execute()方法, 方法内定义一些Callback回调节点运行逻辑,包括用户主动取消执行(使用retryAndFollowUpInterceptor)以及执行请求成功或者失败时的回调方法

final class AsyncCall extends NamedRunnable {    ...    @Override protected void execute() {      boolean signalledCallback = false;      try {        Response response = getResponseWithInterceptorChain();        if (retryAndFollowUpInterceptor.isCanceled()) {          signalledCallback = true;          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));        } else {          signalledCallback = true;          responseCallback.onResponse(RealCall.this, response);        }      } catch (IOException e) {        if (signalledCallback) {          // Do not signal the callback twice!          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);        } else {          eventListener.callFailed(RealCall.this, e);          responseCallback.onFailure(RealCall.this, e);        }      } finally {        client.dispatcher().finished(this);      }    }  }
  1. Created Lazily 成员

    • ExecutorService
    • CacheControl

WebSocket

  1. WebSocket 异步非堵塞的web socket接口 (通过Enqueue方法来实现)

    • OkHttpClient 通过实现 WebSocket.Factory.newWebSocket 接口实现工厂构造, 通常是由 OkHttpClient来构造

    • WebSocket生命周期:

      • Connecting状态: 每个websocket的初始状态, 此时Message可能位于入队状态但是还没有被Dispatcher处理
      • Open状态: WebSocket已经被服务器端接受并且Socket位于完全开放状态, 所有Message入队之后会即刻被处理
      • Closing状态: WebSocket进入优雅的关闭状态,WebSocket继续处理已入队的Message但拒绝新的Message入队
      • Closed状态: WebSocket已完成收发Message的过程, 进入完全关闭状态
        WebSocket受到网络等各种因素影响, 可能会断路而提前进入关闭流程
      • Canceled状态: 被动WebSocket失败连接为非优雅的过程, 而主动则是优雅短路过程
  2. RealWebSocket

    RealWebSocket管理着Request队列内容所占的空间大小以及关闭Socket之后留给优雅关闭的时间,默认为16M和60秒,在RealWebSocket.connect()方法中RealWebSocket对OkHttpClient以及Request封装成Call的形式,然后通过Call.enqueue()方法定义调用成功和失败时的Callback代码

    public void connect(OkHttpClient client) {    client = client.newBuilder()        .eventListener(EventListener.NONE)        .protocols(ONLY_HTTP1)        .build();    final Request request = originalRequest.newBuilder()        .header("Upgrade", "websocket")        .header("Connection", "Upgrade")        .header("Sec-WebSocket-Key", key)        .header("Sec-WebSocket-Version", "13")        .build();    call = Internal.instance.newWebSocketCall(client, request);    call.enqueue(new Callback() {      @Override public void onResponse(Call call, Response response) {        try {          checkResponse(response);        } catch (ProtocolException e) {          failWebSocket(e, response);          closeQuietly(response);          return;        }        // Promote the HTTP streams into web socket streams.        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);        streamAllocation.noNewStreams(); // Prevent connection pooling!        Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);        // Process all web socket messages.        try {          listener.onOpen(RealWebSocket.this, response);          String name = "OkHttp WebSocket " + request.url().redact();          initReaderAndWriter(name, streams);          streamAllocation.connection().socket().setSoTimeout(0);          loopReader();        } catch (Exception e) {          failWebSocket(e, null);        }      }      @Override public void onFailure(Call call, IOException e) {        failWebSocket(e, null);      }    });  }

    当Call请求被服务端响应的时候就将HTTP流导入到Web Socket流中,并且调用WebSocketListener相对应的状态方法, WebSocketListener状态如下:

    onOpen()

    onMessage()

    onClosing()

    onClosed()

    onFailure()

    • WebSocket -> RealWebSocket
    • Connection -> RealConnection
    • Interceptor -> RealInterceptorChain
    • Call -> RealCall
    • ResponseBody -> RealResponseBody

Gzip压缩机制

处理Gzip压缩的代码在BridgeInterceptor中,默认情况下为gzip压缩状态,可以从下面的源码片段中获知。如果header中没有Accept-Encoding,默认自动添加 ,且标记变量transparentGziptrue

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing    // the transfer stream.    boolean transparentGzip = false;    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {      transparentGzip = true;      requestBuilder.header("Accept-Encoding", "gzip");    }

BridgeInterceptor解压缩的过程调用了okio.GzipSource()方法并调用Okio.buffer()缓存解压过程,源码如下

if (transparentGzip        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))        && HttpHeaders.hasBody(networkResponse)) {      GzipSource responseBody = new GzipSource(networkResponse.body().source());      Headers strippedHeaders = networkResponse.headers().newBuilder()          .removeAll("Content-Encoding")          .removeAll("Content-Length")          .build();      responseBuilder.headers(strippedHeaders);      String contentType = networkResponse.header("Content-Type");      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));    }

RealCall构造方法

在RealCall构造方法上面,早期版本的RealCall构造方法中将EventListener.Factory以及EventListenerFactory.Create()分开处理导致RealCall构造方法非线程安全. 现在版本的RealCall的构造函数使用OkHttpClient.eventListenerFactory().create()

早期版本如下:

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();    this.client = client;    this.originalRequest = originalRequest;    this.forWebSocket = forWebSocket;    //重试和跟进拦截器    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);    // TODO(jwilson): this is unsafe publication and not threadsafe.      // 这是不安全的发布,不是线程安全的。    this.eventListener = eventListenerFactory.create(this);  }

现在 OkHttp 3.11.0 的RealCall源代码如下

final class RealCall implements Call {   private EventListener eventListener;    ...   private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {        this.client = client;        this.originalRequest = originalRequest;        this.forWebSocket = forWebSocket;        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);   }  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {    // Safely publish the Call instance to the EventListener.    RealCall call = new RealCall(client, originalRequest, forWebSocket);    call.eventListener = client.eventListenerFactory().create(call);    return call;  }

CacheStrategy

此文由 IT狗 编辑,本网站所发布展示的作品/文章版权归原作者所有,任何商业用途均须联系作者!

相关推荐

评论 暂无评论