[
infra
]
Kafka
카프카에서 데이터를 읽는 것은 다른 메시지 전달 시트템에서 데이터를 읽는 것과 조금 다름
카프카 컨슈머
컨슈머, 컨슈머 그룹
- 처리 속도로 인해 밀리지 않기 위해 데이터를 읽어 오는 작업을 확장할 수 있어야 함
- 카프카 컨슈머는 보통 컨슈머 그룹의 일부로서 동작함
- 일반적인 규모 확장 방식
- 하나의 컨슈머로 토픽에 들어오는 데이터의 속동를 감당할 수 없는 상황
- 컨슈머를 추가
- 단위 컨슈머가 처리하는 파티션과 메시지의 수를 분산
- 여러 애플리케이션에서 동일한 토픽에서 데이터를 읽어와야 하는 경우
- 애플리케이션 별로 컨슈머 그룹 생성
- 컨슈머 추가
컨슈머 그룹과 파티션 리밸런스
- 컨슈머 그룹에 속한 컨슈머들은 토픽의 파티션들에 대한 소유권을 공유
- 리밸런스(rebalance): 컨슈머에 할당된 파티션을 다른 컨슈머에게 할당해주는 작업
- 조급한 리밸런스 (eager rebalance)
- 모든 컨슈머는 읽기 작업을 멈춤, 할당되었던 파티션 소유권 포기
- 컨슈머 그룹에 다시 참여, 새로운 파티션 할당
- 협력적 리밸런스 (cooperative rebalance)
- 한 컨슈머에게 할당되어있던 파티션만 다른 컨슈머에게 재할당
- 다른 컨슈머들은 하던일 방해받지 않음
- 재할당할 파티션에 대한 읽기 작업을 멈춤
- group coordinator 역할을 지정받은 브로커에 heartbeat를 전송하여 소유권 유지
정적 그룹 멤버십
- 기본적으로 컨슈머의 그룹 멤버로서의 자격은 일시적임
- 컨슈머에 고유한 group.instance.id 값을 부여하면 정적 멤버가됨
- 컨슈머가 끝나면 자동으로 그룹을 떠나지 않고, 다시 조인하면 이전에 할당받았던 파티션 그대로 재할당
- 파티션 할당을 캐시해두고 있어 리밸런싱을 발생시키지 않음
컨슈머 생성하기
- bootstrap.servers: 카프카 클러스터로의 연결 문자열
- key.deserialize, value.deserializer: 바이트 배열을 자바 객체로 변환
- group.id: 컨슈머 그룹 지정, 필수는 아니지만 그룹에 속하지 않는 컨슈머를 생성하는 것은 일반적이지 않음
토픽 구독하기
- 정규식을 이용하여 다수의 토픽을 구독하려하면 리밸런스가 발생
폴링 루프
- 컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 폴링하는 단순한 루프
- 일정시간(
max.poll.interval.ms
) 폴링되지 않으면 죽은 것으로 간주 - 하나의 스레드당 하나의 컨슈머가 원칙
컨슈머 설정하기
fetch.min.bytes
: 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량fetch.max.wait.ms
: 얼마나 오래 기다릴 것인지 결정fetch.max.bytes
: 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수max.poll.records
: poll을 호출할 때마다 리턴되는 최대 레코드 수max.partition.fetch.bytes
: 서버가 파티션별로 리턴하는 최대 바이트 수session.timeout.ms
,heartbeat.interval.ms
max.poll.interval.ms
: 컨슈머가 폴링 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간