본문 바로가기
Boaz/Real-time Data and Kafka

[카프카 핵심 가이드 #3] 4장 카프카 컨슈머: 카프카에서 데이터 읽기

by 남디윤 2024. 8. 22.

보아즈 멘멘 스터디 중 기록한 내용입니다.

 

 

 

 

목차

0. 장을 시작하며

1. 카프카 컨슈머: 개념

2. 카프카 컨슈머 생성하기

3. 토픽 구독하기

4. 폴링 루프

5. 컨슈머 설정하기

6. 오프셋과 커밋

7. 리밸런스 리스터

8. 특정 오프셋의 레코드 읽어오기

9. 폴링 루프를 벗어나는 방법

10. 디시리얼라이저

11. 독립 실행 컨슈머 standalone consumer

 

 

 

 

0. 장을 시작하며

  • KafkaConsumer
    • 데이터를 읽는 애플리케이션은 토픽을 구독하고 구독한 토픽들로부터 메세지를 받기 위해

 

 

 

1. 카프카 컨슈머: 개념

컨슈머와 컨슈머 그룹

  • 컨슈머 객체
    • 객체 생성하고, 토픽 구독, 메세지를 받아 검사하고 결과를 작성
    • 메세지가 빠르게 쓰이는 경우, 컨슈머가 하나뿐이라면, 메세지 속도 감당 불가
    • → 여러 개의 컨슈머가 같은 토픽으로부터 데이터를 분할해 읽어올 수 있도록
  • 컨슈머 그룹
    • 동일한 컨슈머 그룹에 속한 여러 개의 컨슈머들이 동일한 토픽 구독할 경우,
    • 각각의 컨슈머는 해당 토픽에서 서로 다른 파티션의 메세지를 받음
  • 컨슈머 그룹에 컨슈머를 추가하는 것 = 카프카 토픽에서 읽어오는 데이터 양을 확장하는 방법
  • 토픽에 설정된 파티션 수 이상으로 컨슈머 투입은 무의미

 

컨슈머 그룹과 파티션 리밸런스

  • 리밸런스
    • 컨슈머에 할당된 파티션을 다른 컨슈머에게 할당해주는 작업
    • 컨슈머 그룹에 높은 가용성과 규모 가변성을 제공하는 기능
    • 문제없이 작업이 수행될 때 진행되면 의미없는 기능
    • 파티션 할당 전략 2가지
  • 조급한 리밸런스
    • 모든 파티션 할당을 해제, 읽기 작업 정지, 모두가 다시 그룹 참여 후 파티션 재할당
  • 협력적 리밸런스 (점진적 리밸런스)
    • 한 컨슈머에게 할당되어 있던 파티션만 다른 컨슈머에게 재할당
    • 재할당되지 않은 파티션에서 레코드 처리 중인 컨슈머들은 계속 일 진행
  • 컨슈머는 해당 컨슈머 그룹의 그룹 코디네이터 역할을 지정받은 카프카 브로커에 하트비트 전송
    • 멤버십, 할당된 파티션에 대한 소유권 유지
    • 일정 시간 이상 하트비트 전송하지 않는 경우, 세션 타임아웃 발생
    • → 그룹 코디네이터: 해당 컨슈머 죽었다고 간주, 리밸런스 실행

 

정적 그룹 멤버십

  • 컨슈머가 갖는 컨슈머 그룹의 멤버로서의 자격(멤버십)은 일시적
  • 컨슈머에 group.instance.id 설정 시 컨슈머 그룹의 정적인 멤버됨
    • 첫 참여 → 기존 해당 그룹이 사용 중이던 파티션 할당 전략에 따라 파티션 할당
    • 꺼졌다가 다시 돌아올 경우, 멤버십 유지 중 → 리밸런스 필요X, 예전에 할당받았던 파티션 그대로 재할당
    • 같은 group.instance.id값을 갖는 두 개의 컨슈머가 같은 그룹에 조인할 경우, 에러 발생
  • 정적 그룹 멤버십 static group membership
    • 애플리케이션이 각 컨슈머에 할당된 파티션의 내용물을 사용해서 로컬 상태나 캐시를 유지해야 할 때 편리
  • 컨슈머 그룹의 정적 멤버는 종료할 때, 미리 컨슈머 그룹을 떠나지 않음
    • 정적 멤버 종료는 session.timeout.ms 설정에 따름

 

 

 

2. 카프카 컨슈머 생성하기

  • KafkaConsumer 인스턴스 생성
    • 프로듀서 인스턴스 생성과 유사
  • 필수 속성 3개
  • boostrap.servers: 카프카 클러스터로의 연결 문자열
  • key.deserializer, value.deserialier
    • 바이트 배열 → 자바 객체로 변환
  • group.id 속성: (필수x, 일반적으로 사용됨) 어떤 컨슈머 그룹에도 속하지 않는 컨슈머 생성 가능하기는 함. 일반적이진 않음

 

 

 

3. 토픽 구독하기

  • 컨슈머 생성 → 토픽 구독: subscribe() 메서드
    • 토픽 목록을 매개변수로 받음
    • 정규식 사용 가능

 

 

 

4. 폴링 루프

  • 컨슈머 API 핵심: 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프
    • 오랫동안 돌아가는 무한 루프. 루프 탈출하는 방법은 후에 다룸
    • 계속 폴링하지 않으면 죽은 것으로 간주됨.
    • 단순히 데이터를 가져오는 것보다 많은 일을 함
      • 처음 poll(), 그룹 코디네이터를 찾아 컨슈머 그룹에 참가, 파티션 할당받음. 그 이후 리밸런스 처리도 함

 

스레드 안전성

  • 하나의 스레드에서 동일한 그룹 내 여러 개의 컨슈머 생성 불가
    • = “하나의 스레드당 하나의 컨슈머” 원칙
  • 하나의 애플리케이션에서 동일한 그룹에 속하는 여러 개의 컨슈머 운용하고 싶다면
    • 스레드를 여러 개 띄워서 각각에 컨슈머를 하나씩 돌려야 함
    • 이벤트를 받아서 큐에 넣는 컨슈머 하나와 이 큐에서 이벤트를 처리하는 여러 개의 워커 스레드를 사용하는 방법

 

 

 

5. 컨슈머 설정하기

  • 중요한 컨슈머의 속성들
  • fetch.min.bytes
    • 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량(바이트) 지정
  • fetch.max.wait.ms
    • 카프카가 컨슈머가 응답하기 전까지 얼마나 오래 기다릴 것인지 결정
    • fetch.min.bytes와 fetch.max.wait.ms 두 조건 중 하나가 만족되는 대로 리턴
  • fetch.max.bytes
    • 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수를 지정
      • 브로커가 컨슈머에 레코드를 보낼 때, 배치 단위로 보냄
      • 배치 크기 예외: 보내는 첫 번째 레코드 배치 크기가 이 설정값 넘길 경우, 제한값 무시하고 배치 그대로 전송
      • fetch.max.bytes는 첫 번째 배치에 대한 제한을 무시할 수 있으나, 그 후의 데이터에 대해서는 이 제한을 적용하여 더 이상의 데이터를 보내지 않습니다.
      • = 컨슈머가 읽기 작업 계속 진행할 수 있도록 보장
  • max.poll.records
    • poll()을 호출할 때마다 리턴되는 최대 레코드 수 지정
    • 레코드의 개수 (크기가 아님)
  • max.partition.fetch.bytes
    • 서버가 파티션별로 리턴하는 최대 바이트 수 결정
    • 카프카 poll()이 ConsumerRecords 리턴 시, 메모리 상에 저장된 레코드 객체의 크기는 컨슈머에 할당된 파티션별로 최대 max.partition.fetch.bytes까지 차지 가능
  • session.timeout.ms, 그리고 heartbeat.interval.ms
    • 컨슈머가 브로커가 신호를 주고 받지 않아도 살아 있는 것으로 판정되는 최대 시간의 기본값은 10초
      • 하트비트 보내지 않은 채로 session.timeout.ms 가 지나가면 그룹 코디네이터는 컨슈머가 죽은 것으로 간주
      • 해당 컨슈머의 파티션을 다른 컨슈머에게 재할당하기 위해 리밸런스 실행
      • heartbeat.interval.ms: 컨슈머가 얼마나 자주 그룹 코디네이터에게 하트비트 보내는지 결정
        • session.timeout.ms 보다 낮은 값. 대체로 1/3
  • max.poll.interval.ms
    • 컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간 지정
    • 하트비트는 백그라운드 스레드에 의해 전송됨
      • = 카프카에서 레코드를 읽어오는 메인 스레드 데드락 걸려도 멀쩡히 하트비트 전송하고 있을 가능성 존재
      • → 안전장치 내지 예방 조치로 max.poll.interval.ms 사용됨
      • 타움아웃 발생 시, 백그라운드 스레드는 브로커로 하여금 컨슈머가 죽어서 리밸런스가 수행되어야 한다는 것을 알 수 있도록 leave group 요청 전송 및 하트비트 전송 중단
  • default.api.timeout.ms
    • API 호출 시, 명시적인 타임아웃 지정하지 않는 한, 거의 모든 컨슈머 API 호출에 적용되는 타임아웃 값
      • poll 메서드는 이 값이 적용되지 않는 중요한 예외
  • request.timeout.ms
    • 컨슈머가 브로커로부터의 응답을 기다릴 수 있는 최대 시간
  • auto.offset.reset
    • 컨슈머가 예전에 오프셋을 커밋한 적 없거나, 커밋된 오프셋이 유효하지 않을 때, 파티션을 읽기 시작할 때의 작동 정의
      • 어떤 레코드부터 읽기 시작할 것인지
  • enable.auto.commit
    • 컨슈머가 자동으로 오프셋을 커밋할지의 여부를 결정
  • partition.assignment.startegy
    • PartitionAssignor 클래스: 컨슈머와 이들이 구독한 토픽들이 있을 때, 어느 컨슈머에게 어느 파티션이 할당될지를 결정하는 역할
    • Range
    • RoundRobin
    • Sticky
    • Cooperative Sticky
  • client.id
    • 브로커가 요청을 보낸 클라이언트를 식별하는 데 쓰임
    • 로깅, 모니터링 지표, 쿼터에서도 사용됨
  • client.rack
    • 컨슈머는 각 파티션의 리더 레플리카로부터 메세지를 읽어 옴
    • 하지만, 클러스터가 다수의 데이터 센터 혹은 다수의 클라우드 가용 영역에 걸쳐 설치되어 있는 경우
      • 컨슈머와 같은 영역에 있는 레플리카로부터 메세지를 읽어 오는 것이 좋음
      • 가장 가까운 레플리카로부터 읽어올 수 있게 설정 주는 변수
  • group.instance.id
    • 컨슈머에 정작 그룹 멤버십 기능을 적용하기 위해 사용되는 설정
  • receive.buffer.bytes, send.buffer.bytes
    • 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP의 수신 및 수신 버퍼의 크기
  • offsets.retention.minutes
    • 브로커의 설정이지만,
    • 컨슈머 작동에 큰 영향을 줌
    • 그룹이 비게 된다면 카프카는 커밋된 오프셋을 이 설정 값에 지정된 기간 동안만 보관

 

 

 

6. 오프셋과 커밋

  • 오프셋 커밋
    • 카프카에서 파티션의 현재 위치를 업데이트하는 작업
    • ≠ 전통적인 메세지 큐
    • 카프카는 레코드를 개별적으로 커밋 X
  • __consumer_offesets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하도록 메세지 전송

 

자동 커밋

  • 오프셋을 커밋하는 가장 쉬운 방법은 컨슈머가 대신하도록 하는 것
  • enable.auto.commit =true 설정
  • 중복 메세지 방지 주의

 

현재 오프셋 커밋하기

  • 오프셋 커밋되는 시각 제어
    • 메세지 유실 가능성 제거, 리밸러스 발생 시 중복되는 메세지의 수를 줄이기 위해
    • 컨슈머 API는 개발자가 원하는 시간에 현재 오프셋을 커밋하는 옵션 제공함
  • enable.auto.commit =false 설정
    • 명시적으로 커밋하려 할 때만 오프셋 커밋 가능
  • commitSync()
    • poll() 이 리턴한 마지막 오프셋을 커밋 → 성공적으로 완료되면 리턴
    • 모든 레코드 처리 완료 전 commitSync() 호출 시, 메세지 누락 위험 감수 불가피

 

비동기적 커밋

  • 수동 커밋의 단점 중 하나: 브로커가 커밋 요청에 응답할 때까지 애플리케이션이 블록됨
    • = 애플리케이션 처리량 제한
    • → 비동기적 커밋 API 사용
  • 단점
    • commitSync()는 성공하거나 불가능한 실패가 발생할 때까지 재시도
    • commitAsync()는 재시도 X

 

동기적 커밋과 비동기적 커밋을 함께 사용하기

  • 컨슈머를 닫기 전 or 리밸런스 전 마지막 커밋 → 성공 여부 추가 확인 필요
  • → commitSync()와 commitAsync() 함께 사용

 

특정 오프셋 커밋하기

  • 가장 최근 오프셋을 커밋하는 것은 메세지 배치의 처리가 끝날 때만 수행 가능
  • 그보다 더 자주 커밋하고 싶다면 → 특정 오프셋 커밋
  • commitSync()와 commitAsync()를 호출할 때 커밋하고자 하는 파티셧과 오프셋의 맵 전달 가능

 

 

 

7. 리밸런스 리스터

  • 컨슈머는 종료하기 전이나 리밸런싱 시작 전 정리 cleanup 작업 필요
    • 파티션 해제 → 마지막 처리한 이벤트의 오프셋 커밋
    • 파일 핸들, 데베 연결 종료
  • subscribe()호출 시 ConsumerRebalanceListener 전달

 

 

 

8. 특정 오프셋의 레코드 읽어오기

  • 다른 오프셋에서부터 읽기를 시작하고 싶은 경우
    • 맨 앞에서부터 메세지 읽기 seekToBeginning
    • 파티션에 새로 들어온 메세지부터 읽기 seekToEnd
    • 카프카 API를 사용하면 특정한 오프셋부터 탐색 가능

 

 

 

9. 폴링 루프를 벗어나는 방법

  • 컨슈머를 종료하고자 할 때, 컨슈머가 poll()을 오랫동안 기다리고 있더라도 즉시 루프 탈출
    • 다른 스레드에서 consumer.wakeup() 호출
    • 메인 스레드에서 컨슈머 루프 돌고 있다면, ShutdownHook 사용
    • 스레드 종료 전 consumer.close() 호출

 

 

 

10. 디시리얼라이저

  • 카프카 컨슈머는 카프카로부터 받은 바이트 배열을 자바 객체로 변환하기 위해 디시리얼라이저 필요
  • 카프카에 이벤트를 쓰기 위해 사용되는 시리얼라이저와 이벤트를 읽어올 때 사용되는 디시리얼라이저가 서로 맞아야 함

 

커스텀 디시리얼라이저

  • 시리얼라이저와 마찬가지로 직접 구현하는 것은 권장하지 않음
  • 깨지기도 쉽고 에러 발생 가능성 높음

 

Avro 디시리얼라이저 사용하기

  • KafkaAvroDeserializer

 

 

 

11. 독립 실행 컨슈머 standalone consumer

: 컨슈머 그룹 없이 컨슈머를 사용해야 하는 이유와 방법

  • 경우에 따라서 필요
    • 하나의 컨슈머가 토픽의 모든 파티션으로부터 모든 데이터를 읽어와야 하거나
    • 토픽의 특정 파티션으로부터 데이터를 읽어와야 함
    • 이런 경우 컨슈머 그룹, 리밸런스 기능 필요 X
  • 컨슈머가 어떤 파티션 읽어야 하는지 정확히 아는 경우
    • 토픽 구독 필요 X, 파티션을 스스로 할당
    • 토픽 구독하거나 스스로 파티션 할당 가능, 두 가지 동시에는 불가
    • (단점) 리밸런싱 기능 사용 X → 직접 파티션 찾아야함