前言

kotlin 作为 Android 的第一开发语音, 其优势还是挺多的。 其中协程就是 JAVA 外的功能。 其实 kotlin 的协程内部也是使用的线程池, 不过写法超级简单, 切换线程很简单, 可读性很高。 rxjava 等也对切换线程等有好的封装, 但是相对于协程来说还是有点繁琐。 淘金内部新开发的内容已经不再使用线程池了, 全部切换到协程中。 所以了解 协程的基本远离还是很有必要的。

基本写法

1
2
3
4
5
6
7
lifecycleScope.launch {
Log.e(TAG, "1")
withContext(Dispatchers.IO) {
Log.e(TAG, "2")
}
Log.e(TAG, "3")
}

其中 1 是在主线程, 2 是在子线程, 3 是在主线程。 我们先看其流程原理。
我们看下代码编译成 Kotlin Bytecode 和 java 代码长什么样子。

1
2
3
// 主要关心的部分
final class cn/xiaoxige/androidtripartitestudy/MainActivity$xiaoxige$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public final void helloWord(@NotNull View view) {
Intrinsics.checkNotNullParameter(view, "view");
EventBus.getDefault().post(new MyEvent());
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
Log.e("MainActivity", "1");
CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
Function2 var10001 = (Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
return Boxing.boxInt(Log.e("MainActivity", "2"));
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}

public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
});
this.label = 1;
if (BuildersKt.withContext(var10000, var10001, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

Log.e("MainActivity", "3");
return Unit.INSTANCE;
}

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}

public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}

通过上面的代码我们可以理出来几个关键性的认知。
通过 kotlin bytecode 我们可以看出来, 我们最后的 {}, 回调是一个内部类, 继承了 SuspendLambda, 其中 SuspendLambda 又继承了 ContinuationImpl, ContinuationImpl 继承了 BaseContinuationImpl。
通过编译后的 JAVA 代码可以很明显的看出, 我们要执行的代码, 通过 label 控制, 肯定调用 invokeSuspend 方法, 然后一个一个的运行。 也可以想到肯定多次调用 invokeSuspend, 每调用一次就运行一块代码去执行相应的代码。
通过 kotlin bytecode 和 编译后的 java 代码关联起来。 我们发现, BaseContinuationImpl 的 resumeWith 方法中调用了 invokeSuspend 方法。 所以呢, 我们关键找到哪里去调用了 BaseContinuationImpl 的 resumeWith 方法。

源码分析

流程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

其中 newCoroutineContext 可以看出, 把 context 进行了合并, 并且如果没有手动传入 dispatcher 的话, 就默认添加一个 Dispatchers.Default。 这个 Default 是 DefaultScheduler。
继续往下看。 coroutine.start(start, coroutine, block), 开启了协程。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}

可以看下注释, 我们可以看到默认的 DEFAULT 模式下是调用的 startCoroutineCancellable, 点进去。

1
2
3
4
5
6
7
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

我们看下 createCoroutineUnintercepted 的返回值是啥了, 这个我们看不到, 但是我们传入了 receiver, 这个是啥, 这个就是协程那个 {}, 也就是那个自动生成的内部类哈。 我们先点 intercepted 方法。

1
2
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this

从 intercepted 可以看出, createCoroutineUnintercepted 返回的是 ContinuationImpl, 或者准确的说至少是以 ContinuationImpl 为父类的一个类。 还记得那个自动生成的内部类吗? 我们是否可以理解为是对内部类的一层包装呢。 不影响我们理解全局的流程哈。
我们再次看下 ContinuationImpl 的 intercepted 的方法。

1
2
3
4
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

我们可以看到返回了 context[ContinuationInterceptor]?.interceptContinuation(this)。 其中 context[ContinuationInterceptor] 就是上面提到的 Dispatchers.Default, 也就是 DefaultScheduler。 我们看看 DefaultScheduler 里的 interceptContinuation 怎么样的逻辑。 注意 interceptContinuation 传入的参数是 this, 这个 this 是什么呀, 是 createCoroutineUnintercepted, 也就是是包装的那个内部类, 也就是调用了 this 的 resumeWith 就会运行到 invokeSuspend, 也就最终运行到了协程 {} 的那个内部类了哈。 OK , 很好, 我们现在看下 DefaultScheduler 中 的 interceptContinuation 这个家伙。

1
2
3
// DefaultScheduler -> ExperimentalCoroutineDispatcher -> ExecutorCoroutineDispatcher -> CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)

通过继承关系, 我们发现最终返回了 DispatchedContinuation。 也就是 intercepted 最终返回了 DispatchedContinuation。 先看下参数加固下记忆哈, DispatchedContinuation 中传入了 this , 也就是 DefaultScheduler, continuation 也就是我们传过来的包装内部类哈。
我们回到最原先的代码中。

