Kotest Kafka (EmbeddedKafkaListener) 확장
임베디드 Kafka 확장
Kotest는 임베디드 Kafka 인스턴스를 실행하는 확장 기능을 제공한다. 이는 Kafka 도커 이미지 사용이 문제가 되는 상황에서 유용하게 사용될 수 있다.
일반적으로 Apache Kafka를 사용하여 메시지 큐 기반의 시스템을 개발하거나 테스트할 때, 실제 Kafka 서버를 설치하고 설정하는 과정이 번거롭고 시간이 많이 소요될 수 있다. EmbeddedKafkaListener는 이러한 문제를 해결하기 위해 테스트 환경에서 Apache Kafka를 내장 형태로 사용할 수 있게 해준다.
의존성 추가
임베디드 Kafka 확장 기능을 사용하려면 테스트 컴파일 경로에 io.kotest.extensions:kotest-extensions-embedded-kafka
모듈을 의존성에 추가해야 한다. 최신 버전은 maven central에서 확인할 수 있다.
testImplementation("io.kotest.extensions:kotest-extensions-embedded-kafka:<version>")
Configuration
테스트 클래스에 embeddedKafkaListener 리스너를 등록한다:
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
})
또는
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
class EmbeddedKafkaListenerTest : FunSpec() {
init {
listener(embeddedKafkaListener)
}
}
그리고 브로커는 스펙이 생성되면 시작되고 사양이 완료되면 중지된다.
EmbeddedKafkaListener
를 사용하면 테스트가 실행되는 동안 내장된 Kafka 서버를 시작하고 중지된다. 이를 통해 테스트를 실행하는 동안 실제 Kafka 서버에 의존하지 않고도 Kafka 기반 애플리케이션을 테스트할 수 있다.
NOTE: 기본 임베디드 kafka 라이브러리는 state에 전역 객체를 사용한다. 여러 개의 kafka 인스턴스를 동시에 시작하지 말아야 한다.
Consumer / Producer
소비자와 생산자를 생성하기 위해 리스너에서 편리한 방법을 사용할 수 있다:
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
test("send / receive") {
val producer = embeddedKafkaListener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()
val consumer = embeddedKafkaListener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
}
consumer.close()
}
})
stringStringProducer
와 stringStringConsumer
메서드는 키와 값에 대한 문자열을 허용하는 생산자/소비자를 반환한다. 바이트 쌍에도 비슷한 메서드가 있다.
또는, Kafka 인스턴스가 배포된 호스트/포트에 액세스하여 직접 클라이언트를 생성할 수도 있다:
class EmbeddedKafkaListenerTest : FunSpec({
listener(embeddedKafkaListener)
val props = Properties().apply {
put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
}
val producer = KafkaProducer<String, String>(props)
})
사용자 포트 지정
포트를 지정하는 리스너의 새 인스턴스를 만든 다음 기본 인스턴스 대신에 해당 인스턴스를 사용할 수 있다.
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.EmbeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds
class EmbeddedKafkaCustomPortTest : FunSpec({
val listener = EmbeddedKafkaListener(5678)
listener(listener)
test("send / receive") {
val producer = listener.stringStringProducer()
producer.send(ProducerRecord("foo", "a"))
producer.close()
val consumer = listener.stringStringConsumer("foo")
eventually(10.seconds) {
consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
}
consumer.close()
}
})
그리고 zookeeper 포트도 추가로 지정할 도 있다.
val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)