ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 코틀린 기초 문법 ④ ( + DSL 학습중 ⌛️)
    KOTLIN 2022. 10. 18. 22:40

    참고한 강의와 책은 이전 블로그 글에 올려두었다.

     

    https://dodop-blog.tistory.com/391

     

    코틀린 기초 문법 ①

    요즘 코드가 Java -> Kotlin으로 넘어가고 있고, 사용이 많고 동기 분들을 보니 모두 코틀린 공부를 하고 계셔서 Kotlin 공부를 시작했다. 노션에 따로 적으면서 공부하긴 했지만, 블로그에도 함께 적

    dodop-blog.tistory.com

     

     

     

     

     

    예외처리
    • 체크예외와 언체크예외를 구별하지 않음
    • try를 식으로 사용이 가능
    fun readNumber(reader: BufferedReader) {
    	val number = try {
    // 예외가 발생하지 않으면 이 값을 사용 
    		Integer.parseInt(reader.readLine())
    	} catch (e: NumberFormatException) {
    // 예외 발생 시 null 값 사용 
    		null
    	} finally {
    // 예외 발생과 관계없이 무조건 실행되는 구문 
        println("Put important code")
      }
    }

     

     

    require문 과 check 문

    • 조건을 만족시키지 않을 때 예외를 발생 시킴
    • require : 조건을 만족시키지 않으면 IllegalArgumentException 을 발생
    • check: 조건을 만족시키지 않으면 IllegalStateException 을 발생
    fun validateName(name: String) {
        require(name.isNotEmpty()) {
            "Name cannot be empty"
        }
    
        check(name.isNotBlank()) {
            "Name cannot be blank"
        }
    }
    
    
    fun main() {
        validateName("")
    // IllegalArgumentException 발생 
    		validateName(" ")
    // IllegalStateException 발생 
    }

     

     

     

     

     

     

     

     

     

     

    리플렉션
    • 실행시점에(동적) 객체의 프로퍼티와 메서드에 접근할 수 있게 해주는 방법
    • 타입과 관계없이 객체를 다뤄야 하거나 객체가 제공하는 메서드나 프로퍼티 이름을 오직 실행 시점에만 알 수 있는 경우
      • ex) JSON 직렬화 라이브러리
    • 애노테이션에 저장된 데이터에 접근할 수 있음
    • 코틀린에서 리플렉션을 구현하는 방법에는 자바의 표준 리플렉션, 코틀린의 리플렉션 API 두가지 방법이 존재
    • 참고 문서 → https://kotlinlang.org/docs/reflection.html#class-references

     

     

     

     

    1.자바 표준 리플렉션 API

    • 자바가 java.lang.reflect 패키지를 통해 제공
    • 코틀린 클래스는 일반 자바 바이트 코드로 컴파일 되므로, 자바 리플렉션 API도 코틀린 클래스를 컴파일한 바이트 코드를 완벽히 지원
    • 즉, 리플렉션을 사용하는 자바 라이브러리와 코틀린 코드가 완전히 호환

     

     

     

     

    2.코틀린 리플렉션 API

    • 코틀린이 kotlin.reflect 패키지를 통해 제공
    • 자바에는 없는 프로퍼티나 nullable타입과 같은 코틀린 고유 개념에 대한 리플렉션 제공
    • 자바 리플렉션 API의 복잡한 기능을 제공하지 않아 완전히 대체할 수는 없음
    • 코틀린 뿐만 아니라 다른 JVM 언어에서 생성한 바이트 코드를 충분히 다룰 수 있음
    • 최상위 수준이나 클래스 안에 정의된 프로퍼티만 리플렉션으로 접근이 가능 ( 함수의 로컬 변수에는 접근할 수 없음 )
    dependencies {
        implementation("org.jetbrains.kotlin:kotlin-reflect:1.7.20")
    }
    
    val myList = listOf(1, 2, 3, 4, 5, 6, 7)
    println(myList::class.qualifiedName)
    // java.util.Arrays.ArrayList
    // 자바 버전 
    // String var1 = Reflection.getOrCreateKotlinClass(myList.getClass()).getQualifiedName();
    
    val functions = myList::class.declaredFunctions
    println(functions)
    // [fun java.util.Arrays.ArrayList<E>.iterator(): kotlin.collections.MutableIterator<E!>, fun java.util.Arrays.ArrayList<E>.contains(E!): kotlin.Boolean, fun java.util.Arrays.ArrayList<E>.spliterator(): java.util.Spliterator<E!>, fun java.util.Arrays.ArrayList<E>.forEach(java.util.function.Consumer<in E!>!): kotlin.Unit, fun java.util.Arrays.ArrayList<E>.toArray(kotlin.Array<(out) T!>!): kotlin.Array<(out) T!>!, fun java.util.Arrays.ArrayList<E>.toArray(): kotlin.Array<(out) kotlin.Any!>!, fun java.util.Arrays.ArrayList<E>.set(kotlin.Int, E!): E!, fun java.util.Arrays.ArrayList<E>.get(kotlin.Int): E!, fun java.util.Arrays.ArrayList<E>.indexOf(E!): kotlin.Int, fun java.util.Arrays.ArrayList<E>.replaceAll(java.util.function.UnaryOperator<E!>): kotlin.Unit, fun java.util.Arrays.ArrayList<E>.sort(java.util.Comparator<in E!>!): kotlin.Unit]
    // 자바버전
    // Collection functions = KClasses.getDeclaredFunctions(Reflection.getOrCreateKotlinClass(myList.getClass()));
    
    fun isOdd(x: Int) = x % 2 != 0
    println(myList.filter(::isOdd))
    // [1, 3, 5, 7]
    // 자바와의 호환성 
    import kotlin.reflect.KClass
    import kotlin.reflect.jvm.*
    
    class A(val p: Int)
    
    fun main() {
        println(A::p.javaGetter) 
    // public final int A.getP()
        println(A::p.javaField)  
    // private final int A.p
        println(getKClass(A(1)))
    // class A
    }
    
    fun getKClass(o: Any): KClass<Any> = o.javaClass.kotlin

     

     

     

     

     

    비교 연산자 오버로딩

    동등성 연산자 equals

    • == 연산자 호출
    • ≠ 식별자 비교 ( === ) → 오버로딩 불가
    • equals의 경우 Any에 정의된 메서드로 override가 필요

     

    순서연산자 compareTo

    • 자바의 Comparable 인터페이스와 비슷
    • equals와 마찬가지로 상위에서 compareTo에 operator 변경자가 붙어있어 하위 클래스의 오버라이딩 함수에 operator 붙일 필요가 없음

     

    컬렉션 연산자

    • 인덱스로 원소에 접근하는 get, set 연산자
      • operator 변경자와 함께 구현된 get, set
    data class Point(var x: Int, var y: Int)
    
    operator fun Point.get(index: Int): Int {
    		return when(index) {
    				0 -> x
    				1 -> y
    				else -> throw IndexOutOfBoundsException("Invalid coordinate $Index")
    		}
    }
    
    operator fun Point.set(index: Int, value: Int) {
    		when(index) {
    				0 -> x = value
    				1 -> y = value
    				else -> throw IndexOutOfBoundsException("Invalid coordinate $Index")
    		}
    }

     

     

     

    • in 관례
      • 객체가 컬렉션에 들어있는지 검사 ( contains )
    • rangeTo 관례
      • Comparable에 대한 확장 함수
      • ..
    • for 루프를 위한 Iterator 관례
      • : Iterator<>

     

     

     

     

    위임 프로퍼티
    // 위임프로퍼티를 사용하지 않은 예제 (원래 방식)
    interface Downloader {
        fun download()
    }
    
    interface Player {
        fun play()
    }
    
    class FileDownloader(private val file: String) : Downloader {
        override fun download() {
            println("$file Downloaded")
        }
    }
    
    class FilePlayer(private val file: String) : Player {
        override fun play() {
            println("$file playing")
        }
    }
    
    class MediaFile(
        private val downloader: Downloader,
        private val player: Player
    ) : Downloader, Player {
        override fun download() {
            downloader.download()
        }
    
        override fun play() {
            player.play()
        }
    }
    
    fun main(args: Array<String>) {
        val file:String = "File1.mkv"
        val mediaFile = MediaFile(FileDownloader(file), FilePlayer(file))
        mediaFile.download()
    // File1.mkv Downloaded
        mediaFile.play()
    // File1.mkv playing
    }
    
    
    // 위임 프로퍼티를 사용한 예제 
    class MediaFile(
        private val downloader: Downloader,
        private val player: Player
    ) : Downloader by downloader, Player by player

     

     

     

     

     

    // 기존의 Student 객체 (set을 가지고 있음)
    class Student {
        var firstName: String? = null
            set(value) {
                if (value != null && value.length > 5) {
                    field = value.trim().uppercase(Locale.getDefault())
                }
            }
        var lastName: String? = null
            set(value) {
                if (value != null && value.length > 5) {
                    field = value.trim().uppercase(Locale.getDefault())
                }
            }
    }
    
    // 위임 객체를 이용한 구현 
    class Student {
        var firstName: String? by NameDelegate()
        var lastName: String? by NameDelegate()
    
        override fun toString(): String {
            return "$firstName $lastName"
        }
    }
    
    class NameDelegate {
        var formattedValue: String? = null
    
        operator fun setValue(thisRef: Any?, property: KProperty<*>, value: String?) {
            if (value != null && value.length > 5) {
                formattedValue = value.trim().uppercase(Locale.getDefault())
            }
        }
    
        operator fun getValue(thisRef: Any?, property: KProperty<*>): String? {
            return formattedValue
        }
    
    
    }
    
    fun main() {
        val student = Student()
        student.firstName = "Kimberly"
        student.lastName = "Doe"
        println(student)
    // KIMBERLY null
    }

     

     

     

     

     

    코루틴
    • 컴퓨터 프로그램 구성 요소 중 하나로 비선점형 멀티태스킹을 수행하는 일반화한 서브루틴
    • 실행을 일시 중단하고 재개 할 수 있는 여러 진입 지점을 허용
    • kolinx.coroutines 패키지
    • 플랫폼에 따른 제어범위, 실행범위(scope)
      • Global Scope : 프로그램 어디서나 제어동작이 가능한 기본 범위
      • Coroutine Scope : 특정 목적의 Dispatcher를 지정, 제어 및 동작이 가능한 범위
        • Dispatchers.Default : 기본적 백그라운드 동작
        • Dispatchers.IO : I/O에 최적화
        • Dispatchers.Main : 메인(UI) 스레드에서 동작

     

     

     

    kotlinx.coroutines.CoroutineScope.launch

    • 반환값이 없는 job 객체 반환
    • 만들어진 코루틴은 기본적으로 즉시 실행
    • launch를 작동하기 위해서 CoroutineScope객체가 블록의 this로 지정되어야 하는데, 다른 suspend 함수 내부라면 해당 함수가 사용 중인 CoroutineScope가 있겠지만, 그렇지 않은 경우에는 GlobalScope를 사용
    • runBlocking : 코루틴이 모두 실행되고 종료되도록 main을 대기시키고 일정시간 응답이 없으면 ANR(Application Not Responding) 오류 발생 후 앱 강제 종료
    • runBlocking과 같이 대기가 가능한 구문 안에서 실행되는 함수(join, await은 실행이 끝날때 까지, delay는 일정 시간동안 대기하는 함수)
      • job.join()
      • Deferred.await() → Deferred의 결과도 반환
      • delay() → 시간이 지날 때 까지 다른 코루틴에게 실행을 양보
    • cancel() : launch가 반환한 Job의 코루틴 실행을 중단 가능
        1. 코루틴 내부의 delay() 함수 또는 yield() 함수가 사용된 위치 까지 수행된 뒤 종료
        1. cancel()로 인해 속성인 isActive가 false되고 이를 확인하여 수동 종료

     

     

     

     

     

    kotlinx.coroutines.CoroutineScope.aync

    • 반환 값이 있는 Deffered 객체
    • launch와 함께 둘다 람다 함수 형태로 같은 일을 수행
    • 단, launch는 job을 반환하는 반면 async는 Deffered를 반환

     

     

     

    일시 중단 함수 (kotlinx-coroutines-core 모듈의 최상위)

    • withContext
    • withTimeout
    • withTimeoutOrNull
      • join.await처럼 blocking 함수로 코루틴이 정해진 시간(ms)안에 실행되지 않으면 null을 결과로 반환
    • awaitAll
      • 모든 작업의 성공을 기다리며 어느 하나라도 예외로 실패하면 예외로 실패함
    • joinAll
      • 모든 작업이 끝날 때까지 현재 작업을 일시 중단

     

     

     

     

     

    -> 예제를 이용한 코루틴 이해 🥖

     

     

     

     

     

     

    -일반적인 동기 코드

    • 주문 생성을 예로 들어 확인해보자
    • 구매자를 조회 → 이를 이용해 주소 조회 → 이를 이용해 상품 조회 → 이를 이용한 스토어 조회 → 상위 모든 정보들을 이용한 주문 생성의 과정
    fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        // 1. 구매자 조회
        val buyer = userRepository.findUserByIdSync(userId)
    
        // 2. 주소 조회 및 유효성 체크
        val address = addressRepository.findAddressByUserSync(buyer).last()
        checkValidRegion(address)
    
        // 3. 상품들 조회
        val products = productRepository.findAllProductsByIdsSync(productIds)
        check(products.isNotEmpty())
    
        // 4. 스토어 조회
        val stores = storeRepository.findStroesByProductsSync(products)
        check(stores.isNotEmpty())
    
        // 5. 주문 생성
        val order = orderRepository.createOrderSync(buyer, products, stores, address)
    
        return order
    }

     

     

     

     

    - 여러가지 라이브러리를 이용해 비동기 리팩토링 해보기

    1) 구매자 조회

    • rxjava3의 Maybe를 사용
    • 없거나 1개가 아니면 Error를 발생
    import io.reactive.rxjava3.core.Maybe
    
    interface UserAsyncRepository {
        fun findUserByIdAsMaybe(userId: String): Maybe<User>
    }
    
    class UserRepository : UserRepositoryBase(), UserAsyncRepository {
        override fun findUserByIdAsMaybe(userId: String): Maybe<User> {
            val user = prepareUser(userId)
            return Maybe.just(user)
                .delay(TIME_DELAY_MS, TimeUnit.MILLISECONDS)
        }
    }

     

     

     

    2) 주소 조회

    • jdk9의 flow(리액티브 스트림)을 사용
    • item(유저가 주어졌을 때의 사용자 주소)을 publish → 정해진 개수 만큼 주소가 publish 되면 complete 이벤트로 flow 종료
    import java.util.concurrent.Flow
    
    interface AddressAsyncRepository {
        fun findAddressByUserAsPublisher(user: User): Flow.Publisher<Address>
    }

     

     

     

    3) 상품 조회

    • reactor의 flux 사용
    • 0과 n개 사이의 데이터가 존재하지 않으면 Error 발생
    import reactor.core.publisher.Flux
    
    interface ProductAsyncRepository {
        fun findAllProductsByIdsAsFlux(productIds: List<String>): Flux<Product>
    }

     

     

     

    4) 스토어 조회

    • mutiny의 Multi 사용
    • 0과 n개 사이의 데이터가 존재하지 않으면 Error 발생
    • Multi를 이용하여 리액트 flux 처럼 상품들을 가지고 각각의 스토어를 조회
    import io.smallrye.mutiny.Multi
    
    interface StoreAsyncRepository {
        fun findStoresByProductsAsMulti(products: List<Product>): Multi<Store>
    }

     

     

     

    5) 주문 생성

    • jdk8의 CompletableFuture 사용
    • complete되는 시점에 결과를 반환
    import java.util.concurrent.CompletableFuture
    
    interface OrderAsyncRepository {
        fun createOrderAsFuture(
            buyer: User,
            products: List<Product>,
            stores: List<Store>,
            address: Address,
        ): CompletableFuture<Order>
    }

     

     

     

     

     

    — 앞에서 작성한 비동기 코드를 조합한 결과 ?

    • subscribe hell 발생
      • subscribe : 결과를 얻은 시점에 주어진 subscriber(consumer)를 실행하는 일종의 call back
      • 반환 값들이 다른 subscribe에서 필요하고 위에서 찾은 내용들이 주문 생성까지 쭉 이어지기 때문에 subscribe가 중첩되고 콜백과 유사한 과정처럼 느껴지게 됨
      • 각각의 어댑터로 변경한다고 해도 똑같이 hell이 됨
      • 예를 들어 각각의 비동기 함수를 Reactor로 변경해봐도 (RxJava3Adaptor, JdkFlowAdapter, Flux.collectList, Flux.from, Mono.fromFuture) 위에서 계속해서 flatMap hell 처럼 중첩되는 모습을 보일 것
    fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        return Mono.create { emitter ->
            userRepository.findUserByIdAsMaybe(userId)
                .subscribe { buyer ->
                    addressRepository.findAddressByUserAsPublisher { address ->
                        checkValidRegion(address)
                        productRepository.findAllProductsByIdsAsFlux(productIds)
                            .collectList()
                            .subscribe { products ->
                                check(products.isNotEmpty())
                                storeRepository.findStoresByProductsAsMulti(products)
                                    .collect().asList()
                                    .subscribe().with { stores ->
                                        check(stores.isNotEmpty())
                                        orderRepository.createOrderAsFuture(buyer, products, stores, address)
                                            .whenComplete { order, _ -> emitter.success(order) }
                                    }
                            }
                    }
                }
        }
    }

     

     

     

     

     

     

    -코루틴을 이용하여 비동기 구현해보기

    • suspend 키워드를 사용
    • Maybe<T>.awaitSingle
    • Publisher<T>.awaitLast
    • Flow<T>.toList
    • CompletableFuture<T>.await
    suspend fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        // 1. 구매자 조회
        val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle()
    
        // 2. 주소 조회 및 유효성 체크
        val address = addressRepository.findAddressByUserAsPublisher(buyer).awaitLast()
        checkValidRegion(address)
    
        // 3. 상품들 조회
        val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList()
        check(products.isNotEmpty())
    
        // 4. 스토어 조회
        val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList()
        check(stores.isNotEmpty())
    
        // 5. 주문 생성
        val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await()
    
        return order
    }

     

     

     

    • 테스트 코드는 다음과 같다.
    @Test
    fun `should return a createdOrder in coroutine`() = runBlocking {
        // given
        val userId = "user1"
        val productsId = listOf("product1", "product2", "product3")
    
        // when
        val watch = StopWatch().also { it.start() }
    
        val inputValues = CreateOrderCoroutineUserCase.InputValues(userId, productIds)
        val createdOrder = createOrderUseCase.execute(inputValues)
    
        watch.stop()
        println("Time Elapsed: ${watch.time}ms")
    
        // then
        println(createdOrder)
    }

     

     

     

     

    — 그렇다면 suspend 함수의 실행은 어떻게 되는 것인가?

    • suspend 함수의 실행은 또 다른 suspend 함수 내부에서만 실행이 가능한데, runBlocking은 동기 코드에서 coroutine을 실행할 수 있게 bridge역할을 수행하여 일반적인 동기 코드처럼 찍어볼 수 있게 해줌
    • 코루틴이 경량화된 쓰레드 혹은 특정지점(awaitSingle)에서 정지했다가 재개할 수 있는 쓰레드라는 말이 있지만 내부구조를 보면 완전히 다르다
    • 코루틴의 내부 구조 (Kotlin Compiler - Finite State Machine ⇒ FSM)
      • kotlin 컴파일러가 suspend 붙은 함수를 호출할 때 함수 내부에서 FSM 기반의 재귀 함수로 변환하여 함수 내부에서 state(shared data)를 가지고 계속 recursive call → 끝에 다 달았을 때야 flow 한테 호출
      • kotlin 컴파일러가 suspend 가 붙은 함수에 추가적인 코드를 추가
        • Continuation 인자를 타겟 함수에 추가 하고 Continuation 구현체를 생성
        • 타겟 함수 내의 모든 suspend 함수에 생성한 continuation 객체를 패스
        • 코드를 분리해서 switch case(when) 안에 넣고 label을 이용해서 state 변경

     

     

     

     

    — FSM 기반의 재귀 함수 (Finite State Machine)

    • execute 함수가 실행되면 재귀 호출을 이용해서 스스로(execute 함수)를 실행하면서 state 변경 → state가 최종에 도달하면 값을 caller 에 반환
    • sharedData를 통해서 여러가지 context를 저장
    • label : state machine의 현재 state 값 ( 현재는 0 , space가 지남에 따라서(buyer 찾기 등등)
    • 이전 state에서 찾은 값들을 buyer, address, products, stores, order에 저장
    • resumeWith로 재귀 호출을 하고 결과를 result에 저장
    • 인자의 sharedData가 null 이라면 생성하고 아니면 있는 sharedData를 사용
    // FSM 기반의 동기 코드
    class SharedData {
        var label: Int = 0
        lateinit var result: Any
        lateinit var buyer: User
        lateinit var address: Address
        lateinit var products: List<Product>
        lateinit var stores: List<Store>
        lateinit var order: Order
        lateinit var resumeWith: (result: Any) -> Unit
    }
    
    fun execute(
        inputValues: InputValues,
        sharedData: SharedData? = null,
    ) {
        val (userId, productIds) = inputValues
    
        val that = this
        val shared = sharedData ?: SharedData().apply {
            this.resumeWith = fun(result: Any): Order { // 스스로를 재귀 콜
                this.result = result
                return that.execute(inputValues, this) // 만약 스스로 재귀 콜이 아니라 바깥에서 execute를 실행하는 경우 , shared Data에 값을 주지 않으면 디폴트 값인 null이 들어가 inputValues만 넘기게 된다
            }
        }
    }
    
    return when (shared.label) {
        // 가장먼저 shared label 의 기본값은 0이기 때문에 첫번째 블록이 실행된다
        0 -> {
            shared.label = 1
            userRepository.findUserByIdSync(userId)
                .let { user ->
                    // this.resumeWith는 함수로 result에 결과값을 저장하고 다시 execute를 실행함(재귀)
                    shared.resumeWith(user)
                }
        }
    
        // findUserByIdSync 블록이 실행된 수 넘어왔기 때문에 이미 label1 블록이 실행된다
        // buyer에는 위에서 저장한 값이 들어가게 되어 실행됨
        1 -> {
            shared.label = 2
            shared.buyer = shared.result as User
            addressRepository.findAddressByUserSync(shared.buyer).last()
                .let { address ->
                    shared.resumeWith(address)
                }
        }
        2 -> {
            shared.label = 3
            shared.address = shared.result as Address
            checkValidRegion(shared.address)
            productRepository.findAllProductsByIdsSync(productIds)
                .let { products ->
                    shared.resumeWith(products)
                }
        }
        3 -> {
            shared.label = 4
            shared.products = shared.result as List<Product>
            check(shared.products.isNotEmpty())
            storeRepository.findStoresByProductsSync(shared.products)
                .let { stores ->
                    shared.resumeWith(stores)
                }
        }
        4 -> {
            shared.label = 5
            shared.stores = shared.result as List<Store>
            check(shared.stores.isNotEmpty())
            orderRepository.createOrderSync(
                shared.buyer, shared.products, shared.stores, shared.address
            ).let { order ->
                shared.resumeWith(order)
            }
        }
        // 이제 5번까지 실행되면 더이상 recursive 콜 하지 않고 반환한다
        // 앞의 일반적의 동기 코드와 같은 코드가 된 것 이다
        5 -> {
            shared.order = shared.result as Order
            shared.order
        }
        else -> throw IllegalAccessException()
    }
    
    
    // 테스트 코드 
    // execute를 실행할 때, 두번째 인자를 넘기지 않아서 default value인 null로 제공
    @Test
    fun `should return a createdOrder in sync with state machine`() {
        // given
        val userId = "user1"
        val productIds = listOf("product1", "product2", "product3")
    
        // when
        val watch = StopWatch().also { it.start() }
    
        val inputValues = CreateOrderSyncStateMachineUseCase.InputValues(userId, productIds)
        val createdOrder = createOrderUseCase.execute(inputValues)
    
        watch.stop()
        println("Time Elapsed: ${watch.time}ms")
    
        // then
        println(createdOrder)
    }

     

     

     

     

     

     

    FSM 기반의 비동기 코드 (Finite State Machine)

    • sharedDataContinuation을 통해서 여러가지 context를 저장
    • label은 state machine의 현재 state 값을 의미
    • 이전 state에서 찾은 값들을 buyer, address, products, stores, order에 저장
    • resumeWith로 재귀 호출을 하여 결과를 result에 저장
    • 인자의 sharedData가 SharedDataContinuation 타입이 아니라면 생성
    class SharedDataContinuation(
        val completion: Continuation<Any>,
    ) : Continuation<Any> { // Continuation 인터페이스는 context, resumeWith 값을 가진다
        var label: Int = 0
        lateinit var result: Any
        lateinit var buyer: User
        lateinit var address: Address
        lateinit var products: List<Product>
        lateinit var stores: List<Store>
        lateinit var order: Order
        lateinit var resume: () -> Unit
    
        override val context: CoroutineContext = completion.context
        override fun resumeWith(result: Result<Any>) {
            this.result = result
            this.resume()
        }
    }
    
    // 더이상 sharedData라는 두번째 인자값을 사용하지 않고 continuation 인터페이스를 인자로 받게끔 수정
    fun execute(inputValues: InputValues, completion: Continuation<Any>) {
        val (userId, productIds) = inputValues
    
        val that = this
        val cont = completion as? SharedDataContinuation // 클래스를 구현하지 않고 다른 클래스를 구현한 객체라면 직접 생성해서 집어 넣게끔 구조되어있음
            ?: SharedDataContinuation(completion).apply {
                resume = fun() {
                    // recursive self
                    that.execute(inputValues, this)
                }
            }
    
    
        // 동기 코드와 동일하게 subscribe 구문을 when문으로
        // 동기 코드와 별로 다를게 없다
        when (cont.label) {
            // 처음에 기본 label 값 0 이라서 첫번째 구문 실행
            0 -> {
                cont.label = 1
                userRepository.findUserByIdAsMaybe(userId)
                    .subscribe { user -> // subscribe로 (비동기->결과 나오는 시점 다를 수 있음) 결과가 돌아오면 읽어서 상단의 resumeWIth 코드 실행됨 (result에 결과 저장, 자기 자신 호출)
                        cont.resumeWith(Result.success(user))
                    }
            }
            1 -> {
                cont.label = 2
                cont.buyer = (cont.result as Result<User>).getOrThrow()
                addressRepository.findAddressByUserAsPublisher(cont.buyer)
                    .subscribe(LastItemSubscriber { address ->
                        cont.resumeWith(Result.success(address))
                    })
            }
            2 -> {
                cont.label = 3
                cont.address = (cont.result as Result<Address>).getOrThrow()
                checkValidRegion(cont.address)
                productRepository.findAllProductsByIdsAsFlux(productIds)
                    .collectList()
                    .subscribe { products ->
                        cont.resumeWith(Result.success(products))
                    }
            }
            3 -> {
                cont.label = 4
                cont.products = (cont.result as Result<List<Product>>).getOrThrow()
                check(cont.products.isNotEmpty())
                storeRepository.findStoresByProductsAsMulti(cont.products)
                    .collect().asList()
                    .subscribe().with { stores ->
                        cont.resumeWith(Result.success(stores))
                    }
            }
            4 -> {
                cont.label = 5
                cont.stores = (cont.result as Result<List<Store>>).getOrThrow()
                check(cont.stores.isNotEmpty())
                orderRepository.createOrderAsFuture(
                    cont.buyer, cont.products, cont.stores, cont.address
                ).whenComplete { order, _ ->
                    cont.resumeWith(Result.success(order))
                }
            }
            // completion의 resumeWith로 종료 (completion resumeWIth) => 외부에서 콜백이 실행됨을 의미
            5 -> {
                cont.order = (cont.result as Result<Order>).getOrThrow()
                cont.completion.resumeWith(Result.success(cont.order))
            }
            else -> throw IllegalAccessException()
        }
    }
    
    
    // FMS 기반의 비동기 코드 실행
    // testContinuation을 생성해서 execute 함수에 주입
    @Test
    fun `should return a createdOrder in async with state machine`() {
        // given
        val userId = "user1"
        val productIds = listOf("product1", "product2", "product3")
    
        // when
        val watch = StopWatch().also { it.start() }
        val lock = CountDownLatch(1)
        val testContinuation = object : Continuation<Any> {
            override val context = EmptyCoroutineContext
            override fun resumeWith(result: Result<Any>) { // resumeWith에는 테스트 코드 종료 코드를 넣어놓았음 (fsm이 동작을 완료하고 complete되는 시점에 결과 반환하게끔 )
                watch.stop()
                lock.countDown()
    
                println("Time Elapsed: ${watch.time}ms")
                println(result.getOrThrow())
            }
        }
    
        val inputValues = CreateOrderAsyncStateMachine2UseCase.InputValues(userId, productIds)
    
        createOrderUseCase.execute(inputValues, testContinuation)
    
        // then
        lock.await(3000, TimeUnit.MILLISECONDS)
    }

     

     

     

     

     

     

     

     

    — FSM 기반의 코드를 코루틴화 시키기

    • FSM기반의 코드는 코루틴 코드로 작성했을 때 kotlin 컴파일러가 바이트 코드로 만들었던 결과와 같은 것
    • 만든 것에서 덜고 코루틴의 확장함수(maybe, publisher등)를 이용한 코루틴 코드로 바꾸자

     

    — 1) FSM 기반의 코루틴의 확장함수

    • 각각의 비동기 라이브러리에서 사용하는 객체에 대한 extension function 생성
    • 각각의 비동기 라이브러리 subscribe 하는 코드
    • (Flux.toList, Multi.toList, CompletionStage.awaitSingle은 실제와 다르긴하다)
    package com.karrot.example.util
    
    import com.karrot.example.repository.shipment.LastItemSubscriber
    import io.reactivex.rxjava3.core.Maybe
    import io.smallrye.mutiny.Multi
    import reactor.core.publisher.Flux
    import java.util.concurrent.CompletionStage
    import java.util.concurrent.Flow
    import kotlin.coroutines.Continuation
    
    fun <T: Any> Maybe<T>.awaitSingle(cont: Continuation<Any>) {
        this.subscribe { user ->
            cont.resumeWith(Result.success(user))
        }
    }
    
    fun <T: Any> Flow.Publisher<T>.awaitLast(cont: Continuation<Any>) {
        this.subscribe(LastItemSubscriber { address ->
            cont.resumeWith(Result.success(address))
        })
    }
    
    fun <T: Any> Flux<T>.toList(cont: Continuation<Any>) {
        this.collectList()
            .subscribe { products ->
                cont.resumeWith(Result.success(products))
            }
    }
    
    fun <T: Any> Multi<T>.toList(cont: Continuation<Any>) {
        this.collect()
            .asList()
            .subscribeAsCompletionStage()
            .whenComplete { stores, _ ->
                cont.resumeWith(Result.success(stores))
            }
    }
    
    fun <T: Any> CompletionStage<T>.awaitSingle(cont: Continuation<Any>) {
        this.whenComplete { order, _ ->
            cont.resumeWith(Result.success(order))
        }
    }
    
    
    
    // 확장함수를 적용한 결과
    class SharedDataContinuation(
        private val continuation: Continuation<Any>,
    ) : Continuation<Any> {
        var label: Int = 0
        lateinit var result: Any
        lateinit var buyer: User
        lateinit var address: Address
        lateinit var products: List<Product>
        lateinit var stores: List<Store>
        lateinit var order: Order
        lateinit var resume: () -> Unit
    
        override val context: CoroutineContext = continuation.context
    
        override fun resumeWith(result: Result<Any>) {
            this.result = result
            this.resume()
        }
    
        fun complete(result: Result<Any>) {
            this.continuation.resumeWith(result)
        }
    }
    
    fun execute(inputValues: InputValues, continuation: Continuation<Any>) {
        val (userId, productIds) = inputValues
    
        val that = this
        val cont = continuation as? SharedDataContinuation
            ?: SharedDataContinuation(continuation).apply {
                resume = fun() {
                    that.execute(inputValues, this)
                }
            }
    
        when (cont.label) {
            0 -> {
                cont.label = 1
                userRepository.findUserByIdAsMaybe(userId).awaitSingle(cont)
            }
            1 -> {
                cont.label = 2
                cont.buyer = (cont.result as Result<User>).getOrThrow()
                addressRepository.findAddressByUserAsPublisher(cont.buyer).awaitLast(cont)
            }
            2 -> {
                cont.label = 3
                cont.address = (cont.result as Result<Address>).getOrThrow()
                checkValidRegion(cont.address)
                productRepository.findAllProductsByIdsAsFlux(productIds).toList(cont)
            }
            3 -> {
                cont.label = 4
                cont.products = (cont.result as Result<List<Product>>).getOrThrow()
                check(cont.products.isNotEmpty())
                storeRepository.findStoresByProductsAsMulti(cont.products).toList(cont)
            }
            4 -> {
                cont.label = 5
                cont.stores = (cont.result as Result<List<Store>>).getOrThrow()
                check(cont.stores.isNotEmpty())
                orderRepository.createOrderAsFuture(
                    cont.buyer, cont.products, cont.stores, cont.address
                ).awaitSingle(cont)
            }
            5 -> {
                cont.order = (cont.result as Result<Order>).getOrThrow()
                cont.complete(Result.success(cont.order))
            }
            else -> throw IllegalAccessException()
        }
    }
    
    
    // 추가로 when을 제거
    // 코틀린은 컴파일러가 continuation 객체를 알아서 해주기 때문에 앞에서 말한 코루틴 순서와 같은 코드가 되었다
    suspend fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        // 1. 구매자 조회
        val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle()
    
        // 2. 주소 조회 및 유효성 체크
        val address = addressRepository.findAddressByUserAsPublisher(buyer)
            .awaitLast()
        checkValidRegion(address)
    
        // 3. 상품들 조회
        val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList()
        check(products.isNotEmpty())
    
        // 4. 스토어 조회
        val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList()
        check(stores.isNotEmpty())
    
        // 5. 주문 생성
        val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await()
    
        return order
    }
    
    
    // 테스트 코드 
    @Test
    fun `should return a createdOrder in coroutine`() = runBlocking {
        // given
        val userId = "user1"
        val productIds = listOf("product1", "product2", "product3")
    
        // when
        val watch = StopWatch().also { it.start() }
    
        val inputValues = CreateOrderCoroutineUseCase.InputValues(userId, productIds)
        val createdOrder = createOrderUseCase.execute(inputValues)
    
        watch.stop()
        println("Time Elapsed: ${watch.time}ms")
    
        // then
        println(createdOrder)
    }

     

     

     

     

    — 2) 코루틴 async를 이용한 동시성 처리

    • CoroutineDispatcher
    • 여러 쓰레드를 오고 가며 로직을 처리 가능
    • CoroutineScope(Dispatchers.IO).async는 완전 별개의 쓰레드에서 동작
      • buyer 찾고 실행은 되지만 블로킹 되지 않은 상황에서 address 찾고 아래의 await() 지점에서 조인되어 결과가 찾아와져서 아래 코드(order 생성)실행에는 문제가 발생하지 않음
    suspend fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        // 1. 구매자 조회
        val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle()
    
        // 2. 주소 조회 및 유효성 체크
        val addressDeferred = CoroutineScope(Dispatchers.IO).async {
            addressRepository.findAddressByUserAsPublisher(buyer)
                .awaitLast()
        }
    
        // 3. 상품들 조회
        val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList()
        check(products.isNotEmpty())
    
        // 4. 스토어 조회
        val storesDeferred = CoroutineScope(Dispatchers.IO).async {
            storeRepository.findStoresByProductsAsMulti(products).asFlow().toList()
        }
    
        val address = addressDeferred.await()
        val stores = storesDeferred.await()
    
        checkValidRegion(address)
        check(stores.isNotEmpty())
    
        // 5. 주문 생성
        val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await()
    
        return order
    }

     

     

    — 3) try/catch를 이용한 일관성 있는 에러 핸들링

    • 일반적으로 sync/blocking 처럼 try가 감싸서 처리
    suspend fun execute(inputValues: InputValues): Order {
        val (userId, productIds) = inputValues
    
        // 1. 구매자 조회
        val buyer = try {
            userRepository.findUserByIdAsMaybe(userId).awaitSingle()
        } catch (e: Exception) {
            throw NoSuchElementException("no such user")
        }
    
        // 2. 주소 조회 및 유효성 체크
        val address = addressRepository.findAddressByUserAsPublisher(buyer)
            .awaitLast()
        checkValidRegion(address)
    
        // 3. 상품들 조회
        val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList()
        check(products.isNotEmpty())
    
        // 4. 스토어 조회
        val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList()
        check(stores.isNotEmpty())
    
        // 5. 주문 생성
        val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await()
    
        return order
    }

     

     

     

     

     

    + ) 그 외에 Structured concurrency, Channer, Flow

Designed by Tistory.