[
infra
]
Kafka
카프카 프로듀서
- 목적, 요구 조건이 다양함 -> 프로듀서 API를 사용하는 방식과 설정에 영향을 미침
메세지 쓰는 순서
- ProducerRecord 객체 생성
- 필수: 레코드가 저장될 토픽, 밸류 지정
- 선택: 키와 파티션 지정
- 전송 API 호출
- 프로듀서는 키와 값 개체를 네트워크로 전송할 수 있도록 직렬화
- 파티션을 지정하지 않았다면 파티셔너에게 보냄
- 파티셔너에서 ProducerRecord 객체의 키 값을 기준으로 파티션 결정
- 같은 토픽, 파티션으로 전송될 레코드들 모음인 레코드 배치(record batch)에 추가
- 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송
- 브로커가 메세지를 받으면 응답을 돌려줌
- 성공적으로 저장: RecordMetadata(토픽, 파티션, 레코드 오프셋을 담음) 객체를 리턴
- 실패: 에러 리턴
카프카 프로듀서 생성
- 3개의 필수 속성 값을 가짐
- bootstrap.servers
- 카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
- 브로커 중 하나가 작동을 정지하는 경우에도 프로듀서가 클러스터에 연결할 수 있도록 최소 2개 이상 지정 권장
- key.serializer
- 카프카에 쓸 레코드의 키 값을 직렬화하기 위해 사용하는 시리얼라이저(serializer) 클래스의 이름
- value.serializer
- 카프카에 쓸 레코드의 밸류값을 직렬화하기 위해 사용하는 시리얼라이즈 클래스의 이름
메시지 전송 방법
- 파이어 앤 포겟 (Fire and Forget)
- 메시지를 서버에 전송마나하고 성공 혹은 실패 여부에는 신경 쓰지 않는다.
- 동기적 전송 (Synchronous Send)
- 다음 메시지를 전송하기 전 get() 메서드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인
- 비동기적 전송 (Asynchronous send)
- 콜백 함수와 함께 send() 메서드를 호출하면 응답을 받는 시점에서 자동으로 콜백 함수 호출
org.apache.kafka.clients.producer.Callback
인터페이스를 구현하는 클래스가 필요함- 콜백은 프로듀서의 메인 스레드에서 실행 -> 콜백이 충분히 빨라야함
프로듀서 설정하기
- client.id
- 프로듀서와 그것을 사용하는 애플리케이션을 구분하기 위한 논리적 식별자
- acks
- 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정
- 신뢰성과 프로듀서 지연 사이에는 트레이드 오프 관계가 있음
- acks=0 : 프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 브로커의 응답을 기다리지 않음
- acks=1 : 프로듀서는 리더 레클리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받는다.
- acks=all : 프로듀서는 메시지가 모든 인-싱크 레플리카 (in-sync replica)에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받는다.
- 메시지 전달 시간
- send()를 호출했을 때 성공 혹은 실패하기까지 시간
- 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않음
- 크래시 난 브로커가 정상으로 돌아오기까지의 시간을 테스트한 뒤 delivery.timeout.ms 매개변수를 잡아주는 것을 권장
- max.block.ms: 메타데이터를 요청했을 때 프로듀서가 얼마나 오랫동안 블록되는지를 결정
- delivery.timeout.ms: 레코드 전송 준비가 완료된 시점에서부터 브로커의 응답을 받거나, 전송을 포기하게 되는 시점까지의 제한시간
- request.timeout.ms: 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지를 결정
- retries: 메시지 재전송 횟수
- retry.backoff.ms: 재전송 간격 조정
- linger.ms: 현재 배치를 전송하기 전까지 대기하는 시간
- buffer.memory: 메시지를 대기시키는 버퍼의 크기
- compression.type: 매개변수 snappy, gzip, lz4, zstd 중 하나로 설정하면 해당 압축 알고리즘 사용
- batch.size: 각각의 배치에 사용될 메모리 양 결정
- max.in.flight.requests.per.connection: 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수
- 원래 메시지 순서는 보존하지만 재시도하면서 순서가 뒤집어질 수 있음
- max.request.size: 전송하는 쓰기 요청의 크기
- receive.buffer.bytes, send.buffer.bytes: 데이터를 읽거나 쓸 때 소켓이 사용하는 TCP 송수신 버퍼의 크기를 결정
- enable.idempotence: 메시지 중복 저장을 방지
시리얼라이저
- 모두 같은 로직을 가지고 있어야하고 변경이 힘들기 때문에 범용 라이브러리 사용을 권장
- 아파치 에이브로
- 데이터를 읽는 쪽 애플리케이션을 전부 변경하지 않고 스키마를 변경하더라도 예외나 에러가 발생하지 않음
- 데이터를 쓸 때 사용하는 스키마와 읽을 떄 기대하는 스키마가 호환되어야 함
- 역직렬화를 할 때는 데이터를 쓸 때 사용했던 스키마에 접근이 가능해야함
파티션
- 접착성 처리: 요청의 수를 줄임
헤더
- 레코드의 키/밸류를 건드리지 않고 추가 메타데이터를 심을 때 사용함
인터셉터
- 클라이언트의 코드를 고치지 않으면서, 그 작동을 변경해야 하는 경우 사용
- ProducerInterceptor
- onSend, onAcknowledgment
- 모니터링, 정보 추적, 표준 헤더 삽입
- kafka-console-producer.sh 툴과 함께 사용하여 클라이언트 코드를 고치지 않고 인터셉터 적용
쿼터, 스로틀링
- 쿼터(quota): 한도
- 쓰기 쿼터 (produce quota), 읽기 쿼터 (consume quota), 요청 쿼터 (request quota)
- 브로커 설정 파일에서 지정 가능, kafka-configs.sh
- JMX 메트릭을 통해서 스로틀링 작동 여부를 확인
- produce-throttle-time-avg
- produce-throttle-time-max
- fetch-throttle-time-avg
- fetch-throttle-time-max