본문 바로가기

안드로이드/Kotlin

[Kotlin] Introduction to Coroutines and Channels-3

개요

해당 게시글은 Welcome to Kotlin hands-on (kotlinlang.org)을 번역한 게시글입니다.

 

Concurrency

코루틴은 스레드에 비해 매우 비용이 저렴하다. 새로운 연산을 비동기적으로 시작하기 원할 때마다 코루틴을 만들 수 있다.

 

새로운 코루틴을 시작하기 위해 launch, async, runBlocking 같은 "Coroutine Builder"를 사용한다. 코틀린의 코루틴 외 다른 라이브러리는 다른 코루틴 빌더가 존재할 수도 있다.

 

async는 새로운 코루틴을 시작하고 Deferred 객체를 반환한다. Deferred는 다른 이름으로 Future/Promise 등의 개념으로 알려져있다. 이는 연산을 저장하지만 최종적인 결과를 얻는 순간을 지연시킨다. 즉 미래(future) 언젠가의 결과를 약속(promise)한다.

 

asynclaunch의 주요 차이점은 launch는 새로운 연산을 시작하는데 특정 결과를 기대하지 않을 때 사용한다. launchJob을 반환하고 이는 코루틴을 의미한다. Job.join()을 통해 Job이 완전히 끝날 때 까지 기다릴 수 있다.

 

Deferred는 Job을 상속한 제네릭 타입이다. async 호출은 람다가 무엇을 반환하냐에 따라 Deferred<Int>나 Deferred<CustomType> 등을 반환할 수 있다.

 

코루틴의 결과를 획득하기 위해 Deferred 인스턴스에서 await() 호출해야 한다. 결과를 기다리는 동안 await이 호출된 코루틴은 일시 중지된다.

fun main() = runBlocking {
    val deferred: Deferred<Int> = async {
        loadData()
    }
    println("waiting...")
    println(deferred.await())
}

suspend fun loadData(): Int {
    println("loading...")
    delay(1000L)
    println("loaded!")
    return 42
}

runBlocking은 일반함수와 suspend 함수, blocking과 non-blocking 사이에서 가교 역할을 한다. runBlocking은 최상위 레벨 메인 코루틴을 시작하기 위한 어댑터로 사용하거나 주요 함수 및 테스트에 활용된다.

 

Deferred 리스트가 존재할 경우 awaitAll을 통해 전체 결과를 기다릴 수도 있다.

fun main() = runBlocking {
    val deferreds: List<Deferred<Int>> = (1..3).map {
        async {
            delay(1000L * it)
            println("Loading $it")
            it
        }
    }
    val sum = deferreds.awaitAll().sum()
    println("$sum")
}

결과

Loading 1
Loading 2
Loading 3
6

 

다시 github예제로 돌아와서, 새로운 코루틴에서 각 contributors를 요청할 때 모든 요청은 비동기적으로 시작된다. 새로운 요청은 이전 요청에 대한 응답의 결과가 도착하기 전에 전송될 수도 있다.

 

이는 대략적으로 이전의 Callback 예제와 거의 동일한 전체 로딩 시간이 소요된다. 하지만 어떤 callback도 필요없다. 또한 비동기는 코드에서 동시에 실행되는 부분을 명시적으로 강조하게 된다.

 

Solution

각 "contributors" 요청을 async로 둘러싼다. 이는 레포지토리의 수 만큼의 코루틴을 생성할 것이다. 그러나 새로운 코루틴을 생성하는 건 매우 적은 비용만 소요되기 때문에 문제가 되지 않는다. 필요한 만큼 충분히 생성할 수 있다.

 

async는 Deferred<List<User>>를 반환한다. 이제 더 이상 flatmap을 사용하지 않아도 된다. Deffered 객체의 리스트가 map의 반환값이다. awaitAll()을 통해 List<List<Users>를 반환하고 이를 통해 간단히 flatten().aggregate()로 결과를 얻을 수 있다.

suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
        async {
            service.getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
        }
    }
    deferreds.awaitAll().flatten().aggregate()
}

이 코드를 실행하고 로그를 확인하면 모든 코루틴이 메인 스레드에서 동작하는걸 확인할 수 있을 것이다. 어쨋든 아직 멀티스레등 처리가 아닌 것이지만 이미 코루틴을 동시에 실행한다는 이점이 존재한다.

 

