揭开Kotlin协程的神秘面纱

banq 18-08-08
                   

Kotlin协程提供了一种新的异步执行方式,但直接查看库函数可能会有点混乱,本文中尝试揭开协程的神秘面纱。

让我们从基础开始吧,假设有一个名为launch可以用来启动协程


private fun myHeavyFunction() {
Log.e("Thread Running ", Thread.currentThread().name)
}
val job = launch { myHeavyFunction() }


上面的代码是使用launch一种非常简单的方法,返回Job一个异步执行函数,Job代表一个协程coroutine作业,可以取消或查询它的状态。


override fun onStop() {
if (job.isActive) {
job.cancel()
}
}



现在,如果查看我们的日志,检查我们的函数实际运行的是哪个线程?我们就会得到类似的结果

E / Thread运行:ForkJoinPool.commonPool-worker-2

我们的代码是在一个线程中运行的,让我们稍微了解一下launch本身:


public fun launch(
context:CoroutineContext =DefaultDispatcher,
start:CoroutineStart CoroutineStart.DEFAULT,
parent:Job?=null,
onComp1etion:CompletionHand1er? =null,
block:suspend CoroutineScope.()->Unit
):Job{




再看看DefaultDispatcher的值是什么?


@Suppress("PropertyName ")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool

object CommonPool:CoroutineDispatcher()



launch是将CoroutineContext作为第一个参数,这个参数值默认为代表一个CommonPool线程池类的DefaultDispatcher,这个线程池类根据当前CPU处理器总数创建一个带有Executors的CoroutineContext。完整代码在这里


launch是一种协程构建器,可以接受一个协程分配器CoroutineDispatcher,分配器实际上负责在单独的线程中运行代码。

我们可以轻松创建自己的分配器:


val singleThreadDispatcher = newSingleThreadContext("singleThreadDispatcher")


newSingleThreadContext 由Kotlin协同程序库本身提供,用于创建仅在单个线程上运行的上下文。我们可以在此基础上创建自己的函数:


fun <T> singleThreadAsync(block: () -> T): Job = launch(singleThreadDispatcher) { block.invoke() }

job = singleThreadAsync { myHeavyFunction() }


下面是运行后的日志

E / Thread运行:singleThreadDispatcher

所以我们用我们自己的线程方案创建了我们自己的简单协程:)

让我们看看我们可以通过Dispatchers做更多事情:


object MyDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
thread {
block.run()
}
}
}

object RxDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
Observable.fromCallable { block.run() }
.subscribeOn(Schedulers.io())
.subscribe {}

}
}

object UIDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
Handler(Looper.getMainLooper()).post {
block.run()
}
}
}


在这里,我们创建了三个不同的分配器程序并重载了dispatch方法, 我们在每个dispatch方法中以不同的方式执行Runnable块,也就是一个简单的线程,这个异步线程是使用RxJava实现,而Android主线程是使用Handler完成。

如果我们用这些分配器程序执行我们的函数,我们会得到这些日志

E / Thread Running:Thread-582
E / Thread Running:RxCachedThreadScheduler-1
E / Thread Running:main

这真的显示了协同程序的强大功能,因为Coroutines只是语言语法,它们与运行它们的平台无关。不同线程的职责分配只需开发人员使用一组函数就能实现,他可以在Rx线程或主线程上执行他喜欢的协同程序。

协同程序就像空的冰淇淋甜筒,你可以选择你想要冰淇淋的填入。

无线程Thread-less异步
编写异步代码传统上被认为是一种线程工作,其实并不总是如此,让我们看看如何使用Coroutines解决这个问题

让我们看看一系列函数执行


mySmallFunction1()
myHeavyFunction() // Takes 3 seconds to execute
mySmallFunction2()
//Order运行顺序
E/mySmallFunction1 running on: main
E/myHeavyFunction running on: main
E/mySmallFunction2 running on: main


现在因为myHeavyFunction()函数需要很长时间才能执行,所以我们可能想要异步执行它。


mySmallFunction1()
thread { myHeavyFunction() } //Execution in a separate thread.
mySmallFunction2()
//Order顺序
E/mySmallFunction1 running on: main
E/mySmallFunction2 running on: main
E/myHeavyFunction running on: Thread-697


这里我们将myHeavyFunction()迁移到一个单独的线程并异步执行它,但是如果我们这样做:

mySmallFunction1()
launch(UI) { myHeavyFunction() }
mySmallFunction2()
//Order
E/mySmallFunction1 running on: main
E/mySmallFunction2 running on: main
E/myHeavyFunction running on: main


这里我们在主线程上运行的Coroutine上下文(UI:由coroutine-android库提供)中执行重量函数,执行仍然是异步的,因为Coroutines是通过暂停这部分函数处理,但函数执行仍然发生在主线程上,而不创建额外的线程。

实战协程
在大多数情况下,我们需要来自一个异步执行的回调,这样我们就可以通过回调函数来更新UI等,这里就可以使用Deferred语法:


Deferred本身继承扩展了Job,但增加一个额外的功能,它可以在函数完成执行后返回未来的值。


让我们看看我们在这里做了什么:

fun <T> asyncExecutor(block: () -> T, response: (T) -> Unit): Job {
return launch(UI) {
val deferred = async(singleThreadDispatcher) { block.invoke() }
response.invoke(deferred.await())
}
}

让我们分析一下:

1. launch(UI)使用Android的UI所在的线程上下文创建一个协同Job。
2. 我们通过async异步创建了另一个协同程序,其中包含我们需要调用的函数,唯一的区别是:这个协程返回一个Deferred值,async是协程库的一部分。
3. 我们调用await()函数来捕获Deferred的未来值。这是在UI所在线程上下文中捕获的。

总而言之,我们创建了一个异步执行程序,我们可以在其中传递函数并让它们异步执行,然后将值返回给UI线程。

现在我们在哪里可以使用它 ? 数据库查询


// Insert into DB without callback
singleThreadAsync { movieDataBase.movieDao().insert(movieObject) }

// Get List of movies from DB and filter it
asyncExecutor({ movieDataBase.movieDao().getAll() }, { movieList ->
movieList
.filter { it.isFavorite }
.map { it.originalLanguage =
"English" }
//Dispatch to UI
})


我们将插入到DB的请求变成了一个发射就可以忘记不用等待结果的异步请求,这是使用singleThreadAsync实现的 。

当我们从DB检索数据时,我们可以使用我们的asyncExecutor来检索对象列表,然后使用Collection Framework中的运算符发挥所有kotlin优点啦!

Demystifying Kotlin Coroutines – ProAndroidDev