1
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

最后调用了 resumeCancellableWith 方法, 也就是调用了 DispatchedContinuation 的 resumeCancellableWith 方法。 废话不说, 直接上代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}

我们可以看到如果需要分发的话, 调用的是 dispatcher 的 dispatch。 否则调用 resumeUndispatchedWith 方法。 首先先明确先 dispatcher 是上面分析的 DefaultScheduler。 我们先看看 isDispatchNeeded 判断了什么。
你会发现 DefaultScheduler 中没有复写, 一致向上找父类。

1
2
// CoroutineDispatcher
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

好吧, 我们看看 DefaultScheduler 的 dispatch 干了什么。 注意 DefaultScheduler 的 dispatch 传入的 this 是当前 DispatchedContinuation 哈。

1
2
3
4
5
6
7
8
9
10
11
12
13
// ExperimentalCoroutineDispatcher
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}

private var coroutineScheduler = createScheduler()

private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

通过 coroutineScheduler(线程池) 的 dispatch 调用 block。 也就是 CoroutineScheduler 的 dispatch。 等下, 还记得 block 是什么吧, 对, 是 DispatchedContinuation。

1
override fun execute(command: Runnable) = dispatch(command)

往下继续分析没有必要了, 知道把 DispatchedContinuation 添加到线程池中进行运行了, 也就是 DispatchedContinuation 的 run 方法运行了。 我们直接看 DispatchedContinuation 的 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// DispatchedContinuation -> DispatchedTask
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}

我们看到中间运行的 continuation.resume 开头的方法, 我们挨个点进去, 不管运行哪个, 最终都会调用到 resumeWith, 然后就调用到了 invokeSuspend 方法。
到这里整个流程讲完了。 我们粗略的整理下:

  • 我们写的协程需要运行的 {} 会编译成一个内部类, 这个内部类继承了 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl。 调用到 BaseContinuationImpl 中的 resumeWith 就会调用到生成内部类中的 invokeSuspend 方法进而运行我们的代码。
  • 最终把内部类和 dispatcher 传入 DispatchedContinuation 类中, DispatchedContinuation 继承了 Runnable。 使用 dispatcher 去运行 Runable 的 run 方法, run 方法最终又调用到了内部类的 resumeWith 方法。

通过上面的分析我们可以隐约看出来什么呢?

  • dispatcher 里面运行的 runnable, 所以 dispatcher 是如果是线程池的话,那么就是子线程运行, 如果是 Handler(Loop.Main) 的话, 那么就是对于的 Android 中的主线程。
  • 最终必须调用到包装内部类的 resumeWith 方法, 不然的话当前运行的结果就传递不下去, 就不会再运行内部类的 invokeSuspend 方法进而运行我们协程写的逻辑。
  • 我们是不是可以中间插入自己的 Dispatcher 去运行然后逮着结果调用原来的 invokeSuspend 方法, 是不是就实现了顺序的插入我们的逻辑了, 而且插入的这个逻辑可以在任何线程, 因为呢, 因为我们不调用 invokeSuspend 方法就相当于是断的。

那么它到底是怎么切换线程的呢? 我们再往下看看。

切换线程分析

我们看下 withContext 是怎么搞的。

1
2
3
4
5
6
7
8
9
10
fun helloWord(view: View) {
EventBus.getDefault().post(MyEvent())
lifecycleScope.launch {
Log.e(TAG, "1")
withContext(Dispatchers.IO) {
Log.e(TAG, "2")
}
Log.e(TAG, "3")
}
}

通过 kotlin bytecode 和编译后的 JAVA 代码, 我们可以看出 withContext 也是自动生成了外部 launch {} 内部类的内部类。 所以 withContext 内部类的可以拿到外部内部类的实例哈。 其中 uCont 就是外部内部类的实例, 也就是我们在流程分析中的那个内部类。 所以按照我们的想法, 我们直接在 withContext 中去 dispatcher 执行我们自己的逻辑, 然后最后调用外部内部类的 resumeWith 即可。 真的是这样的吗? 我们看下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
val newContext = oldContext + context
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}

我们看看 DispatchedCoroutine。

1
2
3
4
5
6
DispatchedCoroutine -> ScopeCoroutine

override fun afterResume(state: Any?) {
// Resume direct because scope is already in the correct context
uCont.resumeWith(recoverResult(state, uCont))
}

还真是哈, 在 withContext 内部类执行完成后, 逮着结果调用了外部内部类的 resumeWith 方法。 从而又继续在外部方法体里运行了哈。
其中 block.startCoroutineCancellable(coroutine, coroutine) 就开始运行 withContext 协程了哈。

1
2
3
4
5
6
7
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

代码跟我们之前的流程分析就一致了哈。
到此 kotlin 协程分析完毕。