본문 바로가기
Boaz/Real-time Data and Kafka

[카프카 핵심 가이드 #2] 3장 카프카 프로듀서: 카프카에 메시지 쓰기

by 남디윤 2024. 8. 22.

보아즈 멘멘 스터디 중 기록한 내용입니다.

 

 

 

목차

0. 장을 시작하며

1. 프로듀서 개요

2. 카프카 프로듀서 생성하기

3. 카프카로 메세지 전달하기

4. 프로듀서 설정하기

5. 시리얼라이저

6. 파티션

7. 헤더

8. 인터셉터

9. 쿼터, 스로틀링

 

 

 

 

0. 장을 시작하며

  • 카프카를 사용할 때
    • 카프카에 데이터를 쓸 때 사용하는 프로듀서
    • 읽어올 때 사용하는 컨슈머
    • 두 가지 기능 모두를 수행하는 애플리케이션 생성
    • 개발자들이 카프카와 상호작용하는 애플리케이션을 개발할 때 사용할 수 있는 클라이언트 API와 함께 배포
  • 프로듀서
    • 디자인, 주요 요소
    • KafkaProducer, ProducterRecord 객체 생성
    • 레코드 전송, 에러 처리
    • 작동 제어를 위한 중요 설정 옵션
    • 파티셔너와 시리얼라이저(객체의직렬화 방식 정의)

 

 

 

1. 프로듀서 개요

  • 카프카에 메세지를 써야하는 상황(목적): 다양
    • 사용자 행동 기록, 성능 메트릭 기록, 정보 수집
    • 다른 애플리케이션과의 비동기적 통신 수행, 임의의 정보를 데이터베이스에 저장하기 전 버퍼링
  • 요구 조건: 다양
    • 메세지 유실 용납/허용
    • 중복 허용
    • 반드시 지켜야할 지연 latency, 처리률 throughtput
  • 프로듀서 요소
    • 카프카에 메세지 쓰는 작업: ProducerRecord 객체 생성
      • 레코드가 저장될 토픽과 밸류 지정 필수
      • 키와 파티션 지정은 선택
    • 키와 값 객체가 네트워크 상에서 전송될 수 있도록, 직렬화, 바이트 배열로 변환
    • (파티션 지정하지 않았다면) 데이터 → 파티셔너 → 파티션 결정 (기준: ProducerReocrd 객체의 키 값)
    • 레코드를 같은 토픽 파티션으로 전송될 레코드들을 모은 레코드 배치 record batch 에 추가
    • 별도의 스레드가 이 레코드 배치를 적절한 카프카 브로커에 전송
    • 브로커가 메세지를 받으면 응답 돌려줌
      • 메세지 저장 성공, 브로커는 토픽, 파티션, 파티션 안에서의 레코드 오프셋을 담은 RecordMetadata 객체 리턴
      • 메세지 저장 실패, 에러 리턴 → 메세지 쓰기 포기 및 재전송 시도 등

 

 

 

2. 카프카 프로듀서 생성하기

  • 프로듀서 객체 생성 필요. 원하는 속성 지정
  • 프로듀서의 필수 속성 값
    • boostrap.servers
      • 카프카 클러스터와 첫 연결을 생성하기 위해, 프로듀서가 사용할 브로커의 host:port 목록
      • 모든 브로커 포함 필요 X, 첫 연결 후 추가 정보 받아옴
      • 브로커 중 작동 정지하는 경우 존재 → 2개 이상 지정 권장
    • key.serializer
      • 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 시리얼라이저serializer 클래스의 이름
      • 자바 객체 → 바이트 배열
      • 키값 없이 밸류값만 보낼 때도 key.serializer 설정 필요 → VoidSerializer를 사용해서 키 타입으로 Void 타입 설정 가능
    • value.serializer
      • 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이저 클래스의 이름
  • 메세지 전송 방법
    • 파이어 앤 포겟
      • 메세지를 서버에 전송만 하고, 성공 혹은 실패 여부 신경X
      • 카프카가 가용성이 높고 프로듀서는 자동으로 전송 실패한 메세지 재전송 → 대부분 성공
      • 그럼에도 에러 발생, 실패 시, 메세지 유실, 예외 전달X
    • 동기적 전송
      • 카프카 프로듀서는 언제나 비동기적으로 작동
      • 메세지를 보내면 send()메서드는 Future 객체 리턴
      • 다음 메세지 전송 전 get() 메서드 호출 해 작업이 완료될 때까지 기다렸다가 실제 성공 여부 확인 필요
    • 비동기적 전송
      • 콜백 함수와 함께 send() 메서드를 호출하면 카프카 브로커로부터 응답 받는 시점에서 자동으로 콜백 함수 호출됨

 

 

 

3. 카프카로 메세지 전달하기

동기적으로 메세지 전송하기

  • 동기적으로 메세지 전송 시, 전송 요청 스레드는 아무것도 안 하면서 기다려야 함
  • 실제로 애플리케이션에서는 잘 사용되지 않음 (코드 예제는 흔함)
    • Future.get()
    • 예외 처리
  • KafkaProducer 두 종류의 에러
    • 재시도 가능한 에러: 메세지 재전송하여 해결 가능 에러
      • 메세지 전송받은 브로커가 해당 파티션의 리더가 아닐 경우 발생 에러
      • → 해당 파티션에 새 리더가 선출되고 업데이트 되면 해결 가능
    • 재시도해도 해결되지 않는 에러: 재시도 없이 바로 예외 발생
      • ex. 메세지 크기가 너무 클 경우

 

비동기적으로 메세지 전송하기

  • 비동기적으로 메세지 전송 시 시간 단축 가능
  • 카프카는 레코드를 쓴 뒤, 해당 레코드의 토픽, 파티션, 오프셋 리턴
    • 대부분 이런 메타 데이터 필요 X
    • 메세지 전송 실패 시 이런 메타 데이터 필요
    • → 예외 발생, 에러 로그 작성, 사후 분석 등
  • 메세지를 비동기적으로 전송하고, 에러를 처리하는 경우를 위해
    • 레코드를 전송할 때 콜백을 지정
    • Callback 객체 구현하여 ProducerRecord에 전달

 

 

 

4. 프로듀서 설정하기

  • 프로듀서 설정 값(속성 값)은 굉장히 많은 수이다.
    • 4-2장에서는 필수적인 속성만 살펴봄
    • 이러한 몇몇 설정 값의 경우 메모리 사용량, 성능, 신뢰성 등에 상당한 영향

 

client.id

  • 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자, 임의의 문자열 사용
  • 브로커는 프로듀서가 보내온 메세지를 서로 구분하기 위해 이 값을 사용
  • 브로커가 로그 메세지 출력 시
  • 성능 메트릭 값 집계 시
  • 클라이언트별로 사용량 할당 시
  • 문제 발생 시 트러블 슈팅

 

acks

  • acks 매개변수: 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지를 결정
    • 기본값: 리더가 해당 레코드를 받은 뒤 쓰기 작업이 성공했다고 응답
    • 설정 가능한 3가지 값
      • acks=0: 프로듀서는 메세지 성공적으로 전달되었다고 간주, 브로커 응답 기다리지X
      • acks=1: 프로듀서는 리더 레플리카가 메세지를 받는 순간 브로커로부터 성공했다는 응답 받음
        • 리더 메세지 쓸 수 없다면, 프로듀서 에러 응답 받음. 유실을 피하기 위해 재전송 시도
        • 리더가 크래시 난 상태, 복제가 안된 채로 리더가 산출될 경우, 메세지가 유실됨
      • acks=all: 프로듀서는 메세지가 모든 인-싱크 레플리카 in-sync replica에 전달된 뒤에야 브로커로부터 성공했다는 응답 받음
        • 가장 안전한 형태
      • 신뢰성을 낮추면 그만큼 레코드 빠르게 보내기 가능
      • → 신뢰성과 프로듀서 지연 사이에 트레이드 오프

 

메세지 전달 시간

image1

  • send() 호출 시, 성공 혹은 실패하기까지 얼마나 시간을 걸리는가
  • ProducerRecord를 보낼 때 걸리는 시간
    • send() 에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간
    • send() 에 대한 비동기 호출이 성공적으로 리턴한 시각부터 콜백이 호출될 때까지 걸리는 시간
  • 두 구간에 영향을 미치는 다른 설정 매개변수
    • max.block.ms
      • 프로듀서가 얼마나 오랫동안 블록되는지를 결정
      • send() 호출, partitionsFor를 호출하여 메타데이터를 요청 시, max.block.ms 만큼 시간이 흐르면 예외 발생
    • delivery.timeout.ms
      • 레코드 전송 준비가 완료된 시점에서 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간 결정
      • linger.ms, request.timeout.ms 보다 커야 함
      • 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms가 넘어가버리면, 리턴 에러 예외와 콜백 호출
      • 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms가 넘어가버리면, 타임아웃 예외와 콜백 호출
    • request.timeout.ms
      • 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지 결정
      • 쓰기 요청 후 전송을 포기하기까지 대기하는 시간
      • 재시간 시간, 실제 전송 이전에 소요되는 시간 포함 X
      • 타임 아웃 발생 시, 재전송 시도 or TimeoutException, 콜백 호출
    • retries, retry.backoff.ms
      • retries : 프로듀서가 메세지 전송을 포기하고 에러를 발생시킬 때까지 메세지를 재전송하는 횟수 결정
      • retry.backoff.ms 재시도 사이의 대기 간격 조정
      • 현재 버전의 카프카에서 이 값들 조정 권장 X
      • → delivery.timeout.ms 매개변수 조정 권장
      • 재전송 끄려면 retries=0 설정

 

linger.ms

  • 현재 배치를 전송하기 전까지 대기하는 시간을 결정
  • KafkaProducer는 현재 배치가 가득 차거나, linger.ms 에 설정된 제한 시간이 되었을 때 메세지 배치 전송
    • 기본적으로는, 프로듀서는 메세지 전송 사용 스레드가 있을 때 곧바로 전송
    • linger.ms는 0보다 큰 값 설정 시, 메세지 추가 가능
    • = 지연 증가, 처리률 크게 증대
    • 단위 메세지당 추가적으로 드는 시간은 매우 작지만 압축 설정이 되어 있는 경우 효율적

 

buffer.memory

  • 프로듀서가 메세지를 전송하기 전에 메세지를 대기시키는 버퍼의 크기(메모리의 양)를 결정

 

compression.type

  • 기본적으로 메세지는 압축되지 않은 상태로 전송
  • 이 매개변수를 설정 시, 해당 압축 알고리즘을 사용해서 메세지 압축한 뒤 브로커로 전송됨
    • snappy
    • gzip
    • lz4, ztsd 등
  • 네트워크 대역폭 제한적일 때 사용하면 좋음
  • 네트워크 사용량, 저장 공간 절약

 

batch.size

  • 같은 파티션에 다수의 레코드가 전송될 경우, 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송
  • batch.size는 각각의 배치에 사용될 메모리의 양을 결정
  • 개수가 아니라 바이트 단위
  • 지나치케 크게 유지한다해서 메세지 전송 지연이 발생하는 것이 아니며,
  • 지나치게 작게 설정하면 오히려 너무 자주 전송해야 해서 오버헤드 발생 가능

 

max.in.flight.requests.per.connection

  • 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메세지의 수 결정
  • 값을 올리면 메모리 사용량 증가, 처리량 증가

 

max.request.size

  • 프로듀서가 전송하는 쓰기 요청의 크기 결정
  • 메세지의 최대 크기 제한, 한 번의 요청에 보낼 수 있는 메세지 최대 개수 제한 (당연히 메세지 최대 크기도 영향을 받음 1MB=1KB*1024개)
  • 브로커가 받아들일 수 있는 최대 메세지 크기 결정하는 매개변수 : message.max.bytes

 

receive.buffer.bytes, send.buffer.bytes

  • 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
  • -1 의 경우 기본값 사용

 

enable.idempotence

  • enable.idempotence=true
    • 멱등적 프로듀서 기능 활성화
    • 같은 메시지를 여러 번 보내더라도 데이터가 중복 저장되지 않도록 막아주는 기능

 

 

 

5. 시리얼라이저

  • 레코드 직렬화

 

커스텀 시리얼 라이저

  • 카프카로 전송해야 하는 객체가 단순한 문자열이나 정수값이 아닌 경우
    • → 커스텀 시리얼라이저(직렬화 로직) 을 사용하거나
    • → 에이브로와 같은 범용 시리얼라이저(직렬화 라이브러리) 사용
  • 범용 시리얼라이저 사용 방안 원장
  • 커스텀 시리얼라이저의 단점
    • 기존 형식, 새 형식 사이의 호환성 유지
    • 같은 로직 사용 필요, 동시에 코드 변경 상황 발생 가능
    • → 범용 라이브러리 사용 권장

 

아파치 에이브로를 사용해서 직렬화하기

  • 아파치 에이브로
    • 언어 중립적인 데이터 직렬화 형식
    • 더 범용적인 데이터 파일 공유 방식을 제공하는 것을 목표로 시작
  • 에이브로 데이터
    • 언어에 독립적인 스키마 형태로 기술
    • 직렬화된 결과물이 저장된 파일을 읽거나 직렬화를 할 때 스키마 정보가 별도로 주어진다고 가정
    • 에이브로 파일 자체에 스키마를 내장하는 방법을 사용
  • 카프카에 적합한 이유
    • 메세지를 쓰는 애플리케이션이 새로운 스키마로 전환하더라도
    • 기존 스키마와 호환성을 유지하는 한,
    • 데이터를 읽는 애플리케이션을 일체의 변경이나 업데이트 없이 계속해서 메세지 처리 가능
  • 주의해야 할 점
    • 데이터를 쓸 때 사용하는 스키마와 읽을 때 기대하는 스키마 호환 필요
    • 역직렬화 시, 데이터를 쓸 때 사용했던 스키마에 접근 가능

 

카프카에서 에이브로 레코드 사용하기

image2

  • 스키마 레지스트리
    • 카프카에 데이터를 쓰기 위해 사용되는 모든 스키마를 저장
      • 레코드에 사용된 스키마의 고유 식별자 필요
    • 모든 작업(스키마를 레지스트리에 저장하고 필요할 때 가져오는)이
      • 주어진 객체를 직렬화하는 시리얼라이저
      • 직렬화된 데이터를 객체로 복원하는 디시얼라이저
      • 내부에서 수행됨
  • 제네릭 에이브로 객체

 

 

 

6. 파티션

  • 카프카 메세지는 키-밸류 순서쌍
    • ProduceRecord 객체는 토픽, 키, 밸류 값 포함
  • 같은 키값을 가진 모든 메세지는 같은 파티션에 저장됨
    • 기본 파티셔너 사용 중, 키 값이 null인 레코드 → 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장됨
      • 메세지 개수의 균형을 맞추기 위해 라운드 로빈 알고리즘 사용됨
    • 기본 파티셔너 사용 중, 키 값이 지정된 상황 → 키 값을 해시한 결과를 기준으로 메세지를 저장할 파티션 특정
    • 기본 파티셔너 외에도 RoundRobinPartitioner와 UniformStickyPartitioner를 포함
      • 메세지가 키 값 포함 시에도, 랜덤 파티션 할당 및 접착성 랜덤 파티션 할당 수행

 

커스텀 파티셔너 구현하기

  • 항상 키값을 해시 처리해서 파티션을 결정해야만 하는 것은 아님
  • 다른 방식으로 파티션을 할당하는 것이 나은 경우도 존재
  • → 커스텀 파티셔너 사용

 

 

 

7. 헤더

  • 레코드는 키값, 밸류값 이외에도 헤더 포함 가능
    • 추가 메타데이터를 심을 때 사용
    • 용도: 메세지 전달 내역 기록
      • 데이터가 생성된 곳의 정보를 헤더에 저장
      • → 이 정보로 메세지를 라우팅 하거나 출처 추적 가능
    • 헤더는 순서가 있는 키/밸류 쌍의 집합으로 구현됨

 

 

 

8. 인터셉터

  • 카프카 클라이언트 코드 고치지 않으면서 그 작동 변경해야 하는 경우 사용
  • 사용 사례
    • 모니터링, 정보 추적, 표준 헤더 삽입 등
    • 메세지가 생성된 위치에 대한 정보 심음 → 메세지 전달 경로 추적, 민감 정보 삭제 처리 등

 

 

 

9. 쿼터, 스로틀링

  • 카프카 브로커에는 쓰기/읽기 속도 제한 기능
    • 한도(쿼터 quota) 설정
      • 쓰기 쿼터/읽기 쿼터/요청 쿼터
      • 쓰기 쿼터, 읽기 쿼터: 클라이언트가 데이터 전송하거나 받는 속도를 초당 바이트 수 단위로 제한
      • 요청 쿼터: 브로커 요청 처리 시간 비율 단위로 제한