-
코틀린 기초 문법 ④ ( + DSL 학습중 ⌛️)KOTLIN 2022. 10. 18. 22:40
참고한 강의와 책은 이전 블로그 글에 올려두었다.
https://dodop-blog.tistory.com/391
예외처리
- 체크예외와 언체크예외를 구별하지 않음
- try를 식으로 사용이 가능
fun readNumber(reader: BufferedReader) { val number = try { // 예외가 발생하지 않으면 이 값을 사용 Integer.parseInt(reader.readLine()) } catch (e: NumberFormatException) { // 예외 발생 시 null 값 사용 null } finally { // 예외 발생과 관계없이 무조건 실행되는 구문 println("Put important code") } }
require문 과 check 문
- 조건을 만족시키지 않을 때 예외를 발생 시킴
- require : 조건을 만족시키지 않으면 IllegalArgumentException 을 발생
- check: 조건을 만족시키지 않으면 IllegalStateException 을 발생
fun validateName(name: String) { require(name.isNotEmpty()) { "Name cannot be empty" } check(name.isNotBlank()) { "Name cannot be blank" } } fun main() { validateName("") // IllegalArgumentException 발생 validateName(" ") // IllegalStateException 발생 }
리플렉션
- 실행시점에(동적) 객체의 프로퍼티와 메서드에 접근할 수 있게 해주는 방법
- 타입과 관계없이 객체를 다뤄야 하거나 객체가 제공하는 메서드나 프로퍼티 이름을 오직 실행 시점에만 알 수 있는 경우
- ex) JSON 직렬화 라이브러리
- 애노테이션에 저장된 데이터에 접근할 수 있음
- 코틀린에서 리플렉션을 구현하는 방법에는 자바의 표준 리플렉션, 코틀린의 리플렉션 API 두가지 방법이 존재
- 참고 문서 → https://kotlinlang.org/docs/reflection.html#class-references
1.자바 표준 리플렉션 API
- 자바가 java.lang.reflect 패키지를 통해 제공
- 코틀린 클래스는 일반 자바 바이트 코드로 컴파일 되므로, 자바 리플렉션 API도 코틀린 클래스를 컴파일한 바이트 코드를 완벽히 지원
- 즉, 리플렉션을 사용하는 자바 라이브러리와 코틀린 코드가 완전히 호환
2.코틀린 리플렉션 API
- 코틀린이 kotlin.reflect 패키지를 통해 제공
- 자바에는 없는 프로퍼티나 nullable타입과 같은 코틀린 고유 개념에 대한 리플렉션 제공
- 자바 리플렉션 API의 복잡한 기능을 제공하지 않아 완전히 대체할 수는 없음
- 코틀린 뿐만 아니라 다른 JVM 언어에서 생성한 바이트 코드를 충분히 다룰 수 있음
- 최상위 수준이나 클래스 안에 정의된 프로퍼티만 리플렉션으로 접근이 가능 ( 함수의 로컬 변수에는 접근할 수 없음 )
dependencies { implementation("org.jetbrains.kotlin:kotlin-reflect:1.7.20") } val myList = listOf(1, 2, 3, 4, 5, 6, 7) println(myList::class.qualifiedName) // java.util.Arrays.ArrayList // 자바 버전 // String var1 = Reflection.getOrCreateKotlinClass(myList.getClass()).getQualifiedName(); val functions = myList::class.declaredFunctions println(functions) // [fun java.util.Arrays.ArrayList<E>.iterator(): kotlin.collections.MutableIterator<E!>, fun java.util.Arrays.ArrayList<E>.contains(E!): kotlin.Boolean, fun java.util.Arrays.ArrayList<E>.spliterator(): java.util.Spliterator<E!>, fun java.util.Arrays.ArrayList<E>.forEach(java.util.function.Consumer<in E!>!): kotlin.Unit, fun java.util.Arrays.ArrayList<E>.toArray(kotlin.Array<(out) T!>!): kotlin.Array<(out) T!>!, fun java.util.Arrays.ArrayList<E>.toArray(): kotlin.Array<(out) kotlin.Any!>!, fun java.util.Arrays.ArrayList<E>.set(kotlin.Int, E!): E!, fun java.util.Arrays.ArrayList<E>.get(kotlin.Int): E!, fun java.util.Arrays.ArrayList<E>.indexOf(E!): kotlin.Int, fun java.util.Arrays.ArrayList<E>.replaceAll(java.util.function.UnaryOperator<E!>): kotlin.Unit, fun java.util.Arrays.ArrayList<E>.sort(java.util.Comparator<in E!>!): kotlin.Unit] // 자바버전 // Collection functions = KClasses.getDeclaredFunctions(Reflection.getOrCreateKotlinClass(myList.getClass())); fun isOdd(x: Int) = x % 2 != 0 println(myList.filter(::isOdd)) // [1, 3, 5, 7]
// 자바와의 호환성 import kotlin.reflect.KClass import kotlin.reflect.jvm.* class A(val p: Int) fun main() { println(A::p.javaGetter) // public final int A.getP() println(A::p.javaField) // private final int A.p println(getKClass(A(1))) // class A } fun getKClass(o: Any): KClass<Any> = o.javaClass.kotlin
비교 연산자 오버로딩
동등성 연산자 equals
- == 연산자 호출
- ≠ 식별자 비교 ( === ) → 오버로딩 불가
- equals의 경우 Any에 정의된 메서드로 override가 필요
순서연산자 compareTo
- 자바의 Comparable 인터페이스와 비슷
- equals와 마찬가지로 상위에서 compareTo에 operator 변경자가 붙어있어 하위 클래스의 오버라이딩 함수에 operator 붙일 필요가 없음
컬렉션 연산자
- 인덱스로 원소에 접근하는 get, set 연산자
- operator 변경자와 함께 구현된 get, set
data class Point(var x: Int, var y: Int) operator fun Point.get(index: Int): Int { return when(index) { 0 -> x 1 -> y else -> throw IndexOutOfBoundsException("Invalid coordinate $Index") } } operator fun Point.set(index: Int, value: Int) { when(index) { 0 -> x = value 1 -> y = value else -> throw IndexOutOfBoundsException("Invalid coordinate $Index") } }
- in 관례
- 객체가 컬렉션에 들어있는지 검사 ( contains )
- rangeTo 관례
- Comparable에 대한 확장 함수
- ..
- for 루프를 위한 Iterator 관례
- : Iterator<>
위임 프로퍼티
- 객체가 직접 작업을 수행하지 않고 다른 도우미 객체(위임 객체)가 그 작업을 처리하도록 맡기는 디자인 패턴
- 참고 영상 → https://www.youtube.com/watch?v=wwz2uPvbfLg
// 위임프로퍼티를 사용하지 않은 예제 (원래 방식) interface Downloader { fun download() } interface Player { fun play() } class FileDownloader(private val file: String) : Downloader { override fun download() { println("$file Downloaded") } } class FilePlayer(private val file: String) : Player { override fun play() { println("$file playing") } } class MediaFile( private val downloader: Downloader, private val player: Player ) : Downloader, Player { override fun download() { downloader.download() } override fun play() { player.play() } } fun main(args: Array<String>) { val file:String = "File1.mkv" val mediaFile = MediaFile(FileDownloader(file), FilePlayer(file)) mediaFile.download() // File1.mkv Downloaded mediaFile.play() // File1.mkv playing } // 위임 프로퍼티를 사용한 예제 class MediaFile( private val downloader: Downloader, private val player: Player ) : Downloader by downloader, Player by player
- 이번엔 get, set 기능을 위임 객체에게 위임해보자
- 참고 영상 → https://www.youtube.com/watch?v=rWPqjr1ZmQU
// 기존의 Student 객체 (set을 가지고 있음) class Student { var firstName: String? = null set(value) { if (value != null && value.length > 5) { field = value.trim().uppercase(Locale.getDefault()) } } var lastName: String? = null set(value) { if (value != null && value.length > 5) { field = value.trim().uppercase(Locale.getDefault()) } } } // 위임 객체를 이용한 구현 class Student { var firstName: String? by NameDelegate() var lastName: String? by NameDelegate() override fun toString(): String { return "$firstName $lastName" } } class NameDelegate { var formattedValue: String? = null operator fun setValue(thisRef: Any?, property: KProperty<*>, value: String?) { if (value != null && value.length > 5) { formattedValue = value.trim().uppercase(Locale.getDefault()) } } operator fun getValue(thisRef: Any?, property: KProperty<*>): String? { return formattedValue } } fun main() { val student = Student() student.firstName = "Kimberly" student.lastName = "Doe" println(student) // KIMBERLY null }
코루틴
- 컴퓨터 프로그램 구성 요소 중 하나로 비선점형 멀티태스킹을 수행하는 일반화한 서브루틴
- 실행을 일시 중단하고 재개 할 수 있는 여러 진입 지점을 허용
- kolinx.coroutines 패키지
- 플랫폼에 따른 제어범위, 실행범위(scope)
- Global Scope : 프로그램 어디서나 제어동작이 가능한 기본 범위
- Coroutine Scope : 특정 목적의 Dispatcher를 지정, 제어 및 동작이 가능한 범위
- Dispatchers.Default : 기본적 백그라운드 동작
- Dispatchers.IO : I/O에 최적화
- Dispatchers.Main : 메인(UI) 스레드에서 동작
kotlinx.coroutines.CoroutineScope.launch
- 반환값이 없는 job 객체 반환
- 만들어진 코루틴은 기본적으로 즉시 실행
- launch를 작동하기 위해서 CoroutineScope객체가 블록의 this로 지정되어야 하는데, 다른 suspend 함수 내부라면 해당 함수가 사용 중인 CoroutineScope가 있겠지만, 그렇지 않은 경우에는 GlobalScope를 사용
- runBlocking : 코루틴이 모두 실행되고 종료되도록 main을 대기시키고 일정시간 응답이 없으면 ANR(Application Not Responding) 오류 발생 후 앱 강제 종료
- runBlocking과 같이 대기가 가능한 구문 안에서 실행되는 함수(join, await은 실행이 끝날때 까지, delay는 일정 시간동안 대기하는 함수)
- job.join()
- Deferred.await() → Deferred의 결과도 반환
- delay() → 시간이 지날 때 까지 다른 코루틴에게 실행을 양보
- cancel() : launch가 반환한 Job의 코루틴 실행을 중단 가능
-
- 코루틴 내부의 delay() 함수 또는 yield() 함수가 사용된 위치 까지 수행된 뒤 종료
-
- cancel()로 인해 속성인 isActive가 false되고 이를 확인하여 수동 종료
-
kotlinx.coroutines.CoroutineScope.aync
- 반환 값이 있는 Deffered 객체
- launch와 함께 둘다 람다 함수 형태로 같은 일을 수행
- 단, launch는 job을 반환하는 반면 async는 Deffered를 반환
일시 중단 함수 (kotlinx-coroutines-core 모듈의 최상위)
- withContext
- withTimeout
- withTimeoutOrNull
- join.await처럼 blocking 함수로 코루틴이 정해진 시간(ms)안에 실행되지 않으면 null을 결과로 반환
- awaitAll
- 모든 작업의 성공을 기다리며 어느 하나라도 예외로 실패하면 예외로 실패함
- joinAll
- 모든 작업이 끝날 때까지 현재 작업을 일시 중단
-> 예제를 이용한 코루틴 이해 🥖
- 코루틴
- 비동기 라이브러리
- 우수한 가독성
- 에러 핸들링
- 동시성 처리
- flow
- channel
- 참고 영상 → https://www.youtube.com/watch?v=eJF60hcz3EU
-일반적인 동기 코드
- 주문 생성을 예로 들어 확인해보자
- 구매자를 조회 → 이를 이용해 주소 조회 → 이를 이용해 상품 조회 → 이를 이용한 스토어 조회 → 상위 모든 정보들을 이용한 주문 생성의 과정
fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues // 1. 구매자 조회 val buyer = userRepository.findUserByIdSync(userId) // 2. 주소 조회 및 유효성 체크 val address = addressRepository.findAddressByUserSync(buyer).last() checkValidRegion(address) // 3. 상품들 조회 val products = productRepository.findAllProductsByIdsSync(productIds) check(products.isNotEmpty()) // 4. 스토어 조회 val stores = storeRepository.findStroesByProductsSync(products) check(stores.isNotEmpty()) // 5. 주문 생성 val order = orderRepository.createOrderSync(buyer, products, stores, address) return order }
- 여러가지 라이브러리를 이용해 비동기 리팩토링 해보기
1) 구매자 조회
- rxjava3의 Maybe를 사용
- 없거나 1개가 아니면 Error를 발생
import io.reactive.rxjava3.core.Maybe interface UserAsyncRepository { fun findUserByIdAsMaybe(userId: String): Maybe<User> } class UserRepository : UserRepositoryBase(), UserAsyncRepository { override fun findUserByIdAsMaybe(userId: String): Maybe<User> { val user = prepareUser(userId) return Maybe.just(user) .delay(TIME_DELAY_MS, TimeUnit.MILLISECONDS) } }
2) 주소 조회
- jdk9의 flow(리액티브 스트림)을 사용
- item(유저가 주어졌을 때의 사용자 주소)을 publish → 정해진 개수 만큼 주소가 publish 되면 complete 이벤트로 flow 종료
import java.util.concurrent.Flow interface AddressAsyncRepository { fun findAddressByUserAsPublisher(user: User): Flow.Publisher<Address> }
3) 상품 조회
- reactor의 flux 사용
- 0과 n개 사이의 데이터가 존재하지 않으면 Error 발생
import reactor.core.publisher.Flux interface ProductAsyncRepository { fun findAllProductsByIdsAsFlux(productIds: List<String>): Flux<Product> }
4) 스토어 조회
- mutiny의 Multi 사용
- 0과 n개 사이의 데이터가 존재하지 않으면 Error 발생
- Multi를 이용하여 리액트 flux 처럼 상품들을 가지고 각각의 스토어를 조회
import io.smallrye.mutiny.Multi interface StoreAsyncRepository { fun findStoresByProductsAsMulti(products: List<Product>): Multi<Store> }
5) 주문 생성
- jdk8의 CompletableFuture 사용
- complete되는 시점에 결과를 반환
import java.util.concurrent.CompletableFuture interface OrderAsyncRepository { fun createOrderAsFuture( buyer: User, products: List<Product>, stores: List<Store>, address: Address, ): CompletableFuture<Order> }
— 앞에서 작성한 비동기 코드를 조합한 결과 ?
- subscribe hell 발생
- subscribe : 결과를 얻은 시점에 주어진 subscriber(consumer)를 실행하는 일종의 call back
- 반환 값들이 다른 subscribe에서 필요하고 위에서 찾은 내용들이 주문 생성까지 쭉 이어지기 때문에 subscribe가 중첩되고 콜백과 유사한 과정처럼 느껴지게 됨
- 각각의 어댑터로 변경한다고 해도 똑같이 hell이 됨
- 예를 들어 각각의 비동기 함수를 Reactor로 변경해봐도 (RxJava3Adaptor, JdkFlowAdapter, Flux.collectList, Flux.from, Mono.fromFuture) 위에서 계속해서 flatMap hell 처럼 중첩되는 모습을 보일 것
fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues return Mono.create { emitter -> userRepository.findUserByIdAsMaybe(userId) .subscribe { buyer -> addressRepository.findAddressByUserAsPublisher { address -> checkValidRegion(address) productRepository.findAllProductsByIdsAsFlux(productIds) .collectList() .subscribe { products -> check(products.isNotEmpty()) storeRepository.findStoresByProductsAsMulti(products) .collect().asList() .subscribe().with { stores -> check(stores.isNotEmpty()) orderRepository.createOrderAsFuture(buyer, products, stores, address) .whenComplete { order, _ -> emitter.success(order) } } } } } } }
-코루틴을 이용하여 비동기 구현해보기
- suspend 키워드를 사용
- Maybe<T>.awaitSingle
- Publisher<T>.awaitLast
- Flow<T>.toList
- CompletableFuture<T>.await
suspend fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues // 1. 구매자 조회 val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle() // 2. 주소 조회 및 유효성 체크 val address = addressRepository.findAddressByUserAsPublisher(buyer).awaitLast() checkValidRegion(address) // 3. 상품들 조회 val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList() check(products.isNotEmpty()) // 4. 스토어 조회 val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList() check(stores.isNotEmpty()) // 5. 주문 생성 val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await() return order }
- 테스트 코드는 다음과 같다.
@Test fun `should return a createdOrder in coroutine`() = runBlocking { // given val userId = "user1" val productsId = listOf("product1", "product2", "product3") // when val watch = StopWatch().also { it.start() } val inputValues = CreateOrderCoroutineUserCase.InputValues(userId, productIds) val createdOrder = createOrderUseCase.execute(inputValues) watch.stop() println("Time Elapsed: ${watch.time}ms") // then println(createdOrder) }
— 그렇다면 suspend 함수의 실행은 어떻게 되는 것인가?
- suspend 함수의 실행은 또 다른 suspend 함수 내부에서만 실행이 가능한데, runBlocking은 동기 코드에서 coroutine을 실행할 수 있게 bridge역할을 수행하여 일반적인 동기 코드처럼 찍어볼 수 있게 해줌
- 코루틴이 경량화된 쓰레드 혹은 특정지점(awaitSingle)에서 정지했다가 재개할 수 있는 쓰레드라는 말이 있지만 내부구조를 보면 완전히 다르다
- 코루틴의 내부 구조 (Kotlin Compiler - Finite State Machine ⇒ FSM)
- kotlin 컴파일러가 suspend 붙은 함수를 호출할 때 함수 내부에서 FSM 기반의 재귀 함수로 변환하여 함수 내부에서 state(shared data)를 가지고 계속 recursive call → 끝에 다 달았을 때야 flow 한테 호출
- kotlin 컴파일러가 suspend 가 붙은 함수에 추가적인 코드를 추가
- Continuation 인자를 타겟 함수에 추가 하고 Continuation 구현체를 생성
- 타겟 함수 내의 모든 suspend 함수에 생성한 continuation 객체를 패스
- 코드를 분리해서 switch case(when) 안에 넣고 label을 이용해서 state 변경
— FSM 기반의 재귀 함수 (Finite State Machine)
- execute 함수가 실행되면 재귀 호출을 이용해서 스스로(execute 함수)를 실행하면서 state 변경 → state가 최종에 도달하면 값을 caller 에 반환
- sharedData를 통해서 여러가지 context를 저장
- label : state machine의 현재 state 값 ( 현재는 0 , space가 지남에 따라서(buyer 찾기 등등)
- 이전 state에서 찾은 값들을 buyer, address, products, stores, order에 저장
- resumeWith로 재귀 호출을 하고 결과를 result에 저장
- 인자의 sharedData가 null 이라면 생성하고 아니면 있는 sharedData를 사용
// FSM 기반의 동기 코드 class SharedData { var label: Int = 0 lateinit var result: Any lateinit var buyer: User lateinit var address: Address lateinit var products: List<Product> lateinit var stores: List<Store> lateinit var order: Order lateinit var resumeWith: (result: Any) -> Unit } fun execute( inputValues: InputValues, sharedData: SharedData? = null, ) { val (userId, productIds) = inputValues val that = this val shared = sharedData ?: SharedData().apply { this.resumeWith = fun(result: Any): Order { // 스스로를 재귀 콜 this.result = result return that.execute(inputValues, this) // 만약 스스로 재귀 콜이 아니라 바깥에서 execute를 실행하는 경우 , shared Data에 값을 주지 않으면 디폴트 값인 null이 들어가 inputValues만 넘기게 된다 } } } return when (shared.label) { // 가장먼저 shared label 의 기본값은 0이기 때문에 첫번째 블록이 실행된다 0 -> { shared.label = 1 userRepository.findUserByIdSync(userId) .let { user -> // this.resumeWith는 함수로 result에 결과값을 저장하고 다시 execute를 실행함(재귀) shared.resumeWith(user) } } // findUserByIdSync 블록이 실행된 수 넘어왔기 때문에 이미 label1 블록이 실행된다 // buyer에는 위에서 저장한 값이 들어가게 되어 실행됨 1 -> { shared.label = 2 shared.buyer = shared.result as User addressRepository.findAddressByUserSync(shared.buyer).last() .let { address -> shared.resumeWith(address) } } 2 -> { shared.label = 3 shared.address = shared.result as Address checkValidRegion(shared.address) productRepository.findAllProductsByIdsSync(productIds) .let { products -> shared.resumeWith(products) } } 3 -> { shared.label = 4 shared.products = shared.result as List<Product> check(shared.products.isNotEmpty()) storeRepository.findStoresByProductsSync(shared.products) .let { stores -> shared.resumeWith(stores) } } 4 -> { shared.label = 5 shared.stores = shared.result as List<Store> check(shared.stores.isNotEmpty()) orderRepository.createOrderSync( shared.buyer, shared.products, shared.stores, shared.address ).let { order -> shared.resumeWith(order) } } // 이제 5번까지 실행되면 더이상 recursive 콜 하지 않고 반환한다 // 앞의 일반적의 동기 코드와 같은 코드가 된 것 이다 5 -> { shared.order = shared.result as Order shared.order } else -> throw IllegalAccessException() } // 테스트 코드 // execute를 실행할 때, 두번째 인자를 넘기지 않아서 default value인 null로 제공 @Test fun `should return a createdOrder in sync with state machine`() { // given val userId = "user1" val productIds = listOf("product1", "product2", "product3") // when val watch = StopWatch().also { it.start() } val inputValues = CreateOrderSyncStateMachineUseCase.InputValues(userId, productIds) val createdOrder = createOrderUseCase.execute(inputValues) watch.stop() println("Time Elapsed: ${watch.time}ms") // then println(createdOrder) }
— FSM 기반의 비동기 코드 (Finite State Machine)
- sharedDataContinuation을 통해서 여러가지 context를 저장
- label은 state machine의 현재 state 값을 의미
- 이전 state에서 찾은 값들을 buyer, address, products, stores, order에 저장
- resumeWith로 재귀 호출을 하여 결과를 result에 저장
- 인자의 sharedData가 SharedDataContinuation 타입이 아니라면 생성
class SharedDataContinuation( val completion: Continuation<Any>, ) : Continuation<Any> { // Continuation 인터페이스는 context, resumeWith 값을 가진다 var label: Int = 0 lateinit var result: Any lateinit var buyer: User lateinit var address: Address lateinit var products: List<Product> lateinit var stores: List<Store> lateinit var order: Order lateinit var resume: () -> Unit override val context: CoroutineContext = completion.context override fun resumeWith(result: Result<Any>) { this.result = result this.resume() } } // 더이상 sharedData라는 두번째 인자값을 사용하지 않고 continuation 인터페이스를 인자로 받게끔 수정 fun execute(inputValues: InputValues, completion: Continuation<Any>) { val (userId, productIds) = inputValues val that = this val cont = completion as? SharedDataContinuation // 클래스를 구현하지 않고 다른 클래스를 구현한 객체라면 직접 생성해서 집어 넣게끔 구조되어있음 ?: SharedDataContinuation(completion).apply { resume = fun() { // recursive self that.execute(inputValues, this) } } // 동기 코드와 동일하게 subscribe 구문을 when문으로 // 동기 코드와 별로 다를게 없다 when (cont.label) { // 처음에 기본 label 값 0 이라서 첫번째 구문 실행 0 -> { cont.label = 1 userRepository.findUserByIdAsMaybe(userId) .subscribe { user -> // subscribe로 (비동기->결과 나오는 시점 다를 수 있음) 결과가 돌아오면 읽어서 상단의 resumeWIth 코드 실행됨 (result에 결과 저장, 자기 자신 호출) cont.resumeWith(Result.success(user)) } } 1 -> { cont.label = 2 cont.buyer = (cont.result as Result<User>).getOrThrow() addressRepository.findAddressByUserAsPublisher(cont.buyer) .subscribe(LastItemSubscriber { address -> cont.resumeWith(Result.success(address)) }) } 2 -> { cont.label = 3 cont.address = (cont.result as Result<Address>).getOrThrow() checkValidRegion(cont.address) productRepository.findAllProductsByIdsAsFlux(productIds) .collectList() .subscribe { products -> cont.resumeWith(Result.success(products)) } } 3 -> { cont.label = 4 cont.products = (cont.result as Result<List<Product>>).getOrThrow() check(cont.products.isNotEmpty()) storeRepository.findStoresByProductsAsMulti(cont.products) .collect().asList() .subscribe().with { stores -> cont.resumeWith(Result.success(stores)) } } 4 -> { cont.label = 5 cont.stores = (cont.result as Result<List<Store>>).getOrThrow() check(cont.stores.isNotEmpty()) orderRepository.createOrderAsFuture( cont.buyer, cont.products, cont.stores, cont.address ).whenComplete { order, _ -> cont.resumeWith(Result.success(order)) } } // completion의 resumeWith로 종료 (completion resumeWIth) => 외부에서 콜백이 실행됨을 의미 5 -> { cont.order = (cont.result as Result<Order>).getOrThrow() cont.completion.resumeWith(Result.success(cont.order)) } else -> throw IllegalAccessException() } } // FMS 기반의 비동기 코드 실행 // testContinuation을 생성해서 execute 함수에 주입 @Test fun `should return a createdOrder in async with state machine`() { // given val userId = "user1" val productIds = listOf("product1", "product2", "product3") // when val watch = StopWatch().also { it.start() } val lock = CountDownLatch(1) val testContinuation = object : Continuation<Any> { override val context = EmptyCoroutineContext override fun resumeWith(result: Result<Any>) { // resumeWith에는 테스트 코드 종료 코드를 넣어놓았음 (fsm이 동작을 완료하고 complete되는 시점에 결과 반환하게끔 ) watch.stop() lock.countDown() println("Time Elapsed: ${watch.time}ms") println(result.getOrThrow()) } } val inputValues = CreateOrderAsyncStateMachine2UseCase.InputValues(userId, productIds) createOrderUseCase.execute(inputValues, testContinuation) // then lock.await(3000, TimeUnit.MILLISECONDS) }
— FSM 기반의 코드를 코루틴화 시키기
- FSM기반의 코드는 코루틴 코드로 작성했을 때 kotlin 컴파일러가 바이트 코드로 만들었던 결과와 같은 것
- 만든 것에서 덜고 코루틴의 확장함수(maybe, publisher등)를 이용한 코루틴 코드로 바꾸자
— 1) FSM 기반의 코루틴의 확장함수
- 각각의 비동기 라이브러리에서 사용하는 객체에 대한 extension function 생성
- 각각의 비동기 라이브러리 subscribe 하는 코드
- (Flux.toList, Multi.toList, CompletionStage.awaitSingle은 실제와 다르긴하다)
package com.karrot.example.util import com.karrot.example.repository.shipment.LastItemSubscriber import io.reactivex.rxjava3.core.Maybe import io.smallrye.mutiny.Multi import reactor.core.publisher.Flux import java.util.concurrent.CompletionStage import java.util.concurrent.Flow import kotlin.coroutines.Continuation fun <T: Any> Maybe<T>.awaitSingle(cont: Continuation<Any>) { this.subscribe { user -> cont.resumeWith(Result.success(user)) } } fun <T: Any> Flow.Publisher<T>.awaitLast(cont: Continuation<Any>) { this.subscribe(LastItemSubscriber { address -> cont.resumeWith(Result.success(address)) }) } fun <T: Any> Flux<T>.toList(cont: Continuation<Any>) { this.collectList() .subscribe { products -> cont.resumeWith(Result.success(products)) } } fun <T: Any> Multi<T>.toList(cont: Continuation<Any>) { this.collect() .asList() .subscribeAsCompletionStage() .whenComplete { stores, _ -> cont.resumeWith(Result.success(stores)) } } fun <T: Any> CompletionStage<T>.awaitSingle(cont: Continuation<Any>) { this.whenComplete { order, _ -> cont.resumeWith(Result.success(order)) } } // 확장함수를 적용한 결과 class SharedDataContinuation( private val continuation: Continuation<Any>, ) : Continuation<Any> { var label: Int = 0 lateinit var result: Any lateinit var buyer: User lateinit var address: Address lateinit var products: List<Product> lateinit var stores: List<Store> lateinit var order: Order lateinit var resume: () -> Unit override val context: CoroutineContext = continuation.context override fun resumeWith(result: Result<Any>) { this.result = result this.resume() } fun complete(result: Result<Any>) { this.continuation.resumeWith(result) } } fun execute(inputValues: InputValues, continuation: Continuation<Any>) { val (userId, productIds) = inputValues val that = this val cont = continuation as? SharedDataContinuation ?: SharedDataContinuation(continuation).apply { resume = fun() { that.execute(inputValues, this) } } when (cont.label) { 0 -> { cont.label = 1 userRepository.findUserByIdAsMaybe(userId).awaitSingle(cont) } 1 -> { cont.label = 2 cont.buyer = (cont.result as Result<User>).getOrThrow() addressRepository.findAddressByUserAsPublisher(cont.buyer).awaitLast(cont) } 2 -> { cont.label = 3 cont.address = (cont.result as Result<Address>).getOrThrow() checkValidRegion(cont.address) productRepository.findAllProductsByIdsAsFlux(productIds).toList(cont) } 3 -> { cont.label = 4 cont.products = (cont.result as Result<List<Product>>).getOrThrow() check(cont.products.isNotEmpty()) storeRepository.findStoresByProductsAsMulti(cont.products).toList(cont) } 4 -> { cont.label = 5 cont.stores = (cont.result as Result<List<Store>>).getOrThrow() check(cont.stores.isNotEmpty()) orderRepository.createOrderAsFuture( cont.buyer, cont.products, cont.stores, cont.address ).awaitSingle(cont) } 5 -> { cont.order = (cont.result as Result<Order>).getOrThrow() cont.complete(Result.success(cont.order)) } else -> throw IllegalAccessException() } } // 추가로 when을 제거 // 코틀린은 컴파일러가 continuation 객체를 알아서 해주기 때문에 앞에서 말한 코루틴 순서와 같은 코드가 되었다 suspend fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues // 1. 구매자 조회 val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle() // 2. 주소 조회 및 유효성 체크 val address = addressRepository.findAddressByUserAsPublisher(buyer) .awaitLast() checkValidRegion(address) // 3. 상품들 조회 val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList() check(products.isNotEmpty()) // 4. 스토어 조회 val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList() check(stores.isNotEmpty()) // 5. 주문 생성 val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await() return order } // 테스트 코드 @Test fun `should return a createdOrder in coroutine`() = runBlocking { // given val userId = "user1" val productIds = listOf("product1", "product2", "product3") // when val watch = StopWatch().also { it.start() } val inputValues = CreateOrderCoroutineUseCase.InputValues(userId, productIds) val createdOrder = createOrderUseCase.execute(inputValues) watch.stop() println("Time Elapsed: ${watch.time}ms") // then println(createdOrder) }
— 2) 코루틴 async를 이용한 동시성 처리
- CoroutineDispatcher
- 여러 쓰레드를 오고 가며 로직을 처리 가능
- CoroutineScope(Dispatchers.IO).async는 완전 별개의 쓰레드에서 동작
- buyer 찾고 실행은 되지만 블로킹 되지 않은 상황에서 address 찾고 아래의 await() 지점에서 조인되어 결과가 찾아와져서 아래 코드(order 생성)실행에는 문제가 발생하지 않음
suspend fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues // 1. 구매자 조회 val buyer = userRepository.findUserByIdAsMaybe(userId).awaitSingle() // 2. 주소 조회 및 유효성 체크 val addressDeferred = CoroutineScope(Dispatchers.IO).async { addressRepository.findAddressByUserAsPublisher(buyer) .awaitLast() } // 3. 상품들 조회 val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList() check(products.isNotEmpty()) // 4. 스토어 조회 val storesDeferred = CoroutineScope(Dispatchers.IO).async { storeRepository.findStoresByProductsAsMulti(products).asFlow().toList() } val address = addressDeferred.await() val stores = storesDeferred.await() checkValidRegion(address) check(stores.isNotEmpty()) // 5. 주문 생성 val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await() return order }
— 3) try/catch를 이용한 일관성 있는 에러 핸들링
- 일반적으로 sync/blocking 처럼 try가 감싸서 처리
suspend fun execute(inputValues: InputValues): Order { val (userId, productIds) = inputValues // 1. 구매자 조회 val buyer = try { userRepository.findUserByIdAsMaybe(userId).awaitSingle() } catch (e: Exception) { throw NoSuchElementException("no such user") } // 2. 주소 조회 및 유효성 체크 val address = addressRepository.findAddressByUserAsPublisher(buyer) .awaitLast() checkValidRegion(address) // 3. 상품들 조회 val products = productRepository.findAllProductsByIdsAsFlux(productIds).asFlow().toList() check(products.isNotEmpty()) // 4. 스토어 조회 val stores = storeRepository.findStoresByProductsAsMulti(products).asFlow().toList() check(stores.isNotEmpty()) // 5. 주문 생성 val order = orderRepository.createOrderAsFuture(buyer, products, stores, address).await() return order }
+ ) 그 외에 Structured concurrency, Channer, Flow
'KOTLIN' 카테고리의 다른 글
코프링(코틀린 + 스프링부트) + 구글 스프레드 시트로 슬랙봇 만들기 - ② 슬랙으로 요청받기 (0) 2022.10.22 코프링(코틀린 + 스프링부트) + 구글 스프레드 시트로 슬랙봇 만들기 - ① 슬랙앱 생성 및 사용 설정하기 (0) 2022.10.22 코틀린 기초 문법 ③ (0) 2022.09.28 코틀린 기초 문법 ② (0) 2022.09.27 코틀린 기초 문법 ① (0) 2022.09.26