Apache Kafka と Azure Databricks を使用したストリーム処理

この記事では、Azure Databricks で構造化ストリーミング ワークロードを実行するときに、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafka の詳細については、Kafka のドキュメントを参照してください。

Kafka からデータを読み取る

Kafka から読み取られたストリーミングの例を次に示します。

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks では、次の例に示すように、Kafka データ ソースのバッチ読み取りセマンティクスもサポートされています。

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

増分バッチ読み込みの場合、Databricks では、Trigger.AvailableNow で Kafka を使用することをお勧めします。 「増分バッチ処理の構成」を参照してください。

Databricks Runtime 13.3 LTS 以降では、Azure Databricks には Kafka データを読み取るための SQL 関数が用意されています。 SQL を使用したストリーミングは、Delta Live Tables または Databricks SQL のストリーミング テーブルでのみサポートされます。 「read_kafka テーブル値関数」を参照してください。

Kafka 構造化ストリーミング リーダーを構成する

Azure Databricks には、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka キーワードが用意されています。

Kafka の最も一般的な構成を次に示します。

サブスクライブするトピックを指定するには、複数の方法があります。 次のパラメーターのいずれか 1 つのみを指定する必要があります。

オプション 説明
subscribe トピックのコンマ区切りリスト。 サブスクライブするトピックの一覧。
subscribePattern Java 正規表現文字列。 トピックのサブスクライブに使用するパターン。
assign JSON 文字列 {"topicA":[0,1],"topic":[2,4]} 使用する特定の topicPartition。

その他の注目すべき構成:

オプション Default value 説明
kafka.bootstrap.servers "ホスト:ポート" のコンマ区切りリストです。 empty [必須] Kafka bootstrap.servers の構成。 Kafka からのデータがないことがわかった場合は、まず、ブローカー アドレスの一覧を確認します。 ブローカー アドレスの一覧が正しくない場合、エラーは発生しないことがあります。 これは、Kafka クライアントによって、ブローカーは最終的に利用可能になると想定され、ネットワーク エラーが発生した場合は永久に再試行されるためです。
failOnDataLoss true または false true [省略可能] データが失われる恐れがある場合にクエリを失敗させるかどうか。 クエリは、トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、Kafka からのデータの読み取りに永続的に失敗する可能性があります。 データが失われた可能性があるかどうかについて控えめな推定を試みます。 これにより、誤ってアラームが発生する場合があります。 予期したとおりに動作しない場合、またはデータ損失にかかわらずクエリの処理を続行する場合は、このオプションを false に設定します。
minPartitions >= 0 の整数、0 = 無効。 0 (無効) [省略可能] Kafka から読み取るパーティションの最小数。 minPartitions オプションを使用して、Kafka から読み取るパーティションの最小数に任意の数を指定して Spark を構成することができます。 通常、Spark では、Kafka の topicPartitions と、Kafka から使用する Spark パーティションが 1 対 1 でマッピングされます。 minPartitions オプションを、Kafka の topicPartitions よりも大きな値に設定すると、Spark では、大きな Kafka パーティションが小さな部分に分割されます。 負荷のピーク時、データ スキュー時、ストリームが遅れているときに、このオプションを設定すると、処理速度を上げることができます。 トリガーごとに Kafka コンシューマーを初期化すると、Kafka に接続するときに SSL を使用する場合、パフォーマンスに影響を与える可能性があります。
kafka.group.id Kafka コンシューマー グループ ID。 設定しない [省略可能] Kafka から読み取り中に使用するグループ ID。 注意して使用する必要があります。 既定では、各クエリによって、データを読み取るための一意のグループ ID が生成されます。 これにより、各クエリでは、独自のコンシューマー グループが使用され、他のコンシューマーによる干渉を受けないため、サブスクライブされたトピックのすべてのパーティションを確実に読み取ることができます。 一部のシナリオ (たとえば、Kafka グループ ベースの承認) では、データを読み取るために特定の承認済みグループ ID を使用することが必要な場合があります。 必要に応じて、グループ ID を設定できます。 ただし、予期しない動作が発生する可能性があるため、細心の注意を払って設定してください。

