前言

现在 Android 中的网络框架底层基本都是 OkHttp, 像 retrofit 等框架也是对 OkHttp 的封装。 网络又是编程的基础也是重中之重。 所以了解其原理是非常有必要的, 当然可以学习到里面的优秀思想, 提高自己。 当然水平有限, 只能大致串下整体流程, 记录下来, 以便后续可以快速再次熟悉和快速走进来和回忆, 更多的细节学习日后也要补充进来。
博客参考:

简单使用

答非所问, OkHttp 是网络请求接受数据呢,那么我们分析就从发送网络入手。 其实 OkHttp 的同步请求和异步请求最终调用的方法都是一样的, 只不过同步请求 OkHttp 没有为我们切子线程, 而异步请求是 OkHttp 内部帮我们切换到了子线程执行而已, 其他并没有本质区别。

同步请求

1
2
3
4
5
6
val okhttpClient = OkHttpClient()
val request = Request.Builder()
.url("https:www.baidu.com")
.get()
.build()
okhttpClient.newCall(request).execute()

异步请求

1
2
3
4
5
6
7
8
9
10
11
val okhttpClient = OkHttpClient()
val request = Request.Builder()
.url("https:www.baidu.com")
.get()
.build()
okhttpClient.newCall(request).enqueue(object : Callback{
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
}
})

看到上面同步和异步的请求, 仅仅是最后 newCall 后一个是 execute, 一个是 enqueue, 可想而知, 异步请求帮我们在子线程中执行了, 其肯定是需要一个回调呢。

源码分析

通过上面的同步请求和异步请求, 其大概分为构造一个 OkHttpClient, 构造一个请求(包括请求地址、 请求方式、 参数等), 最后调用 OKHttpClient 把这个请求发出去等待结果即可。 也对, 无非是输入、 和输出。 我们主要写过程, 简单的一带而过。 比如 Request 是我们的输入, 总要告知我们请求那个服务地址, 什么请求方式(get、 pust……), 参数吧。看到 Request 的写法可以看到是一种很经典的设计模式。 然后把请求参数塞给 OkHttpClient 去执行, 并拿到结果输出。
还记得我们刚才说的不管是同步还是异步请求, 最后都是走到了同一个方法, 异步仅仅为我们在内部在子线程运行了而已, 所以我们先看看这个。

同步和异步的实现

提到同步还是异步, 通过上面简单的请求, 我们可以看出来, 调用 newCall 出来的对象进行了处理。 所以看看 newCall。

1
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

很简单, 我们仅仅是构造了一个新的 RealCall 对象, 把 OkHttpClient 和 我们的请求传递过来了, 最后一个我们就不再分析了。 从这个简单的一行代码中, 我们还可能很清晰的知道, 一个请求对于一个 RealCall 哈。
那么好我们现在看看同步请求

同步

1
2
3
4
5
6
7
8
override fun execute(): Response {
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}

首先我们看到我们第一步先把当前的 RealCall 交给了 OkHttpClient 中的 Dispatcher, 然后在最后 finally 中, 又再次调用到了 Dispatcher 中的 finished 该 RealCall。
我们看看 Dispatcher 中的这两个的方法。

1
2
3
4
5
6
7
8
9
10
11
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}

internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
calls.remove(call)
}

是不是很简单, 仅仅是在成员变了中的队列中记录了一下下, 然后最后请求完成后又对其 remove 了。 为啥要这样干呢, 对了其一我们至少可以对其请求进行取消之类的操作。 异步请求也是通过 Dispatcher 进行分发处理的呢。
好现在让我们切回调 execute 中,我们就剩下没有分析的那行代码了, 这行代码就是进行了后续的请求等一系列操作, 异步也是最后到这里, 我们最后在分析这个方法。 我们先来看看异步请求的操作。

异步

1
2
3
override fun enqueue(responseCallback: Callback) {
client.dispatcher.enqueue(AsyncCall(responseCallback))
}

很简单, 也是交给了 Dispatcher, 我们接下来看看这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
}
promoteAndExecute()
}

private fun promoteAndExecute(): Boolean {
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()

if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}

return isRunning
}

我们可以看到 Dispatcher 中的 enqueue 方法, 先把请求放到了 readyAsyncCalls 队列中等待请求,然后调用到 promoteAndExecute, 把等在执行队列的 Call 放到了 executableCalls 中的队列中, 最后再调用 call 的 executeOn 方法。 我们看看 AsyncCall 的 executeOn 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun executeOn(executorService: ExecutorService) {
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
} finally {
if (!success) {
client.dispatcher.finished(this)
}
}
}


override fun run() {
threadName("OkHttp ${redactedUrl()}") {
try {
val response = getResponseWithInterceptorChain()
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
responseCallback.onFailure(this@RealCall, e)
} catch (t: Throwable) {
responseCallback.onFailure(this@RealCall, canceledException)
} finally {
client.dispatcher.finished(this)
}
}
}

