2019年6月27日 星期四

理解 Kotlin 的 Coroutines 的運作行為

參考文章: https://kotlinlang.org/docs/reference/coroutines/basics.html
簡單來說, Kotlin 的 Coroutines 是一個資源共享的程序區塊簡稱窟程序(一段程序走著走著跑到窟窿去了, 後續可以命令它再鑽出來), 它可以將一條執行緒(Thread)分裂(一個 launch 或 async 區塊只會分裂一次)成不同的程序路徑, 經多次分裂(launchs 或是 asyncs)後就像是(但不是)多條執行緒同時運行, 先了解一下 runBlocking {   } 的運作, 顧名思義,它就是一個阻礙式(blocking)函數 λ (lambda), 區塊內如同一般的函式, 等到區塊內的程序走完才能繼續運行下一步, 但在 λ 區塊 { } 作用(scope)下可以建立出同時進行的程序區塊(Coroutine), 也就是利用像是 async {  } 或是 launch { } 去分裂出 Coroutine,  讓原本循序的程式碼一分為二又不阻塞(block)程式繼續運行, launch 建立出程序區塊後會傳回一個在背景運作的工作代號(job), 後續透過方法 join( ) 可以同步確認程序的完成. 它適用於不需回傳結果的程序,或者稱之為射後不理程序區塊, 至於 async 建立的程序區塊則是從發動的 suspend fun 中獲取一個延遲物件 Deferred <T> , 其中的 <T> 視函式(function)的傳回值而定, 後續再透過延遲物件的方法 await( ) 去等待(suspend)回傳輸出結果, 這是 async 與 launch 區塊應用時最大的區別, 每一個 launch 或 async 區塊都是獨立並行運作的, 一經發動就在背景依序運行, Coroutine 區塊內每條程序也就循序的(sequential)運作起來. 我們可以把 runBlocking 看成是循序程式碼與程序區塊間的中介運作橋樑,如果想要在主程序發動 Coroutine, 另外可以用  GlobalScope.launch { } 去建立與主程序同時進行的程序區塊讓它在背景同時運行, 與 runBlocking 不同的是 GlobalScope.launch 直接回傳一個背景代碼就繼續運行而不會阻塞. 一旦產生 GlobalScope.launch 的 job 後, 就有責任需要呼叫其方法 join( ), 等待該 job 同步完成, 否則有可能程序未完成前就被終結掉, 但 join( ) 也只能放在 Coroutine 區塊內(scope)呼叫. 至於 runBlocking 本身就會等待內部 Coroutine 完成, 正如同程式名所言.因此不用呼叫對應的 join( ).以下範例程式可以貼到 https://play.kotlinlang.org  執行  Playground 看看:

import kotlinx.coroutines.*
fun main( ) {
     println("main start")
     val     job = GlobalScope.launch {
             println("Global Job 1 start")
             delay(2000L)   // suspend for 2 seconds
             println("Global Job 1 stop")
     }
    println("main running")
    runBlocking {
            launch {
                   println("Job 2 start")
                   delay(500L)   // suspend for  500ms = 0.5 sec
                   println("Job 2 stop")
            }        
            launch {
                   println("Job 3 start")
                   delay(1000L) // suspend for 1 sec
                   println("Job 3 stop")
            }  
            job.join()  // wait job in GlobalScope.launch to finish
      }
     println("main stop")
}
看輸出結果可以想像: 上述程序建構了(launch) 了 3 個並行的工作, 就好像從主程序分裂出 3 條執行緒同時運行, 主程序走到 runBlocking 會一直等待直到它結束, 最後主程序執行最後一行列印指令終結任務:
main start
main running

Global Job 1 start
Job 2 start
Job 3 start

Job 2 stop
Job 3 stop

Global Job 1 stop
main stop

Coroutine 運作的行為方式很像是(但不是)執行緒(Thread), 每一個 coroutine 都會攜帶 Context (文本), 在此架構下去運作一個輕量級執行緒, 透過執行緒派任工 Dispatchers 將任務(Task  or job)派發至專屬文本的執行緒去運行,換句話說, 他並不一定要產生執行緒, 而是將任務分配給執行緒,定義可供 coroutine 呼叫的函式, 之前要加上 suspend 關鍵字讓 Kotlin compiler 把它轉變成可以暫停或持續運作的函式.  Kotlin 幾個重要的派任工:
1. Dispatchers.Main: 通常是派任至前景執行緒
2. Dispatchers.IO: 通常是派任至背景,需長時間運作的執行緒
3. Dispatchers.Default: 通常是派任至背景, 短時間加速完成的執行緒
4. Dispatchers.Unconfined: 派任至目前呼叫的 Thread, 開始執行一直到第一個暫停點結束.
透過內建 withContext 及派任工, 可以將任務交給專屬執行緒在其所屬文本底下運行, 或是創建一群共同生命期(coutineScope) coroutines, 他既不組塞目前 Thread 運作, 又可以非同步操作函式, 等到裏面全數 coroutine結束才會終結生命:
suspend fun mySuspend( ) =   withContext(Dispatcher.Main) { // dispatch to Thread   
      //  dispatch to main UI thread
}
suspend fun myScope( ) = coroutineScope { // create a group of coroutines to live together
      async {
         // ...
      } 
      launch {

         // ...
      }
} // live until finishing all of coroutines (async or launch)
一個 Coroutine 是一個有生命周期(scope)的任務(Job),透過 coroutineContext[Job]  就可以獲取目前的任務. CoroutineScope 介面只含唯一屬性 coroutineContext,  至於 async 與 launch 是 CoroutineScope 介面的從屬函數, coroutineScope 創建工則是另一個 suspend 的函式, 它根據 coroutineContext 去建立 coroutine 上下父子關係(承上啟下). 參考文章:
         https://medium.com/@elizarov/coroutine-context-and-scope-c8b255d59055:
