Apache Kafka HDInsight 클러스터의 성능 최적화

이 문서에서는 HDInsight에서 Apache Kafka 워크로드의 성능을 최적화하기 위한 몇 가지 제안 사항을 제공합니다. 프로듀서, 브로커, 소비자 구성을 조정하는 데 중점을 둡니다. 워크로드가 많은 성능을 조정하려면 OS 설정을 조정해야 하는 경우도 있습니다. 성능을 측정하는 방법은 다양하며 적용하는 최적화는 비즈니스 요구 사항에 따라 다릅니다.

아키텍처 개요

Kafka 토픽은 레코드를 구성하는 데 사용됩니다. 생산자는 레코드를 생성하고 소비자는 레코드를 소비합니다. 생산자는 Kafka 브로커에 레코드를 보내고 데이터를 저장합니다. HDInsight 클러스터의 각 작업자 노드는 Kafka broker입니다.

토픽은 브로커 간에 레코드를 분할합니다. 레코드를 소비할 때 데이터의 병렬 처리를 위해 파티션당 최대 1개의 소비자를 사용할 수 있습니다.

복제는 노드 간에 파티션을 복제하는 데 사용됩니다. 이 파티션은 노드(broker) 중단을 방지합니다. 복제본 그룹 중 단일 파티션을 파티션 리더로 지정합니다. 생산자 트래픽은 ZooKeeper에서 관리하는 상태를 사용하여 각 노드의 선행부로 라우팅됩니다.

시나리오 식별

Apache Kafka 성능에는 처리량과 대기 시간의 두 가지 주요 측면이 있습니다. 처리량은 데이터를 처리할 수 있는 최대 속도입니다. 처리량이 높을수록 좋습니다. 대기 시간은 데이터를 저장하거나 검색하는 데 걸리는 시간입니다. 대기 시간이 짧을수록 좋습니다. 처리량, 대기 시간 및 애플리케이션 인프라 비용 간의 적절한 균형을 찾는 것은 어려울 수 있습니다. 성능 요구 사항은 높은 처리량, 짧은 대기 시간 또는 둘 다 필요한지 여부에 따라 다음 세 가지 일반적인 상황 중 하나와 일치해야 합니다.

  • 높은 처리량, 짧은 대기 시간. 이 시나리오에는 높은 처리량과 짧은 대기 시간(~100밀리초)이 모두 필요합니다. 이러한 유형의 애플리케이션의 예는 서비스 가용성 모니터링입니다.
  • 높은 처리량, 높은 대기 시간. 이 시나리오는 높은 처리량(~1.5GBps)이 필요하지만 더 긴 대기 시간(< 250ms)을 허용할 수 있습니다. 이러한 유형의 애플리케이션의 예로는 보안 및 침입 탐지 애플리케이션과 같은 실시간에 가까운 프로세스를 위한 원격 분석 데이터 수집이 있습니다.
  • 낮은 처리량, 짧은 대기 시간. 이 시나리오는 실시간으로 처리하기 위해 짧은 대기 시간(< 10ms)이 필요하지만 더 낮은 처리량을 허용할 수 있습니다. 이러한 유형의 애플리케이션의 예로는 온라인 맞춤법 및 문법 검사가 있습니다.

생산자 구성

다음 섹션에서는 Kafka 프로듀서의 성능을 최적화하는 데 가장 중요한 일반 구성 속성 중 일부를 강조 표시합니다. 모든 구성 속성에 대한 자세한 설명은 생산자 구성에 대한 Apache Kafka 설명서를 참조하세요.

Batch 크기

Apache Kafka 생산자는 단일 스토리지 파티션에 저장될 단위로 전송되는 메시지 그룹(일괄 처리라고 함)을 조합합니다. 일괄 처리의 크기는 해당 그룹이 전송되기 전에 있어야 하는 바이트 수를 의미합니다. batch.size 매개 변수를 늘리면 네트워크 및 IO 요청의 처리 오버헤드가 줄어들기 때문에 처리량이 증가할 수 있습니다. 가벼운 로드에서 일괄 처리 크기가 증가하면 생산자가 일괄 처리가 준비될 때까지 기다릴 때 Kafka 전송 대기 시간이 증가할 수 있습니다. 로드가 많은 경우 처리량과 대기 시간을 개선하기 위해 일괄 처리 크기를 늘리는 것이 좋습니다.

