-
Kotlin + SpringBoot + Kafka 연동해보기 ( + spring kafka 와 reactor kafka )네트워크 & 인프라 2022. 11. 25. 01:40
아파치 카프카는 지난 번에 간단하게 개념정도만 정리했었는데,
이번에는 직접 사용할 일이 생겨서 kotlin에서 springboot + kafka 사용 방법을 공부했다.
(지난 카프카 공부내용 정리 블로그 글)
↓
https://dodop-blog.tistory.com/409
카프카 설치 및 실행하기
먼저 카프카를 설치해준다. 해당 기준은 m1 processor를 기준으로 진행하였다.
# brew를 통해서 kafka를 설치하면 zookeeper는 추가적인 설치가 필요없었다. $ brew install kafka # zookeeper 실행하기 $ zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties # kafka 실행하기 $ kafka-server-start /opt/homebrew/etc/kafka/server.properties
여기서 나는 kafka를 homebrew로 설치하다가 tgz 파일로 설치하다가를 반복하면서 문제가 발생했다.
처음에 발생한 오류는 다음과 같이 다른 카프카 인스턴스가 실행중이라서 log 파일에 lock이 없다고 나왔다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/homebrew/var/lib/kafka-logs. A Kafka instance in another process or thread is using this directory.
그래서 다음과 같이 혹시 실행된 kafka가 있다면 중단하도록 명령어를 실행하고 다시 재실행 해보았다.
↓
https://stackoverflow.com/questions/53338221/kafka-server-does-not-start
$ brew services stop kafka
그 다음엔 다음 오류가 발생했다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
그래서 homebrew log디렉토리로 가서 meta.properties파일도 지워보고 로그 파일도 지워보고 brew로 uninstall하고 모두 지우고 실행하다가 마지막에 다음 명령어를 실행해서 문제를 해결하였다!
이전에 카프카 서버를 제대로 중단하지 않고 Kill -9으로 죽이고 다시 실행했을 때 kafkaServer.log에 남아서 생기는 문제였다. 🤦🏻♀️
↓
https://gray-hat.tistory.com/94
$ cd /tmp/kafka-logs $ rm -rf * zsh: sure you want to delete all 5 files in /tmp/kafka-logs [yn]? y
카프카 실행이 잘 되었다면 다음과 같이 토픽을 생성하고 producer와 consumer 콘솔에서 내용을 확인할 수 있다.
# 'foobar'이름의 topic 생성하기 $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic foobar Created topic foobar. # 위의 명령어에서 Broker may not be available 오류 발생시 kafka 설정파일 (로컬 실행으로) 변경 $ vi /opt/homebrew/etc/kafka/server.properties listeners=PLAINTEXT://localhost:9092 # producer 콘솔이용하기 (foo와 bar를 입력해본다) $ kafka-console-producer --broker-list localhost:9092 --topic foobar > foo > bar # consumer 콘솔이용하기 (producer에서 입력된 foo, bar가 조회되는 것을 확인할 수 있다) $ kafka-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning foo bar
카프카 명령어
# 토픽 생성 $ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 토픽 리스트 보기 $ kafka-topics --list --bootstrap-server localhost:9092 # 토픽 상세 정보 보기 $ kafka-topics --desribe --bootstrap-server localhost:9092 --topic test # 프로듀서 -> 토픽에 데이터 넣기 $ kafka-console-producer --broker-list localhost:9092 --topic test # 컨슈머 -> 토픽의 데이터 확인 $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning # 토픽 삭제하기 => 안돼서 확인중 😭 $ vi /opt/homebrew/etc/kafka/server.properties $ delete.topic.enable=true $ kafka-topics --bootstrap-server localhost:2181 --delete --topic test # 컨슈머 그룹 리스트 보기 $ kafka-consumer-groups --bootstrap-server localhost:9092 --list # 컨슈머 그룹 상태 보기 $ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-consumer --describe
build.gradle.kts 설정하기
이제 프로젝트에 kafka 설정을 해준다.
repositories { mavenCentral() } dependencies { implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter-web") //kafka dependency 추가 implementation("org.springframework.kafka:spring-kafka") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") testImplementation("org.springframework.boot:spring-boot-starter-test") } tasks.withType<KotlinCompile> { kotlinOptions { freeCompilerArgs = listOf("-Xjsr305=strict") jvmTarget = "17" } } tasks.withType<Test> { useJUnitPlatform() }
스프링 부트에서 토픽을 생성해보기
스프링 부트에서는 다음과 같이 토픽을 생성할 수 있다. 여기서 KafkaAdmin 빈은 브로커에서 새로운 빈을 생성하는 책임을 가지고 있는데 스프링 부트에서는 KafkaAdmin 빈을 자동으로 등록해준다.
package com.example.kafkatest import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.NewTopic import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.TopicBuilder import org.springframework.kafka.core.KafkaAdmin @Configuration class KafkaTopicConfig { @Bean fun topic1(): NewTopic? { return TopicBuilder.name("topic1").build() } }
스프링 부트가 아닌 어플리케이션에서는 다음과 같이 KafkaAdmin을 등록해서 사용해줄 수 있다.
@Bean open fun admin(): KafkaAdmin? { val configs: MutableMap<String, Any> = HashMap() configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092" return KafkaAdmin(configs) }
메세지 produce, consume 구현하기
- Producer 데이터 보내기 예시
KafkaTemplate을 이용해서 특정 토픽("foobar")에 메세지를 보낸다.
(StringSerializer가 이용되는 것 같은데 확실하지 않음 🤔, 아마 그래서 추가적인 Producer Config 설정없이도 pub가 가능한 게 아닐까?)🤔
@Component class SampleTopicProducer { @Bean fun runner1(template: KafkaTemplate<String, String>): ApplicationRunner { return ApplicationRunner { template.send("foobar", "test") } } @Bean fun runner2(template: KafkaTemplate<String, String>): ApplicationRunner { return ApplicationRunner { template.send("foobar", "failMessage") } } }
- Consumer Config 구성하기
이번에는 다음과 같이 kafka consumer설정을 진행한다.
먼저 application.properties에 kafka 서버 호스트와 consumer offset을 지정해준다. 여기서 auto-offset-reset은 latest, earliest, none 옵션이 존재하는데 latest는 default로 가장 마지막 offset부터, earliest는 가장 처음 offset부터, none은 해당 consumer group이 가져가고자 하는 topic의 consuer offset정보가 없으면 exception을 발생시킨다.
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.auto-offset-reset=earliest
그 다음 consumer config를 설정한다. 여기서 값 Deserializer로 StringDeserializer를 이용하도록 설정하였다.
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Primary import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.KafkaTemplate import java.net.InetAddress import java.net.UnknownHostException import java.util.* @Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Primary @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>() containerFactory.consumerFactory = consumerFactory() return containerFactory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
- Consumer(Listener) 예시
다음 과 같이 @kafkaListener를 이용하여 특정 토픽을 구독할 수 있다.
@Component class SampleTopicListener { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) @KafkaListener(topics = ["foobar"]) fun consume(@Payload data: String) { log.info("Message $data") } }
- 앱 실행해보기
실제로 앱을 실행해보면 다음과 같이 설정한 대로 토픽이 추가로 생성이 되고, 데이터가 주고 받아지는 것을 확인할 수 있다.
- 토픽 생성 확인하기
$ kafka-topics --list --bootstrap-server localhost:9092
- Consumer 데이터 수신 확인
- 앱에서 로그가 찍히는 것을 확인
yml을 이용해서 메세지 produce, consume 구현하기
위의 간단한 메세지 보내는 기능을 다음과 같이 yml을 이용해 별도의 config파일없이 설정할 수 있다.
먼저 application.yml을 작성해준다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
그다음 SampleMessageProducer를 만들어준다. 빈으로 등록하여 앱이 실행될 때 메세지를 보내도록 설정하였다.
여기서 ListenableFuture를 사용하여 메세지를 보내고 난 후의 결과를 가지고 후처리를 할 수 있다. 여기서는 메세지가 성공적으로 보내졌을때와 실패했을 때 로그를 남기도록 설정하였다.
@Component class SampleMessageProducer() { val log = LoggerFactory.getLogger(SampleMessageProducer::class.java) @Bean fun sendMessage(template: KafkaTemplate<String, String>): String { val message: String = "testMessage" val future: ListenableFuture<SendResult<String, String>> = template.send("foo", message) future.addCallback(object : ListenableFutureCallback<SendResult<String, String>> { override fun onSuccess(result: SendResult<String, String>?) { log.info("Sent message=[$message] with offset=[" + result!!.recordMetadata.offset() + "]") } override fun onFailure(ex: Throwable) { log.info("Unable to send message=[$message] due to : + ${ex.message}") } }) return "" } }
이번엔 메세지를 소비하는 consumer를 작성해준다.
@Component class SampleMessageConsumer { val log = LoggerFactory.getLogger(SampleMessageConsumer::class.java) @KafkaListener(topics = ["foo"], groupId = "foo") fun listenGroupFoobar(message: String) { log.info("Received Message in group foo: $message") } }
앱을 실행하면 다음과 같이 확인할 수 있다.
먼저 producer에서 메세지를 잘 보낸 것을 확인할 수 있다.
메세지를 확인해보면 잘 들어오는 것을 확인할 수 있다.
메세지 필터링하여 consume하기
원래대로라면 test와 failMessage 모두 consume되는 것을 확인할 수 있다. 이번엔 메세지를 필터링해서 구독하도록 설정해보자.
ConsumerConfig에 다음과 같이 filter를 추가해준다. RecordFilterStrategy를 이용하면 해당 내용을 만족하는 부분은 listener에게 닿기전에 버려진다.
@Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String // 특정 메세지내용을 제외한 메세지 듣기 @Bean fun filterKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>() containerFactory.consumerFactory = consumerFactory() containerFactory.setRecordFilterStrategy { it.value().contains("fail") } return containerFactory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
consumer에서 다음과 같이 container를 사용하도록 지정하여 필터링을 적용한다.
@KafkaListener(topics = ["foobar"], containerFactory = "filterKafkaListenerContainerFactory") fun consume(user: User) { log.info("Message ${user.name} + ${user.age}") }
failMessage는 버려지고 test만 출력되는 것을 확인할 수 있다.
메세지 consume하고 답장 보내기
이제 replyTemplate을 적용하여 메세지를 consume 하고 topic에 메세지를 보내보자.
@Configuration class KafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String // 메세지 다시 Reply 하는 listener 구현 @Bean fun kafkaListenerContainerFactory(kafkaTemplate: KafkaTemplate<String, Any>): ConcurrentKafkaListenerContainerFactory<String, Any> { val factory = ConcurrentKafkaListenerContainerFactory<String, Any>() factory.consumerFactory = consumerFactory() as ConsumerFactory<in String, in Any> factory.setReplyTemplate(kafkaTemplate) return factory } private fun consumerFactory(): ConsumerFactory<in String, in String> { return DefaultKafkaConsumerFactory(consumerProperties()) } private fun consumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) } }
Listener에 다음과 같이 @SendTo("topic")을 이용하여 토픽에 메세지를 보내도록 구현해보자.
@Component class SampleTopicListener { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) // 메세지 받고 답장 보내기 @KafkaListener(topics = ["foobar"]) @SendTo("foobar") fun consume2(@Payload data: String): String { log.info("Message $data") return "Listener got this message ${data}" } }
현재 토픽을 consume하고 나서 바로 동일한 토픽에 다시 메세지를 보내고, 다시 이를 구독하는 과정이 반복되어 다음과 같이 출력되는 것을 확인할 수 있다.
Config파일을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
이번엔 위와 같이 string 타입이 아니라 데이터를 json 타입으로 주고 받을 수 있도록 구현해보자.
여기서 사용된 User 객체는 다음과 같다.
data class User( val name: String, val age: Int ) { constructor() : this("", 0) }
먼저 ProducerConfig를 다음과 같이 JsonSerializer를 사용하도록 설정해준다.
package com.example.kafkatest import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.support.serializer.JsonSerializer @Configuration class JsonKafkaProducerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Bean fun userProducerFactory(): ProducerFactory<String, User> { val configs: MutableMap<String, Any> = HashMap() configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = hosts configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java return DefaultKafkaProducerFactory(configs) } @Bean fun kafkaTemplate(): KafkaTemplate<String, User> { return KafkaTemplate(userProducerFactory()) }
그 다음 ConsumerConfig에서 JsonDeserializer를 사용하도록 설정을 해준다. 여기서 cannot trust class 오류가 발생하여 JsonDeserializer.TRUSTED_PACKAGES to "*" 옵션을 설정해주었다.
package com.example.kafkatest import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.support.serializer.JsonDeserializer import java.net.InetAddress import java.net.UnknownHostException import java.util.* @Configuration class JsonKafkaConsumerConfig { @Value("\${spring.kafka.bootstrap-servers}") lateinit var hosts: String @Bean fun userKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, User> { val factory: ConcurrentKafkaListenerContainerFactory<String, User> = ConcurrentKafkaListenerContainerFactory<String, User>() factory.consumerFactory = userConsumerFactory() return factory } @Bean fun userConsumerFactory(): ConsumerFactory<String, User> { return DefaultKafkaConsumerFactory(userConsumerProperties()) } private fun userConsumerProperties(): Map<String, Any> { val hostName: String = try { InetAddress.getLocalHost().hostName + UUID.randomUUID().toString() } catch (e: UnknownHostException) { UUID.randomUUID().toString() } return hashMapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts, ConsumerConfig.GROUP_ID_CONFIG to hostName, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "*" ) } }
이제 SampleProducer를 이용해서 메세지를 생성해준다.
package com.example.kafkatest import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.ApplicationRunner import org.springframework.context.annotation.Bean import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @Component class JsonSampleProducer( @Autowired kafkaTemplate: KafkaTemplate<String, User> ) { @Bean fun sendUserMessage(template: KafkaTemplate<String, User>): ApplicationRunner { val user = User("testUser", 30) return ApplicationRunner { template.send("foobar", user) } } }
생성된 메세지를 읽는 샘플 Consumer를 작성해준다.
package com.example.kafkatest import org.slf4j.LoggerFactory import org.springframework.kafka.annotation.KafkaListener import org.springframework.messaging.handler.annotation.Payload import org.springframework.stereotype.Component @Component class JsonSampleConsumer { val log = LoggerFactory.getLogger(SampleTopicListener::class.java) @KafkaListener(topics = ["foobar"], containerFactory = "userKafkaListenerContainerFactory") fun consume(user: User) { log.info("Message ${user.name} + ${user.age}") } }
다음과 같이 데이터 consume을 json타입으로 하는 것을 확인할 수 있다.
yml을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
위의 설정을 간단하게 yml을 통해서 지정해줄 수 있다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: "*" producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
위의 설정을 이용하는 producer를 작성해준다.
package com.example.kafkatest.ymltest import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult import org.springframework.lang.Nullable import org.springframework.stereotype.Component import org.springframework.util.concurrent.ListenableFuture import org.springframework.util.concurrent.ListenableFutureCallback @Component class SampleJsonProducer { val log = LoggerFactory.getLogger(SampleJsonProducer::class.java) @Bean fun sendMessage(template: KafkaTemplate<String, User>): String { val user = User("testUser", 30) val future: ListenableFuture<SendResult<String, User>> = template.send("foo", user) future.addCallback(object : ListenableFutureCallback<SendResult<String, User>> { override fun onSuccess(result: SendResult<String, User>?) { log.info("Sent message=[$user] with offset=[" + result!!.recordMetadata.offset() + "]") } override fun onFailure(ex: Throwable) { log.info("Unable to send message=[$user] due to : + ${ex.message}") } }) return "" } }
이를 읽는 Consumer를 작성해준다.
package com.example.kafkatest.ymltest import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.kafka.annotation.KafkaListener import org.springframework.stereotype.Component @Component class SampleJsonConsumer { val log = LoggerFactory.getLogger(SampleJsonConsumer::class.java) @KafkaListener(topics = ["foo"], groupId = "foo") fun listenGroupFoobar(user: User) { log.info("Received Message in group foo: $user") } }
앱 실행을 확인해보자.
application.yml 파일을 이용해서 config 설정해주기
위에서 복잡하게 설정해주었던 내용들을 다음을 이용해서 간편하게 설정해줄 수도 있다.
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foobar auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
옵션의 자세한 내용은 다음과 같다.
참고한 글
↓
https://blog.voidmainvoid.net/169
# APACHE KAFKA (KafkaProperties) spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client. spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file. spring.kafka.admin.ssl.key-store-location= # Location of the key store file. spring.kafka.admin.ssl.key-store-password= # Store password for the key store file. spring.kafka.admin.ssl.key-store-type= # Type of the key store. spring.kafka.admin.ssl.protocol= # SSL protocol to use. spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file. spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.admin.ssl.trust-store-type= # Type of the trust store. spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background. spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size". spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request. spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs. spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client. spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.consumer.ssl.key-store-location= # Location of the key store file. spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file. spring.kafka.consumer.ssl.key-store-type= # Type of the key store. spring.kafka.consumer.ssl.protocol= # SSL protocol to use. spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store. spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.jaas.control-flag=required # Control flag for login configuration. spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration. spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. spring.kafka.jaas.options= # Additional JAAS options. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received). spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level). spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Default batch size. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers. spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server. spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.producer.compression-type= # Compression type for all data generated by the producer. spring.kafka.producer.key-serializer= # Serializer class for keys. spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.producer.ssl.key-store-location= # Location of the key store file. spring.kafka.producer.ssl.key-store-password= # Store password for the key store file. spring.kafka.producer.ssl.key-store-type= # Type of the key store. spring.kafka.producer.ssl.protocol= # SSL protocol to use. spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.producer.ssl.trust-store-type= # Type of the trust store. spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer. spring.kafka.producer.value-serializer= # Serializer class for values. spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.key-store-location= # Location of the key store file. spring.kafka.ssl.key-store-password= # Store password for the key store file. spring.kafka.ssl.key-store-type= # Type of the key store. spring.kafka.ssl.protocol= # SSL protocol to use. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name. spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean. spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams. spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads. spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams. spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application. spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file. spring.kafka.streams.ssl.key-store-location= # Location of the key store file. spring.kafka.streams.ssl.key-store-password= # Store password for the key store file. spring.kafka.streams.ssl.key-store-type= # Type of the key store. spring.kafka.streams.ssl.protocol= # SSL protocol to use. spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file. spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.streams.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.state-dir= # Directory location for the state store. spring.kafka.template.default-topic= # Default topic to which messages are sent.
완성 ✨
Spring Kafka + Reactor Kafka
추가적으로 스프링 카프카와 함께 리액터 카프카를 사용하여 리액티브하게 사용하고 싶다면, 다음과 같이 설정할 수 있다.
↓
https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer
1) dependency 추가
//kafka dependency 추가 implementation("org.springframework.kafka:spring-kafka") implementation("io.projectreactor.kafka:reactor-kafka")
2) application.yml 작성
spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: "*" producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
3) ProducerConfig 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import reactor.kafka.sender.SenderOptions @Configuration class ReactiveKafkaProducerConfig { @Bean fun reactiveKafkaProducerTemplate(properties: KafkaProperties): ReactiveKafkaProducerTemplate<String, User> { val props = properties.buildProducerProperties() return ReactiveKafkaProducerTemplate(SenderOptions.create<String, User>(props)) } }
4) ConsumerConfig 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate import reactor.kafka.receiver.ReceiverOptions @Configuration class ReactiveKafkaConsumerConfig { @Bean fun kafkaReceiverOptions( kafkaProperties: KafkaProperties ): ReceiverOptions<String, User>? { val basicReceiverOptions: ReceiverOptions<String, User> = ReceiverOptions.create<String, User>(kafkaProperties.buildConsumerProperties()) return basicReceiverOptions.subscription(listOf("foo")) } @Bean fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, User>): ReactiveKafkaConsumerTemplate<String, User> { return ReactiveKafkaConsumerTemplate<String, User>(kafkaReceiverOptions) } }
5) Producer 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate import org.springframework.stereotype.Service @Service class ReactiveKafkaProducer( val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, User> ){ private val log = LoggerFactory.getLogger(ReactiveKafkaProducer::class.java) fun send(user: User) { log.info("send to topic={}, {}={},", "foo", User::class.java.getSimpleName(), user) reactiveKafkaProducerTemplate.send("foo", user) .doOnSuccess { senderResult -> log.info( "sent {} offset : {}", user, senderResult.recordMetadata().offset() ) } .subscribe() } }
6) Consumer 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.slf4j.LoggerFactory import org.springframework.boot.CommandLineRunner import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate import org.springframework.stereotype.Service import reactor.core.publisher.Flux @Service class ReactiveKafkaConsumer( val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, User> ) : CommandLineRunner { private val log = LoggerFactory.getLogger(ReactiveKafkaConsumer::class.java) private fun consumeUser(): Flux<User> { return reactiveKafkaConsumerTemplate .receiveAutoAck() .doOnNext { consumerRecord -> log.info( "received key={}, value={} from topic={}, offset={}", consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), consumerRecord.offset() ) } .map { it.value() } .doOnNext { user -> log.info( "successfully consumed {}={}", User::class.java.getSimpleName(), user ) } .doOnError { throwable -> log.error( "something bad happened while consuming : {}", throwable.message ) } } override fun run(vararg args: String) { // we have to trigger consumption consumeUser().subscribe() } }
7) 테스트용 Controller 작성
package com.example.kafkatest.reactive import com.example.kafkatest.User import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RestController @RestController class ReactiveKafkaController( val producer: ReactiveKafkaProducer ) { @GetMapping fun produce():String { producer.send(User("test", 1234)) return "hello world" } }
프로젝트를 실행해보면 다음과 같이 잘 되는 것을 확인할 수 있다.
완성 ✨
(참고한 블로그)
https://reflectoring.io/spring-boot-kafka/ 👍
https://www.baeldung.com/spring-kafka 👍
https://docs.spring.io/spring-kafka/reference/html/#configuring-topics 👍
https://pinggoopark.tistory.com/6
https://www.skyer9.pe.kr/wordpress/?p=1550
https://blog.ippon.tech/kafka-tutorial-3-json-serdes/
https://azurealstn.tistory.com/74
https://asuraiv.tistory.com/11
'네트워크 & 인프라' 카테고리의 다른 글
Docker /usr/local/Homebrew is not writable, Cannot connect to the Docker daemon 문제 해결 (0) 2024.06.04 쿠버네티스 모범 사례) Ch.1 - Ch.4 정리 (1) 2024.03.31 AWS 컨테이너 ( ECS, EKS, 파게이트, ECR ) (0) 2022.11.04 ArgoCD (0) 2022.11.02 아파치 카프카 (0) 2022.10.26