或許可以這樣理解: coutineScope { } 是 Kotlin 的一群 coroutines 區塊的生命期創建工(builder), 而 async { } 或 launch { } 僅是單一 coroutine 區塊生命期創建工.
Task, Context, Thread 都是作業系統的專有名詞, Task 是一個獨立作業的前景或背景任務簡稱任務, Context 是執行任務時的特定參數資料庫(database)簡稱文本, Thread 則是運行任務時可以走的程序路徑(program path)簡稱執行緒, 當 CPU 支援硬體的 multiThread (多工緒)或是透過軟體摹擬出 Thread pool(工緒池塘, 一整池的執行緒)時就可以將程式碼分配到不同路徑去執行,主要目地是增加運算效率並改善使用者體驗. 過多的 Threads 對記憶體需求將是一大考驗, 但對 Coroutines 卻不是問題, 這是他神奇的地方. 查了網路理解了一下 Coroutine 的運作原理: 首先透過 suspend 函式宣告後, 編譯器(compiler) 將 suspend 函式轉變成狀態機(state machine), 呼叫時再傳一個叫作 Continuation 的物件進入該函式, 之後透過 Continuation 物件去掌控函式暫停(suspend)或重啟(resume)的狀態, 有點類似Javascript yield 的運行方式, 暫停時傳回值並保留運作狀態, 後續回頭重啟時再從暫停的地方繼續執行,其壓根就不是 Thread, 因此可以透過派任的方式讓 Coroutines 在其他 Thread 上運行,最後來看一個例子:

    import kotlin.coroutines.*
    import kotlinx.coroutines.*
    lateinit var continuation1: Continuation < Unit >
    lateinit var continuation2: Continuation < Unit >
    fun main(args: Array <String> ) {
        println("Stat of main")
        GlobalScope.launch(Dispatchers.Unconfined) {
             println("Coroutine Start")   
             suspendCoroutine<Unit> { // it is a Continuation object
                println("Coroutine seq#1 running")
                continuation1 = it   // save the continuation object to be resume later
                println("Coroutine seq#1 suspend") // will suspend here
             }                
             println("Coroutine running") 
             suspendCoroutine<Unit> { // it is a Continuation object
                println("Coroutine seq#2 running")
                continuation2 = it   // save the continuation object to be resume later
                println("Coroutine seq#2 suspend") // will suspend here
             }   
             println("Coroutine stop")
         }
         println("Resume 1 in main")
         continuation1.resume(Unit) // resume through continuation
         println("Resume 2 in main")
         continuation2.resume(Unit) // resume through continuation
         println("End of main")
    }
輸出結果:
Stat of main
Coroutine Start
Coroutine seq#1 running
Coroutine seq#1 suspend
Resume 1 in main
Coroutine running
Coroutine seq#2 running
Coroutine seq#2 suspend
Resume 2 in main
Coroutine stop
End of main

可以看到兩個 suspend 函式放在一個 launch 區塊內時, 他們是接續(sequential)運行, 並非想像中同時(parallel)運作, 若真要同時(運作兩個程序區塊, 那就要把他放在不同的 launch 或是 async 區塊去運行, 像是這樣就可以了:
    import kotlin.coroutines.*
    import kotlinx.coroutines.*
    lateinit var continuation1: Continuation < Unit >
    lateinit var continuation2: Continuation < Unit >
    fun main(args: Array < String > ) {
        println("Stat of main")
        GlobalScope.launch(Dispatchers.Unconfined) {
             println("Coroutine Start") 
             val seq1 = async {
                 suspendCancellableCoroutine < Unit > { // it is a Continuation object
                     println("Coroutine seq#1 running")
                     continuation1 = it   // save the continuation object to be resume later
                     println("Coroutine seq#1 suspend") // will suspend here
                 }
                 "Coroutine seq#1 stop" // return a string
             }
             "Coroutine running"
             val seq2 = async {
                 suspendCancellableCoroutine < Unit > { // it is a Continuation object
                     println("Coroutine seq#2 running")
                     continuation2 = it   // save the continuation object to be resume later
                     println("Coroutine seq#2 suspend") // will suspend here
                 }
                 "Coroutine seq#2 stop" //  return a string
             }             
             println(seq1.await()) // will suspend until seq1 return
             println(seq2.await()) // will suspend until seq2 return
             println("Coroutine stop")
         }
         println("Resume 1 in main")
         continuation1.resume(Unit) // resume through continuation
         println("Resume 2 in main")
         continuation2.resume(Unit) // resume through continuation
         println("End of main")
    }
執行結果:
Stat of main
Coroutine Start
Coroutine running
Coroutine seq#1 running
Coroutine seq#1 suspend
Coroutine seq#2 running
Coroutine seq#2 suspend
Resume 1 in main
Coroutine seq#1 stop
Resume 2 in main
Coroutine seq#2 stop
Coroutine stop
End of main

沒有留言: