Kafka 用 Azure Event Hubs に対してパスワードレス接続を使用するようにアプリケーションを移行する

この記事では、従来の認証方法から、Kafka 用 Azure Event Hubs とのより安全なパスワードレス接続に移行する方法について説明します。

Kafka 用の Azure Event Hubs へのアプリケーション要求を認証する必要があります。 Kafka 用 Azure Event Hubs には、アプリが安全に接続するためのさまざまな方法が用意されています。 方法の 1 つは、接続文字列を使用することです。 ただしアプリケーションには、可能であればパスワードレス接続を優先的に使用することをお勧めします。

パスワードレス接続は、Spring Cloud Azure 4.3.0 以降でサポートされています。 この記事は、Spring Cloud Stream Kafka アプリケーションから資格情報を削除するための移行ガイドです。

認証オプションを比較する

アプリケーションが Kafka 用の Azure Event Hubs で認証されると、Event Hubs 名前空間に接続するための承認されたエンティティが提供されます。 Apache Kafka プロトコルでは、認証用の複数の単純な認証およびセキュリティ層 (SASL) メカニズムが提供されます。 SASL メカニズムによると、セキュリティで保護されたリソースへのアクセスを承認するために使用できる認証オプションは、Microsoft Entra 認証と Shared Access Signature (SAS) 認証の 2 つです。

Microsoft Entra 認証

Microsoft Entra 認証は、Microsoft Entra ID で定義された ID を使用して、Kafka 用 Azure Event Hubs に接続するメカニズムです。 Microsoft Entra 認証を使用すると、サービス プリンシパル ID やその他のMicrosoft サービスを中央の場所で管理できるため、アクセス許可の管理が簡素化されます。

認証に Microsoft Entra ID を使用すると、次の利点があります。

  • Azure サービス全体でのユーザー認証の一元化。
  • パスワード ポリシーとパスワード ローテーションの一元管理。
  • Microsoft Entra ID でサポートされる複数の形式の認証により、パスワードを格納する必要がなくなります。
  • お客様は、外部 (Microsoft Entra ID) グループを使用して Event Hubs のアクセス許可を管理できます。
  • Kafka 用 Azure Event Hubs に接続するアプリケーションのトークンベース認証のサポート。

SAS 認証

Event Hubs では、Kafka リソースの Event Hubs への委任されたアクセス用の Shared Access Signature (SAS) も提供されます。

SAS を使用して Kafka 用 Azure Event Hubs に接続することはできますが、注意して使用する必要があります。 セキュリティで保護されていない場所で接続文字列を公開することは決してありません。 接続文字列にアクセスできるユーザーは誰でも認証できます。 たとえば、悪意のあるユーザーが、接続文字列が誤ってソース管理にチェックインされたり、セキュリティで保護されていないメールを通じて送信されたり、間違ったチャットに貼り付けたり、アクセス許可を持たないユーザーによって表示されたりした場合に、アプリケーションにアクセスするリスクがあります。 代わりに、OAuth 2.0 トークン ベースのメカニズムを使用してアクセスを承認すると、SAS よりも優れたセキュリティと使いやすさが提供されます。 パスワードレス接続を使用するようにアプリケーションを更新することを検討してください。

パスワードレス接続の概要

パスワードなしの接続を使用すると、アプリケーション コード、その構成ファイル、または環境変数に資格情報を格納することなく、Azure サービスに接続できます。

多くの Azure サービスでは、たとえば Azure マネージド ID を使用したパスワードレス接続がサポートされています。 これらの手法により、Azure Identity クライアント ライブラリから DefaultAzureCredential を使用して実装できる堅牢なセキュリティ機能が提供されます。 このチュートリアルでは、接続文字列などの代替手段ではなく、既存のアプリケーションを更新して使用DefaultAzureCredentialする方法について説明します。

DefaultAzureCredential は複数の認証方法をサポートしており、どの方法が使用されるかは実行時に決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカル開発環境と運用環境) で異なる認証方法をアプリに使用できます。

資格情報を検索するDefaultAzureCredential順序と場所については、Azure ID ライブラリの概要参照してください。 たとえば、ローカルで作業する場合は、通常、 DefaultAzureCredential 開発者が Visual Studio へのサインインに使用したアカウントを使用して認証を行います。 アプリを Azure にデプロイすると、DefaultAzureCredential が自動的に切り替わってマネージド ID が使用されるようになります。 この移行のためにコードを変更する必要はありません。

接続がパスワードレスであることを確認するには、ローカル開発と運用環境の両方を考慮する必要があります。 いずれかの場所で接続文字列が必要な場合、アプリケーションはパスワードレスになりません。

ローカル開発環境では、Visual Studio Code または IntelliJ 用の Azure CLI、Azure PowerShell、Visual Studio、または Azure プラグインを使用して認証できます。 この場合、プロパティを構成する代わりに、アプリケーションでその資格情報を使用できます。

仮想マシンなどの Azure ホスティング環境にアプリケーションをデプロイする場合は、その環境でマネージド ID を割り当てることができます。 その後、Azure サービスに接続するために資格情報を指定する必要はありません。

Note

マネージド ID は、アプリまたはサービスを表すセキュリティ ID を提供します。 ID は Azure プラットフォームによって管理され、ユーザーがシークレットをプロビジョニングまたはローテーションする必要はありません。 マネージド ID の詳細については、概要ドキュメントを参照してください。

パスワードレス接続を使用するように既存のアプリケーションを移行する

次の手順では、SAS ソリューションではなくパスワードレス接続を使用するように既存のアプリケーションを移行する方法について説明します。

0) ローカル開発認証のための作業環境を準備する

まず、次のコマンドを使用して、いくつかの環境変数を設定します。

export AZ_RESOURCE_GROUP=<YOUR_RESOURCE_GROUP>
export AZ_EVENTHUBS_NAMESPACE_NAME=<YOUR_EVENTHUBS_NAMESPACE_NAME>
export AZ_EVENTHUB_NAME=<YOUR_EVENTHUB_NAME>

プレースホルダーは、この記事全体で使用される次の値に置き換えてください。

  • <YOUR_RESOURCE_GROUP>: 使用するリソース グループの名前。
  • <YOUR_EVENTHUBS_NAMESPACE_NAME>: 使用する Azure Event Hubs 名前空間の名前。
  • <YOUR_EVENTHUB_NAME>: 使用するイベント ハブの名前。

1) Azure Event Hubs のアクセス許可を付与する

このサンプルを Microsoft Entra 認証でローカルに実行する場合は、Azure Toolkit for IntelliJ、Visual Studio Code Azure Account プラグインまたは Azure CLI でユーザー アカウントが認証されていることを確認します。 また、アカウントに十分なアクセス許可が付与されていることを確認します。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. Event Hubs の概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [追加] を選択、結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    [ロールの割り当ての追加] が強調表示されている Event Hubs 名前空間リソースの Azure portal アクセス制御 (IAM) ページのスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubs Data SenderAzure Event Hubs Data Receiver を検索し、一致する結果を選択して、[次へ] を選択します。

  6. [アクセスの割り当て] で、[ユーザー、グループ、またはサービス プリンシパル] を選択し、[メンバーの選択] を選択します

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

アクセス ロールの付与の詳細については、「Microsoft Entra ID を使用して Event Hubs リソースへのアクセスを承認する」を参照してください

2) サインインしてアプリ コードを移行してパスワードレス接続を使用する

ローカル開発の場合は、Event Hubs でロールを割り当てたのと同じ Microsoft Entra アカウントで認証されていることを確認します。 認証には、Azure CLI、Visual Studio、Azure PowerShell のほか、IntelliJ などのツールを使用できます。

次のコマンドを使用して、Azure CLI を使用して Azure にサインインします。

az login

