-
동시 삭제 요청으로 인한 StaleObjectStateException 해결 - redisson lock 적용기 (feat. Spring AOP, applicationEventListener)Spring 2024. 10. 1. 16:58
최근에 회사에서 동시성 이슈가 발생하여 해결하기 위한 작업을 진행했는데,
이때, redisson을 사용해볼 기회가 생겨서 어떤 방식으로 redisson lock을 구현했는지 기록해본다.
이전에 동시성 이슈를 해결하기 위한 강의를 듣고 정리했었다.
↓
https://dodop-blog.tistory.com/464
문제 상황 및 발생 재현
redisson lock은 동시성 이슈를 해결하기 위한 pub sub 기반의 락 구현을 제공한다.
자세한 내용은 이전 블로그 글에서 확인할 수 있다.
먼저 redisson lock을 사용하기 위해 dependency를 추가해준다.
implementation("org.redisson:redisson-spring-boot-starter:3.27.2")
먼저 동시성 이슈가 발생한 상황을 정리하자면, 다음과 같다.
- 하나의 데이터에 대해 삭제가 필요하며, 이때 삭제할 데이터를 히스토리 테이블에 적재하고 삭제가 필요함.
- 하나의 데이터에 동시에 삭제 요청이 발생
- 삭제 데이터를 조회하는 과정에서 두가지 요청에 모두 데이터 조회
- 히스토리 테이블에 적재후 삭제처리 과정에서 이미 하나의 요청에서 삭제처리가 완료된 상태에서 삭제 시도하여 OptimisticLockingFailureException 이슈 발생
- 예외가 OptimisticLockingFailureException으로 발생하지만, stackTrace를 따라가보면 StaleObjectStateException이 발생하고 있으며, 이는 이미 다른 트랜잭션에 의해 삭제되거나 수정된 행을 다시 삭제하려고 시도했기 때문에 발생했다.
- HibernateJpaDialect가 해당 예외를 OptimisticLockingFailureException으로 감싸서 던지도록 구현되어있다.
if (ex instanceof StaleObjectStateException hibEx) { return new ObjectOptimisticLockingFailureException(hibEx.getEntityName(), getIdentifier(hibEx), ex.getMessage(), ex); }
해당 케이스를 재현하기 위해서는 데이터 베이스, redis 연결이 되어있어야 한다.
상황을 재현해보자.
먼저 테스트를 위한 유저 클래스와 레파지토리를 만든다.
문제 재현을 위해서 jpa를 사용한다.
@Entity class User( var email: String, var name: String, var phone: String, ) : BaseEntity() @Repository interface UserRepository : JpaRepository<User, Long>
간단한 문제 재현을 위한 유저삭제 처리 서비스를 만든다.
@Service class UserService( private val userRepository: UserRepository ) { @Transactional fun deleteUser(userId: Long) { userRepository.findById(userId).ifPresent { println("delete user: ${it.id}") Thread.sleep(3) userRepository.delete(it) } } }
문제 재현 테스트 코드를 만든다.
@SpringBootTest class UserServiceTest { @Autowired private lateinit var sut: UserService @Autowired private lateinit var userRepo: UserRepository private lateinit var user: User @BeforeEach fun setUp() { user = userRepo.save(User("test@gmail.com", "홍길동", "010-1234-5678")) } @DisplayName("RedissonLock을 적용하지 않은 케이스 테스트 입니다.") @Test fun `동시에 사용자 삭제 요청하면 OptimisticLockingFailureException이 발생한다`() { // given val numberOfThreads = 2 val executorService = Executors.newFixedThreadPool(numberOfThreads) val futures = mutableListOf<Future<*>>() // when repeat(numberOfThreads) { futures.add(executorService.submit { sut.deleteUser(user.id) }) } executorService.shutdown() // then val exception = assertThrows<ExecutionException> { futures.forEach { it.get() } } assertTrue(exception.cause is OptimisticLockingFailureException) assertTrue(exception.message!!.contains("Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)")) } }
테스트 코드를 실행해보면 OptimisticLockingFailureException 예외가 발생해서 테스트코드가 성공하는걸 확인할 수 있다.
redisson lock AOP 적용
나는 이 문제를 해결하기 위해서 redisson 락을 사용하면서, 최대한 코드에 영향 없이 문제를 해결하고 싶었기 때문에 redisson lock적용로직을 AOP를 이용한 어노테이션을 활용해서 문제를 해결했다.
이를 활용하기 위해서 다음과 같이 어노테이션과 AOP를 만들어준다.
여기서, lock을 적용하기 위해서 필요한 데이터를 어노테이션의 파라미터로 넣어줄 수 있도록 선택하고 싶었다.
@Retention(AnnotationRetention.RUNTIME) @Target(AnnotationTarget.FUNCTION) annotation class ExecuteWithLock( val key: String, val timeUnit: TimeUnit, val waitTimeAmount: Long = 3000, val leaseTimeAmount: Long = 1000 )
이제 Aspect를 작성해준다.
여기서는 파라미터로 넘겨받은 객체를 redisCache의 @Cacheable에서 사용하는 것처럼 속성을 사용할 수 있도록 하고 싶었다.
이를 위해서 파라미터로 받은 데이터를 expression을 이용해서 가져올 수 있도록 구현했다.
@Aspect @Component private class ExecuteWithLockAspect( private val redissonClient: RedissonClient ) { private val parser: ExpressionParser = SpelExpressionParser() @Around("@annotation(com.yunhalee.concurrency_redisson.infrastructure.annotation.ExecuteWithLock)") fun executeWithRedissonLock(proceedingJoinPoint: ProceedingJoinPoint): Any? { val annotation = getAnnotation(proceedingJoinPoint) val key = getKey(proceedingJoinPoint, annotation) println("------------------------------$key -------------------------------") val lock = redissonClient.getLock(key.toString()) try { if (!lock.tryLock(annotation.waitTimeAmount, annotation.leaseTimeAmount, annotation.timeUnit)) { println("Lock 획득 실패") throw RuntimeException("Lock 획득에 실패하여 삭제를 중단합니다.") } println("----------------lock 획득 ----------------------------") return proceedingJoinPoint.proceed() // 함수 실행 } catch (e: InterruptedException) { Thread.currentThread().interrupt() throw RuntimeException("Lock 획득 중 InterruptedException이 발생하여 삭제를 중단합니다.", e) } finally { if (lock.isHeldByCurrentThread) { lock.unlock() println("--------------lock 해제됨 --------------------------") } } } private fun getKey(proceedingJoinPoint: ProceedingJoinPoint, annotation: ExecuteWithLock): Any { val methodSignature: MethodSignature = proceedingJoinPoint.signature as MethodSignature val keyExpression = annotation.key val args = proceedingJoinPoint.args val keyParameterName = getKeyParameterName(keyExpression) val idx = methodSignature.parameterNames.indexOf(keyParameterName) if (args.size < idx || idx < 0) { throw RuntimeException("올바르지 않은 키 값입니다. 파라미터와 키를 확인해주세요.") } val expression = parser.parseExpression(keyExpression) val context = StandardEvaluationContext() context.setVariable(keyParameterName, args[idx]) return expression.getValue(context) ?: throw RuntimeException("올바르지 않은 키값입니다.") } private fun getAnnotation(proceedingJoinPoint: ProceedingJoinPoint): ExecuteWithLock { val methodSignature: MethodSignature = proceedingJoinPoint.signature as MethodSignature return methodSignature.method.getAnnotation(ExecuteWithLock::class.java) } private fun acquireLock(lock: RLock, executeWithLockAnnotation: ExecuteWithLock) { if (!lock.tryLock(executeWithLockAnnotation.waitTimeAmount, executeWithLockAnnotation.leaseTimeAmount, executeWithLockAnnotation.timeUnit)) { println("Lock 획득 실패") throw RuntimeException("Lock 획득에 실패하여 삭제를 중단합니다.") } } private fun getKeyParameterName(keyExpression: String): String { val prefix = "#" val appender = "." return keyExpression.split(prefix) .getOrNull(1) ?.substringBefore(appender) ?: throw RuntimeException("Invalid key expression") } }
Spel을 이용해서 객체 속성을 가져올 수 있게 해두었기 때문에 다음과 같이 어노테이션을 사용할 수도 있다.
@Transactional @ExecuteWithLock(key = "#user.id", waitTimeAmount = 5 leaseTimeAmount = 5, timeUnit = TimeUnit.SECONDS) fun deleteUser(user: User) { userRepository.findById(user.id).ifPresent { println("delete user: ${it.id}") Thread.sleep(3) userRepository.delete(it) } }
여기서는 파라미터로 id 자체를 넣어주기 때문에 다음과 같이 넣어준다.
@Transactional @ExecuteWithLock(key = "#userId", waitTimeAmount = 5, leaseTimeAmount = 5, timeUnit = TimeUnit.SECONDS) fun deleteUser(userId: Long) { userRepository.findById(userId).ifPresent { println("delete user: ${it.id}") Thread.sleep(3) userRepository.delete(it) } }
그런데 테스트를 다시 실행해면, 실패할 것이라고 생각했던 테스트가 성공하는 것을 보게 될것이다...! 왜일까?
트랜잭션
바로 서비스 메서드에 @Transactional 어노테이션이 걸려있기 때문이다!
메서드에 @Transactional어노테이션이 걸려있기 때문에 메서드 종료후에 커밋이 되어 데이터가 반영되는데, 커밋이 되기 전에 락의 해제가 이루어지게되고, 삭제 커밋이 나가기 전에 락의 해제가 이루어져 다른 요청에서 동시에 데이터를 읽어올 수 있게되어 동일한 동시성문제가 발생한 것이다.
이를 위해서는, 락의 해제를 명확하게 커밋 이후 시점에 수행할 수 있도록 지정해주어야 한다.
나는 이를 해결하기 위해서 eventListener를 이용해서 커밋 이후에 락을 해제할 수 있도록 구현했다.
다시 어노테이션과 AOP를 만들어보자.
락의 해제 처리를 다음과 같이 After commit으로 처리할 수 있게 EventListner를 이용해서 이벤트 처리를 통해 구현했다.
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Retention(AnnotationRetention.RUNTIME) @Target(AnnotationTarget.FUNCTION) annotation class ExecuteWithLockV2( val key: String, val waitTimeAmount: Long, val leaseTimeAmount: Long, val timeUnit: TimeUnit ) @Aspect @Component class ExecuteWithLockAspectV2( private val redissonClient: RedissonClient, private val applicationEventPublisher: ApplicationEventPublisher ) { private val parser: ExpressionParser = SpelExpressionParser() @Around("@annotation(com.yunhalee.concurrency_redisson.component.ExecuteWithLockV2)") fun executeWithRedissonLock(proceedingJoinPoint: ProceedingJoinPoint): Any? { val annotation = getAnnotation(proceedingJoinPoint) val key = getKey(proceedingJoinPoint, annotation) val lock = redissonClient.getLock(key.toString()) try { acquireLock(lock, annotation) return proceedingJoinPoint.proceed() } catch (e: InterruptedException) { Thread.currentThread().interrupt() throw RuntimeException("Lock 획득 중 InterruptedException이 발생하여 삭제를 중단합니다.", e) } finally { if (lock.isLocked && lock.isHeldByCurrentThread) { applicationEventPublisher.publishEvent(lock) } } } private fun getKey(proceedingJoinPoint: ProceedingJoinPoint, annotation: ExecuteWithLockV2): Any { val methodSignature: MethodSignature = proceedingJoinPoint.signature as MethodSignature val keyExpression = annotation.key val args = proceedingJoinPoint.args val keyParameterName = getKeyParameterName(keyExpression) val idx = methodSignature.parameterNames.indexOf(keyParameterName) if (args.size < idx || idx < 0) { throw RuntimeException("올바르지 않은 키 값입니다. 파라미터와 키를 확인해주세요.") } val expression = parser.parseExpression(keyExpression) val context = StandardEvaluationContext() context.setVariable(keyParameterName, args[idx]) return expression.getValue(context) ?: throw RuntimeException("올바르지 않은 키값입니다.") } private fun getAnnotation(proceedingJoinPoint: ProceedingJoinPoint): ExecuteWithLockV2 { val methodSignature: MethodSignature = proceedingJoinPoint.signature as MethodSignature return methodSignature.method.getAnnotation(ExecuteWithLockV2::class.java) } private fun acquireLock(lock: RLock, executeWithLockAnnotation: ExecuteWithLockV2) { if (!lock.tryLock(executeWithLockAnnotation.waitTimeAmount, executeWithLockAnnotation.leaseTimeAmount, executeWithLockAnnotation.timeUnit)) { throw RuntimeException("Lock 획득에 실패하여 삭제를 중단합니다.") } } private fun getKeyParameterName(keyExpression: String): String { val prefix = "#" val appender = "." return keyExpression.split(prefix) .getOrNull(1) ?.substringBefore(appender) ?: throw RuntimeException("Invalid key expression") } @EventListener @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) fun unLockRedissonLock(rLock: RLock) { rLock.unlock() } }
이제 서비스를 다시 작성해보자. 위에서 정의한 Lock Aspect를 사용하는 deleteUserV2를 만들어주었다.
@Transactional @ExecuteWithLockV2(key = "#userId", waitTimeAmount = 5, leaseTimeAmount = 5, timeUnit = TimeUnit.SECONDS) fun deleteUserV2(userId: Long) { userRepository.findById(userId).ifPresent { println("delete user: ${it.id}") Thread.sleep(3) userRepository.delete(it) } }
이제 테스트 코드를 통해서 성공하는지 확인해보자.
@SpringBootTest class UserServiceTest { @Autowired private lateinit var sut: UserService @Autowired private lateinit var userRepo: UserRepository private lateinit var user: User @BeforeEach fun setUp() { user = userRepo.save(User("test@gmail.com", "홍길동", "010-1234-5678")) } @DisplayName("RedissonLock을 적용하지 않은 케이스 테스트 입니다.") @Test fun `동시에 사용자 삭제 요청하면 OptimisticLockingFailureException이 발생한다`() { // given val numberOfThreads = 2 val executorService = Executors.newFixedThreadPool(numberOfThreads) val futures = mutableListOf<Future<*>>() // when repeat(numberOfThreads) { futures.add(executorService.submit { sut.deleteUser(user.id) }) } executorService.shutdown() // then val exception = assertThrows<ExecutionException> { futures.forEach { it.get() } } assertTrue(exception.cause is OptimisticLockingFailureException) assertTrue(exception.message!!.contains("Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)")) } @DisplayName("RedissonLock을 적용하여 동시성 이슈를 해결한 테스트 케이스 입니다.") @Test fun `동시에 사용자 삭제 요청에도 동시성 이슈가 발생하지 않는다`() { // given val numberOfThreads = 2 val executorService = Executors.newFixedThreadPool(numberOfThreads) val futures = mutableListOf<Future<*>>() // when repeat(numberOfThreads) { futures.add(executorService.submit { sut.deleteUserV2(user.id) }) } executorService.shutdown() // then assertDoesNotThrow { futures.forEach { it.get() } } } }
테스트를 실행해보면 모두 성공하는 것을 확인할 수 있다!
실제 운영 코드에서는 해당 로직이 락을 사용해서 문제를 해결할 만큼은 필요하지 않다고 판단되어, transaction을 세분화하고 트랜잭션 외부에서 try catch로 스킵하는 코드를 대체되었지만, 작업을 진행해보면서 문제를 파볼 수 있어서 재밌는 시간을 가질 수 있었다!
끝-! ✨✨✨✨✨
'Spring' 카테고리의 다른 글
Grpc Service의 호출오류가 Grpc Client 헬스체크 실패를 야기하는 문제 해결 (0) 2024.11.19 Grpc + Spring : 예외 처리 구현 (0) 2024.09.22 Grpc Spring Security - 3) Grpc Client에서 header를 포함한 grpc 호출하기 (3) 2024.09.21 Grpc Spring Security - 2) Grpc Service에 인증, 인가 구현하기 (0) 2024.09.21 Grpc Spring Security - 1) GrpcSpringSecurity의 인증, 인가 (0) 2024.09.21