通过上面的方法可以看到 executeOn 在调用执行后就到 run 了, 说明下这个 executorService 是线程池哈, AsyncCall 是 Runnable 这里就不再分析了, 所以 run 里面是在子线程中执行的, 可以看到 最后还是执行了 getResponseWithInterceptorChain 方法。 跟同步是一致的。 在最后的最后还是请求完成后还是调用到了 Dispatcher 的 finished 哈, 跟同步的一致, 只不过是不再同一个队列中移出而已。

1
2
3
internal fun finished(call: AsyncCall) {
finished(runningAsyncCalls, call)
}

OK, 现在可以进行下一步了, 看看 getResponseWithInterceptorChain 中发生了什么。

关键流程

通过上面的分析, 不管同步还是异步我们都锁定在了 getResponseWithInterceptorChain 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)

var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
return response
} catch (e: IOException) {
} finally {
}
}
}

首先我们主要分析网络请求的过程, 不再详细分析拦截器链的这个设计了哈。不过不得不说这个设计非常好, 值得我们去借鉴。 我这里就简单的说下, 拦截器链的运行大致流程。
我们可以看到上面我们有很多拦截器, 每一个拦截器都有各自的功能, 通常是这样的哈, 除非你自己设计的拦截器不规范就不说了。 简单说每个拦截器会对参数等进行拦截, 然后调用下一个拦截, 下个拦截器会根据自己的需要进行修改, 最后传递到最后去链接网络发送请求, 最后一个拿到网络数据结果后, 然后再把结果一层一层的原路返回, 这样构成一个链, 当然我们也可以在拦截器中对结果进行各自的处理, 最后返回出去。
是的拦截器就这样工作的, 我们通过上面方法可以看出我们可以定义我们自己的拦截器, 其中从名字看一个是拦截器, 一个是网络拦截器, 有什么区别呢,因为还没有分析各个拦截器的作用, 这里直接说 , 拦截器是在最前面呢, 还未经过相关网络的任何请求, 网络拦截器是在与服务端建立好链接后的拦截器。
好了现在我们对每个拦截器进行简单的分析。

RetryAndFollowUpInterceptor

通过名称我们可以看出来, 这个拦截器的作用有重试、 还有重定向。
我们就简单的看下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)

var response: Response
var closeActiveExchange = true
try {
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
continue
} catch (e: IOException) {
continue
}

val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
return response
}

val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}

其中 call.enterNetworkInterceptorExchange(request, newExchangeFinder) 可以专项记下哈, 因为一个请求的开始, 就会创建一个 ExchangeFinder。

1
2
3
4
5
6
7
8
9
10
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {    
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}

可以看出 newExchangeFinder 控制了是否要重新建立一个 ExchangeFinder, 我们也可以看到 ExchangeFinder 构造的时候我们传入了 connectionPool、 Address、 Call 还有一个监听。 我们可以大胆的猜测, 我们后面的网络请求应该跟 ExchangeFinder 有关。
好, 我们现在看以下代码:

1
2
3
4
5
6
7
8
9
10
while (true){
try {
response = realChain.proceed(request)
} catch (e: RouteException) {
continue
} catch (e: IOException) {
continue
}
}
}

很清晰的看到再收到这两种异常的时候, 会 continue, 然后再次重试。
我们看看重定向怎么做的, 看以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)

if (followUp == null) {
return response
}

val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
request = followUp
priorResponse = response

其中 followUpRequest 就是根据各种返回 code 等来构造请求参数及判断是否重定向, 然后再次进入循环进行下次请求。

BridgeInterceptor

这个拦截器, 直接复制代码了, 给请求头部等进行初始化或者赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()

val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}

val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}

val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}

val networkResponse = chain.proceed(requestBuilder.build())

cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

val responseBuilder = networkResponse.newBuilder()
.request(userRequest)

if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}

return responseBuilder.build()
}

这个没有什么好分析的, 我们直接看下一个拦截器吧

CacheInterceptor

这个主要是进行缓存呢, 如果我们开启了缓存, 且当前链接存储的缓存没有失效的话, 我们就直接返回结果。 这个也不多说了, 可以看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())

val now = System.currentTimeMillis()

val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse

cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}

// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}

// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}

if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}

var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

networkResponse.body!!.close()

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}

val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()

if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}

if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}

return response
}

我们直接跳到下一个拦截器, 当然也是最重要的一个拦截器。

ConnectInterceptor

1
2
3
4
5
6
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}

看着这个代码很少, 其实最核心的链接等都在这个里面。
关键在于我们怎么找到当前的 exchange。 所以我们着重看下 RealCall 的 initExchange 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
internal fun initExchange(chain: RealInterceptorChain): Exchange {
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}

if (canceled) throw IOException("Canceled")
return result
}

写到这里, 我们需要提前了解两个概念, 一个是 exchange、 一个是 codec。
其中 exchange 其实就是一次数据交换, 一次网络请求。 codec 是编码解码器, 这个跟 ffmepg 中的差不多。 网络请求可能会使用 http1.1 或者 http2.0, 其中他们各个协议的格式不同, codec 就是返回统一的处理接口, 当然实现就是对于各自的编解码逻辑了哈。
根据名称可知, exchangeFinder 就是去找到一个合适的编解码器。 我们来看看 find 它是怎么找的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
throw e
} catch (e: IOException) {
throw RouteException(e)
}
}

