Spring Kafka 를 사용할 때, 외부 카프카 서버에 의존하지 않고 테스트하는 방법을 정리한다. 
Setup spring-kafka 를 의존성으로 추가하자. 그러면, spring-kafka-test 도 같이 추가된다. spring-kafka-test 에는 테스트를 돕는 다양한 util 이 존재한다.
이제 간단한 Producer 를 추가하자. KafkaTemplate 를 이용해서, 특정 토픽에 메세지를 전송한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import  org.slf4j.LoggerFactoryimport  org.springframework.kafka.core.KafkaTemplateimport  org.springframework.stereotype.Component@Component class  Producer  (    private  val  kafkaTemplate: KafkaTemplate<String, String> ) {     private  val  logger = LoggerFactory.getLogger(this .javaClass.name)     fun  produce (topic: String , message: String )   {         logger.info("Produced, (message: $message ), (topic: $topic )" )         kafkaTemplate.send(topic, message)     } } 
 
Consumer 를 추가하자. “jko-topic” 을 listen 하는 consumer 이다. CountDownLatch 는 테스트 할 때, producer 가 send 한 record 를 consumer thread 가 consume 을 완료할 때 까지 대기하기 위해 사용된다. 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import  org.apache.kafka.clients.consumer.ConsumerRecordimport  org.slf4j.LoggerFactoryimport  org.springframework.kafka.annotation .KafkaListenerimport  org.springframework.stereotype.Componentimport  java.util.concurrent.CountDownLatch@Component class  Consumer  (    private  val  countDownLatch: CountDownLatch = CountDownLatch(1 ) ) {     private  val  logger = LoggerFactory.getLogger(this .javaClass.name)     private  var  message: String? = null      @KafkaListener(topics = ["jko-topic" ])      fun  consume (consumerRecord: ConsumerRecord <String , String>)   {         logger.info("Consumed, (record: $consumerRecord )" )         message = consumerRecord.value()         countDownLatch.countDown()     }     fun  await ()   = countDownLatch.await()     fun  equalsConsumedMessageWith (message: String )   = this .message == message } 
 
Test 테스트를 작성할 때의 핵심은, 외부 카프카 서버에 의존하지 않고 테스트하는 것이다. 이를 위해, spring-kafka-test 에서 지원하는 @EmbeddedKafka 를 사용한다. @EmbeddedKafka 는, Spring for Apache Kafka 기반 테스트를 실행하는 테스트 클래스에 지정할 수 있는 어노테이션이다. 테스트를 실행할 때, in-memory kafka instance 를 사용하게 된다.