次に、次の手順を使用して、パスワードレス接続を使用するように Spring Kafka アプリケーションを更新します。 概念的には似ていますが、各フレームワークは異なる実装の詳細を使用します。

  1. プロジェクト内で、pom.xml ファイルを開き、次の参照を追加します。

    <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-identity</artifactId>
       <version>1.6.0</version>
    </dependency>
    
  2. 移行後、次の例に示すように、OAuth2 認証用に AuthenticateCallbackHandlerOAuthBearerToken をプロジェクトに実装します。

    public class KafkaOAuth2AuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    
       private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(30);
       private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
    
       private Function<TokenCredential, Mono<OAuthBearerTokenImp>> resolveToken;
       private final TokenCredential credential = new DefaultAzureCredentialBuilder().build();
    
       @Override
       public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
          TokenRequestContext request = buildTokenRequestContext(configs);
          this.resolveToken = tokenCredential -> tokenCredential.getToken(request).map(OAuthBearerTokenImp::new);
       }
    
       private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
          URI uri = buildEventHubsServerUri(configs);
          String tokenAudience = buildTokenAudience(uri);
    
          TokenRequestContext request = new TokenRequestContext();
          request.addScopes(tokenAudience);
          return request;
       }
    
       @SuppressWarnings("unchecked")
       private URI buildEventHubsServerUri(Map<String, ?> configs) {
          String bootstrapServer = Arrays.asList(configs.get(BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
          bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
          URI uri = URI.create("https://" + bootstrapServer);
          return uri;
       }
    
       private String buildTokenAudience(URI uri) {
          return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
       }
    
       @Override
       public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
          for (Callback callback : callbacks) {
             if (callback instanceof OAuthBearerTokenCallback) {
                OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
                this.resolveToken
                        .apply(credential)
                        .doOnNext(oauthCallback::token)
                        .doOnError(throwable -> oauthCallback.error("invalid_grant", throwable.getMessage(), null))
                        .block(ACCESS_TOKEN_REQUEST_BLOCK_TIME);
             } else {
                throw new UnsupportedCallbackException(callback);
             }
          }
       }
    
       @Override
       public void close() {
          // NOOP
       }
    }
    
    public class OAuthBearerTokenImp implements OAuthBearerToken {
        private final AccessToken accessToken;
        private final JWTClaimsSet claims;
    
        public OAuthBearerTokenImp(AccessToken accessToken) {
            this.accessToken = accessToken;
            try {
                claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
            } catch (ParseException exception) {
                throw new SaslAuthenticationException("Unable to parse the access token", exception);
            }
        }
    
        @Override
        public String value() {
            return accessToken.getToken();
        }
    
        @Override
        public Long startTimeMs() {
            return claims.getIssueTime().getTime();
        }
    
        @Override
        public long lifetimeMs() {
            return claims.getExpirationTime().getTime();
        }
    
        @Override
        public Set<String> scope() {
            // Referring to https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the scp
            // claim is a String, which is presented as a space separated list.
            return Optional.ofNullable(claims.getClaim("scp"))
                    .map(s -> Arrays.stream(((String) s)
                    .split(" "))
                    .collect(Collectors.toSet()))
                    .orElse(null);
        }
    
        @Override
        public String principalName() {
            return (String) claims.getClaim("upn");
        }
    
        public boolean isExpired() {
            return accessToken.isExpired();
        }
    }
    
  3. Kafka プロデューサーまたはコンシューマーを作成するときに、SASL/OAUTHBEARER メカニズムをサポートするために必要な構成を追加します。 次の例は、移行前と移行後のコードの外観を示しています。 どちらの例でも、プレースホルダーを <eventhubs-namespace> Event Hubs 名前空間の名前に置き換えます。

    移行前のコードは、次の例のようになります。

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG,
            String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString));
    return new KafkaProducer<>(properties);
    

    移行後、コードは次の例のようになります。 この例では、プレースホルダーを <path-to-your-KafkaOAuth2AuthenticateCallbackHandler> 実装した KafkaOAuth2AuthenticateCallbackHandlerクラスの完全な名前に置き換えます。

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required");
    properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "<path-to-your-KafkaOAuth2AuthenticateCallbackHandler>");
    return new KafkaProducer<>(properties);
    

アプリをローカルで実行する

