2025.05.18 TIL
[ infra ]

Kafka

카프카 프로듀서

메세지 쓰는 순서

  1. ProducerRecord 객체 생성
    • 필수: 레코드가 저장될 토픽, 밸류 지정
    • 선택: 키와 파티션 지정
  2. 전송 API 호출
    • 프로듀서는 키와 값 개체를 네트워크로 전송할 수 있도록 직렬화
    • 파티션을 지정하지 않았다면 파티셔너에게 보냄
    • 파티셔너에서 ProducerRecord 객체의 키 값을 기준으로 파티션 결정
  3. 같은 토픽, 파티션으로 전송될 레코드들 모음인 레코드 배치(record batch)에 추가
  4. 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송
  5. 브로커가 메세지를 받으면 응답을 돌려줌
    • 성공적으로 저장: RecordMetadata(토픽, 파티션, 레코드 오프셋을 담음) 객체를 리턴
    • 실패: 에러 리턴

카프카 프로듀서 생성

  1. bootstrap.servers
    • 카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
    • 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상 지정 권장
  2. key.serializer
    • 카프카에 쓸 레코드의 키 값을 직렬화하기 위해 사용하는 시리얼라이저(serializer) 클래스의 이름
  3. value.serializer
    • 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이즈 클래스의 이름

메시지 전송 방법

  1. 파이어 앤 포겟 (Fire and Forget)
    • 메시지를 서버에 전송마나하고 성공 혹은 실패 여부에는 신경 쓰지 않는다.
  2. 동기적 전송 (Synchronous Send)
    • 다음 메시지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인
  3. 비동기적 전송 (Asynchronous send)
    • 콜백 함수와 함께 send() 메서드를 호출하면 응답을 받는 시점에서 자동으로 콜백 함수 호출
    • org.apache.kafka.clients.producer.Callback 인터페이스를 구현하는 클래스가 필요함
    • 콜백은 프로듀서의 메인 스레드에서 실행 -> 콜백이 충분히 빨라야함

프로듀서 설정하기

  1. client.id
    • 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자
  2. acks
    • 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정
    • 신뢰성과 프로듀서 지연 사이에는 트레이드 오프 관계가 있음
    • acks=0 : 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않음
    • acks=1 : 프로듀서는 리더 레클리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.
    • acks=all : 프로듀서는 메시지가 모든 인-싱크 레플리카 (in-sync replica)에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받는다.
  3. 메시지 전달 시간
    • send()를 호출했을 때 성공 혹은 실패하기까지 시간
    • 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않음
    • 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 매개변수를 잡아주는 것을 권장
    1. max.block.ms: 메타데이터를 요청했을 때 프로듀서가 얼마나 오랫동안 블록되는지를 결정
    2. delivery.timeout.ms: 레코드 전송 준비가 완료된 시점에서부터 브로커의 응답을 받거나, 전송을 포기하게 되는 시점까지의 제한시간
    3. request.timeout.ms: 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지를 결정
    4. retries: 메시지 재전송 횟수
    5. retry.backoff.ms: 재전송 간격 조정
  4. linger.ms: 현재 배치를 전송하기 전까지 대기하는 시간
  5. buffer.memory: 메시지를 대기시키는 버퍼의 크기
  6. compression.type: 매개변수 snappy, gzip, lz4, zstd 중 하나로 설정하면 해당 압축 알고리즘 사용
  7. batch.size: 각각의 배치에 사용될 메모리 양 결정
  8. max.in.flight.requests.per.connection: 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수
    • 원래 메시지 순서는 보존하지만 재시도하면서 순서가 뒤집어질 수 있음
  9. max.request.size: 전송하는 쓰기 요청의 크기
  10. receive.buffer.bytes, send.buffer.bytes: 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
  11. enable.idempotence: 메시지 중복 저장을 방지

시리얼라이저

파티션

헤더

인터셉터

쿼터, 스로틀링