Spring Kafka Test

Spring Kafka 를 사용할 때, 외부 카프카 서버에 의존하지 않고 테스트하는 방법을 정리한다.

Setup

spring-kafka 를 의존성으로 추가하자. 그러면, spring-kafka-test 도 같이 추가된다.
spring-kafka-test 에는 테스트를 돕는 다양한 util 이 존재한다.

이제 간단한 Producer 를 추가하자.
KafkaTemplate 를 이용해서, 특정 토픽에 메세지를 전송한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class Producer(
private val kafkaTemplate: KafkaTemplate<String, String>
) {

private val logger = LoggerFactory.getLogger(this.javaClass.name)

fun produce(topic: String, message: String) {
logger.info("Produced, (message: $message), (topic: $topic)")

kafkaTemplate.send(topic, message)
}
}

Consumer 를 추가하자. “jko-topic” 을 listen 하는 consumer 이다.
CountDownLatch 는 테스트 할 때, producer 가 send 한 record 를 consumer thread 가 consume 을 완료할 때 까지 대기하기 위해 사용된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch

@Component
class Consumer(
private val countDownLatch: CountDownLatch = CountDownLatch(1)
) {

private val logger = LoggerFactory.getLogger(this.javaClass.name)
private var message: String? = null

@KafkaListener(topics = ["jko-topic"])
fun consume(consumerRecord: ConsumerRecord<String, String>) {
logger.info("Consumed, (record: $consumerRecord)")

message = consumerRecord.value()
countDownLatch.countDown()
}

fun await() = countDownLatch.await()

fun equalsConsumedMessageWith(message: String) = this.message == message
}

Test

테스트를 작성할 때의 핵심은, 외부 카프카 서버에 의존하지 않고 테스트하는 것이다.
이를 위해, spring-kafka-test 에서 지원하는 @EmbeddedKafka 를 사용한다.
@EmbeddedKafka 는, Spring for Apache Kafka 기반 테스트를 실행하는 테스트 클래스에 지정할 수 있는 어노테이션이다.
테스트를 실행할 때, in-memory kafka instance 를 사용하게 된다.

Read more

Spring Kafka Listener Container BATCH AckMode

Spring Kafka Listener Container 의 AckMode 가 BATCH 일 때,
어떻게 records 를 consume 하고 commit 하는지 정리한다.

Offset

commit 을 정확히 이해하기 위해, offset 의 개념부터 정리하자.
Kafka 는 partition 의 각각의 record 에 대해 numerical offset 을 가지고 있다.
offset 은 두 가지 역할을 한다.

  1. partition 의 record 에 대한 unique identifier
  2. partition 에 대한 consumer 의 위치

그래서, commit 된 position 은 마지막 offset 을 가리킨다.

AckMode

Spring Kafka 에는 ContainerProperties 로 AckMode 일곱 가지가 있다.
offset commit 을 어떻게 할지에 대한 설정이다.
일곱 가지 중에 BATCH AckMode 는 어떻게 동작하는지 알아보자.

우선, BATCH AckMode 는 문서에 따르면 다음과 같이 정의되어 있다.

Commit the offsets of all records returned by the previous poll after they all have been processed by the listener.

번역하면 다음과 같다.
“이전 poll 에 의해 반환된 모든 레코드가” 리스너에 의해 모두 처리가 끝난 뒤에, offset 을 커밋한다.

Read more

CMAK

CMAK 를 어떤 목적으로 사용하는지 정리하고, 설치하고 실행해본다.

CMAK

CMAK (Cluster Manager for Apache Kafka) 는, 야후에서 만든 Kafka Cluster 를 managing 하기 위한 tool 이다. CMAK 를 이용해서 다음과 같은 것들을 할 수 있다.

  1. Create, Delete topic
  2. Add partitions to existing topic
  3. Update config for existing topic
  4. Inspection of cluster state

Requirements

Kafka package 설치하고, Zookeeper 를 아래와 같이 start 한다.

1
bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka 를 아래와 같이 start 한다.

1
bin/kafka-server-start.sh config/server01.properties
Read more

[카프카] 8장_카프카 스트림즈 API

카프카 스트림즈 API 를 통해 스트림을 처리하는 방법을 정리한다.

기본 개념

스트림 프로세싱과 배치 프로세싱

오늘날 데이터 분석 시스템은 스트림 처리 시스템과 배치 처리 시스템을 모두 갖추어서 실시간과 정확성을 보장한다.

  1. 스트림 프로세싱
    데이터들이 지속적으로 유입되고 나가는 과정에서 데이터에 대한 분석이나 질의를 수행하는 것이다.
    데이터가 분석 시스템이나 프로그램에 도달하자마자 처리를 해서 실시간 분석이라고도 한다.

  2. 배치 처리
    이미 저장된 데이터를 기반으로 분석이나 질의를 수행하고 특정 시간에 처리하는 것이다.

스티름 프로세싱의 장점은,

  1. 이벤트 발생, 분석, 조치에 지연시간이 없기 때문에, 최신의 데이터를 반영한다.
  2. 데이터 저장 후 분석을 하지 않으므로, 정적 분석보다 더 많은 데이터를 분석할 수 있다.
  3. 시간에 따라 지속적으로 유입되는 데이터 분석에 최적화되어 있다.
  4. 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있어 인프라에 독립적으로 수행될 수 있다.

