Spring Kafka 를 사용할 때, 외부 카프카 서버에 의존하지 않고 테스트하는 방법을 정리한다.
Setup spring-kafka 를 의존성으로 추가하자. 그러면, spring-kafka-test 도 같이 추가된다. spring-kafka-test 에는 테스트를 돕는 다양한 util 이 존재한다.
이제 간단한 Producer 를 추가하자. KafkaTemplate 를 이용해서, 특정 토픽에 메세지를 전송한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import org.slf4j.LoggerFactoryimport org.springframework.kafka.core.KafkaTemplateimport org.springframework.stereotype.Component@Component class Producer ( private val kafkaTemplate: KafkaTemplate<String, String> ) { private val logger = LoggerFactory.getLogger(this .javaClass.name) fun produce (topic: String , message: String ) { logger.info("Produced, (message: $message ), (topic: $topic )" ) kafkaTemplate.send(topic, message) } }
Consumer 를 추가하자. “jko-topic” 을 listen 하는 consumer 이다. CountDownLatch 는 테스트 할 때, producer 가 send 한 record 를 consumer thread 가 consume 을 완료할 때 까지 대기하기 위해 사용된다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.kafka.clients.consumer.ConsumerRecordimport org.slf4j.LoggerFactoryimport org.springframework.kafka.annotation .KafkaListenerimport org.springframework.stereotype.Componentimport java.util.concurrent.CountDownLatch@Component class Consumer ( private val countDownLatch: CountDownLatch = CountDownLatch(1 ) ) { private val logger = LoggerFactory.getLogger(this .javaClass.name) private var message: String? = null @KafkaListener(topics = ["jko-topic" ]) fun consume (consumerRecord: ConsumerRecord <String , String>) { logger.info("Consumed, (record: $consumerRecord )" ) message = consumerRecord.value() countDownLatch.countDown() } fun await () = countDownLatch.await() fun equalsConsumedMessageWith (message: String ) = this .message == message }
Test 테스트를 작성할 때의 핵심은, 외부 카프카 서버에 의존하지 않고 테스트하는 것이다. 이를 위해, spring-kafka-test 에서 지원하는 @EmbeddedKafka 를 사용한다. @EmbeddedKafka 는, Spring for Apache Kafka 기반 테스트를 실행하는 테스트 클래스에 지정할 수 있는 어노테이션이다. 테스트를 실행할 때, in-memory kafka instance 를 사용하게 된다.