본문 바로가기

안드로이드/Kotlin

[Kotlin] Introduction to Coroutines and Channel-1

개요

해당 개시글은 Welcome to Kotlin hands-on (kotlinlang.org)를 번역한 게시글입니다.

 

Introduction

이번 핸즈 온 튜토리얼을 통해 코루틴의 개념에 친숙해질 것이다. 코루틴은 비동기 및 넌블러킹 처리 과정에서 이점이 있으며 가독성도 해치지 않는다. 네트워크 요청에서 블러킹, 콜백 없이 어떻게 코루틴을 사용해야 하는지 확인할 수 있을 것이다.

 

배울 내용은:

  • 왜 그리고 어떻게 suspend function을 네트워크 요청에 사용하는가
  • 어떻게 코루틴을 이용해서 동시 요청을 보내는가
  • 어떻게 서로 다른 코루틴 사이에서 채널(channels)을 활용해서 정보를 공유하는가

또한 어떻게 코루틴이 다른 비동기 솔루션들과 다른 지 확인할 것이다.

 

네트워크 요청에는 코루틴을 서포팅하는 Retrofit을 활용할 것이지만 접근법 자체는 여러 다른 라이브러리에서도 동일하다. 

 

해당 튜토리얼은 Roman Elizarov의 "Asynchronous Programming with Kotlin"을 기반으로 한다.

 

 

Blocking Request

Retrofit을 통해 Github로 HTTP 요청을 보낼 것이다. 요청으로는  특정 organization에 대한 레포지토리 리스트 요청이 가능하고, 각 레포지토리 별로 contirbutors 리스트를 요청할 수 있다.

interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    fun getOrgReposCall(
        @Path("org") org: String
    ): Call<List<Repo>>

    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    fun getRepoContributorsCall(
        @Path("owner") owner: String,
        @Path("repo") repo: String
    ): Call<List<User>>
}

 

loadContributorsBlocking 함수는 특정 organization에 대한 contributor 리스트를 가져오기 위해 활용된다.

fun loadContributorsBlocking(service: GitHubService, req: RequestData) : List<User> {
    val repos = service
        .getOrgReposCall(req.org)          // #1
        .execute()                         // #2
        .also { logRepos(req, it) }        // #3
        .body() ?: listOf()                // #4

    return repos.flatMap { repo ->
        service
            .getRepoContributorsCall(req.org, repo.name)      // #1
            .execute()                                        // #2
            .also { logUsers(repo, it) }                      // #3
            .bodyList()                                       // #4
    }.aggregate()
}

첫번째로 특정 organization에 대한 레포지토리 리스트를 얻어서 repos 리스트에 저장하게 된다. 그리고 각 레포지토리에 대해 contributor 리스트를 요청하여 해당 리스트를 모두 병합하여 하나의 contributors 불변 리스트를 반환한다.

 

#1 단계에서 getOrgReposCallgetRepoContributorsCall는 Call 클래스 객체를 반환한다. 이 때, 어떤 요청도 전송되지 않는다. #2 단계에서 Call.execute를 통해 실제 요청이 수행된다. excute는 스레드를 차단하는 동기적인 호출이다.

 

#3 단계에서 응답을 얻게 됐을 때, 그 결과를 logRepos()/logUser()를 통해 출력한다. 만약 에러가 잇을 경우 에러도 해당 함수에서 출력한다.

 

마지막으로 우리가 필요로 하는 응답에 대한 body를 얻는다. 이 튜토리얼의 단순성을 위해, #4 단계에서 에러가 발생할 경우 빈 리스트를 사용하고 상응하는 에러는 로깅한다. .body()?:listOf()를 반복하는 것을 막기 위해 bodyList라는 확장 함수를 정의한다.

fun <T> Response<List<T>>.bodyList(): List<T> {
    return body() ?: listOf()
}

 

logRepos와 logUsers는 받은 정보를 바로 로깅한다. 코드가 실해되는 동안 아래와 같은 출력을 볼 수 있다.

1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos
2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors
2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors

