Kotlin coroutines: Part 1
I have been learning Kotlin recently with a special focus on concurrency with coroutines. In this post, I will describe how I took advantage of concurrency to solve a specific problem.
What are coroutines (from Kotlin documentation1)?
A coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.
Why use coroutines instead of doing things the usual way (thread pools)
Thread pools are the usual and recommended way of writing concurrent programs in Java. Coroutines have the following advantages over thread pools:
- Coroutines are a light-weight abstraction over threads and don’t map directly to OS threads, unlike Java
Thread
instances. This means that you can launch way more coroutines than threads in a thread pool. We can do this because coroutines are ‘suspendable’ (they don’t block threads) and a lot more coroutines can be run on a single thread. - Coroutines support the model of structured concurrency which had advantages like fewer memory leaks and built-in cancellation support. Structured concurrency also ensures that any errors in the code are properly reported and are never lost.
Problem: How to merge k sorted lists into a single sorted list?
Let’s solve the problem of merging k sorted lists such that the resulting list is sorted. For example:
When given these 3 lists as input:
[2, 3, 5]
[-1, 1, 2, 30, 239]
[-5, -3, -1, 8, 22, 39]
We can expect an output as follows:
[-5, -3, -1, -1, 1, 2, 2, 3, 5, 8, 22, 30, 39, 239]
Let’s first look at a simple solution to this problem (without concurrency):
1
2
3
4
5
6
7
8
9
10
11
12
13
fun mergeKSortedLists(lists: List<List<Int>>): List<Int> {
val queue = ArrayDeque<List<Int>>()
lists.forEach { queue.add(it) }
while(queue.size > 1) {
queue.addLast(
mergeTwoSortedLists(
queue.removeFirst(),
queue.removeFirst()
)
)
}
return queue.removeFirst()
}
What we did above is as follows:
- Put all
k
input lists in a queue - Pop two lists from the start of the queue and call the
mergeTwoSortedLists(list1, list2)
function on them. We will look at themergeTwoSortedLists
function in a little bit. - Push the merged list to the end of the queue.
- Repeat steps 2 and 3 until only 1 list remains in the queue. This should happen in
k-1
iterations. - Return the single list remaining in the queue as the answer.
Now let’s look at the mergeTwoSortedLists()
function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun mergeTwoSortedLists(list1: List<Int>, list2: List<Int>): List<Int> {
var index1 = 0
var index2 = 0
val result = mutableListOf<Int>()
while(index1 < list1.size && index2 < list2.size) {
if(list1[index1] < list2[index2]) {
result.add(list1[index1])
index1 += 1
} else {
result.add(list2[index2])
index2 += 1
}
}
result.addAll(list1.slice(index1 until list1.size))
result.addAll(list2.slice(index2 until list2.size))
return result.toList()
}
Time complexity:
If the k
lists combined are of size N
, the time complexity of the above solution is O(N log(k))
.
Reason: Every time we pass through all N
items we reduce the number of lists k
by half. We stop when we have a single list.
Solution with coroutines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
suspend fun concurrentlyMergeKSortedLists(lists: List<List<Int>>): List<Int> = coroutineScope {
val channel = Channel<List<Int>>(1)
val jobs = mutableListOf<Job>()
repeat(lists.size) {
jobs.add(
launch {
channel.send(lists[it])
}
)
}
repeat(lists.size - 1) {
jobs.add(
launch {
channel.send(
mergeTwoSortedLists(
channel.receive(), channel.receive()
)
)
}
)
}
jobs.joinAll()
channel.receive()
}
Explanation
Let’s try to understand a few keywords in the above code:
suspend
: The suspend keyword means that this function is suspendable and can be run as a coroutine.coroutineScope
: Creates aCoroutineScope
and calls the suspend function with this scope. You can see that we are usinglaunch
(which is a coroutine builder) inside the function which launches a child coroutine. In Kotlin, every child coroutine must be defined within aCoroutineScope
, which is why we need to callcoroutineScope
.Channel
: AChannel
is conceptually very similar to the JavaBlockingQueue
. One key difference is that channels use suspending operations (send
andreceive
) instead of blocking operations (put
andtake
). In this problem, the usage of a regularQueue
(used in my non-concurrent solution above) maps very nicely toChannel
/BlockingQueue
.Channels
and its different types are documented in detail here and here.launch
andJob
:launch
launches a new coroutine without blocking the current thread and returns a reference to the coroutine as aJob
.
What we are doing in the concurrent code is analogous to what we did in the non-concurrent implementation earlier. Here are the equivalent steps:
- The first
repeat
block sends all lists tochannel
, similar to how we put all lists in aqueue
earlier. Eachsend
operation, however, can be run asynchronously because it’s defined withinlaunch
. Also notice that we defined the channel’s size to be1
. What that means is that the channel will only actually store 1 item in the queue, and the remaining send operations will be suspended. - The second
repeat
block launchesk-1
coroutines. Each of these coroutines is suspended until two lists are ready to be received from thechannel
. When two lists are received, they are merged usingmergeTwoSortedLists
and sent to the channel. - All the coroutines launched above can be referenced from the
jobs
list.jobs.joinAll()
suspends the coroutine corresponding to theconcurrentlyMergeKSortedLists
function until all the child coroutines (referenced injobs
) finish execution. It’s important to note here that we are talking in terms of suspending rather than blocking which is a major advantage. - In the end, we know that there will be exactly
1
list left in the channel after all coroutines launched above have completed execution. The result of the last expression insideCoroutineScope
(channel.receive()
) is automatically returned.
Closing thoughts and next steps
I really enjoyed learning about the powerful concurrency primitives in Kotlin! Learning about suspendable functions and coroutines forced me to think about concurrency in a completely different way. As a next step, I am planning to run some performance tests to see how the two implementations compare in practice.
All the code that I described in this blog post can be accessed here.