목차
0. 장을 시작하며
- KafkaConsumer
- 데이터를 읽는 애플리케이션은 토픽을 구독하고 구독한 토픽들로부터 메세지를 받기 위해
1. 카프카 컨슈머: 개념
컨슈머와 컨슈머 그룹
- 컨슈머 객체
- 객체 생성하고, 토픽 구독, 메세지를 받아 검사하고 결과를 작성
- 메세지가 빠르게 쓰이는 경우, 컨슈머가 하나뿐이라면, 메세지 속도 감당 불가
- → 여러 개의 컨슈머가 같은 토픽으로부터 데이터를 분할해 읽어올 수 있도록
- 컨슈머 그룹
- 동일한 컨슈머 그룹에 속한 여러 개의 컨슈머들이 동일한 토픽 구독할 경우,
- 각각의 컨슈머는 해당 토픽에서 서로 다른 파티션의 메세지를 받음
- 컨슈머 그룹에 컨슈머를 추가하는 것 = 카프카 토픽에서 읽어오는 데이터 양을 확장하는 방법
- 토픽에 설정된 파티션 수 이상으로 컨슈머 투입은 무의미
컨슈머 그룹과 파티션 리밸런스
- 리밸런스
- 컨슈머에 할당된 파티션을 다른 컨슈머에게 할당해주는 작업
- 컨슈머 그룹에 높은 가용성과 규모 가변성을 제공하는 기능
- 문제없이 작업이 수행될 때 진행되면 의미없는 기능
- 파티션 할당 전략 2가지
- 조급한 리밸런스
- 모든 파티션 할당을 해제, 읽기 작업 정지, 모두가 다시 그룹 참여 후 파티션 재할당
- 협력적 리밸런스 (점진적 리밸런스)
- 한 컨슈머에게 할당되어 있던 파티션만 다른 컨슈머에게 재할당
- 재할당되지 않은 파티션에서 레코드 처리 중인 컨슈머들은 계속 일 진행
- 컨슈머는 해당 컨슈머 그룹의 그룹 코디네이터 역할을 지정받은 카프카 브로커에 하트비트 전송
- 멤버십, 할당된 파티션에 대한 소유권 유지
- 일정 시간 이상 하트비트 전송하지 않는 경우, 세션 타임아웃 발생
- → 그룹 코디네이터: 해당 컨슈머 죽었다고 간주, 리밸런스 실행
정적 그룹 멤버십
- 컨슈머가 갖는 컨슈머 그룹의 멤버로서의 자격(멤버십)은 일시적
- 컨슈머에 group.instance.id 설정 시 컨슈머 그룹의 정적인 멤버됨
- 첫 참여 → 기존 해당 그룹이 사용 중이던 파티션 할당 전략에 따라 파티션 할당
- 꺼졌다가 다시 돌아올 경우, 멤버십 유지 중 → 리밸런스 필요X, 예전에 할당받았던 파티션 그대로 재할당
- 같은 group.instance.id값을 갖는 두 개의 컨슈머가 같은 그룹에 조인할 경우, 에러 발생
- 정적 그룹 멤버십 static group membership
- 애플리케이션이 각 컨슈머에 할당된 파티션의 내용물을 사용해서 로컬 상태나 캐시를 유지해야 할 때 편리
- 컨슈머 그룹의 정적 멤버는 종료할 때, 미리 컨슈머 그룹을 떠나지 않음
- 정적 멤버 종료는 session.timeout.ms 설정에 따름
2. 카프카 컨슈머 생성하기
- KafkaConsumer 인스턴스 생성
- 프로듀서 인스턴스 생성과 유사
- 필수 속성 3개
- boostrap.servers: 카프카 클러스터로의 연결 문자열
- key.deserializer, value.deserialier
- 바이트 배열 → 자바 객체로 변환
- group.id 속성: (필수x, 일반적으로 사용됨) 어떤 컨슈머 그룹에도 속하지 않는 컨슈머 생성 가능하기는 함. 일반적이진 않음
3. 토픽 구독하기
- 컨슈머 생성 → 토픽 구독: subscribe() 메서드
- 토픽 목록을 매개변수로 받음
- 정규식 사용 가능
4. 폴링 루프
- 컨슈머 API 핵심: 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프
- 오랫동안 돌아가는 무한 루프. 루프 탈출하는 방법은 후에 다룸
- 계속 폴링하지 않으면 죽은 것으로 간주됨.
- 단순히 데이터를 가져오는 것보다 많은 일을 함
- 처음 poll(), 그룹 코디네이터를 찾아 컨슈머 그룹에 참가, 파티션 할당받음. 그 이후 리밸런스 처리도 함
스레드 안전성
- 하나의 스레드에서 동일한 그룹 내 여러 개의 컨슈머 생성 불가
- = “하나의 스레드당 하나의 컨슈머” 원칙
- 하나의 애플리케이션에서 동일한 그룹에 속하는 여러 개의 컨슈머 운용하고 싶다면
- 스레드를 여러 개 띄워서 각각에 컨슈머를 하나씩 돌려야 함
- 이벤트를 받아서 큐에 넣는 컨슈머 하나와 이 큐에서 이벤트를 처리하는 여러 개의 워커 스레드를 사용하는 방법
5. 컨슈머 설정하기
- 중요한 컨슈머의 속성들
- fetch.min.bytes
- 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량(바이트) 지정
- fetch.max.wait.ms
- 카프카가 컨슈머가 응답하기 전까지 얼마나 오래 기다릴 것인지 결정
- fetch.min.bytes와 fetch.max.wait.ms 두 조건 중 하나가 만족되는 대로 리턴
- fetch.max.bytes
- 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수를 지정
- 브로커가 컨슈머에 레코드를 보낼 때, 배치 단위로 보냄
- 배치 크기 예외: 보내는 첫 번째 레코드 배치 크기가 이 설정값 넘길 경우, 제한값 무시하고 배치 그대로 전송
- fetch.max.bytes는 첫 번째 배치에 대한 제한을 무시할 수 있으나, 그 후의 데이터에 대해서는 이 제한을 적용하여 더 이상의 데이터를 보내지 않습니다.
- = 컨슈머가 읽기 작업 계속 진행할 수 있도록 보장
- 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수를 지정
- max.poll.records
- poll()을 호출할 때마다 리턴되는 최대 레코드 수 지정
- 레코드의 개수 (크기가 아님)
- max.partition.fetch.bytes
- 서버가 파티션별로 리턴하는 최대 바이트 수 결정
- 카프카 poll()이 ConsumerRecords 리턴 시, 메모리 상에 저장된 레코드 객체의 크기는 컨슈머에 할당된 파티션별로 최대 max.partition.fetch.bytes까지 차지 가능
- session.timeout.ms, 그리고 heartbeat.interval.ms
- 컨슈머가 브로커가 신호를 주고 받지 않아도 살아 있는 것으로 판정되는 최대 시간의 기본값은 10초
- 하트비트 보내지 않은 채로 session.timeout.ms 가 지나가면 그룹 코디네이터는 컨슈머가 죽은 것으로 간주
- 해당 컨슈머의 파티션을 다른 컨슈머에게 재할당하기 위해 리밸런스 실행
- heartbeat.interval.ms: 컨슈머가 얼마나 자주 그룹 코디네이터에게 하트비트 보내는지 결정
- session.timeout.ms 보다 낮은 값. 대체로 1/3
- 컨슈머가 브로커가 신호를 주고 받지 않아도 살아 있는 것으로 판정되는 최대 시간의 기본값은 10초
- max.poll.interval.ms
- 컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간 지정
- 하트비트는 백그라운드 스레드에 의해 전송됨
- = 카프카에서 레코드를 읽어오는 메인 스레드 데드락 걸려도 멀쩡히 하트비트 전송하고 있을 가능성 존재
- → 안전장치 내지 예방 조치로 max.poll.interval.ms 사용됨
- 타움아웃 발생 시, 백그라운드 스레드는 브로커로 하여금 컨슈머가 죽어서 리밸런스가 수행되어야 한다는 것을 알 수 있도록 leave group 요청 전송 및 하트비트 전송 중단
- default.api.timeout.ms
- API 호출 시, 명시적인 타임아웃 지정하지 않는 한, 거의 모든 컨슈머 API 호출에 적용되는 타임아웃 값
- poll 메서드는 이 값이 적용되지 않는 중요한 예외
- API 호출 시, 명시적인 타임아웃 지정하지 않는 한, 거의 모든 컨슈머 API 호출에 적용되는 타임아웃 값
- request.timeout.ms
- 컨슈머가 브로커로부터의 응답을 기다릴 수 있는 최대 시간
- auto.offset.reset
- 컨슈머가 예전에 오프셋을 커밋한 적 없거나, 커밋된 오프셋이 유효하지 않을 때, 파티션을 읽기 시작할 때의 작동 정의
- 어떤 레코드부터 읽기 시작할 것인지
- 컨슈머가 예전에 오프셋을 커밋한 적 없거나, 커밋된 오프셋이 유효하지 않을 때, 파티션을 읽기 시작할 때의 작동 정의
- enable.auto.commit
- 컨슈머가 자동으로 오프셋을 커밋할지의 여부를 결정
- partition.assignment.startegy
- PartitionAssignor 클래스: 컨슈머와 이들이 구독한 토픽들이 있을 때, 어느 컨슈머에게 어느 파티션이 할당될지를 결정하는 역할
- Range
- RoundRobin
- Sticky
- Cooperative Sticky
- client.id
- 브로커가 요청을 보낸 클라이언트를 식별하는 데 쓰임
- 로깅, 모니터링 지표, 쿼터에서도 사용됨
- client.rack
- 컨슈머는 각 파티션의 리더 레플리카로부터 메세지를 읽어 옴
- 하지만, 클러스터가 다수의 데이터 센터 혹은 다수의 클라우드 가용 영역에 걸쳐 설치되어 있는 경우
- 컨슈머와 같은 영역에 있는 레플리카로부터 메세지를 읽어 오는 것이 좋음
- 가장 가까운 레플리카로부터 읽어올 수 있게 설정 주는 변수
- group.instance.id
- 컨슈머에 정작 그룹 멤버십 기능을 적용하기 위해 사용되는 설정
- receive.buffer.bytes, send.buffer.bytes
- 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP의 수신 및 수신 버퍼의 크기
- offsets.retention.minutes
- 브로커의 설정이지만,
- 컨슈머 작동에 큰 영향을 줌
- 그룹이 비게 된다면 카프카는 커밋된 오프셋을 이 설정 값에 지정된 기간 동안만 보관
6. 오프셋과 커밋
- 오프셋 커밋
- 카프카에서 파티션의 현재 위치를 업데이트하는 작업
- ≠ 전통적인 메세지 큐
- 카프카는 레코드를 개별적으로 커밋 X
- __consumer_offesets 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하도록 메세지 전송
자동 커밋
- 오프셋을 커밋하는 가장 쉬운 방법은 컨슈머가 대신하도록 하는 것
- enable.auto.commit =true 설정
- 중복 메세지 방지 주의
현재 오프셋 커밋하기
- 오프셋 커밋되는 시각 제어
- 메세지 유실 가능성 제거, 리밸러스 발생 시 중복되는 메세지의 수를 줄이기 위해
- 컨슈머 API는 개발자가 원하는 시간에 현재 오프셋을 커밋하는 옵션 제공함
- enable.auto.commit =false 설정
- 명시적으로 커밋하려 할 때만 오프셋 커밋 가능
- commitSync()
- poll() 이 리턴한 마지막 오프셋을 커밋 → 성공적으로 완료되면 리턴
- 모든 레코드 처리 완료 전 commitSync() 호출 시, 메세지 누락 위험 감수 불가피
비동기적 커밋
- 수동 커밋의 단점 중 하나: 브로커가 커밋 요청에 응답할 때까지 애플리케이션이 블록됨
- = 애플리케이션 처리량 제한
- → 비동기적 커밋 API 사용
- 단점
- commitSync()는 성공하거나 불가능한 실패가 발생할 때까지 재시도
- commitAsync()는 재시도 X
동기적 커밋과 비동기적 커밋을 함께 사용하기
- 컨슈머를 닫기 전 or 리밸런스 전 마지막 커밋 → 성공 여부 추가 확인 필요
- → commitSync()와 commitAsync() 함께 사용
특정 오프셋 커밋하기
- 가장 최근 오프셋을 커밋하는 것은 메세지 배치의 처리가 끝날 때만 수행 가능
- 그보다 더 자주 커밋하고 싶다면 → 특정 오프셋 커밋
- commitSync()와 commitAsync()를 호출할 때 커밋하고자 하는 파티셧과 오프셋의 맵 전달 가능
7. 리밸런스 리스터
- 컨슈머는 종료하기 전이나 리밸런싱 시작 전 정리 cleanup 작업 필요
- 파티션 해제 → 마지막 처리한 이벤트의 오프셋 커밋
- 파일 핸들, 데베 연결 종료
- subscribe()호출 시 ConsumerRebalanceListener 전달
8. 특정 오프셋의 레코드 읽어오기
- 다른 오프셋에서부터 읽기를 시작하고 싶은 경우
- 맨 앞에서부터 메세지 읽기 seekToBeginning
- 파티션에 새로 들어온 메세지부터 읽기 seekToEnd
- 카프카 API를 사용하면 특정한 오프셋부터 탐색 가능
9. 폴링 루프를 벗어나는 방법
- 컨슈머를 종료하고자 할 때, 컨슈머가 poll()을 오랫동안 기다리고 있더라도 즉시 루프 탈출
- 다른 스레드에서 consumer.wakeup() 호출
- 메인 스레드에서 컨슈머 루프 돌고 있다면, ShutdownHook 사용
- 스레드 종료 전 consumer.close() 호출
10. 디시리얼라이저
- 카프카 컨슈머는 카프카로부터 받은 바이트 배열을 자바 객체로 변환하기 위해 디시리얼라이저 필요
- 카프카에 이벤트를 쓰기 위해 사용되는 시리얼라이저와 이벤트를 읽어올 때 사용되는 디시리얼라이저가 서로 맞아야 함
커스텀 디시리얼라이저
- 시리얼라이저와 마찬가지로 직접 구현하는 것은 권장하지 않음
- 깨지기도 쉽고 에러 발생 가능성 높음
Avro 디시리얼라이저 사용하기
- KafkaAvroDeserializer
11. 독립 실행 컨슈머 standalone consumer
: 컨슈머 그룹 없이 컨슈머를 사용해야 하는 이유와 방법
- 경우에 따라서 필요
- 하나의 컨슈머가 토픽의 모든 파티션으로부터 모든 데이터를 읽어와야 하거나
- 토픽의 특정 파티션으로부터 데이터를 읽어와야 함
- 이런 경우 컨슈머 그룹, 리밸런스 기능 필요 X
- 컨슈머가 어떤 파티션 읽어야 하는지 정확히 아는 경우
- 토픽 구독 필요 X, 파티션을 스스로 할당
- 토픽 구독하거나 스스로 파티션 할당 가능, 두 가지 동시에는 불가
- (단점) 리밸런싱 기능 사용 X → 직접 파티션 찾아야함
'Boaz > Real-time Data and Kafka' 카테고리의 다른 글
[카프카 핵심 가이드 #2] 3장 카프카 프로듀서: 카프카에 메시지 쓰기 (0) | 2024.08.22 |
---|---|
[카프카 핵심 가이드 #1] 1장 카프카 시작하기 (0) | 2024.08.22 |
[대규모 실시간 데이터 처리 #4] 대규모 시스템 설계 기초. 12장 채팅 시스템 설계 (0) | 2024.08.04 |
[대규모 실시간 데이터 처리 #3] 대규모 시스템 설계 기초. 9장 웹 크롤러 설계 (0) | 2024.08.04 |
[대규모 실시간 데이터 처리 #2] 대규모 시스템 설계 기초. 2장~3장 개략적인 규모 추정, 시스템 설계 면접 공략법 (0) | 2024.07.28 |