생산자 필수 승인

생산자 필수 acks 구성은 쓰기 요청이 완료된 것으로 간주되기 전에 파티션 리더에 필요한 승인 수를 결정합니다. 이 설정은 데이터 안정성에 영향을 미치며 0, 1 또는 -1 값을 사용합니다. 값 -1은 쓰기가 완료되기 전에 모든 복제본에서 승인을 받아야 함을 의미합니다. acks = -1을 설정하면 데이터 손실에 대한 더 강력한 보장이 제공되지만 대기 시간이 길어지고 처리량이 낮아집니다. 애플리케이션 요구 사항이 더 높은 처리량을 요구하는 경우 acks = 0 또는 acks = 1을 설정해 보세요. 모든 복제본을 확인하지 않으면 데이터 안정성이 저하될 수 있습니다.

압축

Kafka 생산자는 메시지를 브로커에 보내기 전에 압축하도록 구성할 수 있습니다. compression.type 설정은 사용할 압축 코덱을 지정합니다. 지원되는 압축 코덱은 “gzip”, “snappy” 및 “lz4”입니다. 압축은 유익하며 디스크 용량에 제한이 있는 경우 고려해야 합니다.

일반적으로 사용되는 두 가지 압축 코덱(gzipsnappy) 중에서 gzip은 압축률이 높기 때문에 CPU 로드가 높아지면서 디스크 사용량이 줄어듭니다. snappy 코덱은 더 적은 CPU 오버헤드로 더 적은 압축을 제공합니다. 브로커 디스크 또는 생산자 CPU 제한에 따라 사용할 코덱을 결정할 수 있습니다. gzipsnappy보다 5배 빠른 속도로 데이터를 압축할 수 있습니다.

데이터를 압축하면 디스크에 저장할 수 있는 레코드 수가 늘어납니다. 생산자와 브로커가 사용하는 압축 형식이 일치하지 않는 경우 CPU 오버헤드가 증가할 수도 있습니다. 데이터는 보내기 전에 압축해야 하며 처리하기 전에 압축을 풀어야 하기 때문입니다.

브로커 설정

다음 섹션에서는 Kafka 브로커의 성능을 최적화하기 위한 가장 중요한 설정 중 일부를 강조 표시합니다. 모든 브로커 설정에 대한 자세한 설명은 프로듀서 구성에 대한 Apache Kafka 설명서를 참조하세요.

디스크 수

스토리지 디스크에는 IOPS(초당 입력/출력 작업) 및 초당 읽기/쓰기 바이트가 제한되어 있습니다. 새 파티션을 만들 때 Kafka는 기존 파티션이 가장 적은 디스크에 각각의 새 파티션을 저장하여 사용 가능한 다른 디스크와 균형을 맞춥니다. 스토리지 전략에도 불구하고 각 디스크에서 수백 개의 파티션 복제본을 처리할 때 Kafka는 사용 가능한 디스크 처리량을 쉽게 포화시킬 수 있습니다. 여기서 처리량과 비용 간의 트레이드 오프가 작용합니다. 애플리케이션에 더 많은 처리량이 필요한 경우 브로커당 더 많은 관리 디스크가 있는 클러스터를 만듭니다. HDInsight는 현재 실행 중인 클러스터에 관리 디스크 추가를 지원하지 않습니다. 관리 디스크 수를 구성하는 방법에 대한 자세한 내용은 HDInsight에서 Apache Kafka의 스토리지 및 확장성 구성을 참조하세요. 클러스터의 노드에 대한 스토리지 공간 증가의 비용 면에서의 영향을 이해합니다.

토픽 및 파티션 수