これらの変更をコードに対して行った後、アプリケーションをローカルで実行します。 Azure CLI、Visual Studio、IntelliJ などの互換性のある IDE またはコマンド ライン ツールにログインしていることを前提として、新しい構成でローカル資格情報を取得する必要があります。 アプリは、Azure のローカル開発ユーザーに割り当てられたロールを使用して、Azure サービスにローカルから接続できます。

3) Azure ホスティング環境を構成する

アプリケーションがパスワードレス接続を使用するように構成され、ローカルで実行されると、Azure にデプロイされた後、同じコードを Azure サービスに対して認証できます。 たとえば、マネージド ID が割り当てられている Azure Spring Apps インスタンスにデプロイされたアプリケーションは、Kafka 用 Azure Event Hubs に接続できます。

このセクションでは、2 つの手順を実行して、アプリケーションをパスワードなしの方法で Azure ホスティング環境で実行できるようにします。

  • Azure ホスティング環境のマネージド ID を割り当てます。
  • マネージド ID にロールを割り当てます。

Note

Azure には Service Connector も用意されています。これは、ホスティング サービスを Event Hubs に接続するのに役立ちます。 Service Connector を使用してホスティング環境を構成すると、Service Connector によって自動的に行われるため、マネージド ID にロールを割り当てる手順を省略できます。 次のセクションでは、2 つの方法で Azure ホスティング環境を構成する方法について説明します。1 つは Service Connector 経由で、もう 1 つは各ホスティング環境を直接構成することです。

重要

Service Connector のコマンドには、Azure CLI 2.41.0 以降が必要です。

Azure ホスティング環境のマネージド ID を割り当てる

次の手順では、さまざまな Web ホスティング サービスにシステム割り当てマネージド ID を割り当てる方法を示します。 このマネージド ID は、先ほど設定したアプリ構成を使用して、他の Azure サービスに安全に接続できます。

  1. Azure アプリ サービス インスタンスのメイン概要ページで、ナビゲーション ウィンドウから [ID] を選択します。

  2. [システム割り当て済み] タブで、[ステータス] フィールドをオン設定します。 システム割り当て ID は Azure によって内部的に管理され、この ID によって管理タスクが自動的に処理されます。 ID の詳細と ID がコードで公開されることはありません。

Azure CLI を使用して、Azure ホスティング環境でマネージド ID を割り当てることもできます。

次の例に示すように、az webapp identity assign コマンドを使用して、マネージド ID を Azure アプリ Service インスタンスに割り当てることができます。

export AZURE_MANAGED_IDENTITY_ID=$(az webapp identity assign \
    --resource-group $AZ_RESOURCE_GROUP \
    --name <app-service-name> \
    --query principalId \
    --output tsv)

マネージド ID にロールを割り当てる

次に、Event Hubs 名前空間にアクセスするために作成したマネージド ID にアクセス許可を付与します。 ローカル開発ユーザーと同じように、マネージド ID にロールを割り当てることでアクセス許可を付与できます。

Service Connector を使用してサービスを接続した場合は、この手順を完了する必要はありません。 次の必要な構成が自動的に処理されました。

  • 接続の作成時にマネージド ID を選択した場合、システム割り当てマネージド ID がアプリ用に作成され、Event Hubs 上に Azure Event Hubs データ送信者ロールと Azure Event Hubs データ レシーバー ロールが割り当てられます

  • 接続文字列を使用することを選択した場合、接続文字列はアプリ環境変数として追加されました。

アプリをテストする

これらのコードに変更を加えた後、ホストされているアプリケーションにブラウザーでアクセスします。 アプリは、Kafka 用 Azure Event Hubs に正常に接続できる必要があります。 Azure 環境にロールの割り当てが反映されるまでに数分かかる場合があることに留意してください。 これでローカル環境と運用環境のどちらでも動作するようにアプリケーションが構成されました。開発者がアプリケーション自体でシークレットを管理する必要はありません。

次の手順

このチュートリアルでは、アプリケーションをパスワードレス接続に移行する方法について説明しました。

この記事で説明されている概念の詳細については、次のリソースを参照してください。