ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kotlin + SpringBoot + Kafka 연동해보기 ( + spring kafka 와 reactor kafka )
    네트워크 & 인프라 2022. 11. 25. 01:40

     

     

     

     

    아파치 카프카는 지난 번에 간단하게 개념정도만 정리했었는데,

    이번에는 직접 사용할 일이 생겨서 kotlin에서 springboot + kafka 사용 방법을 공부했다.

    (지난 카프카 공부내용 정리 블로그 글)

    https://dodop-blog.tistory.com/409

     

    아파치 카프카

    EDA를 배우면서 카프카에 대해서 알게 되었고, 유튜브를 찾아보니 다음의 플레이리스트 강의가 있어서 아주 간단하게 기본 개념만이라도 학습해보았다! 정리한 자료 및 사진은 모두 영상의 내용

    dodop-blog.tistory.com

     

    다른 사람은 다 알고있는 카프카,, 또 나만 모르지,,, 또 나만 진심이지,,

     

     

     

     

     

     

     

    카프카 설치 및 실행하기

    먼저 카프카를 설치해준다. 해당 기준은 m1 processor를 기준으로 진행하였다.

    # brew를 통해서 kafka를 설치하면 zookeeper는 추가적인 설치가 필요없었다. 
    $ brew install kafka
    
    # zookeeper 실행하기 
    $ zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
    
    # kafka 실행하기 
    $ kafka-server-start /opt/homebrew/etc/kafka/server.properties

     

    여기서 나는 kafka를 homebrew로 설치하다가 tgz 파일로 설치하다가를 반복하면서 문제가 발생했다.

    처음에 발생한 오류는 다음과 같이 다른 카프카 인스턴스가 실행중이라서 log 파일에 lock이 없다고 나왔다.

    ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
    org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/homebrew/var/lib/kafka-logs. A Kafka instance in another process or thread is using this directory.

     

    그래서 다음과 같이 혹시 실행된 kafka가 있다면 중단하도록 명령어를 실행하고 다시 재실행 해보았다.

    https://stackoverflow.com/questions/53338221/kafka-server-does-not-start

     

    Kafka server does not start

    I installed Kafka and Zookeeper on my OSX machine using Homebrew, and I'm trying to launch Zookeeper and Kafka-server following this blog post. zookeeper-server-start /usr/local/etc/kafka/zookeeper.

    stackoverflow.com

    $ brew services stop kafka

    그 다음엔 다음 오류가 발생했다.

     ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
    org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists

     

    그래서 homebrew log디렉토리로 가서 meta.properties파일도 지워보고 로그 파일도 지워보고 brew로 uninstall하고 모두 지우고 실행하다가 마지막에 다음 명령어를 실행해서 문제를 해결하였다!

    이전에 카프카 서버를 제대로 중단하지 않고 Kill -9으로 죽이고 다시 실행했을 때 kafkaServer.log에 남아서 생기는 문제였다. 🤦🏻‍♀️

    https://gray-hat.tistory.com/94

     

    [kafka] 카프카 시작 안되는 경우

    배경설명 AWS EC2에 카프카와 주키퍼를 single-node로 셋팅해서 로컬에서 접속테스트 이것저것하고 다른날 다시 연결해보니 연결이 안되는 현상 발생. 버전: kafka_2.12-2.5.0 급한 마음에 kill -9로 프로세

    gray-hat.tistory.com

     $ cd /tmp/kafka-logs
     $ rm -rf *
     zsh: sure you want to delete all 5 files in /tmp/kafka-logs [yn]? y

     

    카프카 실행이 잘 되었다면 다음과 같이 토픽을 생성하고 producer와 consumer 콘솔에서 내용을 확인할 수 있다.

    # 'foobar'이름의 topic 생성하기 
    $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic foobar
    Created topic foobar.
    
    # 위의 명령어에서 Broker may not be available 오류 발생시 kafka 설정파일 (로컬 실행으로) 변경
    $ vi /opt/homebrew/etc/kafka/server.properties
    listeners=PLAINTEXT://localhost:9092
    
    # producer 콘솔이용하기 (foo와 bar를 입력해본다)
    $ kafka-console-producer --broker-list localhost:9092 --topic foobar
    > foo
    > bar
    
    # consumer 콘솔이용하기 (producer에서 입력된 foo, bar가 조회되는 것을 확인할 수 있다)
    $ kafka-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning
    foo
    bar

     

     

     

     

    카프카 명령어
    # 토픽 생성 
    $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    # 토픽 리스트 보기 
    $ kafka-topics --list --bootstrap-server localhost:9092
    
    # 토픽 상세 정보 보기 
    $ kafka-topics --desribe --bootstrap-server localhost:9092 --topic test
    
    # 프로듀서 -> 토픽에 데이터 넣기 
    $ kafka-console-producer --broker-list localhost:9092 --topic test
    
    # 컨슈머 -> 토픽의 데이터 확인 
    $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
    
    # 토픽 삭제하기  => 안돼서 확인중 😭
    $ vi /opt/homebrew/etc/kafka/server.properties
    $ delete.topic.enable=true
    $ kafka-topics --bootstrap-server localhost:2181 --delete --topic test
    
    # 컨슈머 그룹 리스트 보기 
    $ kafka-consumer-groups --bootstrap-server localhost:9092 --list
    
    # 컨슈머 그룹 상태 보기 
    $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-consumer --describe

     

     

     

     

    build.gradle.kts 설정하기

    이제 프로젝트에 kafka 설정을 해준다.

    repositories {
        mavenCentral()
    }
    
    dependencies {
        implementation("org.springframework.boot:spring-boot-starter")
        implementation("org.springframework.boot:spring-boot-starter-web")
    
        //kafka dependency 추가
        implementation("org.springframework.kafka:spring-kafka")
    
        implementation("org.jetbrains.kotlin:kotlin-reflect")
        implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
        testImplementation("org.springframework.boot:spring-boot-starter-test")
    }
    
    tasks.withType<KotlinCompile> {
        kotlinOptions {
            freeCompilerArgs = listOf("-Xjsr305=strict")
            jvmTarget = "17"
        }
    }
    
    tasks.withType<Test> {
        useJUnitPlatform()
    }

     

     

     

     

     

    스프링 부트에서 토픽을 생성해보기

    스프링 부트에서는 다음과 같이 토픽을 생성할 수 있다. 여기서 KafkaAdmin 빈은 브로커에서 새로운 빈을 생성하는 책임을 가지고 있는데 스프링 부트에서는 KafkaAdmin 빈을 자동으로 등록해준다.

    package com.example.kafkatest
    
    import org.apache.kafka.clients.admin.AdminClientConfig
    import org.apache.kafka.clients.admin.NewTopic
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.config.TopicBuilder
    import org.springframework.kafka.core.KafkaAdmin
    
    
    @Configuration
    class KafkaTopicConfig {
    
        @Bean
        fun topic1(): NewTopic? {
            return TopicBuilder.name("topic1").build()
        }
    }

     

    스프링 부트가 아닌 어플리케이션에서는 다음과 같이 KafkaAdmin을 등록해서 사용해줄 수 있다.

    @Bean
    open fun admin(): KafkaAdmin? {
        val configs: MutableMap<String, Any> = HashMap()
        configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        return KafkaAdmin(configs)
    }

     

     

     

    메세지 produce, consume 구현하기

     - Producer 데이터 보내기 예시

    KafkaTemplate을 이용해서 특정 토픽("foobar")에 메세지를 보낸다.

    (StringSerializer가 이용되는 것 같은데 확실하지 않음 🤔, 아마 그래서 추가적인 Producer Config 설정없이도 pub가 가능한 게 아닐까?)🤔

    @Component
    class SampleTopicProducer {
    
        @Bean
        fun runner1(template: KafkaTemplate<String, String>): ApplicationRunner {
            return ApplicationRunner { template.send("foobar", "test") }
        }
    
        @Bean
        fun runner2(template: KafkaTemplate<String, String>): ApplicationRunner {
            return ApplicationRunner { template.send("foobar", "failMessage") }
        }
    }

     

     

    - Consumer Config 구성하기

    이번에는 다음과 같이 kafka consumer설정을 진행한다.

    먼저 application.properties에 kafka 서버 호스트와 consumer offset을 지정해준다. 여기서 auto-offset-reset은 latest, earliest, none 옵션이 존재하는데 latest는 default로 가장 마지막 offset부터, earliest는 가장 처음 offset부터, none은 해당 consumer group이 가져가고자 하는 topic의 consuer offset정보가 없으면 exception을 발생시킨다.

    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.auto-offset-reset=earliest

    그 다음 consumer config를 설정한다. 여기서 값 Deserializer로 StringDeserializer를 이용하도록 설정하였다.

    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.context.annotation.Primary
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
    import org.springframework.kafka.core.ConsumerFactory
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory
    import org.springframework.kafka.core.KafkaTemplate
    import java.net.InetAddress
    import java.net.UnknownHostException
    import java.util.*
    
    
    @Configuration
    class KafkaConsumerConfig {
    
        @Value("\${spring.kafka.bootstrap-servers}")
        lateinit var hosts: String
    
        @Primary
        @Bean
        fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
            val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
            containerFactory.consumerFactory = consumerFactory()
    
            return containerFactory
        }
    
        private fun consumerFactory(): ConsumerFactory<in String, in String> {
            return DefaultKafkaConsumerFactory(consumerProperties())
        }
    
        private fun consumerProperties(): Map<String, Any> {
            val hostName: String = try {
                InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
            } catch (e: UnknownHostException) {
                UUID.randomUUID().toString()
            }
            return hashMapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
                ConsumerConfig.GROUP_ID_CONFIG to hostName,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            )
        }
    }

     

     

     

    Consumer(Listener) 예시

    다음 과 같이 @kafkaListener를 이용하여 특정 토픽을 구독할 수 있다.

    @Component
    class SampleTopicListener {
    
        val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
    
        @KafkaListener(topics = ["foobar"])
        fun consume(@Payload data: String) {
            log.info("Message $data")
        }
    }

     

     

     

    앱 실행해보기

    실제로 앱을 실행해보면 다음과 같이 설정한 대로 토픽이 추가로 생성이 되고, 데이터가 주고 받아지는 것을 확인할 수 있다.

     

    - 토픽 생성 확인하기

    $ kafka-topics --list --bootstrap-server localhost:9092

    - Consumer 데이터 수신 확인

    - 앱에서 로그가 찍히는 것을 확인

     

     

     

     

     

    yml을 이용해서 메세지 produce, consume 구현하기

    위의 간단한 메세지 보내는 기능을 다음과 같이 yml을 이용해 별도의 config파일없이 설정할 수 있다.

    먼저 application.yml을 작성해준다.

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: foo
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          bootstrap-servers: localhost:9092
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    그다음 SampleMessageProducer를 만들어준다. 빈으로 등록하여 앱이 실행될 때 메세지를 보내도록 설정하였다.

    여기서 ListenableFuture를 사용하여 메세지를 보내고 난 후의 결과를 가지고 후처리를 할 수 있다. 여기서는 메세지가 성공적으로 보내졌을때와 실패했을 때 로그를 남기도록 설정하였다.

    @Component
    class SampleMessageProducer() {
    
        val log = LoggerFactory.getLogger(SampleMessageProducer::class.java)
    
        @Bean
        fun sendMessage(template: KafkaTemplate<String, String>): String {
            val message: String = "testMessage"
            val future: ListenableFuture<SendResult<String, String>> = template.send("foo", message)
            future.addCallback(object : ListenableFutureCallback<SendResult<String, String>> {
    
                override fun onSuccess(result: SendResult<String, String>?) {
                    log.info("Sent message=[$message] with offset=[" + result!!.recordMetadata.offset() + "]")
                }
    
                override fun onFailure(ex: Throwable) {
                    log.info("Unable to send message=[$message] due to : + ${ex.message}")
                }
            })
            return ""
        }
    }

    이번엔 메세지를 소비하는 consumer를 작성해준다.

    @Component
    class SampleMessageConsumer {
    
        val log = LoggerFactory.getLogger(SampleMessageConsumer::class.java)
    
        @KafkaListener(topics = ["foo"], groupId = "foo")
        fun listenGroupFoobar(message: String) {
            log.info("Received Message in group foo:  $message")
        }
    }

    앱을 실행하면 다음과 같이 확인할 수 있다.

    먼저 producer에서 메세지를 잘 보낸 것을 확인할 수 있다.

    메세지를 확인해보면 잘 들어오는 것을 확인할 수 있다.

     

     

     

     

     

     

    메세지 필터링하여 consume하기

    원래대로라면 test와 failMessage 모두 consume되는 것을 확인할 수 있다. 이번엔 메세지를 필터링해서 구독하도록 설정해보자.

    ConsumerConfig에 다음과 같이 filter를 추가해준다. RecordFilterStrategy를 이용하면 해당 내용을 만족하는 부분은 listener에게 닿기전에 버려진다.

    @Configuration
    class KafkaConsumerConfig {
    
        @Value("\${spring.kafka.bootstrap-servers}")
        lateinit var hosts: String
    
    //    특정 메세지내용을 제외한 메세지 듣기 
        @Bean
        fun filterKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
            val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
            containerFactory.consumerFactory = consumerFactory()
            containerFactory.setRecordFilterStrategy { it.value().contains("fail") }
            return containerFactory
        }
    
        private fun consumerFactory(): ConsumerFactory<in String, in String> {
            return DefaultKafkaConsumerFactory(consumerProperties())
        }
    
        private fun consumerProperties(): Map<String, Any> {
            val hostName: String = try {
                InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
            } catch (e: UnknownHostException) {
                UUID.randomUUID().toString()
            }
            return hashMapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
                ConsumerConfig.GROUP_ID_CONFIG to hostName,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            )
        }
    }

    consumer에서 다음과 같이 container를 사용하도록 지정하여 필터링을 적용한다.

        @KafkaListener(topics = ["foobar"], containerFactory = "filterKafkaListenerContainerFactory")
        fun consume(user: User) {
            log.info("Message ${user.name} + ${user.age}")
        }

    failMessage는 버려지고 test만 출력되는 것을 확인할 수 있다.

     

     

     

     

    메세지 consume하고 답장 보내기

    이제 replyTemplate을 적용하여 메세지를 consume 하고 topic에 메세지를 보내보자.

    @Configuration
    class KafkaConsumerConfig {
    
        @Value("\${spring.kafka.bootstrap-servers}")
        lateinit var hosts: String
    
        //    메세지 다시 Reply 하는 listener 구현
    	@Bean
        fun kafkaListenerContainerFactory(kafkaTemplate: KafkaTemplate<String, Any>): ConcurrentKafkaListenerContainerFactory<String, Any> {
            val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
            factory.consumerFactory = consumerFactory() as ConsumerFactory<in String, in Any>
            factory.setReplyTemplate(kafkaTemplate)
            return factory
        }
    
        private fun consumerFactory(): ConsumerFactory<in String, in String> {
            return DefaultKafkaConsumerFactory(consumerProperties())
        }
    
        private fun consumerProperties(): Map<String, Any> {
            val hostName: String = try {
                InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
            } catch (e: UnknownHostException) {
                UUID.randomUUID().toString()
            }
            return hashMapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
                ConsumerConfig.GROUP_ID_CONFIG to hostName,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            )
        }
    }

     

    Listener에 다음과 같이 @SendTo("topic")을 이용하여 토픽에 메세지를 보내도록 구현해보자.

    @Component
    class SampleTopicListener {
    
        val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
    
    //        메세지 받고 답장 보내기
        @KafkaListener(topics = ["foobar"])
        @SendTo("foobar")
        fun consume2(@Payload data: String): String {
            log.info("Message $data")
            return "Listener got this message ${data}"
        }
    }

    현재 토픽을 consume하고 나서 바로 동일한 토픽에 다시 메세지를 보내고, 다시 이를 구독하는 과정이 반복되어 다음과 같이 출력되는 것을 확인할 수 있다.

     

     

     

     

    Config파일을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기

    이번엔 위와 같이 string 타입이 아니라 데이터를 json 타입으로 주고 받을 수 있도록 구현해보자.

    여기서 사용된 User 객체는 다음과 같다.

    data class User(
        val name: String,
        val age: Int
    ) {
        constructor() : this("", 0)
    }

     

    먼저 ProducerConfig를 다음과 같이 JsonSerializer를 사용하도록 설정해준다.

    package com.example.kafkatest
    
    import org.apache.kafka.clients.producer.ProducerConfig
    import org.apache.kafka.common.serialization.StringSerializer
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.core.DefaultKafkaProducerFactory
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.kafka.core.ProducerFactory
    import org.springframework.kafka.support.serializer.JsonSerializer
    
    
    @Configuration
    class JsonKafkaProducerConfig {
    
        @Value("\${spring.kafka.bootstrap-servers}")
        lateinit var hosts: String
    
        @Bean
        fun userProducerFactory(): ProducerFactory<String, User> {
            val configs: MutableMap<String, Any> = HashMap()
            configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = hosts
            configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
            return DefaultKafkaProducerFactory(configs)
        }
    
        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, User> {
            return KafkaTemplate(userProducerFactory())
        }

     

     

    그 다음 ConsumerConfig에서 JsonDeserializer를 사용하도록 설정을 해준다. 여기서 cannot trust class 오류가 발생하여      JsonDeserializer.TRUSTED_PACKAGES to "*" 옵션을 설정해주었다.

    package com.example.kafkatest
    
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
    import org.springframework.kafka.core.ConsumerFactory
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory
    import org.springframework.kafka.support.serializer.JsonDeserializer
    import java.net.InetAddress
    import java.net.UnknownHostException
    import java.util.*
    
    @Configuration
    class JsonKafkaConsumerConfig {
    
        @Value("\${spring.kafka.bootstrap-servers}")
        lateinit var hosts: String
    
        @Bean
        fun userKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, User> {
            val factory: ConcurrentKafkaListenerContainerFactory<String, User> =
                ConcurrentKafkaListenerContainerFactory<String, User>()
            factory.consumerFactory = userConsumerFactory()
            return factory
        }
    
    
        @Bean
        fun userConsumerFactory(): ConsumerFactory<String, User> {
            return DefaultKafkaConsumerFactory(userConsumerProperties())
        }
    
    
        private fun userConsumerProperties(): Map<String, Any> {
            val hostName: String = try {
                InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
            } catch (e: UnknownHostException) {
                UUID.randomUUID().toString()
            }
    
            return hashMapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
                ConsumerConfig.GROUP_ID_CONFIG to hostName,
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                JsonDeserializer.TRUSTED_PACKAGES to "*"
            )
        }
    }

     

    이제 SampleProducer를 이용해서 메세지를 생성해준다.

    package com.example.kafkatest
    
    import org.springframework.beans.factory.annotation.Autowired
    import org.springframework.boot.ApplicationRunner
    import org.springframework.context.annotation.Bean
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.stereotype.Component
    
    
    @Component
    class JsonSampleProducer(
        @Autowired kafkaTemplate: KafkaTemplate<String, User>
    ) {
    
        @Bean
        fun sendUserMessage(template: KafkaTemplate<String, User>): ApplicationRunner {
            val user = User("testUser", 30)
            return ApplicationRunner { template.send("foobar", user) }
        }
    }

     

    생성된 메세지를 읽는 샘플 Consumer를 작성해준다.

    package com.example.kafkatest
    
    import org.slf4j.LoggerFactory
    import org.springframework.kafka.annotation.KafkaListener
    import org.springframework.messaging.handler.annotation.Payload
    import org.springframework.stereotype.Component
    
    @Component
    class JsonSampleConsumer {
    
        val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
    
        @KafkaListener(topics = ["foobar"], containerFactory = "userKafkaListenerContainerFactory")
        fun consume(user: User) {
            log.info("Message ${user.name} + ${user.age}")
        }
    }

     

    다음과 같이 데이터 consume을 json타입으로 하는 것을 확인할 수 있다.

     

     

     

     

     

    yml을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기

    위의 설정을 간단하게 yml을 통해서 지정해줄 수 있다.

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: foo
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
              spring:
                json:
                  trusted:
                    packages: "*"
    
        producer:
          bootstrap-servers: localhost:9092
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

     

    위의 설정을 이용하는 producer를 작성해준다.

    package com.example.kafkatest.ymltest
    
    import com.example.kafkatest.User
    import org.slf4j.LoggerFactory
    import org.springframework.context.annotation.Bean
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.kafka.support.SendResult
    import org.springframework.lang.Nullable
    import org.springframework.stereotype.Component
    import org.springframework.util.concurrent.ListenableFuture
    import org.springframework.util.concurrent.ListenableFutureCallback
    
    @Component
    class SampleJsonProducer {
    
        val log = LoggerFactory.getLogger(SampleJsonProducer::class.java)
    
    
        @Bean
        fun sendMessage(template: KafkaTemplate<String, User>): String {
            val user = User("testUser", 30)
            val future: ListenableFuture<SendResult<String, User>> = template.send("foo", user)
            future.addCallback(object : ListenableFutureCallback<SendResult<String, User>> {
    
                override fun onSuccess(result: SendResult<String, User>?) {
                    log.info("Sent message=[$user] with offset=[" + result!!.recordMetadata.offset() + "]")
                }
    
                override fun onFailure(ex: Throwable) {
                    log.info("Unable to send message=[$user] due to : + ${ex.message}")
                }
            })
            return ""
        }
    }

     

     

    이를 읽는 Consumer를 작성해준다.

    package com.example.kafkatest.ymltest
    
    import com.example.kafkatest.User
    import org.slf4j.LoggerFactory
    import org.springframework.kafka.annotation.KafkaListener
    import org.springframework.stereotype.Component
    
    @Component
    class SampleJsonConsumer {
    
        val log = LoggerFactory.getLogger(SampleJsonConsumer::class.java)
    
        @KafkaListener(topics = ["foo"], groupId = "foo")
        fun listenGroupFoobar(user: User) {
            log.info("Received Message in group foo:  $user")
        }
    }

     

    앱 실행을 확인해보자.

     

     

     

    application.yml 파일을 이용해서 config 설정해주기

    위에서 복잡하게 설정해주었던 내용들을 다음을 이용해서 간편하게 설정해줄 수도 있다.

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: foobar
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          bootstrap-servers: localhost:9092
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    옵션의 자세한 내용은 다음과 같다.

    참고한 글 

    https://blog.voidmainvoid.net/169

     

    Spring boot에서 kafka 사용시 application.yaml 설정

    # APACHE KAFKA (KafkaProperties) spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.

    blog.voidmainvoid.net

    # APACHE KAFKA (KafkaProperties)
    spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup.
    spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
    spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
    spring.kafka.admin.ssl.key-store-location= # Location of the key store file.
    spring.kafka.admin.ssl.key-store-password= # Store password for the key store file.
    spring.kafka.admin.ssl.key-store-type= # Type of the key store.
    spring.kafka.admin.ssl.protocol= # SSL protocol to use.
    spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
    spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
    spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
    spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
    spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
    spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
    spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
    spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
    spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size".
    spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request.
    spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs.
    spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator.
    spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
    spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
    spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
    spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
    spring.kafka.consumer.ssl.key-store-location= # Location of the key store file.
    spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file.
    spring.kafka.consumer.ssl.key-store-type= # Type of the key store.
    spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
    spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file.
    spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file.
    spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store.
    spring.kafka.consumer.value-deserializer= # Deserializer class for values.
    spring.kafka.jaas.control-flag=required # Control flag for login configuration.
    spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
    spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
    spring.kafka.jaas.options= # Additional JAAS options.
    spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
    spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
    spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
    spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
    spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
    spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
    spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
    spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
    spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
    spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
    spring.kafka.listener.type=single # Listener type.
    spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
    spring.kafka.producer.batch-size= # Default batch size.
    spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers.
    spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server.
    spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
    spring.kafka.producer.key-serializer= # Serializer class for keys.
    spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
    spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
    spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
    spring.kafka.producer.ssl.key-store-location= # Location of the key store file.
    spring.kafka.producer.ssl.key-store-password= # Store password for the key store file.
    spring.kafka.producer.ssl.key-store-type= # Type of the key store.
    spring.kafka.producer.ssl.protocol= # SSL protocol to use.
    spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file.
    spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file.
    spring.kafka.producer.ssl.trust-store-type= # Type of the trust store.
    spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
    spring.kafka.producer.value-serializer= # Serializer class for values.
    spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
    spring.kafka.ssl.key-password= # Password of the private key in the key store file.
    spring.kafka.ssl.key-store-location= # Location of the key store file.
    spring.kafka.ssl.key-store-password= # Store password for the key store file.
    spring.kafka.ssl.key-store-type= # Type of the key store.
    spring.kafka.ssl.protocol= # SSL protocol to use.
    spring.kafka.ssl.trust-store-location= # Location of the trust store file.
    spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
    spring.kafka.ssl.trust-store-type= # Type of the trust store.
    spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name.
    spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean.
    spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams.
    spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads.
    spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
    spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.
    spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application.
    spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file.
    spring.kafka.streams.ssl.key-store-location= # Location of the key store file.
    spring.kafka.streams.ssl.key-store-password= # Store password for the key store file.
    spring.kafka.streams.ssl.key-store-type= # Type of the key store.
    spring.kafka.streams.ssl.protocol= # SSL protocol to use.
    spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file.
    spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file.
    spring.kafka.streams.ssl.trust-store-type= # Type of the trust store.
    spring.kafka.streams.state-dir= # Directory location for the state store.
    spring.kafka.template.default-topic= # Default topic to which messages are sent.

     

     

     

    완성 ✨

     

     

     

     

     

    Spring Kafka + Reactor Kafka

    추가적으로 스프링 카프카와 함께 리액터 카프카를 사용하여 리액티브하게 사용하고 싶다면, 다음과 같이 설정할 수 있다.

    https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer

     

    GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implementation

    Example reactive kafka consumer and producer implementation on spring boot - GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implem...

    github.com

     

    1) dependency 추가

        //kafka dependency 추가
        implementation("org.springframework.kafka:spring-kafka")
        implementation("io.projectreactor.kafka:reactor-kafka")

     

     

     

     

    2) application.yml 작성

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          group-id: foo
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          properties:
              spring:
                json:
                  trusted:
                    packages: "*"
    
        producer:
          bootstrap-servers: localhost:9092
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

     

     

     

    3) ProducerConfig 작성

    package com.example.kafkatest.reactive
    
    import com.example.kafkatest.User
    import org.springframework.boot.autoconfigure.kafka.KafkaProperties
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
    import reactor.kafka.sender.SenderOptions
    
    
    @Configuration
    class ReactiveKafkaProducerConfig {
    
        @Bean
        fun reactiveKafkaProducerTemplate(properties: KafkaProperties): ReactiveKafkaProducerTemplate<String, User> {
            val props = properties.buildProducerProperties()
            return ReactiveKafkaProducerTemplate(SenderOptions.create<String, User>(props))
        }
    
    }

     

     

     

     

     

    4) ConsumerConfig 작성

    package com.example.kafkatest.reactive
    
    import com.example.kafkatest.User
    import org.springframework.boot.autoconfigure.kafka.KafkaProperties
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
    import reactor.kafka.receiver.ReceiverOptions
    
    
    @Configuration
    class ReactiveKafkaConsumerConfig {
    
        @Bean
        fun kafkaReceiverOptions(
            kafkaProperties: KafkaProperties
        ): ReceiverOptions<String, User>? {
            val basicReceiverOptions: ReceiverOptions<String, User> = ReceiverOptions.create<String, User>(kafkaProperties.buildConsumerProperties())
            return basicReceiverOptions.subscription(listOf("foo"))
        }
    
        @Bean
        fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, User>): ReactiveKafkaConsumerTemplate<String, User> {
            return ReactiveKafkaConsumerTemplate<String, User>(kafkaReceiverOptions)
        }
    
    }

     

     

     

     

     

    5) Producer 작성

    package com.example.kafkatest.reactive
    
    import com.example.kafkatest.User
    import org.slf4j.LoggerFactory
    import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
    import org.springframework.stereotype.Service
    
    @Service
    class ReactiveKafkaProducer(
        val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, User>
    ){
        private val log = LoggerFactory.getLogger(ReactiveKafkaProducer::class.java)
    
    
        fun send(user: User) {
            log.info("send to topic={}, {}={},", "foo", User::class.java.getSimpleName(), user)
            reactiveKafkaProducerTemplate.send("foo", user)
                .doOnSuccess { senderResult ->
                    log.info(
                        "sent {} offset : {}",
                        user,
                        senderResult.recordMetadata().offset()
                    )
                }
                .subscribe()
        }
    }

     

     

     

     

    6) Consumer 작성

    package com.example.kafkatest.reactive
    
    import com.example.kafkatest.User
    import org.slf4j.LoggerFactory
    import org.springframework.boot.CommandLineRunner
    import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
    import org.springframework.stereotype.Service
    import reactor.core.publisher.Flux
    
    @Service
    class ReactiveKafkaConsumer(
        val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, User>
    ) : CommandLineRunner {
    
        private val log = LoggerFactory.getLogger(ReactiveKafkaConsumer::class.java)
    
        private fun consumeUser(): Flux<User> {
            return reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext { consumerRecord ->
                    log.info(
                        "received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset()
                    )
                }
                .map { it.value() }
                .doOnNext { user ->
                    log.info(
                        "successfully consumed {}={}",
                        User::class.java.getSimpleName(), user
                    )
                }
                .doOnError { throwable ->
                    log.error(
                        "something bad happened while consuming : {}",
                        throwable.message
                    )
                }
        }
    
        override fun run(vararg args: String) {
            // we have to trigger consumption
            consumeUser().subscribe()
        }
    }

     

     

     

     

    7) 테스트용 Controller 작성

    package com.example.kafkatest.reactive
    
    import com.example.kafkatest.User
    import org.springframework.web.bind.annotation.GetMapping
    import org.springframework.web.bind.annotation.RestController
    
    @RestController
    class ReactiveKafkaController(
        val producer: ReactiveKafkaProducer
    ) {
    
        @GetMapping
        fun produce():String {
            producer.send(User("test", 1234))
            return "hello world"
        }
    }

     

     

     

     

    프로젝트를 실행해보면 다음과 같이 잘 되는 것을 확인할 수 있다.

    테스트용 api로 실행하면
    producer logging
    consumer logging
    kafka consumer로 데이터 잘 들어오는 것 확인

     

     

    완성 ✨

     

     

     

     

     

     

     

    (참고한 블로그)

    https://medium.com/@taapasagrawal/installing-and-running-apache-kafka-on-macos-with-m1-processor-5238dda81d51

     

    Installing and running Apache Kafka on MacOS with M1 Processor

    This article assumes that you have already setup homebrew on your MacOS M1 system. If not refer to this stackoverflow link to set it up or…

    medium.com

    https://reflectoring.io/spring-boot-kafka/  👍

     

    Using Kafka with Spring Boot

    How to use Spring Kafka to send messages to and receive messages from Kafka.

    reflectoring.io

    https://www.baeldung.com/spring-kafka  👍   

    https://docs.spring.io/spring-kafka/reference/html/#configuring-topics  👍  

     

    Spring for Apache Kafka

    When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

    docs.spring.io

    https://pinggoopark.tistory.com/6

     

    [kafka] 카프카 명령어 모음 Kafka Command Collection

    주키퍼 실행 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 카프카 실행 ./bin/kafka-server-start.sh -daemon config/server.properties 토픽 생성 ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-

    pinggoopark.tistory.com

    https://www.skyer9.pe.kr/wordpress/?p=1550 

     

    Spring Boot Kafka Json 데이타 전송하기 – 상구리의 기술 블로그

    Spring Boot Kafka Json 데이타 전송하기 application.yml 설정 spring: kafka: bootstrap-servers: 192.168.0.9:9092 데이타 class 생성 @Getter @Setter @NoArgsConstructor public class StockChange { String yyyymmdd; String skuCd; String fieldName; in

    www.skyer9.pe.kr

    https://blog.ippon.tech/kafka-tutorial-3-json-serdes/

     

    Kafka tutorial #3 - JSON SerDes

    This is the third post in this series where we go through the basics of using Kafka. We saw in the previous posts how to produce and consume JSON messages using the plain Java client and Jackson. We will see here how to create our own serializers and deser

    blog.ippon.tech

    https://azurealstn.tistory.com/74

     

    cannot deserialize from object value 에러!

    cannot deserialize from object value (no delegate- or property-based creator) 위와 같은 에러가 발생했다면 아래글을 한번 확인해보자. Member Class 먼저 데이터를 담을 모델을 정의하였습니다. //Getter, Setter는 생략 p

    azurealstn.tistory.com

    https://asuraiv.tistory.com/11

     

    Kotlin + SpringBoot + Kafka 연동

    이번 포스트에서는 코틀린 기반의 스프링 부트 프로젝트 설정 및 카프카 연동의 과정을 정리 하려고 한다. 내가 생각하는 코틀린의 큰 강점 중 하나는, 자바로 개발을 해오던 방식을 거의 그대

    asuraiv.tistory.com

     

    '네트워크 & 인프라' 카테고리의 다른 글

    쿠버네티스 모범 사례) Ch.1 - Ch.4 정리  (1) 2024.03.31
    AWS 컨테이너 ( ECS, EKS, 파게이트, ECR )  (0) 2022.11.04
    ArgoCD  (0) 2022.11.02
    아파치 카프카  (0) 2022.10.26
    쿠버네티스 ⑤ Namespace 와 Helm  (0) 2022.10.12
Designed by Tistory.