목차
0. 장을 시작하며
- 카프카를 사용할 때
- 카프카에 데이터를 쓸 때 사용하는 프로듀서
- 읽어올 때 사용하는 컨슈머
- 두 가지 기능 모두를 수행하는 애플리케이션 생성
- 개발자들이 카프카와 상호작용하는 애플리케이션을 개발할 때 사용할 수 있는 클라이언트 API와 함께 배포
- 프로듀서
- 디자인, 주요 요소
- KafkaProducer, ProducterRecord 객체 생성
- 레코드 전송, 에러 처리
- 작동 제어를 위한 중요 설정 옵션
- 파티셔너와 시리얼라이저(객체의직렬화 방식 정의)
1. 프로듀서 개요
- 카프카에 메세지를 써야하는 상황(목적): 다양
- 사용자 행동 기록, 성능 메트릭 기록, 정보 수집
- 다른 애플리케이션과의 비동기적 통신 수행, 임의의 정보를 데이터베이스에 저장하기 전 버퍼링
- 요구 조건: 다양
- 메세지 유실 용납/허용
- 중복 허용
- 반드시 지켜야할 지연 latency, 처리률 throughtput
- 프로듀서 요소
- 카프카에 메세지 쓰는 작업: ProducerRecord 객체 생성
- 레코드가 저장될 토픽과 밸류 지정 필수
- 키와 파티션 지정은 선택
- 키와 값 객체가 네트워크 상에서 전송될 수 있도록, 직렬화, 바이트 배열로 변환
- (파티션 지정하지 않았다면) 데이터 → 파티셔너 → 파티션 결정 (기준: ProducerReocrd 객체의 키 값)
- 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치 record batch 에 추가
- 별도의 스레드가 이 레코드 배치를 적절한 카프카 브로커에 전송
- 브로커가 메세지를 받으면 응답 돌려줌
- 메세지 저장 성공, 브로커는 토픽, 파티션, 파티션 안에서의 레코드 오프셋을 담은 RecordMetadata 객체 리턴
- 메세지 저장 실패, 에러 리턴 → 메세지 쓰기 포기 및 재전송 시도 등
- 카프카에 메세지 쓰는 작업: ProducerRecord 객체 생성
2. 카프카 프로듀서 생성하기
- 프로듀서 객체 생성 필요. 원하는 속성 지정
- 프로듀서의 필수 속성 값
- boostrap.servers
- 카프카 클러스터와 첫 연결을 생성하기 위해, 프로듀서가 사용할 브로커의 host:port 목록
- 모든 브로커 포함 필요 X, 첫 연결 후 추가 정보 받아옴
- 브로커 중 작동 정지하는 경우 존재 → 2개 이상 지정 권장
- key.serializer
- 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 시리얼라이저serializer 클래스의 이름
- 자바 객체 → 바이트 배열
- 키값 없이 밸류값만 보낼 때도 key.serializer 설정 필요 → VoidSerializer를 사용해서 키 타입으로 Void 타입 설정 가능
- value.serializer
- 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름
- boostrap.servers
- 메세지 전송 방법
- 파이어 앤 포겟
- 메세지를 서버에 전송만 하고, 성공 혹은 실패 여부 신경X
- 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메세지 재전송 → 대부분 성공
- 그럼에도 에러 발생, 실패 시, 메세지 유실, 예외 전달X
- 동기적 전송
- 카프카 프로듀서는 언제나 비동기적으로 작동
- 메세지를 보내면 send()메서드는 Future 객체 리턴
- 다음 메세지 전송 전 get() 메서드 호출 해 작업이 완료될 때까지 기다렸다가 실제 성공 여부 확인 필요
- 비동기적 전송
- 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답 받는 시점에서 자동으로 콜백 함수 호출됨
- 파이어 앤 포겟
3. 카프카로 메세지 전달하기
동기적으로 메세지 전송하기
- 동기적으로 메세지 전송 시, 전송 요청 스레드는 아무것도 안 하면서 기다려야 함
- 실제로 애플리케이션에서는 잘 사용되지 않음 (코드 예제는 흔함)
- Future.get()
- 예외 처리
- KafkaProducer 두 종류의 에러
- 재시도 가능한 에러: 메세지 재전송하여 해결 가능 에러
- 메세지 전송받은 브로커가 해당 파티션의 리더가 아닐 경우 발생 에러
- → 해당 파티션에 새 리더가 선출되고 업데이트 되면 해결 가능
- 재시도해도 해결되지 않는 에러: 재시도 없이 바로 예외 발생
- ex. 메세지 크기가 너무 클 경우
- 재시도 가능한 에러: 메세지 재전송하여 해결 가능 에러
비동기적으로 메세지 전송하기
- 비동기적으로 메세지 전송 시 시간 단축 가능
- 카프카는 레코드를 쓴 뒤, 해당 레코드의 토픽, 파티션, 오프셋 리턴
- 대부분 이런 메타 데이터 필요 X
- 메세지 전송 실패 시 이런 메타 데이터 필요
- → 예외 발생, 에러 로그 작성, 사후 분석 등
- 메세지를 비동기적으로 전송하고, 에러를 처리하는 경우를 위해
- 레코드를 전송할 때 콜백을 지정
- Callback 객체 구현하여 ProducerRecord에 전달
4. 프로듀서 설정하기
- 프로듀서 설정 값(속성 값)은 굉장히 많은 수이다.
- 4-2장에서는 필수적인 속성만 살펴봄
- 이러한 몇몇 설정 값의 경우 메모리 사용량, 성능, 신뢰성 등에 상당한 영향
client.id
- 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자, 임의의 문자열 사용
- 브로커는 프로듀서가 보내온 메세지를 서로 구분하기 위해 이 값을 사용
- 브로커가 로그 메세지 출력 시
- 성능 메트릭 값 집계 시
- 클라이언트별로 사용량 할당 시
- 문제 발생 시 트러블 슈팅
acks
- acks 매개변수: 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지를 결정
- 기본값: 리더가 해당 레코드를 받은 뒤 쓰기 작업이 성공했다고 응답
- 설정 가능한 3가지 값
- acks=0: 프로듀서는 메세지 성공적으로 전달되었다고 간주, 브로커 응답 기다리지X
- acks=1: 프로듀서는 리더 레플리카가 메세지를 받는 순간 브로커로부터 성공했다는 응답 받음
- 리더 메세지 쓸 수 없다면, 프로듀서 에러 응답 받음. 유실을 피하기 위해 재전송 시도
- 리더가 크래시 난 상태, 복제가 안된 채로 리더가 산출될 경우, 메세지가 유실됨
- acks=all: 프로듀서는 메세지가 모든 인-싱크 레플리카 in-sync replica에 전달된 뒤에야 브로커로부터 성공했다는 응답 받음
- 가장 안전한 형태
- 신뢰성을 낮추면 그만큼 레코드 빠르게 보내기 가능
- → 신뢰성과 프로듀서 지연 사이에 트레이드 오프
메세지 전달 시간
- send() 호출 시, 성공 혹은 실패하기까지 얼마나 시간을 걸리는가
- ProducerRecord를 보낼 때 걸리는 시간
- send() 에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
- send() 에 대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때까지 걸리는 시간
- 두 구간에 영향을 미치는 다른 설정 매개변수
- max.block.ms
- 프로듀서가 얼마나 오랫동안 블록되는지를 결정
- send() 호출, partitionsFor를 호출하여 메타데이터를 요청 시, max.block.ms 만큼 시간이 흐르면 예외 발생
- delivery.timeout.ms
- 레코드 전송 준비가 완료된 시점에서 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간 결정
- linger.ms, request.timeout.ms 보다 커야 함
- 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms가 넘어가버리면, 리턴 에러 예외와 콜백 호출
- 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms가 넘어가버리면, 타임아웃 예외와 콜백 호출
- request.timeout.ms
- 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지 결정
- 쓰기 요청 후 전송을 포기하기까지 대기하는 시간
- 재시간 시간, 실제 전송 이전에 소요되는 시간 포함 X
- 타임 아웃 발생 시, 재전송 시도 or TimeoutException, 콜백 호출
- retries, retry.backoff.ms
- retries : 프로듀서가 메세지 전송을 포기하고 에러를 발생시킬 때까지 메세지를 재전송하는 횟수 결정
- retry.backoff.ms 재시도 사이의 대기 간격 조정
- 현재 버전의 카프카에서 이 값들 조정 권장 X
- → delivery.timeout.ms 매개변수 조정 권장
- 재전송 끄려면 retries=0 설정
- max.block.ms
linger.ms
- 현재 배치를 전송하기 전까지 대기하는 시간을 결정
- KafkaProducer는 현재 배치가 가득 차거나, linger.ms 에 설정된 제한 시간이 되었을 때 메세지 배치 전송
- 기본적으로는, 프로듀서는 메세지 전송 사용 스레드가 있을 때 곧바로 전송
- linger.ms는 0보다 큰 값 설정 시, 메세지 추가 가능
- = 지연 증가, 처리률 크게 증대
- 단위 메세지당 추가적으로 드는 시간은 매우 작지만 압축 설정이 되어 있는 경우 효율적
buffer.memory
- 프로듀서가 메세지를 전송하기 전에 메세지를 대기시키는 버퍼의 크기(메모리의 양)를 결정
compression.type
- 기본적으로 메세지는 압축되지 않은 상태로 전송
- 이 매개변수를 설정 시, 해당 압축 알고리즘을 사용해서 메세지 압축한 뒤 브로커로 전송됨
- snappy
- gzip
- lz4, ztsd 등
- 네트워크 대역폭 제한적일 때 사용하면 좋음
- 네트워크 사용량, 저장 공간 절약
batch.size
- 같은 파티션에 다수의 레코드가 전송될 경우, 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송
- batch.size는 각각의 배치에 사용될 메모리의 양을 결정
- 개수가 아니라 바이트 단위
- 지나치케 크게 유지한다해서 메세지 전송 지연이 발생하는 것이 아니며,
- 지나치게 작게 설정하면 오히려 너무 자주 전송해야 해서 오버헤드 발생 가능
max.in.flight.requests.per.connection
- 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메세지의 수 결정
- 값을 올리면 메모리 사용량 증가, 처리량 증가
max.request.size
- 프로듀서가 전송하는 쓰기 요청의 크기 결정
- 메세지의 최대 크기 제한, 한 번의 요청에 보낼 수 있는 메세지 최대 개수 제한 (당연히 메세지 최대 크기도 영향을 받음 1MB=1KB*1024개)
- 브로커가 받아들일 수 있는 최대 메세지 크기 결정하는 매개변수 : message.max.bytes
receive.buffer.bytes, send.buffer.bytes
- 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
- -1 의 경우 기본값 사용
enable.idempotence
- enable.idempotence=true
- 멱등적 프로듀서 기능 활성화
- 같은 메시지를 여러 번 보내더라도 데이터가 중복 저장되지 않도록 막아주는 기능
5. 시리얼라이저
- 레코드 직렬화
커스텀 시리얼 라이저
- 카프카로 전송해야 하는 객체가 단순한 문자열이나 정수값이 아닌 경우
- → 커스텀 시리얼라이저(직렬화 로직) 을 사용하거나
- → 에이브로와 같은 범용 시리얼라이저(직렬화 라이브러리) 사용
- 범용 시리얼라이저 사용 방안 원장
- 커스텀 시리얼라이저의 단점
- 기존 형식, 새 형식 사이의 호환성 유지
- 같은 로직 사용 필요, 동시에 코드 변경 상황 발생 가능
- → 범용 라이브러리 사용 권장
아파치 에이브로를 사용해서 직렬화하기
- 아파치 에이브로
- 언어 중립적인 데이터 직렬화 형식
- 더 범용적인 데이터 파일 공유 방식을 제공하는 것을 목표로 시작
- 에이브로 데이터
- 언어에 독립적인 스키마 형태로 기술
- 직렬화된 결과물이 저장된 파일을 읽거나 직렬화를 할 때 스키마 정보가 별도로 주어진다고 가정
- 에이브로 파일 자체에 스키마를 내장하는 방법을 사용
- 카프카에 적합한 이유
- 메세지를 쓰는 애플리케이션이 새로운 스키마로 전환하더라도
- 기존 스키마와 호환성을 유지하는 한,
- 데이터를 읽는 애플리케이션을 일체의 변경이나 업데이트 없이 계속해서 메세지 처리 가능
- 주의해야 할 점
- 데이터를 쓸 때 사용하는 스키마와 읽을 때 기대하는 스키마 호환 필요
- 역직렬화 시, 데이터를 쓸 때 사용했던 스키마에 접근 가능
카프카에서 에이브로 레코드 사용하기
- 스키마 레지스트리
- 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 저장
- 레코드에 사용된 스키마의 고유 식별자 필요
- 모든 작업(스키마를 레지스트리에 저장하고 필요할 때 가져오는)이
- 주어진 객체를 직렬화하는 시리얼라이저
- 직렬화된 데이터를 객체로 복원하는 디시얼라이저
- 내부에서 수행됨
- 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 저장
- 제네릭 에이브로 객체
6. 파티션
- 카프카 메세지는 키-밸류 순서쌍
- ProduceRecord 객체는 토픽, 키, 밸류 값 포함
- 같은 키값을 가진 모든 메세지는 같은 파티션에 저장됨
- 기본 파티셔너 사용 중, 키 값이 null인 레코드 → 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장됨
- 메세지 개수의 균형을 맞추기 위해 라운드 로빈 알고리즘 사용됨
- 기본 파티셔너 사용 중, 키 값이 지정된 상황 → 키 값을 해시한 결과를 기준으로 메세지를 저장할 파티션 특정
- 기본 파티셔너 외에도 RoundRobinPartitioner와 UniformStickyPartitioner를 포함
- 메세지가 키 값 포함 시에도, 랜덤 파티션 할당 및 접착성 랜덤 파티션 할당 수행
- 기본 파티셔너 사용 중, 키 값이 null인 레코드 → 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장됨
커스텀 파티셔너 구현하기
- 항상 키값을 해시 처리해서 파티션을 결정해야만 하는 것은 아님
- 다른 방식으로 파티션을 할당하는 것이 나은 경우도 존재
- → 커스텀 파티셔너 사용
7. 헤더
- 레코드는 키값, 밸류값 이외에도 헤더 포함 가능
- 추가 메타데이터를 심을 때 사용
- 용도: 메세지 전달 내역 기록
- 데이터가 생성된 곳의 정보를 헤더에 저장
- → 이 정보로 메세지를 라우팅 하거나 출처 추적 가능
- 헤더는 순서가 있는 키/밸류 쌍의 집합으로 구현됨
8. 인터셉터
- 카프카 클라이언트 코드 고치지 않으면서 그 작동 변경해야 하는 경우 사용
- 사용 사례
- 모니터링, 정보 추적, 표준 헤더 삽입 등
- 메세지가 생성된 위치에 대한 정보 심음 → 메세지 전달 경로 추적, 민감 정보 삭제 처리 등
9. 쿼터, 스로틀링
- 카프카 브로커에는 쓰기/읽기 속도 제한 기능
- 한도(쿼터 quota) 설정
- 쓰기 쿼터/읽기 쿼터/요청 쿼터
- 쓰기 쿼터, 읽기 쿼터: 클라이언트가 데이터 전송하거나 받는 속도를 초당 바이트 수 단위로 제한
- 요청 쿼터: 브로커 요청 처리 시간 비율 단위로 제한
- 한도(쿼터 quota) 설정
'Boaz > Real-time Data and Kafka' 카테고리의 다른 글
[카프카 핵심 가이드 #3] 4장 카프카 컨슈머: 카프카에서 데이터 읽기 (0) | 2024.08.22 |
---|---|
[카프카 핵심 가이드 #1] 1장 카프카 시작하기 (0) | 2024.08.22 |
[대규모 실시간 데이터 처리 #4] 대규모 시스템 설계 기초. 12장 채팅 시스템 설계 (0) | 2024.08.04 |
[대규모 실시간 데이터 처리 #3] 대규모 시스템 설계 기초. 9장 웹 크롤러 설계 (0) | 2024.08.04 |
[대규모 실시간 데이터 처리 #2] 대규모 시스템 설계 기초. 2장~3장 개략적인 규모 추정, 시스템 설계 면접 공략법 (0) | 2024.07.28 |