Kafka 생산자는 토픽에 글을 씁니다. Kafka 소비자는 토픽을 읽습니다. 토픽은 디스크의 데이터 구조인 로그와 연결됩니다. Kafka는 생산자의 레코드를 토픽 로그 끝에 추가합니다. 토픽 로그는 여러 파일에 분산된 많은 파티션으로 구성됩니다. 이러한 파일은 차례로 여러 Kafka 클러스터 노드에 분산됩니다. 소비자는 자신의 속도로 Kafka 토픽을 읽고 토픽 로그에서 위치(오프셋)를 선택할 수 있습니다.

각 Kafka 파티션은 시스템의 로그 파일이며 생산자 스레드는 동시에 여러 로그에 쓸 수 있습니다. 마찬가지로, 각 소비자 스레드는 하나의 파티션에서 메시지를 읽기 때문에 여러 파티션에서 소비하는 것도 병렬로 처리됩니다.

파티션 밀도(브로커당 파티션 수)를 늘리면 메타데이터 작업 및 파티션 리더와 해당 팔로워 간의 파티션 요청/응답당 오버헤드가 추가됩니다. 데이터 흐름이 없는 경우에도 파티션 복제본은 여전히 리더에서 데이터를 가져오므로 네트워크를 통한 전송 및 수신 요청에 대한 추가 처리가 발생합니다.

Apache Kafka 클러스터 2.1 및 2.4의 경우 HDInsight에서 앞서 언급한 대로 복제본을 포함하여 브로커당 최대 2000개의 파티션을 갖는 것이 좋습니다. 브로커당 파티션 수를 늘리면 처리량이 감소하고 토픽을 사용할 수 없게 될 수도 있습니다. Kafka 파티션 지원에 대한 자세한 내용은 버전 1.1.0에서 지원되는 파티션 수 증가에 대한 공식 Apache Kafka 블로그 게시물을 참조하세요. 토픽 수정에 대한 자세한 내용은 Apache Kafka: 토픽 수정을 참조하세요.

복제본 수

복제 계수가 높을수록 파티션 리더와 팔로워 간에 추가 요청이 발생합니다. 결과적으로 복제 계수가 높을수록 추가 요청을 처리하기 위해 더 많은 디스크와 CPU를 사용하므로 쓰기 대기 시간이 늘어나고 처리량이 감소합니다.

Azure HDInsight에서 Kafka에 대해 3배 이상의 복제를 사용하는 것이 좋습니다. 대부분의 Azure 지역에는 3개의 장애 도메인이 있지만 2개의 장애 도메인만 있는 지역에서는 사용자가 4배 복제를 사용해야 합니다.

복제에 대한 자세한 내용은 Apache Kafka: 복제Apache Kafka: 복제 계수 증가를 참조하세요.

소비자 구성

다음 섹션에서는 Kafka 소비자의 성능을 최적화하는 데 중요한 일반 구성 중 일부를 강조 표시합니다. 모든 구성에 대한 자세한 설명은 소비자 구성에 대한 Apache Kafka 설명서를 참조하세요.

소비자 수

파티션 수를 소비자 수와 동일하게 하는 것이 좋습니다. 소비자 수가 파티션 수보다 적으면 소비자 중 일부가 여러 파티션에서 읽으므로 소비자 대기 시간이 늘어납니다.

소비자 수가 파티션 수보다 크면 해당 소비자가 유휴 상태이므로 소비자 리소스를 낭비하는 것입니다.

소비자의 잦은 리밸런스 방지

소비자 리밸런스는 파티션 소유권 변경(즉, 소비자가 스케일 아웃 또는 스케일 다운), 브로커 크래시(브로커가 소비자 그룹의 그룹 코디네이터이기 때문), 소비자 충돌, 새 토픽 추가 또는 새 파티션 추가에 의해 트리거됩니다. 리밸런싱하는 동안에는 소비자가 소비할 수 없으므로 대기 시간이 증가합니다.

소비자는 session.timeout.ms 내 브로커에 하트비트를 보낼 수 있는 경우 활성 상태인 것으로 간주됩니다. 그렇지 않으면 소비자는 죽었거나 실패한 것으로 간주됩니다. 이러한 지연은 소비자 재조정으로 이어집니다. 소비자 session.timeout.ms를 낮추면 이러한 오류를 더 빨리 감지할 수 있습니다.