- クエリ (バッチとストリーミングの両方) を同じグループ ID で同時実行すると、相互に干渉し合い、各クエリでデータの一部しか読み取れなくなる可能性があります。
- これは、クエリを立て続けに起動または再起動した場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms が非常に小さくなるように設定します。
startingOffsets earliest、latest latest [省略可能] クエリが開始されたときの開始点。最も古いオフセットの "earliest" か、各 TopicPartition の開始オフセットを指定する JSON 文字列。 JSON では、オフセットとして -2 を使用して earliest を、-1 で latest を示すことができます。 注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始されるときにのみ適用され、再開では常にクエリが中止された時点の情報が引き継がれます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。

他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。

Kafka レコードのスキーマ

Kafka レコードのスキーマは次のとおりです。

Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

keyvalue は、ByteArrayDeserializer を使用して常にバイト配列として逆シリアル化されます。 キーと値を明示的に逆シリアル化するには、DataFrame 操作 (cast("string") など) を使用します。

Kafka にデータを書き込む

Kafka へのストリーミング書き込みの例を次に示します。

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks では、次の例に示すように、Kafka データ シンクへのバッチ書き込みセマンティクスもサポートされています。

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Kafka 構造化ストリーミング ライターを構成する

重要

Databricks Runtime 13.3 LTS 以降には、既定でべき等の書き込みを有効にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクでバージョン 2.8.0 以下を使用していて、ACL が構成されているが IDEMPOTENT_WRITE が有効になっていない場合、書き込みはエラー メッセージ org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state で失敗します。

このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に .option(“kafka.enable.idempotence”, “false”) を設定します。

DataStreamWriter に提供されるスキーマは、Kafka シンクと対話します。 次のフィールドを使用できます。

列名 必須または省略可能 Type
key 省略可能 STRING または BINARY
value 必須 STRING または BINARY
headers 省略可能 ARRAY
topic 省略可能 (topic がライター オプションとして設定されている場合は無視されます) STRING
partition 省略可能 INT

Kafka への書き込み中に設定される一般的なオプションを次に示します。

オプション Default value 説明
kafka.boostrap.servers <host:port> のコンマ区切りリスト。 なし [必須] Kafka bootstrap.servers の構成。
topic STRING 設定しない [省略可能] すべての行のトピックが書き込まれるように設定します。 このオプションは、データに存在するすべてのトピック列をオーバーライドします。
includeHeaders BOOLEAN false [省略可能] 行に Kafka ヘッダーを含めるかどうか。

他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。

Kafka メトリックを取得する

avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest メトリックを使用して、サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均、最小、最大数を取得できます。 「対話形式によるメトリックの読み取り」を参照してください。

注意

Databricks Runtime 9.1 以降で使用できます。

estimatedTotalBytesBehindLatest の値を調べることで、クエリ プロセスによって消費されていない推定合計バイト数をサブスクライブされたトピックから取得します。 この推定値は、過去 300 秒間に処理されたバッチ数に基づきます。 推定値の基になる期間は、オプション bytesEstimateWindowLength を別の値に設定することによって変更できます。 たとえば、10 分に設定するには、次のようにします。

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、これらのメトリックは、ストリーミング クエリの進行状況ダッシュボードの [生データ] タブに表示されます。

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

SSL を使用して Azure Databricks を Kafka に接続する

Kafka への SSL 接続を有効にするには、Confluent のドキュメント「SSL による暗号化と認証」の手順に従います。 オプションとして、そこで説明されている構成を、プレフィックス kafka. を付けて指定できます。 たとえば、プロパティ kafka.ssl.truststore.location で、信頼ストアの場所を指定します。

Databricks では、次のことが推奨されています。

次の例では、オブジェクト ストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

HDInsight 上の Kafka を Azure Databricks に接続する

  1. HDInsight Kafka クラスターを作成します。

    手順については、「Azure Virtual Network 経由で HDInsight 上の Apache Kafka に接続する」を参照してください。

  2. 正しいアドレスをアドバタイズするように Kafka ブローカーを構成します。

    IP をアドバタイズするように Kafka を構成する」の手順に従います。 Azure Virtual Machines で Kafka を自分で管理する場合、ブローカーの advertised.listeners 構成がホストの内部 IP に設定されていることを確認してください。

  3. Azure Databricks クラスターを作成します。

  4. Kafka クラスターを Azure Databricks クラスターにピアリングします。

    仮想ネットワークをピアリングする」の手順に従います。

Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証

