Kotlin + SpringBoot + Kafka 연동해보기 ( + spring kafka 와 reactor kafka )
아파치 카프카는 지난 번에 간단하게 개념정도만 정리했었는데,
이번에는 직접 사용할 일이 생겨서 kotlin에서 springboot + kafka 사용 방법을 공부했다.
(지난 카프카 공부내용 정리 블로그 글)
↓
https://dodop-blog.tistory.com/409
아파치 카프카
EDA를 배우면서 카프카에 대해서 알게 되었고, 유튜브를 찾아보니 다음의 플레이리스트 강의가 있어서 아주 간단하게 기본 개념만이라도 학습해보았다! 정리한 자료 및 사진은 모두 영상의 내용
dodop-blog.tistory.com
카프카 설치 및 실행하기
먼저 카프카를 설치해준다. 해당 기준은 m1 processor를 기준으로 진행하였다.
# brew를 통해서 kafka를 설치하면 zookeeper는 추가적인 설치가 필요없었다.
$ brew install kafka
# zookeeper 실행하기
$ zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
# kafka 실행하기
$ kafka-server-start /opt/homebrew/etc/kafka/server.properties
여기서 나는 kafka를 homebrew로 설치하다가 tgz 파일로 설치하다가를 반복하면서 문제가 발생했다.
처음에 발생한 오류는 다음과 같이 다른 카프카 인스턴스가 실행중이라서 log 파일에 lock이 없다고 나왔다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
org.apache.kafka.common.KafkaException: Failed to acquire lock on file .lock in /opt/homebrew/var/lib/kafka-logs. A Kafka instance in another process or thread is using this directory.
그래서 다음과 같이 혹시 실행된 kafka가 있다면 중단하도록 명령어를 실행하고 다시 재실행 해보았다.
↓
https://stackoverflow.com/questions/53338221/kafka-server-does-not-start
Kafka server does not start
I installed Kafka and Zookeeper on my OSX machine using Homebrew, and I'm trying to launch Zookeeper and Kafka-server following this blog post. zookeeper-server-start /usr/local/etc/kafka/zookeeper.
stackoverflow.com
$ brew services stop kafka
그 다음엔 다음 오류가 발생했다.
ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
그래서 homebrew log디렉토리로 가서 meta.properties파일도 지워보고 로그 파일도 지워보고 brew로 uninstall하고 모두 지우고 실행하다가 마지막에 다음 명령어를 실행해서 문제를 해결하였다!
이전에 카프카 서버를 제대로 중단하지 않고 Kill -9으로 죽이고 다시 실행했을 때 kafkaServer.log에 남아서 생기는 문제였다. 🤦🏻♀️
↓
https://gray-hat.tistory.com/94
[kafka] 카프카 시작 안되는 경우
배경설명 AWS EC2에 카프카와 주키퍼를 single-node로 셋팅해서 로컬에서 접속테스트 이것저것하고 다른날 다시 연결해보니 연결이 안되는 현상 발생. 버전: kafka_2.12-2.5.0 급한 마음에 kill -9로 프로세
gray-hat.tistory.com
$ cd /tmp/kafka-logs
$ rm -rf *
zsh: sure you want to delete all 5 files in /tmp/kafka-logs [yn]? y
카프카 실행이 잘 되었다면 다음과 같이 토픽을 생성하고 producer와 consumer 콘솔에서 내용을 확인할 수 있다.
# 'foobar'이름의 topic 생성하기
$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic foobar
Created topic foobar.
# 위의 명령어에서 Broker may not be available 오류 발생시 kafka 설정파일 (로컬 실행으로) 변경
$ vi /opt/homebrew/etc/kafka/server.properties
listeners=PLAINTEXT://localhost:9092
# producer 콘솔이용하기 (foo와 bar를 입력해본다)
$ kafka-console-producer --broker-list localhost:9092 --topic foobar
> foo
> bar
# consumer 콘솔이용하기 (producer에서 입력된 foo, bar가 조회되는 것을 확인할 수 있다)
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning
foo
bar
카프카 명령어
# 토픽 생성
$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 토픽 리스트 보기
$ kafka-topics --list --bootstrap-server localhost:9092
# 토픽 상세 정보 보기
$ kafka-topics --desribe --bootstrap-server localhost:9092 --topic test
# 프로듀서 -> 토픽에 데이터 넣기
$ kafka-console-producer --broker-list localhost:9092 --topic test
# 컨슈머 -> 토픽의 데이터 확인
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
# 토픽 삭제하기 => 안돼서 확인중 😭
$ vi /opt/homebrew/etc/kafka/server.properties
$ delete.topic.enable=true
$ kafka-topics --bootstrap-server localhost:2181 --delete --topic test
# 컨슈머 그룹 리스트 보기
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
# 컨슈머 그룹 상태 보기
$ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-consumer --describe
build.gradle.kts 설정하기
이제 프로젝트에 kafka 설정을 해준다.
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-web")
//kafka dependency 추가
implementation("org.springframework.kafka:spring-kafka")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "17"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
스프링 부트에서 토픽을 생성해보기
스프링 부트에서는 다음과 같이 토픽을 생성할 수 있다. 여기서 KafkaAdmin 빈은 브로커에서 새로운 빈을 생성하는 책임을 가지고 있는데 스프링 부트에서는 KafkaAdmin 빈을 자동으로 등록해준다.
package com.example.kafkatest
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.KafkaAdmin
@Configuration
class KafkaTopicConfig {
@Bean
fun topic1(): NewTopic? {
return TopicBuilder.name("topic1").build()
}
}
스프링 부트가 아닌 어플리케이션에서는 다음과 같이 KafkaAdmin을 등록해서 사용해줄 수 있다.
@Bean
open fun admin(): KafkaAdmin? {
val configs: MutableMap<String, Any> = HashMap()
configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
return KafkaAdmin(configs)
}
메세지 produce, consume 구현하기
- Producer 데이터 보내기 예시
KafkaTemplate을 이용해서 특정 토픽("foobar")에 메세지를 보낸다.
(StringSerializer가 이용되는 것 같은데 확실하지 않음 🤔, 아마 그래서 추가적인 Producer Config 설정없이도 pub가 가능한 게 아닐까?)🤔
@Component
class SampleTopicProducer {
@Bean
fun runner1(template: KafkaTemplate<String, String>): ApplicationRunner {
return ApplicationRunner { template.send("foobar", "test") }
}
@Bean
fun runner2(template: KafkaTemplate<String, String>): ApplicationRunner {
return ApplicationRunner { template.send("foobar", "failMessage") }
}
}
- Consumer Config 구성하기
이번에는 다음과 같이 kafka consumer설정을 진행한다.
먼저 application.properties에 kafka 서버 호스트와 consumer offset을 지정해준다. 여기서 auto-offset-reset은 latest, earliest, none 옵션이 존재하는데 latest는 default로 가장 마지막 offset부터, earliest는 가장 처음 offset부터, none은 해당 consumer group이 가져가고자 하는 topic의 consuer offset정보가 없으면 exception을 발생시킨다.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
그 다음 consumer config를 설정한다. 여기서 값 Deserializer로 StringDeserializer를 이용하도록 설정하였다.
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.*
@Configuration
class KafkaConsumerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
lateinit var hosts: String
@Primary
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
containerFactory.consumerFactory = consumerFactory()
return containerFactory
}
private fun consumerFactory(): ConsumerFactory<in String, in String> {
return DefaultKafkaConsumerFactory(consumerProperties())
}
private fun consumerProperties(): Map<String, Any> {
val hostName: String = try {
InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
} catch (e: UnknownHostException) {
UUID.randomUUID().toString()
}
return hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
ConsumerConfig.GROUP_ID_CONFIG to hostName,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
}
}
- Consumer(Listener) 예시
다음 과 같이 @kafkaListener를 이용하여 특정 토픽을 구독할 수 있다.
@Component
class SampleTopicListener {
val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
@KafkaListener(topics = ["foobar"])
fun consume(@Payload data: String) {
log.info("Message $data")
}
}
- 앱 실행해보기
실제로 앱을 실행해보면 다음과 같이 설정한 대로 토픽이 추가로 생성이 되고, 데이터가 주고 받아지는 것을 확인할 수 있다.
- 토픽 생성 확인하기
$ kafka-topics --list --bootstrap-server localhost:9092
- Consumer 데이터 수신 확인
- 앱에서 로그가 찍히는 것을 확인
yml을 이용해서 메세지 produce, consume 구현하기
위의 간단한 메세지 보내는 기능을 다음과 같이 yml을 이용해 별도의 config파일없이 설정할 수 있다.
먼저 application.yml을 작성해준다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
그다음 SampleMessageProducer를 만들어준다. 빈으로 등록하여 앱이 실행될 때 메세지를 보내도록 설정하였다.
여기서 ListenableFuture를 사용하여 메세지를 보내고 난 후의 결과를 가지고 후처리를 할 수 있다. 여기서는 메세지가 성공적으로 보내졌을때와 실패했을 때 로그를 남기도록 설정하였다.
@Component
class SampleMessageProducer() {
val log = LoggerFactory.getLogger(SampleMessageProducer::class.java)
@Bean
fun sendMessage(template: KafkaTemplate<String, String>): String {
val message: String = "testMessage"
val future: ListenableFuture<SendResult<String, String>> = template.send("foo", message)
future.addCallback(object : ListenableFutureCallback<SendResult<String, String>> {
override fun onSuccess(result: SendResult<String, String>?) {
log.info("Sent message=[$message] with offset=[" + result!!.recordMetadata.offset() + "]")
}
override fun onFailure(ex: Throwable) {
log.info("Unable to send message=[$message] due to : + ${ex.message}")
}
})
return ""
}
}
이번엔 메세지를 소비하는 consumer를 작성해준다.
@Component
class SampleMessageConsumer {
val log = LoggerFactory.getLogger(SampleMessageConsumer::class.java)
@KafkaListener(topics = ["foo"], groupId = "foo")
fun listenGroupFoobar(message: String) {
log.info("Received Message in group foo: $message")
}
}
앱을 실행하면 다음과 같이 확인할 수 있다.
먼저 producer에서 메세지를 잘 보낸 것을 확인할 수 있다.
메세지를 확인해보면 잘 들어오는 것을 확인할 수 있다.
메세지 필터링하여 consume하기
원래대로라면 test와 failMessage 모두 consume되는 것을 확인할 수 있다. 이번엔 메세지를 필터링해서 구독하도록 설정해보자.
ConsumerConfig에 다음과 같이 filter를 추가해준다. RecordFilterStrategy를 이용하면 해당 내용을 만족하는 부분은 listener에게 닿기전에 버려진다.
@Configuration
class KafkaConsumerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
lateinit var hosts: String
// 특정 메세지내용을 제외한 메세지 듣기
@Bean
fun filterKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
containerFactory.consumerFactory = consumerFactory()
containerFactory.setRecordFilterStrategy { it.value().contains("fail") }
return containerFactory
}
private fun consumerFactory(): ConsumerFactory<in String, in String> {
return DefaultKafkaConsumerFactory(consumerProperties())
}
private fun consumerProperties(): Map<String, Any> {
val hostName: String = try {
InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
} catch (e: UnknownHostException) {
UUID.randomUUID().toString()
}
return hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
ConsumerConfig.GROUP_ID_CONFIG to hostName,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
}
}
consumer에서 다음과 같이 container를 사용하도록 지정하여 필터링을 적용한다.
@KafkaListener(topics = ["foobar"], containerFactory = "filterKafkaListenerContainerFactory")
fun consume(user: User) {
log.info("Message ${user.name} + ${user.age}")
}
failMessage는 버려지고 test만 출력되는 것을 확인할 수 있다.
메세지 consume하고 답장 보내기
이제 replyTemplate을 적용하여 메세지를 consume 하고 topic에 메세지를 보내보자.
@Configuration
class KafkaConsumerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
lateinit var hosts: String
// 메세지 다시 Reply 하는 listener 구현
@Bean
fun kafkaListenerContainerFactory(kafkaTemplate: KafkaTemplate<String, Any>): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory() as ConsumerFactory<in String, in Any>
factory.setReplyTemplate(kafkaTemplate)
return factory
}
private fun consumerFactory(): ConsumerFactory<in String, in String> {
return DefaultKafkaConsumerFactory(consumerProperties())
}
private fun consumerProperties(): Map<String, Any> {
val hostName: String = try {
InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
} catch (e: UnknownHostException) {
UUID.randomUUID().toString()
}
return hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
ConsumerConfig.GROUP_ID_CONFIG to hostName,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
}
}
Listener에 다음과 같이 @SendTo("topic")을 이용하여 토픽에 메세지를 보내도록 구현해보자.
@Component
class SampleTopicListener {
val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
// 메세지 받고 답장 보내기
@KafkaListener(topics = ["foobar"])
@SendTo("foobar")
fun consume2(@Payload data: String): String {
log.info("Message $data")
return "Listener got this message ${data}"
}
}
현재 토픽을 consume하고 나서 바로 동일한 토픽에 다시 메세지를 보내고, 다시 이를 구독하는 과정이 반복되어 다음과 같이 출력되는 것을 확인할 수 있다.
Config파일을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
이번엔 위와 같이 string 타입이 아니라 데이터를 json 타입으로 주고 받을 수 있도록 구현해보자.
여기서 사용된 User 객체는 다음과 같다.
data class User(
val name: String,
val age: Int
) {
constructor() : this("", 0)
}
먼저 ProducerConfig를 다음과 같이 JsonSerializer를 사용하도록 설정해준다.
package com.example.kafkatest
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer
@Configuration
class JsonKafkaProducerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
lateinit var hosts: String
@Bean
fun userProducerFactory(): ProducerFactory<String, User> {
val configs: MutableMap<String, Any> = HashMap()
configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = hosts
configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
return DefaultKafkaProducerFactory(configs)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, User> {
return KafkaTemplate(userProducerFactory())
}
그 다음 ConsumerConfig에서 JsonDeserializer를 사용하도록 설정을 해준다. 여기서 cannot trust class 오류가 발생하여 JsonDeserializer.TRUSTED_PACKAGES to "*" 옵션을 설정해주었다.
package com.example.kafkatest
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.*
@Configuration
class JsonKafkaConsumerConfig {
@Value("\${spring.kafka.bootstrap-servers}")
lateinit var hosts: String
@Bean
fun userKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, User> {
val factory: ConcurrentKafkaListenerContainerFactory<String, User> =
ConcurrentKafkaListenerContainerFactory<String, User>()
factory.consumerFactory = userConsumerFactory()
return factory
}
@Bean
fun userConsumerFactory(): ConsumerFactory<String, User> {
return DefaultKafkaConsumerFactory(userConsumerProperties())
}
private fun userConsumerProperties(): Map<String, Any> {
val hostName: String = try {
InetAddress.getLocalHost().hostName + UUID.randomUUID().toString()
} catch (e: UnknownHostException) {
UUID.randomUUID().toString()
}
return hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to hosts,
ConsumerConfig.GROUP_ID_CONFIG to hostName,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "*"
)
}
}
이제 SampleProducer를 이용해서 메세지를 생성해준다.
package com.example.kafkatest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.ApplicationRunner
import org.springframework.context.annotation.Bean
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class JsonSampleProducer(
@Autowired kafkaTemplate: KafkaTemplate<String, User>
) {
@Bean
fun sendUserMessage(template: KafkaTemplate<String, User>): ApplicationRunner {
val user = User("testUser", 30)
return ApplicationRunner { template.send("foobar", user) }
}
}
생성된 메세지를 읽는 샘플 Consumer를 작성해준다.
package com.example.kafkatest
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
@Component
class JsonSampleConsumer {
val log = LoggerFactory.getLogger(SampleTopicListener::class.java)
@KafkaListener(topics = ["foobar"], containerFactory = "userKafkaListenerContainerFactory")
fun consume(user: User) {
log.info("Message ${user.name} + ${user.age}")
}
}
다음과 같이 데이터 consume을 json타입으로 하는 것을 확인할 수 있다.
yml을 이용하여 JsonSerialize-Deseiralize하여 producer, consumer 구현하기
위의 설정을 간단하게 yml을 통해서 지정해줄 수 있다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: "*"
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
위의 설정을 이용하는 producer를 작성해준다.
package com.example.kafkatest.ymltest
import com.example.kafkatest.User
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
import org.springframework.lang.Nullable
import org.springframework.stereotype.Component
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.ListenableFutureCallback
@Component
class SampleJsonProducer {
val log = LoggerFactory.getLogger(SampleJsonProducer::class.java)
@Bean
fun sendMessage(template: KafkaTemplate<String, User>): String {
val user = User("testUser", 30)
val future: ListenableFuture<SendResult<String, User>> = template.send("foo", user)
future.addCallback(object : ListenableFutureCallback<SendResult<String, User>> {
override fun onSuccess(result: SendResult<String, User>?) {
log.info("Sent message=[$user] with offset=[" + result!!.recordMetadata.offset() + "]")
}
override fun onFailure(ex: Throwable) {
log.info("Unable to send message=[$user] due to : + ${ex.message}")
}
})
return ""
}
}
이를 읽는 Consumer를 작성해준다.
package com.example.kafkatest.ymltest
import com.example.kafkatest.User
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class SampleJsonConsumer {
val log = LoggerFactory.getLogger(SampleJsonConsumer::class.java)
@KafkaListener(topics = ["foo"], groupId = "foo")
fun listenGroupFoobar(user: User) {
log.info("Received Message in group foo: $user")
}
}
앱 실행을 확인해보자.
application.yml 파일을 이용해서 config 설정해주기
위에서 복잡하게 설정해주었던 내용들을 다음을 이용해서 간편하게 설정해줄 수도 있다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foobar
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
옵션의 자세한 내용은 다음과 같다.
참고한 글
↓
https://blog.voidmainvoid.net/169
Spring boot에서 kafka 사용시 application.yaml 설정
# APACHE KAFKA (KafkaProperties) spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.
blog.voidmainvoid.net
# APACHE KAFKA (KafkaProperties)
spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup.
spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.key-store-location= # Location of the key store file.
spring.kafka.admin.ssl.key-store-password= # Store password for the key store file.
spring.kafka.admin.ssl.key-store-type= # Type of the key store.
spring.kafka.admin.ssl.protocol= # SSL protocol to use.
spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size".
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.key-store-location= # Location of the key store file.
spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.consumer.ssl.key-store-type= # Type of the key store.
spring.kafka.consumer.ssl.protocol= # SSL protocol to use.
spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration.
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
spring.kafka.jaas.options= # Additional JAAS options.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Default batch size.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers.
spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.key-store-location= # Location of the key store file.
spring.kafka.producer.ssl.key-store-password= # Store password for the key store file.
spring.kafka.producer.ssl.key-store-type= # Type of the key store.
spring.kafka.producer.ssl.protocol= # SSL protocol to use.
spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.producer.ssl.trust-store-type= # Type of the trust store.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.key-store-location= # Location of the key store file.
spring.kafka.ssl.key-store-password= # Store password for the key store file.
spring.kafka.ssl.key-store-type= # Type of the key store.
spring.kafka.ssl.protocol= # SSL protocol to use.
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name.
spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams.
spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads.
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.
spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application.
spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.streams.ssl.key-store-location= # Location of the key store file.
spring.kafka.streams.ssl.key-store-password= # Store password for the key store file.
spring.kafka.streams.ssl.key-store-type= # Type of the key store.
spring.kafka.streams.ssl.protocol= # SSL protocol to use.
spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.streams.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.state-dir= # Directory location for the state store.
spring.kafka.template.default-topic= # Default topic to which messages are sent.
완성 ✨
Spring Kafka + Reactor Kafka
추가적으로 스프링 카프카와 함께 리액터 카프카를 사용하여 리액티브하게 사용하고 싶다면, 다음과 같이 설정할 수 있다.
↓
https://github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer
GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implementation
Example reactive kafka consumer and producer implementation on spring boot - GitHub - Kevded/example-reactive-spring-kafka-consumer-and-producer: Example reactive kafka consumer and producer implem...
github.com
1) dependency 추가
//kafka dependency 추가
implementation("org.springframework.kafka:spring-kafka")
implementation("io.projectreactor.kafka:reactor-kafka")
2) application.yml 작성
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: "*"
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
3) ProducerConfig 작성
package com.example.kafkatest.reactive
import com.example.kafkatest.User
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import reactor.kafka.sender.SenderOptions
@Configuration
class ReactiveKafkaProducerConfig {
@Bean
fun reactiveKafkaProducerTemplate(properties: KafkaProperties): ReactiveKafkaProducerTemplate<String, User> {
val props = properties.buildProducerProperties()
return ReactiveKafkaProducerTemplate(SenderOptions.create<String, User>(props))
}
}
4) ConsumerConfig 작성
package com.example.kafkatest.reactive
import com.example.kafkatest.User
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import reactor.kafka.receiver.ReceiverOptions
@Configuration
class ReactiveKafkaConsumerConfig {
@Bean
fun kafkaReceiverOptions(
kafkaProperties: KafkaProperties
): ReceiverOptions<String, User>? {
val basicReceiverOptions: ReceiverOptions<String, User> = ReceiverOptions.create<String, User>(kafkaProperties.buildConsumerProperties())
return basicReceiverOptions.subscription(listOf("foo"))
}
@Bean
fun reactiveKafkaConsumerTemplate(kafkaReceiverOptions: ReceiverOptions<String, User>): ReactiveKafkaConsumerTemplate<String, User> {
return ReactiveKafkaConsumerTemplate<String, User>(kafkaReceiverOptions)
}
}
5) Producer 작성
package com.example.kafkatest.reactive
import com.example.kafkatest.User
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate
import org.springframework.stereotype.Service
@Service
class ReactiveKafkaProducer(
val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, User>
){
private val log = LoggerFactory.getLogger(ReactiveKafkaProducer::class.java)
fun send(user: User) {
log.info("send to topic={}, {}={},", "foo", User::class.java.getSimpleName(), user)
reactiveKafkaProducerTemplate.send("foo", user)
.doOnSuccess { senderResult ->
log.info(
"sent {} offset : {}",
user,
senderResult.recordMetadata().offset()
)
}
.subscribe()
}
}
6) Consumer 작성
package com.example.kafkatest.reactive
import com.example.kafkatest.User
import org.slf4j.LoggerFactory
import org.springframework.boot.CommandLineRunner
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
@Service
class ReactiveKafkaConsumer(
val reactiveKafkaConsumerTemplate: ReactiveKafkaConsumerTemplate<String, User>
) : CommandLineRunner {
private val log = LoggerFactory.getLogger(ReactiveKafkaConsumer::class.java)
private fun consumeUser(): Flux<User> {
return reactiveKafkaConsumerTemplate
.receiveAutoAck()
.doOnNext { consumerRecord ->
log.info(
"received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset()
)
}
.map { it.value() }
.doOnNext { user ->
log.info(
"successfully consumed {}={}",
User::class.java.getSimpleName(), user
)
}
.doOnError { throwable ->
log.error(
"something bad happened while consuming : {}",
throwable.message
)
}
}
override fun run(vararg args: String) {
// we have to trigger consumption
consumeUser().subscribe()
}
}
7) 테스트용 Controller 작성
package com.example.kafkatest.reactive
import com.example.kafkatest.User
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
@RestController
class ReactiveKafkaController(
val producer: ReactiveKafkaProducer
) {
@GetMapping
fun produce():String {
producer.send(User("test", 1234))
return "hello world"
}
}
프로젝트를 실행해보면 다음과 같이 잘 되는 것을 확인할 수 있다.
완성 ✨
(참고한 블로그)
Installing and running Apache Kafka on MacOS with M1 Processor
This article assumes that you have already setup homebrew on your MacOS M1 system. If not refer to this stackoverflow link to set it up or…
medium.com
https://reflectoring.io/spring-boot-kafka/ 👍
Using Kafka with Spring Boot
How to use Spring Kafka to send messages to and receive messages from Kafka.
reflectoring.io
https://www.baeldung.com/spring-kafka 👍
https://docs.spring.io/spring-kafka/reference/html/#configuring-topics 👍
Spring for Apache Kafka
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka
docs.spring.io
https://pinggoopark.tistory.com/6
[kafka] 카프카 명령어 모음 Kafka Command Collection
주키퍼 실행 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 카프카 실행 ./bin/kafka-server-start.sh -daemon config/server.properties 토픽 생성 ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-
pinggoopark.tistory.com
https://www.skyer9.pe.kr/wordpress/?p=1550
Spring Boot Kafka Json 데이타 전송하기 – 상구리의 기술 블로그
Spring Boot Kafka Json 데이타 전송하기 application.yml 설정 spring: kafka: bootstrap-servers: 192.168.0.9:9092 데이타 class 생성 @Getter @Setter @NoArgsConstructor public class StockChange { String yyyymmdd; String skuCd; String fieldName; in
www.skyer9.pe.kr
https://blog.ippon.tech/kafka-tutorial-3-json-serdes/
Kafka tutorial #3 - JSON SerDes
This is the third post in this series where we go through the basics of using Kafka. We saw in the previous posts how to produce and consume JSON messages using the plain Java client and Jackson. We will see here how to create our own serializers and deser
blog.ippon.tech
https://azurealstn.tistory.com/74
cannot deserialize from object value 에러!
cannot deserialize from object value (no delegate- or property-based creator) 위와 같은 에러가 발생했다면 아래글을 한번 확인해보자. Member Class 먼저 데이터를 담을 모델을 정의하였습니다. //Getter, Setter는 생략 p
azurealstn.tistory.com
https://asuraiv.tistory.com/11
Kotlin + SpringBoot + Kafka 연동
이번 포스트에서는 코틀린 기반의 스프링 부트 프로젝트 설정 및 카프카 연동의 과정을 정리 하려고 한다. 내가 생각하는 코틀린의 큰 강점 중 하나는, 자바로 개발을 해오던 방식을 거의 그대
asuraiv.tistory.com