ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 인프런) 실습으로 배우는 선착순 이벤트 시스템
    JAVA 2024. 6. 5. 00:36

     

     

     

    이번에는 실습으로 배우는 선착순 이벤트 시스템 강의를 듣고 실습해보았다. 

    https://www.inflearn.com/course/%EC%84%A0%EC%B0%A9%EC%88%9C-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EC%8B%9C%EC%8A%A4%ED%85%9C-%EC%8B%A4%EC%8A%B5/dashboard

     

    실습으로 배우는 선착순 이벤트 시스템 | 최상용 - 인프런

    최상용 | 선착순 이벤트 시스템을 구현할 때 어떤 문제가 발생할 수 있고 어떻게 해결할 수 있는지 배워봅니다., 선착순 이벤트 시스템도 자신있게! 예제를 통해 실전 감각을 잡아보세요.  [임

    www.inflearn.com

     

     

     

     

     

    선착순 시스템 문제와 해결 

     

    실시간으로 100개의 쿠폰만을 발행해야하는 선착순 이벤트 시스템에서 발생할 수 있는 문제는 다음과 같다. 

    - 쿠폰이 100개보다 더 많이 발행될 수 있음 

    - 이벤트 페이지 접속이 안됨 

    - 이벤트랑 전혀 상관없는 페이지도 느려질 수 있음 

     

     

    이와 같은 문제를 해결하는 방법은 다음과 같다. 

    - 트래픽이 몰렸을 때 대처할 수 있는 방법 

    - redis를 활용하여 쿠폰 발급 개수 보장 

    - kafka를 활용하여 다른 페이지들에 대한 영향도를 줄임 

     

     

     

    이제 실제 서비스를 만들어보자. 

    먼저 도커 세팅이 되어있어야 한다 

     

    docker 설치한다. 

    % brew install docker
    % brew link docker
    % docker version

     

    docker mysql 실행 명령어를 통해 mysql을 실행한다. 

    % docker pull mysql
    % docker run -d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=1234 --name mysql mysql
    % docker ps
    % docker exec -it mysql bash

     

    💡 참고 )
    docker: no matching manifest for linux/arm64/v8 in the manifest list entries. 오류가 발생하면 다음 명령어를 실행한다. 

    % docker pull --platform linux/x86_64 mysql

     

     

     

    mysql 명령어를 통해 데이터베이스를 만든다. 

    % mysql -u root -p
    % create database coupon_example;
    % use coupon_example;

     

     

    이제 dependency를 추가해준다. 

    내가 실습하는 프로젝트는 멀티모듈 구조라서 모듈을 생성하고 build에 다음과 같이 추가해주었다. 

    plugins {
    }
    
    dependencies {
        implementation("org.springframework.boot:spring-boot-starter-data-jpa")
        implementation("org.springframework.boot:spring-boot-starter-web")
        implementation("org.springframework.boot:spring-boot-starter-data-redis:3.2.4")
        implementation("org.redisson:redisson-spring-boot-starter:3.27.2")
        runtimeOnly("com.mysql:mysql-connector-j")
    }

     

     

    application.yaml 설정은 다음과 같이 해주었다. 

    spring:
      application:
        name: concurrency
      jpa:
        hibernate:
          ddl-auto: create
        show-sql: true
        properties:
          hibernate:
            dialect: org.hibernate.dialect.MySQLDialect
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:33306/coupon_example
        username: root
        password: root
        hikari:
          maximum-pool-size: 40
    
    logging:
      level:
        org:
          hibernate:
            SQL: DEBUG
            type:
              descriptor:
                sql:
                  BasicBinder: TRACE

     

     

     

    먼저 쿠폰 시스템을 만들어보자

    요구사항은 다음과 같다. 

    선착순 100명에게 할인쿠폰을 제공하는 이벤트를 진행하고자 한다.
    
    이 이벤트는 아래와 같은 조건을 만족하여야 한다.
    - 선착순 100명에게만 지급되어야한다.
    - 101개 이상이 지급되면 안된다.
    - 순간적으로 몰리는 트래픽을 버틸 수 있어야합니다.

     

     

     

    쿠폰 발급 로직을 만들어보자. 

    import jakarta.persistence.Entity;
    import jakarta.persistence.GeneratedValue;
    import jakarta.persistence.GenerationType;
    import jakarta.persistence.Id;
    
    @Entity
    public class Coupon {
    
        @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
    
        private Long userId;
    
        public Coupon() {
        }
    
        public Coupon(Long userId) {
            this.userId = userId;
        }
    
        public Long getId() {
            return id;
        }
    }
    import com.yunhalee.concurrency_practice.domain.Coupon;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public interface CouponRepository extends JpaRepository<Coupon, Long> {
    
    }
    import com.yunhalee.concurrency_practice.domain.Coupon;
    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ApplyService {
    
        private final CouponRepository couponRepository;
    
        public ApplyService(CouponRepository couponRepository) {
            this.couponRepository = couponRepository;
        }
    
        public void apply(Long userId) {
            long count = couponRepository.count();
    
            if (count > 100) {
                return;
            }
    
            couponRepository.save(new Coupon(userId));
    
        }
    }

     

    쿠폰발급 서비스 테스트 코드를 만들어보자. 

    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    @SpringBootTest
    class ApplyServiceTest {
    
        @Autowired
        private ApplyService applyService;
    
        @Autowired
        private CouponRepository couponRepository;
    
        @Test
        public void 한번만응모() {
            applyService.apply(1L);
            long count = couponRepository.count();
            assertEquals(1, count);
        }
    }

     

     

     

    테스트에 정상적으로 성공한다. 그런데 여기에는 몇가지 문제점이 있다. 

    테스트에 성공한 케이스는 요청이 1개만 들어왔을 때이다. 

    동시에 요청이 여러개가 들어온다면, 예상대로 동작하지 않는다. 

    테스트 케이스를 만들어보자. 

        @Test
        public void 여러명응모() throws InterruptedException {
            int threadCount = 1000;
            ExecutorService executorService = Executors.newFixedThreadPool(32);
            CountDownLatch latch =  new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                long userId = i;
                executorService.submit(() -> {
                    try {
                        applyService.apply(userId);
                    } finally {
                        latch.countDown();
                    }
                });
            }
    
            latch.await();
            long count = couponRepository.count();
            assertEquals(100, count);
        }

     

     

     

     

    해당문제를 해결하기 위해서 redis를 이용한다. 

    먼저 레디스를 설치하고 실행한다. 

    % docker pull redis
    % docker run --name myredis -d -p 6379:6379 redis

     

    프로젝트에 redis dependency를 추가한다. 

        implementation("org.springframework.boot:spring-boot-starter-data-redis:3.2.4")

     

     

    레이스 컨디션이 일어나는 부분쿠폰 갯수를 가져오는 부분이었다. 

    레이스는 두개이상의 스레드에서 공유자원에 접근하고자 할때 발생하므로 싱글스레드의 경우 해당 문제가 발생하지 않는다. 

    하지만, 쿠폰 발급 전체를 싱글스레드로 하게 된다면, 회원의 모든 로직이 종료된 후에야 회원의 쿠폰 발급이 가능해지기 때문에 성능상 좋지 않을 것이다. 

     

    예를 들어 10:1분에 사용자가 쿠폰 발급을 요청하고 2분에 쿠폰발급이 완료된다면, 2,3번째 사용자는 그 이후부터 순차적으로 쿠폰발급이 이루어지게 된다. 

     

    자바에서 지원하는 synchronized는 서버가 여러대가된다면, 레이스 컨디션이 다시 발생하게 된다. 

     

    mysql의 락을 이용할 수 있는데, 만약 이를 사용하게 된다면, 쿠폰 발급부터 생성까지 모두 락을 걸게된다. 따라서, 락을 거는구간이 길어져서 성능이 낮아질 수 있다. 

     

    프로젝트의 핵심키는 쿠폰 갯수이므로, 쿠폰 갯수에 대한 정합성만 관리하면된다. 

    레디스는 싱글 스레드기반으로 동작하여 레이스 컨디션을 해결하고 incr 명령어는 성능도 빠르다. 

     

     

    incr 명령어를 입력하면 특정 키에 대한 값이 1씩 증가한다. 

    이를 이용해서 발급된 쿠폰의 갯수를 제어하자. 

    쿠폰 발급하기 전에 coupon_count가 100보다 크다면 그 이상 발급된 것이므로 발급하지 않도록 한다. 

    수를 증가시키는 increment를 작성한다. 

    import com.yunhalee.concurrency_practice.domain.Coupon;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public class CouponCountRepository {
    
        private final RedisTemplate<String, String> redisTemplate;
    
        public CouponCountRepository(RedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        public Long increment() {
            return redisTemplate
                    .opsForValue()
                    .increment("coupon_count");
        }
    }
    import com.yunhalee.concurrency_practice.domain.Coupon;
    import com.yunhalee.concurrency_practice.repository.CouponCountRepository;
    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ApplyService {
    
        private final CouponRepository couponRepository;
    
        private final CouponCountRepository couponCountRepository;
    
        public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository) {
            this.couponRepository = couponRepository;
            this.couponCountRepository = couponCountRepository;
        }
    
        public void apply(Long userId) {
            Long count = couponCountRepository.increment();
    
            if (count > 100) {
                return;
            }
    
            couponRepository.save(new Coupon(userId));
    
        }
    }

     

    flushall명령어를 이용해서 먼저 카운트를 초기화하고 

    테스트 케이스를 다시 실행하면 성공하는 것을 확인할 수 있다. 

     

    카운트를 읽을때 모든 스레드가 기다렸다가 정합성이 맞는 데이터만을 읽기 때문에 100개 이상 처리되지 않도록 한다. 

     

     

    그런데 발급받는 쿠폰의 갯수가 많아질수록 rdb에 부하가 가해진다. 

    만약 쿠폰 DB가 아니라 다른 서비스와 동일한 DB를 사용하고 있었다면, 다른 서비스까지 장애가 전파되게 된다. 

     

     

    100분 이후에 모든 처리가 가능한데, 요청이 한번에 들어오게 된다면 대부분의 서비스의 경우 timeout 옵션이 설정되어있기 때문에 주문, 회원가입 뿐만아니라 일부분의 쿠폰도 생성되지 않는 문제가 발생할 수 있다. 

     

    두번째로, 짧은 시간내에 많은 요청이 들어오게 된다면, db 리소스에 많은 부하가 가해지게 되고 이는 곧 서비스 오류로 이어질 수 있다. 

    ngrinder와 aws로 많은 트래픽이 들어오는 테스트 해보자. 

    클라이언트가 쿠폰 요청을 한다면 로드밸런서가 적절히 트래픽을 분산해서 api에 요청하게 된다. 

     

     

    단기간에 많은 요청이 들어와 rdb에 cpu 부하가 증가하고 서비스 오류로 이어지는 것을 확인할 수 있다. 

     

     

     

     

    이번엔 카프카를 이용해서 문제를 해결해보자. 

    먼저 카프카를 설치한다

    먼저 docker-compose가 설치되어있는지 확인한다 

    % docker-compose -v

     

    프로젝트에 docker-compose.yml 파일을 만들어준다. 

    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka:2.12-2.5.0
        container_name: kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock

     

    파일 위치로 가서 docker -compose파일을 실행한다. 

    % docker-compose up -d

     

    중단은 다음의 명령어로 할 수 있다. 

    % docker-compose down

     

     

     

    잘 실행되었는지 확인한다. 

    % docker ps

     

     

     

     

     

    카프카란 ?

     

    분산 이벤트 스트리밍 플랫폼으로 이벤트 스트리밍이란 소스에서 목적지까지 이벤트를 실시간으로 스트리밍 하는 것을 말한다. 

    토픽은 큐로 이루어지고, 프로듀서는 토픽에 데이터를 넣고, consumer가 이를 소비하게된다. 

    테스트를 위해서 test라는 토픽을 만들어준다. 

     

    토픽생성

    % docker exec -it kafka [kafka-topics.sh](<http://kafka-topics.sh/>) --bootstrap-server localhost:9092 --create --topic testTopic

     

    토픽에 메세지를 보낼 수 있는 프로듀서를 띄워준다. 

     

    프로듀서 실행

    % docker exec -it kafka kafka-console-producer.sh --topic testTopic --broker-list 0.0.0.0:9092

     

    창을 하나 더 열고 테스트 토픽으로부터 데이터를 읽을 consumer를 만들어준다. 

     

    컨슈머 실행

    % docker exec -it kafka [kafka-console-consumer.sh](<http://kafka-console-consumer.sh/>) --topic testTopic --bootstrap-server localhost:9092

     

     

    프로듀서에서 생성한 메세지가 정상적으로 consumer에서 읽히는 것을 확인할 수 있다. 

     

     

     

    이를이용해서 토픽에  쿠폰을 생성할 사용자 아이디를 넣어서 이를 소비하여 쿠폰을 생성하도록 한다. 

     

     

    api 모듈에 kafka producer dependency를 추가한다. 

    https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka/3.1.3

    implementation("org.springframework.kafka:spring-kafka:3.1.3")

     

    토픽에 데이터를 넣을 프로듀서 설정을 해준다. 

    producer factory설정할 빈을 만들고 이를활용하는 kafkaTemplate을 만들어준다. 

     

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class KafkaProducerConfig {
    
        // key : String, value : Long
        @Bean
        public ProducerFactory<String, Long> producerFactory() {
            Map<String, Object> config = new HashMap();
            config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
            return new DefaultKafkaProducerFactory<>(config);
        }
    
        @Bean
        public KafkaTemplate<String, Long> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }

     

     

     

    이제 토픽에 메세지를 보내는 프로듀서를 만들어준다. 

    coupon_create라는 토픽으로 userId라는 메세지를 보낸다. 

    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CouponCreateProducer {
    
        private final KafkaTemplate<String, Long> kafkaTemplate;
    
        public CouponCreateProducer(KafkaTemplate<String, Long> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public void create(Long userId) {
            kafkaTemplate.send("coupon_create", userId);
        }
    }

     

     

    이제 applyService에서는 바로 쿠폰을 생성하지않고 컨슈머를 이용해서 메세지만 보내는 역할을 수행한다. 

    import com.yunhalee.concurrency_practice.producer.CouponCreateProducer;
    import com.yunhalee.concurrency_practice.repository.CouponCountRepository;
    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ApplyService {
    
        private final CouponRepository couponRepository;
    
        private final CouponCountRepository couponCountRepository;
    
        private final CouponCreateProducer couponCreateProducer;
    
        public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository, CouponCreateProducer couponCreateProducer) {
            this.couponRepository = couponRepository;
            this.couponCountRepository = couponCountRepository;
            this.couponCreateProducer = couponCreateProducer;
        }
    
        public void apply(Long userId) {
            Long count = couponCountRepository.increment();
    
            if (count > 100) {
                return;
            }
    
            couponCreateProducer.create(userId);
        }
    }

     

     

    이제 터미널을 열고 coupon_create라는 토픽을 만들어준다. 

    % docker exec -it kafka kafka-topics.sh --bootstrap-server localhost:9092 --create --topic coupon_create

     

     

    토픽에 들어오는 데이터를 받아볼 수 있는 consumer를 실행한다. 

    % docker exec -it kafka [kafka-console-consumer.sh](http://kafka-console-consumer.sh/) --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

     

    이전에 만들어둔 테스트 케이스를 실행해서 메세지가 들어오는 것을 확인할 수 있다. 

    (그 전에 redis 테스트롤 인해서 100이 채워졌을 수 있으니 flushall 실행후에 한다)

    % redis-cli flushall
    % docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

     

     

     

    이번엔 프로젝트에 컨슈머를 구현해보자. 

    consumer module을 새로 만들어서 서버를 구성해준다.

    새로운 모듈에서 api 서버와 동일하게 dependency와 yaml을 설정하고 consumerconfig를 만들어준다. 

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.LongDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    class KafkaConsumerConfig {
    
        @Bean
        public ConsumerFactory<String, Long> consumerFactory() {
            Map<String, Object> config = new HashMap();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(config);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
    }

     

    이를 사용하는 컨슈머를 구현해준다. 

     

     

    userId를 받아와서 콘솔에 먼저 찍어본다. 

    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CreateCouponConsumer {
    
        @KafkaListener(topics = "coupon_create", groupId = "group_1")
        public void listener(Long userId) {
            System.out.println(userId);
        }
    }

     

     

    먼저 테스트 전에 나는 flushall 귀찮아서 after each로 테스트에서 clear하게 만들어줬다. 

    import com.yunhalee.concurrency_practice.domain.Coupon;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public class CouponCountRepository {
    
        private final RedisTemplate<String, String> redisTemplate;
    
        public CouponCountRepository(RedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        public Long increment() {
            return redisTemplate
                    .opsForValue()
                    .increment("coupon_count");
        }
    
        public void clear() {
            redisTemplate.delete("coupon_count");
            redisTemplate.opsForValue().set("coupon_count", "0");
        }
    }

     

    import com.yunhalee.concurrency_practice.repository.CouponCountRepository;
    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.junit.jupiter.api.AfterAll;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    @SpringBootTest
    class ApplyServiceTest {
    
        @Autowired
        private ApplyService applyService;
    
        @Autowired
        private CouponRepository couponRepository;
    
        @Autowired
        private CouponCountRepository couponCountRepository;
    
        @AfterEach
        public void clear() {
            couponCountRepository.clear();
        }
    
        @Test
        public void 여러명응모() throws InterruptedException {
            int threadCount = 1000;
            ExecutorService executorService = Executors.newFixedThreadPool(32);
            CountDownLatch latch =  new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                long userId = i;
                executorService.submit(() -> {
                    try {
                        applyService.apply(userId);
                    } finally {
                        latch.countDown();
                    }
                });
            }
    
            latch.await();
            long count = couponRepository.count();
            assertEquals(100, count);
        }

     

     

    consumer 서비스를 실행시켜두고 테스트를 실행시키면 다음과 같이 메세지가 찍히는걸 확인할 수 있다. 

    2024-04-17T21:21:07.599+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-1, groupId=group_1] Finished assignment for group at generation 5: {consumer-group_1-1-d8984385-fe6f-4c22-af8b-379ec96abbaa=Assignment(partitions=[coupon_create-0])}
    2024-04-17T21:21:07.606+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-1, groupId=group_1] Successfully synced group in generation Generation{generationId=5, memberId='consumer-group_1-1-d8984385-fe6f-4c22-af8b-379ec96abbaa', protocol='range'}
    2024-04-17T21:21:07.607+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-1, groupId=group_1] Notifying assignor about the new Assignment(partitions=[coupon_create-0])
    2024-04-17T21:21:07.609+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-1, groupId=group_1] Adding newly assigned partitions: coupon_create-0
    2024-04-17T21:21:07.619+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-1, groupId=group_1] Setting offset for partition coupon_create-0 to the committed offset FetchPosition{offset=3100, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1001 rack: null)], epoch=0}}
    2024-04-17T21:21:07.620+09:00  INFO 4047 --- [concurrency_practice_consumer] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_1: partitions assigned: [coupon_create-0]
    2024-04-17T21:21:11.436+09:00  INFO 4047 --- [concurrency_practice_consumer] [sson-netty-1-22] org.redisson.connection.DNSMonitor       : Try Redisson PRO with Proxy mode to use all ip addresses: [redis://127.0.0.1:6379, redis://0:0:0:0:0:0:0:1:6379]
    25
    11
    28
    0
    17
    8
    3
    15
    13
    7
    24
    2
    16
    14
    18
    29
    10
    31
    5
    4
    22
    43
    38
    20
    1
    30
    9
    26
    19
    6
    12
    36
    27
    23
    21
    42
    33
    34
    39
    40
    37
    41
    32
    35
    61
    59
    57
    55
    63
    56
    54
    62
    58
    60
    53
    52
    50
    49
    51
    47
    46
    48
    44
    45
    65
    64
    66
    67
    68
    74
    75
    72
    69
    71
    73
    78
    80
    76
    70
    77
    79
    83
    82
    85
    84
    81
    86
    117
    118
    97
    94
    92
    96
    95
    91
    93
    87
    88
    89
    90

     

     

    이제 컨슈머에서 쿠폰을 생성하도록 하자. 

    이전에 만들어둔 coupon 도메인과 coupon repository를 현재 모듈로 옮겨주고, 

    컨슈머에서 이를 활용해서 쿠폰을 생성한다. 

    import com.yunhalee.concurrency_practice_consumer.domain.Coupon;
    import com.yunhalee.concurrency_practice_consumer.repository.CouponRepository;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CreateCouponConsumer {
    
        private final CouponRepository couponRepository;
    
        public CreateCouponConsumer(CouponRepository couponRepository) {
            this.couponRepository = couponRepository;
        }
    
        @KafkaListener(topics = "coupon_create", groupId = "group_1")
        public void listener(Long userId) {
            couponRepository.save(new Coupon(userId));
        }
    }

     

    먼저 100개만 생성되는지 보기 위해서 테이블을 먼저 비워준다. 

    mysql> use coupon_example;
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Database changed
    mysql> show tables;
    +--------------------------+
    | Tables_in_coupon_example |
    +--------------------------+
    | coupon                   |
    +--------------------------+
    1 row in set (0.00 sec)
    
    mysql> truncate coupon;
    Query OK, 0 rows affected (0.02 sec)
    
    mysql> show tables;
    +--------------------------+
    | Tables_in_coupon_example |
    +--------------------------+
    | coupon                   |
    +--------------------------+
    1 row in set (0.01 sec)
    
    mysql> select * from coupon;
    Empty set (0.00 sec)
    
    mysql>

     

     

    이제 컨슈머를 다시 실행하고 테스트를 실행하면 다음과 같이 정확하게 100 개의 데이터가 쌓인것을 확인할 수 있다. 

    mysql> select count(*) from coupon;
    +----------+
    | count(*) |
    +----------+
    |      100 |
    +----------+
    1 row in set (0.00 sec)
    
    mysql>

     

     

    그런데 테스트 케이스 자체는 실패하는 것을 확인할 수 있는데, 

    Hibernate: select count(*) from coupon c1_0
    
    Expected :100
    Actual   :24
    <Click to see difference>
    
    org.opentest4j.AssertionFailedError: expected: <100> but was: <24>
    	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    	at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
    	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)
    	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161)
    	at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:632)
    	at com.yunhalee.concurrency_practice.service.ApplyServiceTest.여러명응모(ApplyServiceTest.java:59)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
    	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
    	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
    	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
    	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
    	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
    	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
    	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
    	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
    	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
    	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
    	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)

     

     

     

     

    그 이유는 이벤트 스트리밍의 데이터 처리방식이 실시간이 아니기 때문이다. 

    따라서 일시적으로 스레드 sleep을 주어서 시간차를 준 후에 다시 동일한 환경에서 테스트를 실행하면, 테스트 케이스가 성공하는 것을 확인할 수 있다. 

    mysql> truncate coupon;
    Query OK, 0 rows affected (0.03 sec)
        @Test
        public void 여러명응모() throws InterruptedException {
            int threadCount = 1000;
            ExecutorService executorService = Executors.newFixedThreadPool(32);
            CountDownLatch latch =  new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                long userId = i;
                executorService.submit(() -> {
                    try {
                        applyService.apply(userId);
                    } finally {
                        latch.countDown();
                    }
                });
            }
    
            latch.await();
            Thread.sleep(10000);
            long count = couponRepository.count();
            assertEquals(100, count);
        }

     

     

    카프카를 이용하면 데이터 처리량을 조절할 수 있고, 이로 인해서 데이터 베이스의 부하를 줄일 수 있다. 

    단, 쿠폰생성(데이터 실제 처리)까지 시간차가 발생할 수 있다. 

     

     

     

    그렇다면 1인당 하나의 쿠폰발급으로만 제어를 한다면 어떻게 할 수 있을까? 

    가장 간단한 방법으로는 database의 유니크 키를 사용하는 방법이 있다. 

    쿠폰 테이블에 userId coupon을 가지도록 하는 것이다. 

    하지만, 보통의 서비스는 유저가 여러개의 쿠폰을 가질 수 있기 때문에 실용적인 방법은 아니다 

     

    두번째 방법은 더미로 락을 잡고 쿠폰 발급을 확인하는 방식이다. 

        public void apply(Long userId) {
            // start lock
            // 쿠폰 발급 여부
            // If(발급됐다면) return 
            Long count = couponCountRepository.increment();
    
            if (count > 100) {
                return;
            }
    
            couponCreateProducer.create(userId);
            // end lock
        }

     

    하지만, 실제 쿠폰 발급 처리를 consumer가 하고 있기 때문에 시간차가 발생하여 한명이 여러개의 쿠폰 발급을 받을 수도 있다. 

     

     

    실제로 user1의 요청에 대해 consumer가 쿠폰을 발급하기 전에 user2가 요청을 하게 된다면, 쿠폰발급이 가능한것으로 판단하여 consumer는 메세지를 2개를 받아 쿠폰이 2개 발급된다. 

     

     

     

    그렇다고 다음과 같이 lock 범위안에서 쿠폰을 생성하게되면 lock 범위가 넓어져 성능 이슈가 발생할 수 있다. 

        public void apply(Long userId) {
            // start lock
            // 쿠폰 발급 여부
            // If(발급됐다면) return
            Long count = couponCountRepository.increment();
    
            if (count > 100) {
                return;
            }
    
            couponCreateProducer.create(userId);
            // CouponCreateConsumer.create(userId);
            // 쿠폰발급 
            // end lock
        }

     

     

     

    문제를 해결하기 위해 set을 사용해보자. 

    redis set은 값을 두번 저장해도 한개만, 남으며 요소의 존재유무를 빠르게 확인할 수 있다. 

     

    redis cli를 켜고 sadd 키 값를 하게 되면 레디스에 set을 추가할 수 있다. 

     

    한번 호출하면 1을 리턴하고 한번더 호출하면 이미 존재하므로 0을 리턴한다. 

    따라서 유저 아이디를 set에 추가하고 이미 존재하면 쿠폰을 발급하지 않고, 존재하지 않으면 쿠폰을 발급한다. 

     

    사용자의 등록 유무를 판단해주는 AppliedUserRepository를 만들어준다. 

    import org.springframework.data.redis.core.RedisTemplate;
    
    public class AppliedUserRepository {
    
        private final RedisTemplate<String, String> redisTemplate;
    
        public AppliedUserRepository(RedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
    
        public Long add(Long userId) {
            return redisTemplate.opsForSet()
                    .add("applied_user", userId.toString());
        }
    
    }

     

    쿠폰발급 요청 메세지를만들때 다음과 같이 확인 로직을 넣어준다. 

    import com.yunhalee.concurrency_practice.producer.CouponCreateProducer;
    import com.yunhalee.concurrency_practice.repository.AppliedUserRepository;
    import com.yunhalee.concurrency_practice.repository.CouponCountRepository;
    import com.yunhalee.concurrency_practice.repository.CouponRepository;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ApplyService {
    
        private final CouponRepository couponRepository;
    
        private final CouponCountRepository couponCountRepository;
    
        private final CouponCreateProducer couponCreateProducer;
    
        private final AppliedUserRepository appliedUserRepository;
    
        public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository, CouponCreateProducer couponCreateProducer, AppliedUserRepository appliedUserRepository) {
            this.couponRepository = couponRepository;
            this.couponCountRepository = couponCountRepository;
            this.couponCreateProducer = couponCreateProducer;
            this.appliedUserRepository = appliedUserRepository;
        }
    
        public void apply(Long userId) {
            Long apply = appliedUserRepository.add(userId);
    
            if (apply !=1 ) {
                return;
            }
    
            Long count = couponCountRepository.increment();
    
            if (count > 100) {
                return;
            }
    
            couponCreateProducer.create(userId);
        }
    }

     

     

     

    새로운 테스트 케이스를 작성해서 테스트 해보자. 

    이떄, 1L라는 유저가 1000번의 요청을 보내지만, 결국 한개의 쿠폰만 생성이 될 것이다. 

    쿠폰 테이블을 비우고 컨슈머를 킨뒤 테스트를 실행해준다. 

    
        @Test
        public void 한명당_한개의_쿠폰만_발급() throws InterruptedException {
            int threadCount = 1000;
            ExecutorService executorService = Executors.newFixedThreadPool(32);
            CountDownLatch latch =  new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                long userId = i;
                executorService.submit(() -> {
                    try {
                        applyService.apply(1L);
                    } finally {
                        latch.countDown();
                    }
                });
            }
    
            latch.await();
            Thread.sleep(10000);
            long count = couponRepository.count();
            assertEquals(1, count);
        }

    테스트 케이스가 성공하는 것을 확인할 수 있다. 

     

     

     

    그런데 컨슈머에서 처리하다가 예외가 발생하면 어떻게 될까? 

    결과적으로 100개미만의 쿠폰이 발생한는 이슈가 발생할 수 있다. 

     

    이번에는 쿠폰을 발급하다가 예외가 발생하면 로그를 남기도록 해보자. 

    FailedEvent 엔티티와 레파지토리를 만들고  그리고  컨슈머에서 로그를 남기도록 한다. 

    import jakarta.persistence.Entity;
    import jakarta.persistence.GeneratedValue;
    import jakarta.persistence.GenerationType;
    import jakarta.persistence.Id;
    
    @Entity
    public class FailedEvent {
    
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
        private Long userId;
    
        public FailedEvent() {
        }
    
        public FailedEvent(Long userId) {
            this.userId = userId;
        }
    
    }
    
    import com.yunhalee.concurrency_practice_consumer.domain.Coupon;
    import com.yunhalee.concurrency_practice_consumer.domain.FailedEvent;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public interface FailedEventRepository extends JpaRepository<FailedEvent, Long> {
    
    }
    
    import com.yunhalee.concurrency_practice_consumer.domain.Coupon;
    import com.yunhalee.concurrency_practice_consumer.domain.FailedEvent;
    import com.yunhalee.concurrency_practice_consumer.repository.CouponRepository;
    import com.yunhalee.concurrency_practice_consumer.repository.FailedEventRepository;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CreateCouponConsumer {
    
        private final CouponRepository couponRepository;
    
        private final FailedEventRepository failedEventRepository;
    
        private final Logger logger = LoggerFactory.getLogger(CreateCouponConsumer.class);
    
        public CreateCouponConsumer(CouponRepository couponRepository, FailedEventRepository failedEventRepository) {
            this.couponRepository = couponRepository;
            this.failedEventRepository = failedEventRepository;
        }
    
        @KafkaListener(topics = "coupon_create", groupId = "group_1")
        public void listener(Long userId) {
    
            try {
                couponRepository.save(new Coupon(userId));
            } catch (Exception e) {
                logger.error("쿠폰 발급에 실패하여습니다. 사용자 아이디 : " + userId);
                failedEventRepository.save(new FailedEvent(userId));
            }
        }
    }

     

     

    전체 플로우는 다음과 같다. 

    api가 토픽에 메세지를 쏘고 컨슈머가 이를 처리하다가 문제가 발생하면 failedEvent데이터에 저장해놓고 배치 프로그램이 주기적으로 재시도할 수 있도록 하여 쿠폰 데이터의 정합성을 맞춰가는 것이다. 

     

Designed by Tistory.