session.timeout.ms가 너무 낮으면 메시지 일괄 처리가 처리되는 데 시간이 오래 걸리거나 JVM GC 일시 중지 시간이 너무 오래 걸리는 경우 등의 시나리오로 인해 소비자가 불필요한 리밸런싱을 반복할 수 있습니다. 메시지를 처리하는 데 너무 많은 시간을 보내는 소비자가 있는 경우, 더 많은 레코드를 max.poll.interval.ms로 가져오기 전에 소비자가 유휴 상태가 될 수 있는 시간의 상한을 늘리거나 max.poll.records 구성 매개 변수로 반환되는 일괄 처리의 최대 크기를 줄여서 이 문제를 해결할 수 있습니다.

일괄 처리

프로듀서와 마찬가지로, 소비자를 위한 일괄 처리를 추가할 수 있습니다. 각 페치 요청에서 소비자가 가져올 수 있는 데이터의 양은 fetch.min.bytes 구성을 변경하여 구성할 수 있습니다. 이 매개 변수는 소비자의 페치 응답에서 예상되는 최소 바이트를 정의합니다. 이 값을 늘리면 브로커에 대한 페치 요청 수가 줄어들어서 추가 오버헤드가 감소합니다. 기본적으로 이 값은 1입니다. 마찬가지로, 다른 fetch.max.wait.ms 구성이 있습니다. 페치 요청에 fetch.min.bytes 크기만큼 메시지가 충분하지 않으면 이 fetch.max.wait.ms 구성에 따라 대기 시간이 만료될 때까지 대기합니다.

참고 항목

일부 시나리오에서는 메시지를 처리하지 못하면 소비자는 속도가 느리다고 느낄 수 있습니다. 예외 후 오프셋을 커밋하지 않으면 소비자가 무한 루프의 특정 오프셋에 고정되고 앞으로 이동하지 않으므로 결과적으로 소비자 쪽에서 지연 시간이 길어집니다.

워크로드가 많은 Linux OS 튜닝

메모리 맵

vm.max_map_count는 프로세스에서 사용할 수 있는 최대 mmap 수를 정의합니다. 기본적으로 HDInsight Apache Kafka 클러스터 Linux VM의 값은 65535입니다.

Apache Kafka에서 각 로그 세그먼트에는 index/timeindex 파일 쌍이 필요하며 이러한 각 파일은 하나의 mmap을 사용합니다. 즉, 각 로그 세그먼트는 두 개의 mmap을 사용합니다. 따라서 각 파티션이 단일 로그 세그먼트를 호스트하는 경우 최소 두 개의 mmap이 필요합니다. 파티션당 로그 세그먼트 수는 세그먼트 크기, 부하 강도, 보존 정책, 롤링 기간에 따라 다르며, 일반적으로 둘 이상인 경향이 있습니다. Mmap value = 2*((partition size)/(segment size))*(partitions)

필요한 mmap 값이 vm.max_map_count를 초과하면 브로커에서 ‘맵 실패’ 예외가 발생합니다.

이 예외를 방지하려면, 아래의 명령을 사용하여 vm의 mmap 크기를 확인하고 각 작업자 노드에서 필요한 경우 크기를 늘립니다.

# command to find number of index files:
find . -name '*index' | wc -l

# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l

# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>

# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p

참고 항목

VM에서 메모리를 사용하므로, 이 값을 너무 높게 설정하는 것에 주의합니다. 메모리 맵의 JVM에서 사용할 수 있는 메모리의 양은 MaxDirectMemory 설정에 따라 결정됩니다. 기본값은 64MB입니다. 이 값에 도달할 수 있습니다. Ambari를 통해 JVM 설정에 -XX:MaxDirectMemorySize=amount of memory used를 추가하여 이 값을 늘릴 수 있습니다. 노드에서 사용되는 메모리의 양과 이를 지원하는 데 사용할 수 있는 RAM이 충분한지 여부를 인지하도록 합니다.

다음 단계