상태 기반 스트림 처리, 무상태 스트림 처리

  1. 상태 기반 스트림 처리
    이전 스트림을 처리한 결과를 참조하는 방식의 처리이다.
    애플리케이션에서 각각의 이벤트를 처리하고 결과를 저장할 상태 저장소가 필요하다.

  2. 무상태 스트림 처리
    이전 스트림의 처리 결과와 관계 없이, 현재 애플리케이션에 도달한 스트림만을 기준으로 처리한다.

카프카 스트림즈의 특징과 개념

Read more

[카프카] 5장_카프카 컨슈머

컨슈머란, 프로듀서가 메세지를 생산해서 카프카의 토픽으로 보내면 그 토픽의 메세지를 가져와 소비하는 애플리케이션, 서버이다.
주요 기능은, 특정 파티션을 관리하고 있는 파티션 리더에게 메세지를 가져오기 요청을 하는 것이다.

파티션과 메세지 순서

파티션 3 개로 구성한 토픽과 메세지 순서

프로듀서 다음 순서로 보낸다.

1
2
3
4
5
a
b
c
d
e

컨슈머에서 –from-beginning 옵션으로 받는다.
메세지의 순서가 프로듀서가 보낸 순서가 아니다.

1
2
3
4
5
a
d
b
e
c

다음으로, 프로듀서서가 숫자를 보낸다.

1
2
3
4
5
1
2
3
4
5

컨슈머에서 –from-beginning 옵션으로 받는다.

Read more

[카프카] 4장_카프카 프로듀서

프로듀서란, 메세지를 생산해서 카프카의 토픽으로 보내는 역할을 하는 애플리케이션 or 서버이다.
주요 기능은, 각각의 메세지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것이다.
프로듀서 옵션 중에 acks 옵션 설정에 따라
카프카로 메세지를 전송할 때, 메세지 손실 여부와 메세지 전송 속도 및 처리량이 달라진다.

  1. 메세지 보내고 확인하지 않기
    메세지 손실 가능성이 있다.

  2. 동기 전송
    카프카의 응답을 기다린다.

  3. 비동기 전송
    응답을 기다리지 않기 때문에 빠른 전송이 가능하다.

메세지 손실 가능성 높음 && 빠른 전송 속도

acks = 0
카프카 서버의 응답을 기다리지 않고 메세지 보낼 준비가 되면 즉시 다음 요청을 보낸다.

메세지 손실 가능성 낮음 && 적당한 전송 속도

asks = 1
프로듀서는 메세지를 보내고 Leader 는 잘 받았으면, 바로 ack 를 한다.
팔로워들은 주기적으로 리더를 확인하고 새로운 메세지가 확인되면 팔로워들에도 저장한다.
메세지 손실이 발생하는 경우는 리더에 장애가 발생하는 경우이다.
즉, 프로듀서가 리더에게 메세지를 보내고 리더는 메시지를 저장한 후에 바로 장애가 발생하는 경우이다.

메세지 손실 없음 && 느린 전송 속도

acks = all
메세지를 보내고 잘 받았는지 확인하고 추가적으로 팔로워들까지 메세지를 잘 받았는지 확인한다.
이 때, 프로듀서 설정 뿐만 아니라 브로커 설정도 같이 해줘야한다.

프로듀서 acks = all, 브로커 min.insync.replicas = 1

Read more

[카프카] 3장_카프카 디자인

카프카 디자인의 특징

분산 시스템

분산 시스템이란, 같은 역할을 하는 여러 대의 서버로 이뤄진 서버 그룹이다.
장점은,

  1. 단일 시스템 보다 높은 성능
  2. 하나의 서버가 장애가 발생해도 다른 서버가 대신 처리
  3. 시스템 확장 용이

페이지 캐시

OS 는 물리적 메모리에 애플리케이션이 사용하는 부분을 할당하고, 남은 잔여 메모리 일부를 페이지 케시로 이용한다.
즉, 디스크에 읽고 쓰기를 하지 않고 페이지 케시를 읽고 쓰는 방식으로 처리 속도가 빠르고 전체적인 성능을 향상 시킨다.

배치 전송 처리

서버와 클라이언트 사이, 또는 서버 내부적으로 데이터를 주고 받는 과정에서 I/O 가 발생한다.
작은 I/O 가 빈번하게 발생하는 것을 막기 위해, 작은 I/O 들을 묶어러 처리할 수 있도록 배치 작업으로 처리한다.

카프카 데이터 모델

토픽

Read more

[카프카] 1장_카프카란 무엇인가

카프카 : 대용량, 대규모 메세지 데이터를 빠르게 처리하도록 개발된 메시징 플랫폼

탄생 배경

end-to-end 아키텍쳐의 문제점은,

  1. 통합된 전송 영역이 없어서 복잡도 증가하고
  2. 데이터 파이프라인의 관리가 어렵다.

하지만 카프카는 아래 그림 처럼,

  1. 모든 시스템으로 데이터 전송이 가능하고
  2. 실시간 처리 가능하고
  3. 확장이 용이하다.

동작 방식과 원리

메시징 시스템을 먼저 살펴보자.
중앙에 메시징 시스템 서버를 두고 메시지를 Publish 하고 Subscribe 하는 형태의 통신을 Pub/Sub 모델이라고 한다.

Read more