Azure Databricks は、Event Hubs サービスを使用した Spark ジョブの認証をサポートしています。 認証は Microsoft Entra ID によって OAuth で経由で行われます。

AAD 認証のダイアグラム

Azure Databricks では、以下のコンピューティング環境で、クライアント ID とシークレットを使用した Microsoft Entra ID 認証をサポートしています。

  • シングル ユーザー アクセス モードで構成されたコンピューティング上の Databricks Runtime 12.2 LTS 以降。
  • 共有アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.3 LTS 以降。
  • Unity Catalog を使用せずに構成された Delta Live Tables パイプライン。

Azure Databricks では、コンピューティング環境内または Unity Catalog を使用して構成された Delta Live Tables パイプラインで、証明書を使用した Microsoft Entra ID 認証はサポートされていません。

この認証は、共有クラスターや Unity Catalog の Delta Live Tables では機能しません。

構造化ストリーミング Kafka コネクタの構成

Microsoft Entra ID で認証を実行するには、次の値が必要です。

  • テナント ID。 これは、Microsoft Entra ID の [サービス] タブにあります。

  • clientID (アプリケーション ID とも呼ばれます)。

  • クライアント シークレット。 これを取得したら、シークレットとして Databricks ワークスペースに追加する必要があります。 このシークレットを追加するには、「シークレットの管理」を参照してください。

  • EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの [エンティティ] セクションの [Event Hubs] セクションにあります。 複数のトピックを操作するために、Event Hubs レベルで IAM ロールを設定できます。

  • EventHubs サーバー。 これは、特定の Event Hubs 名前空間の概要ページにあります。

    Event Hubs 名前空間

さらに、Entra ID を使用するには、OAuth SASL メカニズム (SASL は汎用プロトコルであり、OAuth は SASL "メカニズム" の一種です) を使用するように Kafka に指示する必要があります。

  • kafka.security.protocol は、SASL_SSL である必要があります。
  • kafka.sasl.mechanism は、OAUTHBEARER である必要があります。
  • kafka.sasl.login.callback.handler.class は、シェーディングされた Kafka クラスのログイン コールバック ハンドラーについて値 kafkashaded を持つ Java クラスの完全修飾名にする必要があります。 正確なクラスについては、次の例を参照してください。

次に、実行例を見てみましょう。

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

潜在的なエラーの処理

  • ストリーミング オプションがサポートされません。

    Unity Catalog を使用して構成された Delta Live Tables パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが表示されることがあります。

    サポートされていないストリーミング エラー

    このエラーを解決するには、サポートされているコンピューティング構成を使用します。 「Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証」を参照してください。

  • 新しい KafkaAdminClient の作成に失敗しました。

    これは、次のいずれかの認証オプションが正しくない場合に Kafka がスローする内部エラーです。

    • クライアント ID (アプリケーション ID とも呼ばれます)
    • テナント ID
    • Event Hubs サーバー

    エラーを解決するには、これらのオプションの値が正しいことを確認します。

    また、この例で既定で提供されている (変更しないように求められた) kafka.security.protocol などの構成オプションを変更すると、このエラーが表示される場合があります。

  • 返されるレコードがありません

    データ フレームを表示または処理しようとしているが結果が得られない場合は、UI に次の情報が表示されます。

    結果メッセージなし

    このメッセージは、認証は成功したが、Event Hubs がデータを返さなかったことを意味します。 次のような理由が考えられます (ただし、決して網羅的ではありません)。

    • 正しくない EventHubs トピックを指定しました。
    • startingOffsets の既定の Kafka 構成オプションが latest であり、現在、トピック経由でデータをまだ受信していません。 Kafka の最も古いオフセットからデータの読み取りを開始するように、startingOffsetstoearliest を設定できます。