Kotest Kafka (EmbeddedKafkaListener) 확장

Kotest의 EmbeddedKafkaListener는 Apache Kafka를 내장된 형태로 테스트 환경에서 사용할 수 있게 해주는 확장 기능이다.

임베디드 Kafka 확장

Kotest는 임베디드 Kafka 인스턴스를 실행하는 확장 기능을 제공한다. 이는 Kafka 도커 이미지 사용이 문제가 되는 상황에서 유용하게 사용될 수 있다.

일반적으로 Apache Kafka를 사용하여 메시지 큐 기반의 시스템을 개발하거나 테스트할 때, 실제 Kafka 서버를 설치하고 설정하는 과정이 번거롭고 시간이 많이 소요될 수 있다. EmbeddedKafkaListener는 이러한 문제를 해결하기 위해 테스트 환경에서 Apache Kafka를 내장 형태로 사용할 수 있게 해준다.

의존성 추가

임베디드 Kafka 확장 기능을 사용하려면 테스트 컴파일 경로에 io.kotest.extensions:kotest-extensions-embedded-kafka 모듈을 의존성에 추가해야 한다. 최신 버전은 maven central에서 확인할 수 있다.

testImplementation("io.kotest.extensions:kotest-extensions-embedded-kafka:<version>")

Configuration

테스트 클래스에 embeddedKafkaListener 리스너를 등록한다:

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec({
    listener(embeddedKafkaListener)
})

또는

import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener

class EmbeddedKafkaListenerTest : FunSpec() {
    init {
        listener(embeddedKafkaListener)
    }
}

그리고 브로커는 스펙이 생성되면 시작되고 사양이 완료되면 중지된다.

EmbeddedKafkaListener를 사용하면 테스트가 실행되는 동안 내장된 Kafka 서버를 시작하고 중지된다. 이를 통해 테스트를 실행하는 동안 실제 Kafka 서버에 의존하지 않고도 Kafka 기반 애플리케이션을 테스트할 수 있다.

NOTE: 기본 임베디드 kafka 라이브러리는 state에 전역 객체를 사용한다. 여러 개의 kafka 인스턴스를 동시에 시작하지 말아야 한다.

Consumer / Producer

소비자와 생산자를 생성하기 위해 리스너에서 편리한 방법을 사용할 수 있다:

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.embeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaListenerTest : FunSpec({

    listener(embeddedKafkaListener)

    test("send / receive") {

        val producer = embeddedKafkaListener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = embeddedKafkaListener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

stringStringProducerstringStringConsumer 메서드는 키와 값에 대한 문자열을 허용하는 생산자/소비자를 반환한다. 바이트 쌍에도 비슷한 메서드가 있다.

또는, Kafka 인스턴스가 배포된 호스트/포트에 액세스하여 직접 클라이언트를 생성할 수도 있다:

class EmbeddedKafkaListenerTest : FunSpec({

  listener(embeddedKafkaListener)
   
  val props = Properties().apply {
      put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${embeddedKafkaListener.host}:${embeddedKafkaListener.port}")
  }
   
  val producer = KafkaProducer<String, String>(props)
   
})

사용자 포트 지정

포트를 지정하는 리스너의 새 인스턴스를 만든 다음 기본 인스턴스 대신에 해당 인스턴스를 사용할 수 있다.

import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.embedded.kafka.EmbeddedKafkaListener
import io.kotest.matchers.shouldBe
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds

class EmbeddedKafkaCustomPortTest : FunSpec({

    val listener = EmbeddedKafkaListener(5678)
    listener(listener)

    test("send / receive") {

        val producer = listener.stringStringProducer()
        producer.send(ProducerRecord("foo", "a"))
        producer.close()

        val consumer = listener.stringStringConsumer("foo")
        eventually(10.seconds) {
            consumer.poll(Duration.of(1, ChronoUnit.SECONDS)).first().value() shouldBe "a"
        }
        consumer.close()
    }
})

그리고 zookeeper 포트도 추가로 지정할 도 있다.

val listener = EmbeddedKafkaListener(kafkaPort = 6005, zookeeperPort = 9005)

참조




최종 수정 : 2024-04-14