可以看通过 findHealthyConnection 找到一个健康可用的 connection, 然后通过 connection 的 newCodec 去返回对应的编解码器。 其中 connection 是什么呢, 通过名称我们也可以明显的了解, 它是一个连接。 这个连接肯定是已经明确 http1.1 还是 http2.0 等一些操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection

return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}

看到了吗? 如果是 http2.0 的话就返回 Http2ExchangeCodec, 如果是 http1.1 的话, 就返回 Http1ExchangeCodec。 多说一句哈, 两个版本的区别是啥呢, 这个可以详细的看下这两方面的知识哈。 其中 http2.0 支持多路复用。
继续往下看吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)

if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}

candidate.noNewExchanges()

if (nextRouteToTry != null) continue

val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue

val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue

throw IOException("exhausted all routes")
}
}

简单的看上面的代码可以分析出, 找到一个 connection, 如果是健康可用的直接返回, 如果不可用, 那么把当前链接设置成坏的链接, 然后通过下面条件判断重试还是异常。 这里对 routeSelection 和 routeSelector 多说一句, 其中 selector 包含了多个 selection, 一个 selection 又包含了多个 router, 所以呢当前没有找到好的连接, 那么如果还存在其他 router 可以尝试, 就要切到下一个继续尝试查找 connection。
最重要的代码 findConnection 比较复杂, 我这里就简单的过下主流程。 因为第一次进入和第二次由于重试或者重定向进入有些不同, 所以我们两种情况进行分析大致流程。

第一次进入 findConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 1. 
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

// 2.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes

if (call.isCanceled()) throw IOException("Canceled")

// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}

route = localRouteSelection.next()
}

// 3.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())

// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}

synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}

eventListener.connectionAcquired(call, newConnection)
return newConnection

可以看到我上面编写的编号 1、 2、 3。 其中编号 1 是直接在连接池中查找, 查找到了就直接返回。 编号 2 是,当编号 1 没有找到合适的连接的话, 就通过 router 再次查找。 如果前两次都找不到, 我们就直接来一个新的连接。 详情我就不再多说了, 仅仅说下需要注意的地方。
最后我们手动去创建链接时, 还又一次去对连接池里找, 找到了再次关闭我们创建的, 可能大家有疑问, 那么我们可以看下参数, 第三次我们查找的是 http2.0 的, 因为 http2.0 是多路复用的, 如果我们发现已经存在连接了我们直接用即可。
当然刚才说的把我们连接好的关闭了, 但是也不直接扔掉, 可以看到 nextRouteToTry 这个变量, 下次再次进来的话, 直接就可以用这次这个 route 了哈。
连接中我们还可能需要分 http、 https。 当然也包含 ssl、 tls。 这些就不再过多分析了。
我们可以关注下这三此重连接池找链接的参数

1
2
3
4
5
6
// 1. 
connectionPool.callAcquirePooledConnection(address, call, null, false)
// 2.
connectionPool.callAcquirePooledConnection(address, call, routes, false)
// 3.
connectionPool.callAcquirePooledConnection(address, call, routes, true)

可以看出来, 最后两个参数不一样。 最后一个最明显, 只找多路复用的 http2.0 的链接。 那么第一次和第二次呢, 第一次我们没有 routers, 我们只通过 address 等查找, 当然这里只能是 http1,1, 第二次呢我们传入了 routers, 那么这次我们即找 http1.1, 也找 http2.0。

第二次进入 findConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}

// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
check(toClose == null)
return callConnection
}

// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}

第二次进入, 包括重试、 或者重定向。 那么 connection 可能不为空了, 因为第一次已经经历过了。 如果不为空, 我们就对其判断是否可以使用, 如果可以使用的话, 我们就直接返回了哈。
我们下来看最后一个拦截器吧, 这个就比较简单了, 主要是通过我们在 ConnectInterceptor 中的 codec 去写入或者接受数据。

CallServerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()

exchange.writeRequestHeaders(request)

var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}

if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}

exchange.responseHeadersEnd(response)

response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}

这里就不多说了, 就是通过上个拦截器的连接选择通后根据不同的协议进行数据的处理。

请求对于关系的整理

一个请求对于一个 Request, 也就是一个请求对于一个 RealCall, 一个 RealCall 对于一个 Connection, 一个 Connection 对于一个 socket。 但是 socket 根据版本可能会进行多路复用。
反过来多个 Call 可能对应一个 socket, 因为可能会有多路复用。 一个 socket 对于一个 connection。
在这里插入图片描述

结束

OK, 主要的流程我们已经讲完了。 很多细节没有讲述彻底。 当然水平有限, 也有些本人也不甚了解太多。 后面有时间一定要扩宽下知识面及学习能力。