Въведение в Kotlin Coroutines

1. Общ преглед

В тази статия ще разгледаме съпрограми от езика Kotlin. Най-просто казано, съпрограмите ни позволяват да създаваме асинхронни програми по много плавен начин и те се основават на концепцията за програмиране в стила на продължаване .

Езикът Kotlin ни дава основни конструкции, но може да получи достъп до по-полезни програми с библиотеката kotlinx-coroutines-core . Ще разгледаме тази библиотека, след като разберем основните градивни елементи на езика Kotlin.

2. Създаване на съпрограма с BuildSequence

Нека създадем първа съпрограма, използвайки функцията buildSequence .

И нека внедрим генератор на последователността на Фибоначи, използвайки тази функция:

val fibonacciSeq = buildSequence { var a = 0 var b = 1 yield(1) while (true) { yield(a + b) val tmp = a + b a = b b = tmp } }

Подписът на функция за добив е:

public abstract suspend fun yield(value: T)

Най временно прекратяване на средства за ключови думи, тази функция може да се блокират. Такава функция може да спре съпрограма на buildSequence .

Спиращите функции могат да бъдат създадени като стандартни функции на Kotlin, но трябва да сме наясно, че можем да ги извикваме само от вътрешна програма. В противен случай ще получим грешка в компилатора.

Ако сме спрели повикването в рамките на buildSequence, това обаждане ще бъде трансформирано в специално състояние в държавната машина. Програма може да бъде предадена и присвоена на променлива като всяка друга функция.

В съпрограмата fibonacciSeq имаме две точки на окачване. Първо, когато извикваме yield (1) и второ, когато извикваме yield (a + b).

Ако тази функция за добив доведе до някакво блокиращо повикване, текущата нишка няма да блокира върху нея. Той ще може да изпълни някой друг код. След като суспендираната функция завърши изпълнението си, нишката може да възобнови изпълнението на съпрограмата fibonacciSeq .

Можем да тестваме нашия код, като вземем някои елементи от последователността на Фибоначи:

val res = fibonacciSeq .take(5) .toList() assertEquals(res, listOf(1, 1, 2, 3, 5))

3. Добавяне на зависимостта на Maven за kotlinx-съпрограми

Нека да разгледаме библиотеката kotlinx-coroutines, която има полезни конструкции, изградени върху основните съпрограми.

Нека добавим зависимостта към библиотеката kotlinx-coroutines-core . Имайте предвид, че трябва да добавим и хранилището jcenter :

 org.jetbrains.kotlinx kotlinx-coroutines-core 0.16    central //jcenter.bintray.com  

4. асинхронно програмиране Използване на старта () C oroutine

В kotlinx-coroutines библиотеката добавя много полезни конструкции, които ни позволяват да се създаде асинхронни програми. Да кажем, че имаме скъпа изчислителна функция, която добавя String към входния списък:

suspend fun expensiveComputation(res: MutableList) { delay(1000L) res.add("word!") }

Можем да използваме програма за стартиране, която ще изпълни тази функция за спиране по неблокиращ начин - трябва да предадем пул от нишки като аргумент към нея.

Функцията за стартиране връща екземпляр на Job, в който можем да извикаме метод join () , за да изчакаме резултатите:

@Test fun givenAsyncCoroutine_whenStartIt_thenShouldExecuteItInTheAsyncWay() { // given val res = mutableListOf() // when runBlocking { val promise = launch(CommonPool) { expensiveComputation(res) } res.add("Hello,") promise.join() } // then assertEquals(res, listOf("Hello,", "word!")) }

За да можем да тестваме нашия код, ние предаваме цялата логика в съпрограмата runBlocking - което е блокиращо повикване. Следователно нашите assertEquals () могат да бъдат изпълнени синхронно след кода вътре в метода runBlocking () .

Имайте предвид, че в този пример, въпреки че методът launch () се задейства първо, това е забавено изчисление. Основната нишка ще продължи, като добави низа “Hello,” към списъка с резултати.

След една секунда закъснение, което е въведено във функцията на скъпатаКомпютация () , думата! Низът ще бъде добавен към резултата.

5. Програмите са много леки

Нека си представим ситуация, в която искаме да извършим 100000 операции асинхронно. Хвърлянето на хайвера на толкова голям брой нишки ще бъде много скъпо и вероятно ще доведе до OutOfMemoryException.

За щастие, когато се използват съпрограмите, това не е случай. Можем да изпълняваме колкото се може повече блокиращи операции. Под капака тези операции ще се обработват от фиксиран брой нишки без прекомерно създаване на нишки:

@Test fun givenHugeAmountOfCoroutines_whenStartIt_thenShouldExecuteItWithoutOutOfMemory() { runBlocking { // given val counter = AtomicInteger(0) val numberOfCoroutines = 100_000 // when val jobs = List(numberOfCoroutines) { launch(CommonPool) { delay(1000L) counter.incrementAndGet() } } jobs.forEach { it.join() } // then assertEquals(counter.get(), numberOfCoroutines) } }

Имайте предвид, че изпълняваме 100 000 съпрограми и всяко изпълнение добавя значително забавяне. Въпреки това не е необходимо да се създават твърде много нишки, тъй като тези операции се изпълняват по асинхронен начин, като се използва нишка от CommonPool.

6. Анулиране и изчаквания

Понякога, след като задействаме някакво продължително асинхронно изчисление, искаме да го отменим, защото вече не се интересуваме от резултата.

Когато започнем нашето асинхронно действие с подпрограмата launch () , можем да разгледаме флага isActive . Този флаг е зададен на false, когато основната нишка извиква метода cancel () в екземпляра на Job:

@Test fun givenCancellableJob_whenRequestForCancel_thenShouldQuit() { runBlocking { // given val job = launch(CommonPool) { while (isActive) { println("is working") } } delay(1300L) // when job.cancel() // then cancel successfully } }

Това е много елегантен и лесен начин за използване на механизма за анулиране . В асинхронното действие трябва само да проверим дали флагът isActive е равен на false и да отменим обработката ни.

Когато искаме някаква обработка и не сме сигурни колко време ще отнеме това изчисление, препоръчително е да зададете времето за изчакване за такова действие. Ако обработката не приключи в рамките на дадения таймаут, ще получим изключение и можем да реагираме по подходящ начин.

Например, можем да опитаме отново действието:

@Test(expected = CancellationException::class) fun givenAsyncAction_whenDeclareTimeout_thenShouldFinishWhenTimedOut() { runBlocking { withTimeout(1300L) { repeat(1000) { i -> println("Some expensive computation $i ...") delay(500L) } } } }

Ако не дефинираме време за изчакване, възможно е нишката ни да бъде блокирана завинаги, защото това изчисление ще увисне. Не можем да се справим с този случай в нашия код, ако времето за изчакване не е определено.

7. Изпълнение на асинхронни действия едновременно

Let's say that we need to start two asynchronous actions concurrently and wait for their results afterward. If our processing takes one second and we need to execute that processing twice, the runtime of synchronous blocking execution will be two seconds.

It would be better if we could run both those actions in separate threads and wait for those results in the main thread.

We can leverage the async() coroutine to achieve this by starting processing in two separate threads concurrently:

@Test fun givenHaveTwoExpensiveAction_whenExecuteThemAsync_thenTheyShouldRunConcurrently() { runBlocking { val delay = 1000L val time = measureTimeMillis { // given val one = async(CommonPool) { someExpensiveComputation(delay) } val two = async(CommonPool) { someExpensiveComputation(delay) } // when runBlocking { one.await() two.await() } } // then assertTrue(time < delay * 2) } }

After we submit the two expensive computations, we suspend the coroutine by executing the runBlocking() call. Once results one and two are available, the coroutine will resume, and the results are returned. Executing two tasks in this way should take around one second.

We can pass CoroutineStart.LAZY as the second argument to the async() method, but this will mean the asynchronous computation will not be started until requested. Because we are requesting computation in the runBlocking coroutine, it means the call to two.await() will be made only once the one.await() has finished:

@Test fun givenTwoExpensiveAction_whenExecuteThemLazy_thenTheyShouldNotConcurrently() { runBlocking { val delay = 1000L val time = measureTimeMillis { // given val one = async(CommonPool, CoroutineStart.LAZY) { someExpensiveComputation(delay) } val two = async(CommonPool, CoroutineStart.LAZY) { someExpensiveComputation(delay) } // when runBlocking { one.await() two.await() } } // then assertTrue(time > delay * 2) } }

The laziness of the execution in this particular example causes our code to run synchronously. That happens because when we call await(), the main thread is blocked and only after task one finishes task two will be triggered.

We need to be aware of performing asynchronous actions in a lazy way as they may run in a blocking way.

8. Conclusion

In this article, we looked at basics of Kotlin coroutines.

We saw that buildSequence is the main building block of every coroutine. We described how the flow of execution in this Continuation-passing programming style looks.

Finally, we looked at the kotlinx-coroutines library that ships a lot of very useful constructs for creating asynchronous programs.

Внедряването на всички тези примери и кодови фрагменти може да се намери в проекта GitHub.