각 라인의 첫번째 아이템은 프로그램이 시작된 이후 경과된 시간(밀리초)이고, 대괄호 내부는 [스레드명]을 의미한다. 마지막 아이템은 얼마나 많은 데이터가 왔는지 표시해준다.

 

이 로그는 모든 결과가 메인 스레드에서 기록된 것을 증명한다. 해당 코드를 Blocking 요청으로 실행할 때, 네트워킹 로딩이 완료될 때 까지 윈도우가 멈추고 유저의 인풋에 반응하지 않는 것을 확인할 수 있다. 모든 요청이 같은 스레드에서 수행되고 이는 메인 스레드이기 때문에 UI가 멈춘 것이다.

Contributor 리스트가 모두 로딩된 후, 결과가 갱신된다. 만약 loadContributorsBlocking 이 어떻게 호출되는지 보았다면, updateResults는 loadContributorsBlocking 이 호출된 후 수행될 것이다.

val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)

updateResults는 받은 결과를 UI에 갱신하는 함수로 결과로 이는 항상 UI 스레드에서 호출해야된다.

 

 

Using Callbacks

이전 결과는 작동하긴 하지만 스레드를 차단하여 UI가 멈춘다. 이를 방지하는 전통적인 접근법은 콜백이다. 작업이 곧바로 완료된 후 호출해야하는 코드를 호출하는 대신 이를 콜백(종종 람다로 구현됨)으로 분리하여 이 람다를 호출자에게 전달한다.

 

UI가 잘 반응하게 만들기 위해 전체 계산 과정을 별도의 스레드로 이동하거나 Retrofit API로 전환하여 호출을 차단하는 대신 콜백을 사용할 수 있다.

 

Calling loadContributors in the background thread

첫번재로 전체 계산 과정을 다른 스레드로 옮기자. thread 함수를 활용하여 새로운 스레드를 시작한다.

thread {
    loadContributorsBlocking(service, req)
}

이제 모든 로딩 과정은 다른 스레드로 옮겨졌고 메인 스레드는 자유롭고 다른 작업을 수행할 수 있다.

 

loadContributors 함수의 특징이 바뀌는데 updateResults 콜백을 인자로 받아 로딩이 끝날 때 호출하게 된다:

fun loadContributorsBackground(service: GitHubService, req: RequestData, 
                               updateResults: (List<User>) -> Unit)

 

loadContributorsBackground가 호출되었을 때 updateResults는 이전과 달리 콜백에서 호출된다

loadContributorsBackground(req) { users ->
    SwingUtilities.invokeLater {
        updateResults(users, startTime)
    }
}

SwingUtilities.invokeLater를 호출함으로써 updateResults가 메인 스레드에서 결과에 대한 갱신을 호출하는 것을 보장한다.

 

그러나 만약 contributors를 BACKGROUND에서 가져오려면 해당 리스트가 갱신되는 것은 확인할 수 있지만 아무것도 변경되지 않는다.

 

 

Task

loadContributorsBackground를 수정하여 결과가 UI에서 보이도록 해라.

 

Solution

콜백을 호출하는 것을 잊었다. Contributors 리스트가 로딩되고 로그는 보았지만 결과가 화면에 표시되지 않았다. 이를 해결하기 위해 Contributors 리스트를 받는 updateResults를 호출해야한다.

thread {
    updateResults(loadContributorsBlocking(service, req))
}

콜백에 전달된 로직을 명시적으로 호출해야한다. 그렇지 않으면 아무 일도 일어나지 않는다.

 

 

Using Retrofit callback API

모든 로직은 백그라운드 스레드에서 처리하게 옮겼지만 아직 자원을 충분히 활용한건 아니다. 모든 요청 작업이 각 작업 후에 순차적으로 실행되고 그 결과를 기다리는 동안 백그라운드 스레드가 차단되지만 다른 작업으로 채워질 수 있다. 특히 전체 결과를 더 빨리 받을 수 있도록 다른 로딩 작업을 시작하는 것이 가능하다.

 

