Kotlin coroutines: Part 1
I have been learning Kotlin recently with a special focus on concurrency with coroutines. In this post, I will describe how I took advantage of concurrency to solve a specific problem.
What are coroutines (from Kotlin documentation1)?
A coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.
Why use coroutines instead of doing things the usual way (thread pools)
Thread pools are the usual and recommended way of writing concurrent programs in Java. Coroutines have the following advantages over thread pools:
- Coroutines are a light-weight abstraction over threads and don’t map directly to OS threads, unlike Java
Threadinstances. This means that you can launch way more coroutines than threads in a thread pool. We can do this because coroutines are ‘suspendable’ (they don’t block threads) and a lot more coroutines can be run on a single thread. - Coroutines support the model of structured concurrency which had advantages like fewer memory leaks and built-in cancellation support. Structured concurrency also ensures that any errors in the code are properly reported and are never lost.
Problem: How to merge k sorted lists into a single sorted list?
Let’s solve the problem of merging k sorted lists such that the resulting list is sorted. For example:
When given these 3 lists as input:
[2, 3, 5]
[-1, 1, 2, 30, 239]
[-5, -3, -1, 8, 22, 39]
We can expect an output as follows:
[-5, -3, -1, -1, 1, 2, 2, 3, 5, 8, 22, 30, 39, 239]
Let’s first look at a simple solution to this problem (without concurrency):
1
2
3
4
5
6
7
8
9
10
11
12
13
fun mergeKSortedLists(lists: List<List<Int>>): List<Int> {
val queue = ArrayDeque<List<Int>>()
lists.forEach { queue.add(it) }
while(queue.size > 1) {
queue.addLast(
mergeTwoSortedLists(
queue.removeFirst(),
queue.removeFirst()
)
)
}
return queue.removeFirst()
}
What we did above is as follows:
- Put all
kinput lists in a queue - Pop two lists from the start of the queue and call the
mergeTwoSortedLists(list1, list2)function on them. We will look at themergeTwoSortedListsfunction in a little bit. - Push the merged list to the end of the queue.
- Repeat steps 2 and 3 until only 1 list remains in the queue. This should happen in
k-1iterations. - Return the single list remaining in the queue as the answer.
Now let’s look at the mergeTwoSortedLists() function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun mergeTwoSortedLists(list1: List<Int>, list2: List<Int>): List<Int> {
var index1 = 0
var index2 = 0
val result = mutableListOf<Int>()
while(index1 < list1.size && index2 < list2.size) {
if(list1[index1] < list2[index2]) {
result.add(list1[index1])
index1 += 1
} else {
result.add(list2[index2])
index2 += 1
}
}
result.addAll(list1.slice(index1 until list1.size))
result.addAll(list2.slice(index2 until list2.size))
return result.toList()
}
Time complexity:
If the k lists combined are of size N, the time complexity of the above solution is O(N log(k)).
Reason: Every time we pass through all N items we reduce the number of lists k by half. We stop when we have a single list.
Solution with coroutines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
suspend fun concurrentlyMergeKSortedLists(lists: List<List<Int>>): List<Int> = coroutineScope {
val channel = Channel<List<Int>>(1)
val jobs = mutableListOf<Job>()
repeat(lists.size) {
jobs.add(
launch {
channel.send(lists[it])
}
)
}
repeat(lists.size - 1) {
jobs.add(
launch {
channel.send(
mergeTwoSortedLists(
channel.receive(), channel.receive()
)
)
}
)
}
jobs.joinAll()
channel.receive()
}
Explanation
Let’s try to understand a few keywords in the above code:
suspend: The suspend keyword means that this function is suspendable and can be run as a coroutine.coroutineScope: Creates aCoroutineScopeand calls the suspend function with this scope. You can see that we are usinglaunch(which is a coroutine builder) inside the function which launches a child coroutine. In Kotlin, every child coroutine must be defined within aCoroutineScope, which is why we need to callcoroutineScope.Channel: AChannelis conceptually very similar to the JavaBlockingQueue. One key difference is that channels use suspending operations (sendandreceive) instead of blocking operations (putandtake). In this problem, the usage of a regularQueue(used in my non-concurrent solution above) maps very nicely toChannel/BlockingQueue.Channelsand its different types are documented in detail here and here.launchandJob:launchlaunches a new coroutine without blocking the current thread and returns a reference to the coroutine as aJob.
What we are doing in the concurrent code is analogous to what we did in the non-concurrent implementation earlier. Here are the equivalent steps:
- The first
repeatblock sends all lists tochannel, similar to how we put all lists in aqueueearlier. Eachsendoperation, however, can be run asynchronously because it’s defined withinlaunch. Also notice that we defined the channel’s size to be1. What that means is that the channel will only actually store 1 item in the queue, and the remaining send operations will be suspended. - The second
repeatblock launchesk-1coroutines. Each of these coroutines is suspended until two lists are ready to be received from thechannel. When two lists are received, they are merged usingmergeTwoSortedListsand sent to the channel. - All the coroutines launched above can be referenced from the
jobslist.jobs.joinAll()suspends the coroutine corresponding to theconcurrentlyMergeKSortedListsfunction until all the child coroutines (referenced injobs) finish execution. It’s important to note here that we are talking in terms of suspending rather than blocking which is a major advantage. - In the end, we know that there will be exactly
1list left in the channel after all coroutines launched above have completed execution. The result of the last expression insideCoroutineScope(channel.receive()) is automatically returned.
Closing thoughts and next steps
I really enjoyed learning about the powerful concurrency primitives in Kotlin! Learning about suspendable functions and coroutines forced me to think about concurrency in a completely different way. As a next step, I am planning to run some performance tests to see how the two implementations compare in practice.
All the code that I described in this blog post can be accessed here.