본문 바로가기

안드로이드/Kotlin

[Kotlin] Introduction to Coroutines and Channels-6

개요

해당 게시글은 을 번역한 Welcome to Kotlin hands-on (kotlinlang.org)게시글 입니다.

 

Channels

가변적인 상태를 공유하는 코드를 작성하는 것은 매우 까다롭고 에러가 자주 발생하는 지점이다. 공통 가변 가능한 상태를 사용하는 대신 커뮤니케이션으로 정보를 공유하는 것은 이를 쉽게 해결할 수 있다. 코루틴은 Channel을 통해 서로가 통신할 수 있다.

 

Channel은 서로 다른 코루틴 사이에서 데이터를 전달하기 위한 통신 원시형이다. 하나의 코루틴에서 어떤 정보를 Channel로 전송하면 다른 코루틴은 Channel에서 해당 정보를 수신할 수 있다.

 

정보를 송신하는 코루틴을 생산자, 그리고 정보를 수신하는 코루틴을 소비자라고 부르기도 한다. 필요할 때는 많은 코루틴이 정보를 같은 Channel로 전송하고 많은 코루틴이 정보를 수신할 수도 있다.

많은 코루틴이 하나의 같은 Channel에서 정보를 수신할 때는 각 요소는 한 소비자에 의해 한 번만 처리된다. 이 요소를 처리하는 것은 Channel에서 이 요소를 자동으로 제거하는 것을 의미한다.

 

Channel을 요소의 컬렉션과 유사하다고 생각하는 것이 가능하고, 더 직접적으로 표현하면 하나의 Queue라고 보면 된다. 그러나 콜렉션과 다른 중요한 차이점은 동기화를 제공하는 버전에서 Channel은 송/수신 과정이 suspend/resume 되며 작동한다는 것이다. 이는 Channel이 비어 있거나 가득 찬 경우에 발생한다.

 

Channel은 sendChannel, ReceiveChannel과 앞선 둘의 상속인 Channel까지 총 3가지 인터페이스를 가진다. 보통 Channel을 생성하여 생산자에게 SendChannel 인스턴스로 제공하여 생산자만 Channel에 송신할 수 있고, 소비는 ReceiveChannel 인스턴스로만 수신할 수 있다.

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}    

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

 

생산자는 더 이상 요소가 없는 경우 Channel을 닫을 수 있다.

 

Channel의 여러 타입은 라이브러리에 정의되어있다. 이 타입은 얼마나 많은 요소가 Channel 내부에 저장되고, 어떤 Send 호출이 suspend 될 수 있느냐 없느냐에 따라 다르다. 공통적으로 receive 호출은 같은 방식으로 동작한다. Channel이 비어있지 않은 경우 한 요소를 수신하고, 그렇지 않다면 일시 중지(suspend)한다. 

 

  • Unlimited Channel

무제한 Channel은 Queue와 유사하다. 생산자는 요소를 Channel에 전송하고, 무한히 자라난다. Send 호출은 절대 일시중지(suspend)되지 않는다. 메모리가 부족할 경우, OutOfMemoryException이 발생할 것이다. Queue와 다른 점은 소비자가 비어 있는 Channel에서 수신하려고 할 때 나타나고, 이 Channel에 새로운 요소가 전송될 때 까지 일시 중지된다는 점이다.

 

  • Buffered Channel

Buffered Channel는 크기 제한이 존재한다. 생산자는 크기 제한에 도달하기 전까지만 송신할 수 있다. 모든 요소가 내부에 저장된다. 만약 Channel이 가득 찬 경우 다음 send 호출은 공간이 생길 때 까지 일시 중지(suspend) 된다.

 

  • "Rendezvous" Channel

"Rendezvous" Channel은 버퍼 크기가 없는 디폴트로 생성되는 Channel이다. 버퍼 크기를 0으로 생성하는 Buffered Channel과 동일하다. send, receive 어느 것을 호출해도 다른 하나가 호출될 때 까지 일시 중지(suspend) 된다. 만약 send 가 호출되었지만 요소를 처리하기 위해 준비된 receive 호출이 없는 경우 send 는 일시 중지(suspend) 된다. 비슷하게 receive 가 비어있는 Channel에 호출될 경우나 요소를 전송하기 위해 준비된 일시 중지된 send 호출이 없는 경우에 receive 호출은 일시 중지(suspend)된다. "Rendezvous"의 의미는 "약속된 시간/장소의 모임"이라는 의미에서 추론할 수 있듯 sendreceive는 "제 시간에 만나야 된다."

 

  • Conflated Channel

새로운 요소가 conflated channel에 전송되면 이전에 전송된 요소가 덮어써져 수신자는 항상 최신 요소만 전달받게 된다. send 호출은 절대 일시 중지(suspend)되지 않는다.

 

Channel을 생성할 때 위와 같은 타입이나 크기를 명시해주어야 한다.

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

디폴트로 생성되는 Channel은 "Rendezvous" Channel이다. 

 

이후의 예제에서 "Rendezvous" Channel과 두 개의 생산자 코루틴과 하나의 소비자 코루틴을 만들 것이다.

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch { //Producer#1
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch { //Producer#2
        channel.send("B1")
        log("B done")
    }
    launch { //Consumer#1
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

 

결과:

[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2

 

 

Task

모든 Github contributors를 동시에 요청함과 동시에 중간 진행 결과를 보이는 loadContributorsChannels을 구현하라.

 

Tip

동시에 서로 다른 레포지토리에 대한 contributors 리스트를 수신하는 서로 다른 코루틴은 같은 Channel에 모든 응답 결과를 송신한다.

 

Solution

이전의 loadContributorsProgress 함수에서 중간 결과를 저장하는 allUsers 변수를 생성할 수 있었다. Channel에서 각각의 새로운 리스트를 수신할 때 이를 allUsers에 추가하고 결과를 집계하고 updateResuls 콜백으로 상태를 갱신한다.

suspend fun loadContributorsChannels(
    service: GitHubService,
    req: RequestData,
    updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {

    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    val channel = Channel<List<User>>()
    for (repo in repos) {
        launch {
            val users = service.getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
            channel.send(users)
        }
    }
    var allUsers = emptyList<User>()
    repeat(repos.size) {
        val users = channel.receive()
        allUsers = (allUsers + users).aggregate()
        updateResults(allUsers, it == repos.lastIndex)
    }
}

서로 다른 레포지토리에 대한 결과는 준비가 될 때마다 Channel에 추가된다. 첫번째로 모든 요청이 전송되고 어느 데이터도 수신하지 않는 상황에서 receive 호출은 일시 중지(suspend)된다. 이 경우에 "load Contributors"를 수행하는 모든 코루틴이 suspend된다. 그럼으로 유저 리스트가 Channel로 송신되면 "load Contributors" 코루틴이 재개(resume)되어 receive 호출은 리스트를 반환하고 결과가 즉시 갱신된다.

 

Suspend 함수, 코루틴의 동시 수행, Channel을 이용한 코루틴 간의 정보 공유를 어떻게 사용하는지 배웠다.

 

코루틴과 Channel 모두 동시성에로 오는 복잡성을 완전히 근절하긴 어려울 것이다. 하지만 분명한 건 어떻게 동작하는 지 이해하는지는 조금 더 쉬워질 것이다. 다음은 사용할 코루틴을 어떻게 테스트하는지 확인해볼 것이다.