각 레포지토리에 대한 데이터를 핸들링하는 것은 두 부분으로 나뉜다: 첫번째는 로딩하고 응답 결과를 처리하는 것이다. 그리고 두번째는 "processing"으로 콜백으로 분리된다. 각 레포지토리에 대한 로딩은 이전 레포지토에 대한 응답이 도착하기 전에 호출할 수 있다. (아래 그림 참조)

 

Retrofit Callback API는 이를 돕는다. Call.enqueue를 활용하여 HTTP 요청을 시작하고 콜백을 인자로 받는다. 이 콜백에서 각 요청이 완료된 후 어떤 작업을 처리할 지 명시하면 된다.

 

loadContributorsCallbacks()는 해당 API를 사용한다. 편리함을 위해 onResponse를 확장 함수를 선언했다. 이는 객체가 아닌 람다를 인자로 받는다.

fun loadContributorsCallbacks(service: GitHubService, req: RequestData, 
                              updateResults: (List<User>) -> Unit) {
    service.getOrgReposCall(req.org).onResponse { responseRepos ->  // #1
        logRepos(req, responseRepos)
        val repos = responseRepos.bodyList()
        
        val allUsers = mutableListOf<User>()
        for (repo in repos) {
            service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->   // #2
                logUsers(repo, responseUsers)
                val users = responseUsers.bodyList()
                allUsers += users
            }
        }
        // TODO: Why this code doesn't work? How to fix that?
        updateResults(allUsers.aggregate())
    }
}

 

#1과 #2처럼 응답을 처리하는 로직을 콜백으로 처리했다.

 

그러나 해당 솔루션은 동작하지 않는다. 만약 해당 프로그램을 실행하고 로드된 contributors 선택하면 아무 것도 보이지 않는다. 그러나 즉시 결과를 반환하는 테스트는 통과한다.

 

여러 요청을 동시에 시작하면 전체 로딩 시간을 줄어든다. 하지만 해당 결과를 기다리지는 않는다. Callback 내에서 updateResults를 호출하는 것은 모든 로딩 요청이 시작한 직후 호출된다. 하지만 이는 아직 allUsers의 데이터가 채워지지 않은 상태이다.

 

Solution(First attempt)

이를 처리하기 위해 아래와 같이 코드를 변경했다:

val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) {   // #1
    service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        logUsers(repo, responseUsers)
        val users = responseUsers.bodyList()
        allUsers += users
        if (index == repos.lastIndex) {      // #2
            updateResults(allUsers.aggregate())
        }
    }
}

#1에서 레포지토리에 해당하는 인덱스를 받고 #2에서 마지막 인덱스일 경우 결과를 갱신한다. 그럴듯해 보이지만 제대로 동작하지 않는다. 이는 결국 요청에 대한 응답이 정해진 순서대로 오는 것을 보장하지 않기 때문이다.

 

Solution(Second attempt)

로딩 요청이 동시에 시작되었기 때문에 어떤 요청이 먼저 도착하고 마지막에 도착하는지를 보장할 수 없다. 따라서 첫 번째 방법처럼 인덱스를 비교하는 것은 일부 결과를 잃어버릴 수도 있음을 의미한다. 만약 마지막 인덱스의 레포지토리가 이전 요청 보다 빨리 결과를 리턴할 경우 처리하는데 더 많은 시간이 걸리는 요청에 대한 모든 데이터는 손실되게 된다.

 

이를 극복하기 위한 방법은 모든 레포지토리가 처리되었는지 체크하는 것이다.

val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        logUsers(repo, responseUsers)
        val users = responseUsers.bodyList()
        allUsers += users
        if (numberOfProcessed.incrementAndGet() == repos.size) {
            updateResults(allUsers.aggregate())
        }
    }
}

 

보통 다른 콜백 처리가 항상 동일한 스레드에서 호출된다는 보장이 없기 때문에 동기화를 제공하는 AtomicInteger를 활용한다.

 

콜백을 활용하여 바른 코드를 작성하는 것은 쉽지않고 오류의 가능성이 높은 것을 알 수 있다. 특히 여러 스레드가 존재하고 동기화를 제공해야하는 경우 더욱 문제가 된다. 다음으로는 suspend function을 통해 동일한 로직을 구현하는 방법에 대해 설명하겠다.