-
Kotlin + SpringBoot + Kafka 연동해보기 ( + spring kafka 와 reactor kafka )네트워크 & 인프라 2022. 11. 25. 01:40
아파치 카프카는 지난 번에 간단하게 개념정도만 정리했었는데,
이번에는 직접 사용할 일이 생겨서 kotlin에서 springboot + kafka 사용 방법을 공부했다.
(지난 카프카 공부내용 정리 블로그 글)
↓
https://dodop-blog.tistory.com/409
아파치 카프카
EDA를 배우면서 카프카에 대해서 알게 되었고, 유튜브를 찾아보니 다음의 플레이리스트 강의가 있어서 아주 간단하게 기본 개념만이라도 학습해보았다! 정리한 자료 및 사진은 모두 영상의 내용
dodop-blog.tistory.com
다른 사람은 다 알고있는 카프카,, 또 나만 모르지,,, 또 나만 진심이지,, 카프카 설치 및 실행하기
먼저 카프카를 설치해준다. 해당 기준은 m1 processor를 기준으로 진행하였다.
# brew를 통해서 kafka를 설치하면 zookeeper는 추가적인 설치가 필요없었다. $ brew install kafka # zookeeper 실행하기 $ zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties # kafka 실행하기 $ kafka-server-start /opt/homebrew/etc/kafka/server.properties
여기서 나는 kafka를 homebrew로 설치하다가 tgz 파일로 설치하다가를 반복하면서 문제가 발생했다.
처음에 발생한 오류는 다음과 같이 다른 카프카 인스턴스가 실행중이라서 log 파일에 lock이 없다고 나왔다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/homebrew/var/lib/kafka-logs. A Kafka instance in another process or thread is using this directory.
그래서 다음과 같이 혹시 실행된 kafka가 있다면 중단하도록 명령어를 실행하고 다시 재실행 해보았다.
↓
https://stackoverflow.com/questions/53338221/kafka-server-does-not-start
Kafka server does not start
I installed Kafka and Zookeeper on my OSX machine using Homebrew, and I'm trying to launch Zookeeper and Kafka-server following this blog post. zookeeper-server-start /usr/local/etc/kafka/zookeeper.
stackoverflow.com
$ brew services stop kafka
그 다음엔 다음 오류가 발생했다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
그래서 homebrew log디렉토리로 가서 meta.properties파일도 지워보고 로그 파일도 지워보고 brew로 uninstall하고 모두 지우고 실행하다가 마지막에 다음 명령어를 실행해서 문제를 해결하였다!
이전에 카프카 서버를 제대로 중단하지 않고 Kill -9으로 죽이고 다시 실행했을 때 kafkaServer.log에 남아서 생기는 문제였다. 🤦🏻♀️
↓
https://gray-hat.tistory.com/94
[kafka] 카프카 시작 안되는 경우
배경설명 AWS EC2에 카프카와 주키퍼를 single-node로 셋팅해서 로컬에서 접속테스트 이것저것하고 다른날 다시 연결해보니 연결이 안되는 현상 발생. 버전: kafka_2.12-2.5.0 급한 마음에 kill -9로 프로세
gray-hat.tistory.com
$ cd /tmp/kafka-logs $ rm -rf * zsh: sure you want to delete all 5 files in /tmp/kafka-logs [yn]? y
카프카 실행이 잘 되었다면 다음과 같이 토픽을 생성하고 producer와 consumer 콘솔에서 내용을 확인할 수 있다.
# 'foobar'이름의 topic 생성하기 $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic foobar Created topic foobar. # 위의 명령어에서 Broker may not be available 오류 발생시 kafka 설정파일 (로컬 실행으로) 변경 $ vi /opt/homebrew/etc/kafka/server.properties listeners=PLAINTEXT://localhost:9092 # producer 콘솔이용하기 (foo와 bar를 입력해본다) $ kafka-console-producer --broker-list localhost:9092 --topic foobar > foo > bar # consumer 콘솔이용하기 (producer에서 입력된 foo, bar가 조회되는 것을 확인할 수 있다) $ kafka-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning foo bar
카프카 명령어
# 토픽 생성 $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 토픽 리스트 보기 $ kafka-topics --list --bootstrap-server localhost:9092 # 토픽 상세 정보 보기 $ kafka-topics --desribe --bootstrap-server localhost:9092 --topic test # 프로듀서 -> 토픽에 데이터 넣기 $ kafka-console-producer --broker-list localhost:9092 --topic test # 컨슈머 -> 토픽의 데이터 확인 $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning # 토픽 삭제하기 => 안돼서 확인중 😭 $ vi /opt/homebrew/etc/kafka/server.properties $ delete.topic.enable=true $ kafka-topics --bootstrap-server localhost:2181 --delete --topic test # 컨슈머 그룹 리스트 보기 $ kafka-consumer-groups --bootstrap-server localhost:9092 --list # 컨슈머 그룹 상태 보기 $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-consumer --describe
build.gradle.kts 설정하기
이제 프로젝트에 kafka 설정을 해준다.
repositories { mavenCentral() } dependencies { implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter-web") //kafka dependency 추가 implementation("org.springframework.kafka:spring-kafka") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") testImplementation("org.springframework.boot:spring-boot-starter-test") } tasks.withType<KotlinCompile> { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict") jvmTarget = "17" } } tasks.withType<Test> { useJUnitPlatform() }
스프링 부트에서 토픽을 생성해보기
스프링 부트에서는 다음과 같이 토픽을 생성할 수 있다. 여기서 KafkaAdmin 빈은 브로커에서 새로운 빈을 생성하는 책임을 가지고 있는데 스프링 부트에서는 KafkaAdmin 빈을 자동으로 등록해준다.
package com.example.kafkatest import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.NewTopic import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.TopicBuilder import org.springframework.kafka.core.KafkaAdmin @Configuration class KafkaTopicConfig { @Bean fun topic1(): NewTopic? { return TopicBuilder.name("topic1").build() } }
스프링 부트가 아닌 어플리케이션에서는 다음과 같이 KafkaAdmin을 등록해서 사용해줄 수 있다.
@Bean open fun admin(): KafkaAdmin? { val configs: MutableMap<String, Any> = HashMap() configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092" return KafkaAdmin(configs) }
메세지 produce, consume 구현하기
- Producer 데이터 보내기 예시
KafkaTemplate을 이용해서 특정 토픽("foobar")에 메세지를 보낸다.
(StringSerializer가 이용되는 것 같은데 확실하지 않음 🤔, 아마 그래서 추가적인 Producer Config 설정없이도 pub가 가능한 게 아닐까?)🤔
@Component class SampleTopicProducer { @Bean fun runner1(template: KafkaTemplate<String, String>): ApplicationRunner { return ApplicationRunner { template.send("foobar", "test") } } @Bean fun runner2(template: KafkaTemplate<String, String>): ApplicationRunner { return ApplicationRunner { template.send("foobar", "failMessage") } } }
- Consumer Config 구성하기
이번에는 다음과 같이 kafka consumer설정을 진행한다.
먼저 application.properties에 kafka 서버 호스트와 consumer offset을 지정해준다. 여기서 auto-offset-reset은 latest, earliest, none 옵션이 존재하는데 latest는 default로 가장 마지막 offset부터, earliest는 가장 처음 offset부터, none은 해당 consumer group이 가져가고자 하는 topic의 consuer offset정보가 없으면 exception을 발생시킨다.
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.auto-offset-reset=earliest
그 다음 consumer config를 설정한다. 여기서 값 Deserializer로 StringDeserializer를 이용하도록 설정하였다.
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Primary import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.KafkaTemplate import java.net.InetAddress import java.net.UnknownHostException import java.util.* @Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Primary @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>() containerFactory.consumerFactory = consumerFactory() return containerFactory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
- Consumer(Listener) 예시
다음 과 같이 @kafkaListener를 이용하여 특정 토픽을 구독할 수 있다.
@Component class SampleTopicListener { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) @KafkaListener(topics = ["foobar"]) fun consume(@Payload data: String) { log.info("Message $data") } }
- 앱 실행해보기
실제로 앱을 실행해보면 다음과 같이 설정한 대로 토픽이 추가로 생성이 되고, 데이터가 주고 받아지는 것을 확인할 수 있다.
- 토픽 생성 확인하기
$ kafka-topics --list --bootstrap-server localhost:9092
- Consumer 데이터 수신 확인
- 앱에서 로그가 찍히는 것을 확인
yml을 이용해서 메세지 produce, consume 구현하기
위의 간단한 메세지 보내는 기능을 다음과 같이 yml을 이용해 별도의 config파일없이 설정할 수 있다.
먼저 application.yml을 작성해준다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
그다음 SampleMessageProducer를 만들어준다. 빈으로 등록하여 앱이 실행될 때 메세지를 보내도록 설정하였다.
여기서 ListenableFuture를 사용하여 메세지를 보내고 난 후의 결과를 가지고 후처리를 할 수 있다. 여기서는 메세지가 성공적으로 보내졌을때와 실패했을 때 로그를 남기도록 설정하였다.
@Component class SampleMessageProducer() { val log = LoggerFactory.getLogger(SampleMessageProducer::class.java) @Bean fun sendMessage(template: KafkaTemplate<String, String>): String { val message: String = "testMessage" val future: ListenableFuture<SendResult<String, String>> = template.send("foo", message) future.addCallback(object : ListenableFutureCallback<SendResult<String, String>> { override fun onSuccess(result: SendResult<String, String>?) { log.info("Sent message=[$message] with offset=[" + result!!.recordMetadata.offset() + "]") } override fun onFailure(ex: Throwable) { log.info("Unable to send message=[$message] due to : + ${ex.message}") } }) return "" } }
이번엔 메세지를 소비하는 consumer를 작성해준다.
@Component class SampleMessageConsumer { val log = LoggerFactory.getLogger(SampleMessageConsumer::class.java) @KafkaListener(topics = ["foo"], groupId = "foo") fun listenGroupFoobar(message: String) { log.info("Received Message in group foo: $message") } }
앱을 실행하면 다음과 같이 확인할 수 있다.
먼저 producer에서 메세지를 잘 보낸 것을 확인할 수 있다.
메세지를 확인해보면 잘 들어오는 것을 확인할 수 있다.
메세지 필터링하여 consume하기
원래대로라면 test와 failMessage 모두 consume되는 것을 확인할 수 있다. 이번엔 메세지를 필터링해서 구독하도록 설정해보자.
ConsumerConfig에 다음과 같이 filter를 추가해준다. RecordFilterStrategy를 이용하면 해당 내용을 만족하는 부분은 listener에게 닿기전에 버려진다.
@Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String // 특정 메세지내용을 제외한 메세지 듣기 @Bean fun filterKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>() containerFactory.consumerFactory = consumerFactory() containerFactory.setRecordFilterStrategy { it.value().contains("fail") } return containerFactory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
consumer에서 다음과 같이 container를 사용하도록 지정하여 필터링을 적용한다.
@KafkaListener(topics = ["foobar"], containerFactory = "filterKafkaListenerContainerFactory") fun consume(user: User) { log.info("Message ${user.name} + ${user.age}") }
failMessage는 버려지고 test만 출력되는 것을 확인할 수 있다.
메세지 consume하고 답장 보내기
이제 replyTemplate을 적용하여 메세지를 consume 하고 topic에 메세지를 보내보자.
@Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String // 메세지 다시 Reply 하는 listener 구현 @Bean fun kafkaListenerContainerFactory(kafkaTemplate: KafkaTemplate<String, Any>): ConcurrentKafkaListenerContainerFactory<String, Any> { val factory = ConcurrentKafkaListenerContainerFactory<String, Any>() factory.consumerFactory = consumerFactory() as ConsumerFactory<in String, in Any> factory.setReplyTemplate(kafkaTemplate) return factory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
Listener에 다음과 같이 @SendTo("topic")을 이용하여 토픽에 메세지를 보내도록 구현해보자.
@Component class SampleTopicListener { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) // 메세지 받고 답장 보내기 @KafkaListener(topics = ["foobar"]) @SendTo("foobar") fun consume2(@Payload data: String): String { log.info("Message $data") return "Listener got this message ${data}" } }
현재 토픽을 consume하고 나서 바로 동일한 토픽에 다시 메세지를 보내고, 다시 이를 구독하는 과정이 반복되어 다음과 같이 출력되는 것을 확인할 수 있다.
Config파일을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
이번엔 위와 같이 string 타입이 아니라 데이터를 json 타입으로 주고 받을 수 있도록 구현해보자.
여기서 사용된 User 객체는 다음과 같다.
data class User( val name: String, val age: Int ) { constructor() : this("", 0) }
먼저 ProducerConfig를 다음과 같이 JsonSerializer를 사용하도록 설정해준다.
package com.example.kafkatest import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.support.serializer.JsonSerializer @Configuration class JsonKafkaProducerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Bean fun userProducerFactory(): ProducerFactory<String, User> { val configs: MutableMap<String, Any> = HashMap() configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = hosts configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java return DefaultKafkaProducerFactory(configs) } @Bean fun kafkaTemplate(): KafkaTemplate<String, User> { return KafkaTemplate(userProducerFactory()) }
그 다음 ConsumerConfig에서 JsonDeserializer를 사용하도록 설정을 해준다. 여기서 cannot trust class 오류가 발생하여 JsonDeserializer.TRUSTED_PACKAGES to "*" 옵션을 설정해주었다.
package com.example.kafkatest import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.support.serializer.JsonDeserializer import java.net.InetAddress import java.net.UnknownHostException import java.util.* @Configuration class JsonKafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Bean fun userKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, User> { val factory: ConcurrentKafkaListenerContainerFactory<String, User> = ConcurrentKafkaListenerContainerFactory<String, User>() factory.consumerFactory = userConsumerFactory() return factory } @Bean fun userConsumerFactory(): ConsumerFactory<String, User> { return DefaultKafkaConsumerFactory(userConsumerProperties()) } private fun userConsumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "*" ) } }
이제 SampleProducer를 이용해서 메세지를 생성해준다.
package com.example.kafkatest import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.ApplicationRunner import org.springframework.context.annotation.Bean import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @Component class JsonSampleProducer( @Autowired kafkaTemplate: KafkaTemplate<String, User> ) { @Bean fun sendUserMessage(template: KafkaTemplate<String, User>): ApplicationRunner { val user = User("testUser", 30) return ApplicationRunner { template.send("foobar", user) } } }
생성된 메세지를 읽는 샘플 Consumer를 작성해준다.
package com.example.kafkatest import org.slf4j.LoggerFactory import org.springframework.kafka.annotation.KafkaListener import org.springframework.messaging.handler.annotation.Payload import org.springframework.stereotype.Component @Component class JsonSampleConsumer { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) @KafkaListener(topics = ["foobar"], containerFactory = "userKafkaListenerContainerFactory") fun consume(user: User) { log.info("Message ${user.name} + ${user.age}") } }
다음과 같이 데이터 consume을 json타입으로 하는 것을 확인할 수 있다.
yml을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
위의 설정을 간단하게 yml을 통해서 지정해줄 수 있다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: "*" producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
위의 설정을 이용하는 producer를 작성해준다.
package com.example.kafkatest.ymltest import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult import org.springframework.lang.Nullable import org.springframework.stereotype.Component import org.springframework.util.concurrent.ListenableFuture import org.springframework.util.concurrent.ListenableFutureCallback @Component class SampleJsonProducer { val log = LoggerFactory.getLogger(SampleJsonProducer::class.java) @Bean fun sendMessage(template: KafkaTemplate<String, User>): String { val user = User("testUser", 30) val future: ListenableFuture<SendResult<String, User>> = template.send("foo", user) future.addCallback(object : ListenableFutureCallback<SendResult<String, User>> { override fun onSuccess(result: SendResult<String, User>?) { log.info("Sent message=[$user] with offset=[" + result!!.recordMetadata.offset() + "]") } override fun onFailure(ex: Throwable) { log.info("Unable to send message=[$user] due to : + ${ex.message}") } }) return "" } }
이를 읽는 Consumer를 작성해준다.
package com.example.kafkatest.ymltest import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.kafka.annotation.KafkaListener import org.springframework.stereotype.Component @Component class SampleJsonConsumer { val log = LoggerFactory.getLogger(SampleJsonConsumer::class.java) @KafkaListener(topics = ["foo"], groupId = "foo") fun listenGroupFoobar(user: User) { log.info("Received Message in group foo: $user") } }
앱 실행을 확인해보자.
application.yml 파일을 이용해서 config 설정해주기
위에서 복잡하게 설정해주었던 내용들을 다음을 이용해서 간편하게 설정해줄 수도 있다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foobar auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
옵션의 자세한 내용은 다음과 같다.
참고한 글
↓
https://blog.voidmainvoid.net/169
Spring boot에서 kafka 사용시 application.yaml 설정
# APACHE KAFKA (KafkaProperties) spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.
blog.voidmainvoid.net
# APACHE KAFKA (KafkaProperties) spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client. spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file. spring.kafka.admin.ssl.key-store-location= # Location of the key store file. spring.kafka.admin.ssl.key-store-password= # Store password for the key store file. spring.kafka.admin.ssl.key-store-type= # Type of the key store. spring.kafka.admin.ssl.protocol= # SSL protocol to use. spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file. spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.admin.ssl.trust-store-type= # Type of the trust store. spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background. spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size". spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request. spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs. spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client. spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.consumer.ssl.key-store-location= # Location of the key store file. spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file. spring.kafka.consumer.ssl.key-store-type= # Type of the key store. spring.kafka.consumer.ssl.protocol= # SSL protocol to use. spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store. spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.jaas.control-flag=required # Control flag for login configuration. spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration. spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. spring.kafka.jaas.options= # Additional JAAS options. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received). spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level). spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Default batch size. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers. spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server. spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.producer.compression-type= # Compression type for all data generated by the producer. spring.kafka.producer.key-serializer= # Serializer class for keys. spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.producer.ssl.key-store-location= # Location of the key store file. spring.kafka.producer.ssl.key-store-password= # Store password for the key store file. spring.kafka.producer.ssl.key-store-type= # Type of the key store. spring.kafka.producer.ssl.protocol= # SSL protocol to use. spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.producer.ssl.trust-store-type= # Type of the trust store. spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer. spring.kafka.producer.value-serializer= # Serializer class for values. spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.key-store-location= # Location of the key store file. spring.kafka.ssl.key-store-password= # Store password for the key store file. spring.kafka.ssl.key-store-type= # Type of the key store. spring.kafka.ssl.protocol= # SSL protocol to use. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name. spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean. spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams. spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads. spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams. spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application. spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file. spring.kafka.streams.ssl.key-store-location= # Location of the key store file. spring.kafka.streams.ssl.key-store-password= # Store password for the key store file. spring.kafka.streams.ssl.key-store-type= # Type of the key store. spring.kafka.streams.ssl.protocol= # SSL protocol to use. spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file. spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.streams.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.state-dir= # Directory location for the state store. spring.kafka.template.default-topic= # Default topic to which messages are sent.
완성 ✨
Spring Kafka + Reactor Kafka
추가적으로 스프링 카프카와 함께 리액터 카프카를 사용하여 리액티브하게 사용하고 싶다면, 다음과 같이 설정할 수 있다.
↓
https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer
GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implementation
Example reactive kafka consumer and producer implementation on spring boot - GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implem...
github.com
1) dependency 추가
//kafka dependency 추가 implementation("org.springframework.kafka:spring-kafka") implementation("io.projectreactor.kafka:reactor-kafka")
2) application.yml 작성
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: "*" producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
3) ProducerConfig 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import reactor.kafka.sender.SenderOptions @Configuration class ReactiveKafkaProducerConfig { @Bean fun reactiveKafkaProducerTemplate(properties: KafkaProperties): ReactiveKafkaProducerTemplate<String, User> { val props = properties.buildProducerProperties() return ReactiveKafkaProducerTemplate(SenderOptions.create<String, User>(props)) } }
4) ConsumerConfig 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate import reactor.kafka.receiver.ReceiverOptions @Configuration class ReactiveKafkaConsumerConfig { @Bean fun kafkaReceiverOptions( kafkaProperties: KafkaProperties ): ReceiverOptions<String, User>? { val basicReceiverOptions: ReceiverOptions<String, User> = ReceiverOptions.create<String, User>(kafkaProperties.buildConsumerProperties()) return basicReceiverOptions.subscription(listOf("foo")) } @Bean fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, User>): ReactiveKafkaConsumerTemplate<String, User> { return ReactiveKafkaConsumerTemplate<String, User>(kafkaReceiverOptions) } }
5) Producer 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import org.springframework.stereotype.Service @Service class ReactiveKafkaProducer( val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, User> ){ private val log = LoggerFactory.getLogger(ReactiveKafkaProducer::class.java) fun send(user: User) { log.info("send to topic={}, {}={},", "foo", User::class.java.getSimpleName(), user) reactiveKafkaProducerTemplate.send("foo", user) .doOnSuccess { senderResult -> log.info( "sent {} offset : {}", user, senderResult.recordMetadata().offset() ) } .subscribe() } }
6) Consumer 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.boot.CommandLineRunner import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate import org.springframework.stereotype.Service import reactor.core.publisher.Flux @Service class ReactiveKafkaConsumer( val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, User> ) : CommandLineRunner { private val log = LoggerFactory.getLogger(ReactiveKafkaConsumer::class.java) private fun consumeUser(): Flux<User> { return reactiveKafkaConsumerTemplate .receiveAutoAck() .doOnNext { consumerRecord -> log.info( "received key={}, value={} from topic={}, offset={}", consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset() ) } .map { it.value() } .doOnNext { user -> log.info( "successfully consumed {}={}", User::class.java.getSimpleName(), user ) } .doOnError { throwable -> log.error( "something bad happened while consuming : {}", throwable.message ) } } override fun run(vararg args: String) { // we have to trigger consumption consumeUser().subscribe() } }
7) 테스트용 Controller 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RestController @RestController class ReactiveKafkaController( val producer: ReactiveKafkaProducer ) { @GetMapping fun produce():String { producer.send(User("test", 1234)) return "hello world" } }
프로젝트를 실행해보면 다음과 같이 잘 되는 것을 확인할 수 있다.
테스트용 api로 실행하면 producer logging consumer logging kafka consumer로 데이터 잘 들어오는 것 확인 완성 ✨
(참고한 블로그)
Installing and running Apache Kafka on MacOS with M1 Processor
This article assumes that you have already setup homebrew on your MacOS M1 system. If not refer to this stackoverflow link to set it up or…
medium.com
https://reflectoring.io/spring-boot-kafka/ 👍
Using Kafka with Spring Boot
How to use Spring Kafka to send messages to and receive messages from Kafka.
reflectoring.io
https://www.baeldung.com/spring-kafka 👍
https://docs.spring.io/spring-kafka/reference/html/#configuring-topics 👍
Spring for Apache Kafka
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka
docs.spring.io
https://pinggoopark.tistory.com/6
[kafka] 카프카 명령어 모음 Kafka Command Collection
주키퍼 실행 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 카프카 실행 ./bin/kafka-server-start.sh -daemon config/server.properties 토픽 생성 ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-
pinggoopark.tistory.com
https://www.skyer9.pe.kr/wordpress/?p=1550
Spring Boot Kafka Json 데이타 전송하기 – 상구리의 기술 블로그
Spring Boot Kafka Json 데이타 전송하기 application.yml 설정 spring: kafka: bootstrap-servers: 192.168.0.9:9092 데이타 class 생성 @Getter @Setter @NoArgsConstructor public class StockChange { String yyyymmdd; String skuCd; String fieldName; in
www.skyer9.pe.kr
https://blog.ippon.tech/kafka-tutorial-3-json-serdes/
Kafka tutorial #3 - JSON SerDes
This is the third post in this series where we go through the basics of using Kafka. We saw in the previous posts how to produce and consume JSON messages using the plain Java client and Jackson. We will see here how to create our own serializers and deser
blog.ippon.tech
https://azurealstn.tistory.com/74
cannot deserialize from object value 에러!
cannot deserialize from object value (no delegate- or property-based creator) 위와 같은 에러가 발생했다면 아래글을 한번 확인해보자. Member Class 먼저 데이터를 담을 모델을 정의하였습니다. //Getter, Setter는 생략 p
azurealstn.tistory.com
https://asuraiv.tistory.com/11
Kotlin + SpringBoot + Kafka 연동
이번 포스트에서는 코틀린 기반의 스프링 부트 프로젝트 설정 및 카프카 연동의 과정을 정리 하려고 한다. 내가 생각하는 코틀린의 큰 강점 중 하나는, 자바로 개발을 해오던 방식을 거의 그대
asuraiv.tistory.com
'네트워크 & 인프라' 카테고리의 다른 글
Docker /usr/local/Homebrew is not writable, Cannot connect to the Docker daemon 문제 해결 (0) 2024.06.04 쿠버네티스 모범 사례) Ch.1 - Ch.4 정리 (1) 2024.03.31 AWS 컨테이너 ( ECS, EKS, 파게이트, ECR ) (0) 2022.11.04 ArgoCD (0) 2022.11.02 아파치 카프카 (0) 2022.10.26