메인 스레드에서 다른 스레드 바꾸는 방법은 매우 간단하다. async 함수의 context의 인자로 Dispatcher.Default를 넘겨주면 된다.

 

CoroutineDispatcher는 어느 스레드에서 코루틴을 동작시킬 지 결정할 수 있다. 만약 명시하지 않는다면 async는 바깥 scope의 dispatch을 활용하게 된다.

 

Dispatcher.Default는 JVM의 공유 스레드 풀을 의미한다. 이 풀은 병렬 처리를 제공한다는 것을 의미한다. 사용 가능한 CPU의 코어 수 만큼 스레드가 존재하지만 코어가 한개인 환경에서도 두개의 스레드가 존재할 수 있다.

 

위의 코드를 수정하여 공통 스레드 풀의 다른 스레드에서 새로운 코루틴을 수행할 수 있다. 또한 요청을 보내기 전에 로그 작업을 추가했다.

async(Dispatchers.Default) {
    log("starting loading for ${repo.name}")
    service.getRepoContributors(req.org, repo.name)
        .also { logUsers(repo, it) }
        .bodyList()
}

결과

1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka
1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt
...
2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors
2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors
2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors

이런 변화 이후 각 코루틴은 스레드 풀의 한 스레드에서 수행되는 것을 확인할 수 있고 다시 코루틴은 다른 스레드에서 재개(resume)될 수 있다. 예를 들어, 위 로그에서 코루틴#4는 스레드-2에서 시작되고 스레드-2에서 재개(resume)된다.

 

코루틴을 메인 스레드에서만 동작시키고 싶다면 Dispatcher.Main을 인자로 명시하면 된다. 만약 메인스레드가 새로운 코루틴을 시작할 때 바쁘다면 해당 코루틴은 일시 중지(suspend)되고 메인 스레드에서 스케쥴되길 기다린다. 그리고 코루틴은 메인 스레드가 여유로워질 때 다시 재개(resume)될 수 있다.

 

Dispatcher는 각 end-point에서 지정하는 것 보다 외부 scope에서 명시하는 것이 좋다. 이 예시에서 loadContributorsConcurrent를 Dispatcher.Default를 인자로 전달하지 않고 정의했을 때, context 내부에서 custom dispatcher 혹은 메인 스레드와 함께 이 함수를 어떤 방식으로든 다시 호출 할 수 있다. 나중에 보겠지만, loadContributorsConcurrent를 테스팅 할 때 테스트를 간단화하기 위해 생성한 TestCoroutineDispatcher를 사용하여 context 내부에서 호출할 수 있다.

 

아래는 호출한 쪽에서 dispatcher를 어떻게 명시하는지 확인할 수 있는 예시이다.

launch(Dispatchers.Default) {
    val users = loadContributorsConcurrent(service, req)
    withContext(Dispatchers.Main) {
        updateResults(users, startTime)
    }
}

이 변화를 프로젝트에 적용하고 loadContributorsConcurrent가 상속된 컨텍스트에서 코루틴을 시작하도록 하자. updateResults는 메인 스레드에서 호출되어야 하기 때문에 Dispatcher.Main의 컨텍스트로 호출한다. withContext는 코드가 명시된 코루틴 context를 사용하여 코드를 호출하고 완료될 때 까지 일시 중단하고 결과를 반환한다. 이를 더 자세히 표현하는 대안은 새로운 코루틴을 시작하고 완료될 때 까지 명시적으로 기다리는 것이다: launch(context){ ... }.json()

 

"외부 scope에서 dispatcher를 사용하는 것"은 정확히 어떻게 작동할까? 이를 더 정확이 표현하는 것은 "외부 scope의 context에서 dispatcher를 사용하는 것"이다. CoroutineScope과 CoroutineCotext의 차이점은 무엇일까? CoroutineScope 내에서 새로운 "contributors" async 코루틴을 시작하는 이유는 무엇일까? 이는 다음 포스팅에 이어진다.

 

 

참고: flatMap vs map

  • map: 단일 스트림 내의 요소를 특정 현태로 변환.
  • flatMap: 리스트의 모든 원소를 하나의 스트림으로 반환.  Collection<Collection<T>>가 Collection<T>로 바뀌는 flatten 과정이 존재.