Storage Blob을 사용하는 Javascript용 Azure Event Hubs 검사점 저장소 라이브러리

검사점을 저장하고 @azure/event-hubs 라이브러리에서 사용할 EventHubConsumerClient 때 부하 분산을 지원하는 Azure Blob Storage 기반 솔루션

소스 코드 | 패키지(npm) | API 참조 설명서 | 샘플

시작

패키지 설치

npm을 사용하여 Azure Event Hubs 검사점 저장소 Blob 라이브러리 설치

npm install @azure/eventhubs-checkpointstore-blob

필수 구성 요소: 이 패키지를 사용하려면 Azure 구독, Event Hubs 네임스페이스Storage 계정이 있어야 합니다.

Node.js 애플리케이션에서 이 패키지를 사용하는 경우 Node.js 8.x 이상을 사용합니다.

Typescript 구성

TypeScript 사용자는 노드 형식 정의를 설치해야 합니다.

npm install @types/node

또한 tsconfig.json에서 를 사용하도록 설정 compilerOptions.allowSyntheticDefaultImports 해야 합니다. 를 사용하도록 설정한 compilerOptions.esModuleInteropallowSyntheticDefaultImports 경우 는 기본적으로 사용하도록 설정됩니다. 자세한 내용은 TypeScript의 컴파일러 옵션 핸드북 을 참조하세요.

주요 개념

  • 크기 조정: 몇몇 Event Hubs 파티션에서 읽기 소유권을 가진 각 소비자를 사용하여 여러 소비자를 만듭니다.

  • 부하 분산: 부하 분산을 지원하는 애플리케이션은 동일한 이벤트 허브 및 소비자 그룹 및 동일한 CheckpointStore의 이벤트를 사용하도록 구성된 하나 이상의 인스턴스 EventHubConsumerClient 로 구성됩니다. 서로 다른 인스턴스 간에 처리할 파티션을 배포하여 워크로드의 균형을 조정합니다.

  • 검사점: 판독기는 파티션 이벤트 시퀀스 내에서 위치를 표시하거나 커밋하는 프로세스입니다. 검사점은 소비자의 책임으로 소비자 그룹 내에서 파티션별로 발생합니다. 이러한 책임은 각 소비자 그룹에 대해 각 파티션 판독기는 이벤트 스트림의 현재 위치를 추적해야 하며 데이터 스트림이 완료된 것으로 간주되면 서비스를 알릴 수 있다는 것을 의미합니다.

    판독기가 파티션에서 연결을 끊은 경우 다시 연결하면 해당 소비자 그룹에서 해당 파티션의 마지막 판독기에서 이전에 제출한 검사점에서 읽기 시작합니다. 판독기가 연결하면, 오프셋을 이벤트 허브로 전달하여 읽기 시작할 위치를 지정합니다. 이러한 방식으로, 서로 다른 머신에서 실행되는 판독기 간의 장애 조치(failover)가 발생하는 경우 복원력을 제공하고 다운스트림 애플리케이션에서 이벤트를 "완료"로 표시하는 데 검사점을 사용할 수 있습니다. 이 검사점 프로세스에서 더 낮은 오프셋을 지정하면 이전 데이터로 돌아갈 수 있습니다. 이 메커니즘을 통해 검사점을 지정하면 장애 조치 복원력 및 제어된 이벤트 스트림 재생 모두를 사용할 수 있습니다.

    BlobCheckpointStore는 부하 분산 및 검사점 업데이트에 필요한 EventHubConsumerClient에 필요한 키 메서드를 구현하는 클래스입니다.

예제

CheckpointStore Azure Blob Storage 사용하여 만들기

아래 코드 조각을 사용하여 를 만듭니다 CheckpointStore. 스토리지 계정에 연결 문자열 제공해야 합니다.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Azure Blob Storage를 사용하는 검사점 이벤트

Azure Blob Storage 사용하여 수신된 검사점 이벤트는 메서드를 호출 updateCheckpoint() 하는 코드와 함께 SubscriptionEventHandlers 인터페이스와 호환되는 개체를 전달해야 합니다.

이 예제 SubscriptionHandlers 에서는 SubscriptionEventHandlers를 구현하고 검사점도 처리합니다.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

문제 해결

로그 사용 설정

로깅stderrAZURE_LOG_LEVEL 사용하도록 설정하려면 환경 변수를 다음 값 중 하나로 설정할 수 있습니다.

  • verbose
  • 정보
  • warning
  • error

@azure/로거 패키지를 가져오고 로그 수준 값 중 하나를 사용하여 함수를 호출 setLogLevel 하여 프로그래밍 방식으로 로그 수준을 설정할 수도 있습니다.

프로그래밍 방식으로 또는 환경 변수를 통해 AZURE_LOG_LEVEL 로그 수준을 설정할 때 선택한 로그 수준과 같거나 작은 로그를 사용하여 작성된 모든 로그가 내보내집니다. 예를 들어 로그 수준을 로 설정하면 수준에 infowarningerror 대해 작성되고 내보내도 되는 로그입니다. 이 SDK는 로그할 수준을 결정할 때 TypeScript용 Azure SDK 지침을 따릅니다.

또는 이 라이브러리를 사용할 때 로그를 DEBUG 가져올 환경 변수를 설정할 수 있습니다. 종속성에서 로그를 내보내려는 경우에도 유용할 수 있습니다 rhea-promiserhea .

참고: AZURE_LOG_LEVEL 설정된 경우 DEBUG보다 우선합니다. AZURE_LOG_LEVEL 지정하거나 setLogLevel을 호출할 때는 DEBUG를 통해 라이브러리를 지정 azure 하지 마세요.

다음 환경 변수를 설정하여 이 라이브러리를 사용할 때 디버그 로그를 볼 수 있습니다.

  • Eventhubs Checkpointstore Blob에서 정보 수준 디버그 로그만 가져옵니다.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

파일에 로깅

  • 위와 같이 로깅을 사용하도록 설정한 다음 다음과 같이 테스트 스크립트를 실행합니다.

    • 테스트 스크립트의 로깅 문은 로 out.log 이동하고 sdk의 로깅 문은 로 debug.log이동합니다.

      node your-test-script.js > out.log 2>debug.log
      
    • 테스트 스크립트와 sdk의 로깅 문은 stderr를 stdout(&1)로 리디렉션한 다음 stdout을 파일로 리디렉션하여 동일한 out.log 파일로 이동합니다.

      node your-test-script.js >out.log 2>&1
      
    • 테스트 스크립트 및 sdk의 로깅 문은 동일한 파일 out.log로 이동합니다.

      node your-test-script.js &> out.log
      

다음 단계

자세한 예제는 샘플 디렉터리를 참조하세요.

참여

이 라이브러리에 기여하려면 기여 가이드 를 참조하여 코드를 빌드하고 테스트하는 방법에 대해 자세히 알아보세요.

Impressions