前言 kotlin 作为 Android 的第一开发语音, 其优势还是挺多的。 其中协程就是 JAVA 外的功能。 其实 kotlin 的协程内部也是使用的线程池, 不过写法超级简单, 切换线程很简单, 可读性很高。 rxjava 等也对切换线程等有好的封装, 但是相对于协程来说还是有点繁琐。 淘金内部新开发的内容已经不再使用线程池了, 全部切换到协程中。 所以了解 协程的基本远离还是很有必要的。
基本写法 1 2 3 4 5 6 7 lifecycleScope.launch { Log.e(TAG, "1" ) withContext(Dispatchers.IO) { Log.e(TAG, "2" ) } Log.e(TAG, "3" ) }
其中 1 是在主线程, 2 是在子线程, 3 是在主线程。 我们先看其流程原理。 我们看下代码编译成 Kotlin Bytecode 和 java 代码长什么样子。
1 2 3 final class cn /xiaoxige /androidtripartitestudy /MainActivity $xiaoxige $1 extends kotlin /coroutines /jvm /internal /SuspendLambda implements kotlin /jvm /functions /Function2 {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public final void helloWord(@NotNull View view) { Intrinsics.checkNotNullParameter(view, "view" ); EventBus.getDefault().post(new MyEvent()); BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this ), (CoroutineContext)null , (CoroutineStart)null , (Function2)(new Function2((Continuation)null ) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this .label) { case 0 : ResultKt.throwOnFailure($result); Log.e("MainActivity" , "1" ); CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO(); Function2 var10001 = (Function2)(new Function2((Continuation)null ) { int label; @Nullable public final Object invokeSuspend(@NotNull Object var1) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this .label) { case 0 : ResultKt.throwOnFailure(var1); return Boxing.boxInt(Log.e("MainActivity" , "2" )); default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine" ); } } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion" ); Function2 var3 = new <anonymous constructor >(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this .create(var1, (Continuation)var2)).invokeSuspend(Unit .INSTANCE); } }); this .label = 1 ; if (BuildersKt.withContext(var10000, var10001, this ) == var2) { return var2; } break ; case 1 : ResultKt.throwOnFailure($result); break ; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine" ); } Log.e("MainActivity" , "3" ); return Unit .INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion" ); Function2 var3 = new <anonymous constructor >(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this .create(var1, (Continuation)var2)).invokeSuspend(Unit .INSTANCE); } }), 3 , (Object)null ); }
通过上面的代码我们可以理出来几个关键性的认知。 通过 kotlin bytecode 我们可以看出来, 我们最后的 {}, 回调是一个内部类, 继承了 SuspendLambda, 其中 SuspendLambda 又继承了 ContinuationImpl, ContinuationImpl 继承了 BaseContinuationImpl。 通过编译后的 JAVA 代码可以很明显的看出, 我们要执行的代码, 通过 label 控制, 肯定调用 invokeSuspend 方法, 然后一个一个的运行。 也可以想到肯定多次调用 invokeSuspend, 每调用一次就运行一块代码去执行相应的代码。 通过 kotlin bytecode 和 编译后的 java 代码关联起来。 我们发现, BaseContinuationImpl 的 resumeWith 方法中调用了 invokeSuspend 方法。 所以呢, 我们关键找到哪里去调用了 BaseContinuationImpl 的 resumeWith 方法。
源码分析 流程分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public fun CoroutineScope.launch ( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope .() -> Unit ) : Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine } public actual fun CoroutineScope.newCoroutineContext (context: CoroutineContext ) : CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null ) debug + Dispatchers.Default else debug }
其中 newCoroutineContext 可以看出, 把 context 进行了合并, 并且如果没有手动传入 dispatcher 的话, 就默认添加一个 Dispatchers.Default。 这个 Default 是 DefaultScheduler。 继续往下看。 coroutine.start(start, coroutine, block), 开启了协程。
1 2 3 4 5 6 7 8 9 10 11 12 public fun <R> start (start: CoroutineStart , receiver: R , block: suspend R .() -> T ) { start(block, receiver, this ) }
可以看下注释, 我们可以看到默认的 DEFAULT 模式下是调用的 startCoroutineCancellable, 点进去。
1 2 3 4 5 6 7 internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit )? = null ) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit ), onCancellation) }
我们看下 createCoroutineUnintercepted 的返回值是啥了, 这个我们看不到, 但是我们传入了 receiver, 这个是啥, 这个就是协程那个 {}, 也就是那个自动生成的内部类哈。 我们先点 intercepted 方法。
1 2 public actual fun <T> Continuation<T> .intercepted () : Continuation<T> = (this as ? ContinuationImpl)?.intercepted() ?: this
从 intercepted 可以看出, createCoroutineUnintercepted 返回的是 ContinuationImpl, 或者准确的说至少是以 ContinuationImpl 为父类的一个类。 还记得那个自动生成的内部类吗? 我们是否可以理解为是对内部类的一层包装呢。 不影响我们理解全局的流程哈。 我们再次看下 ContinuationImpl 的 intercepted 的方法。
1 2 3 4 public fun intercepted () : Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this ) ?: this ) .also { intercepted = it }
我们可以看到返回了 context[ContinuationInterceptor]?.interceptContinuation(this)。 其中 context[ContinuationInterceptor] 就是上面提到的 Dispatchers.Default, 也就是 DefaultScheduler。 我们看看 DefaultScheduler 里的 interceptContinuation 怎么样的逻辑。 注意 interceptContinuation 传入的参数是 this, 这个 this 是什么呀, 是 createCoroutineUnintercepted, 也就是是包装的那个内部类, 也就是调用了 this 的 resumeWith 就会运行到 invokeSuspend, 也就最终运行到了协程 {} 的那个内部类了哈。 OK , 很好, 我们现在看下 DefaultScheduler 中 的 interceptContinuation 这个家伙。
1 2 3 public final override fun <T> interceptContinuation (continuation: Continuation <T >) : Continuation<T> = DispatchedContinuation(this , continuation)
通过继承关系, 我们发现最终返回了 DispatchedContinuation。 也就是 intercepted 最终返回了 DispatchedContinuation。 先看下参数加固下记忆哈, DispatchedContinuation 中传入了 this , 也就是 DefaultScheduler, continuation 也就是我们传过来的包装内部类哈。 我们回到最原先的代码中。
1 createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit ), onCancellation)
最后调用了 resumeCancellableWith 方法, 也就是调用了 DispatchedContinuation 的 resumeCancellableWith 方法。 废话不说, 直接上代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 inline fun resumeCancellableWith ( result: Result <T >, noinline onCancellation: ((cause : Throwable ) -> Unit )? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this ) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } }
我们可以看到如果需要分发的话, 调用的是 dispatcher 的 dispatch。 否则调用 resumeUndispatchedWith 方法。 首先先明确先 dispatcher 是上面分析的 DefaultScheduler。 我们先看看 isDispatchNeeded 判断了什么。 你会发现 DefaultScheduler 中没有复写, 一致向上找父类。
1 2 public open fun isDispatchNeeded (context: CoroutineContext ) : Boolean = true
好吧, 我们看看 DefaultScheduler 的 dispatch 干了什么。 注意 DefaultScheduler 的 dispatch 传入的 this 是当前 DispatchedContinuation 哈。
1 2 3 4 5 6 7 8 9 10 11 12 13 override fun dispatch (context: CoroutineContext , block: Runnable ) : Unit = try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { DefaultExecutor.dispatch(context, block) } private var coroutineScheduler = createScheduler()private fun createScheduler () = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
通过 coroutineScheduler(线程池) 的 dispatch 调用 block。 也就是 CoroutineScheduler 的 dispatch。 等下, 还记得 block 是什么吧, 对, 是 DispatchedContinuation。
1 override fun execute (command: Runnable ) = dispatch(command)
往下继续分析没有必要了, 知道把 DispatchedContinuation 添加到线程池中进行运行了, 也就是 DispatchedContinuation 的 run 方法运行了。 我们直接看 DispatchedContinuation 的 run 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public final override fun run () { assert { resumeMode != MODE_UNINITIALIZED } val taskContext = this .taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() val exception = getExceptionalResult(state) val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null ) { continuation.resumeWithException(exception) } else { continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } }
我们看到中间运行的 continuation.resume 开头的方法, 我们挨个点进去, 不管运行哪个, 最终都会调用到 resumeWith, 然后就调用到了 invokeSuspend 方法。 到这里整个流程讲完了。 我们粗略的整理下:
我们写的协程需要运行的 {} 会编译成一个内部类, 这个内部类继承了 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl。 调用到 BaseContinuationImpl 中的 resumeWith 就会调用到生成内部类中的 invokeSuspend 方法进而运行我们的代码。
最终把内部类和 dispatcher 传入 DispatchedContinuation 类中, DispatchedContinuation 继承了 Runnable。 使用 dispatcher 去运行 Runable 的 run 方法, run 方法最终又调用到了内部类的 resumeWith 方法。
通过上面的分析我们可以隐约看出来什么呢?
dispatcher 里面运行的 runnable, 所以 dispatcher 是如果是线程池的话,那么就是子线程运行, 如果是 Handler(Loop.Main) 的话, 那么就是对于的 Android 中的主线程。
最终必须调用到包装内部类的 resumeWith 方法, 不然的话当前运行的结果就传递不下去, 就不会再运行内部类的 invokeSuspend 方法进而运行我们协程写的逻辑。
我们是不是可以中间插入自己的 Dispatcher 去运行然后逮着结果调用原来的 invokeSuspend 方法, 是不是就实现了顺序的插入我们的逻辑了, 而且插入的这个逻辑可以在任何线程, 因为呢, 因为我们不调用 invokeSuspend 方法就相当于是断的。
那么它到底是怎么切换线程的呢? 我们再往下看看。
切换线程分析 我们看下 withContext 是怎么搞的。
1 2 3 4 5 6 7 8 9 10 fun helloWord (view: View ) { EventBus.getDefault().post(MyEvent()) lifecycleScope.launch { Log.e(TAG, "1" ) withContext(Dispatchers.IO) { Log.e(TAG, "2" ) } Log.e(TAG, "3" ) } }
通过 kotlin bytecode 和编译后的 JAVA 代码, 我们可以看出 withContext 也是自动生成了外部 launch {} 内部类的内部类。 所以 withContext 内部类的可以拿到外部内部类的实例哈。 其中 uCont 就是外部内部类的实例, 也就是我们在流程分析中的那个内部类。 所以按照我们的想法, 我们直接在 withContext 中去 dispatcher 执行我们自己的逻辑, 然后最后调用外部内部类的 resumeWith 即可。 真的是这样的吗? 我们看下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public suspend fun <T> withContext ( context: CoroutineContext , block: suspend CoroutineScope .() -> T ) : T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val oldContext = uCont.context val newContext = oldContext + context val coroutine = DispatchedCoroutine(newContext, uCont) block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } }
我们看看 DispatchedCoroutine。
1 2 3 4 5 6 DispatchedCoroutine -> ScopeCoroutine override fun afterResume (state: Any ?) { uCont.resumeWith(recoverResult(state, uCont)) }
还真是哈, 在 withContext 内部类执行完成后, 逮着结果调用了外部内部类的 resumeWith 方法。 从而又继续在外部方法体里运行了哈。 其中 block.startCoroutineCancellable(coroutine, coroutine) 就开始运行 withContext 协程了哈。
1 2 3 4 5 6 7 internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit )? = null ) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit ), onCancellation) }
代码跟我们之前的流程分析就一致了哈。 到此 kotlin 协程分析完毕。