Cribl Stream から Azure Data Explorer にデータを取り込む

Cribl Stream は、あらゆるソースからマシン イベント データを安全に収集、処理、ストリームする処理エンジンです。 これにより安全な方法で、分析と管理を目的として、あらゆる宛先に対してそのデータを解析および処理できます。

この記事では、Cribl Stream を使用してデータを取り込む方法について説明します。

データ コネクタの完全な一覧については、「データ統合の概要」を参照してください。

前提条件

Microsoft Entra サービス プリンシパルを作成する

Microsoft Entra サービス プリンシパルは、次の例のように Azure portal を通してか、プログラムを使用して作成できます。

このサービス プリンシパルは、Kusto 内のテーブルにデータを書き込むコネクタによって使用される ID です。 Kusto リソースにアクセスするためのアクセス許可をこのサービス プリンシパルに付与します。

  1. Azure CLI 経由で Azure サブスクリプションにサインインします。 次に、ブラウザーで認証します。

    az login
    
  2. プリンシパルをホストするサブスクリプションを選択します。 この手順は、複数のサブスクリプションがある場合に必要です。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. サービス プリンシパルを作成します。 この例では、サービス プリンシパルを my-service-principal と呼びます。

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 返された JSON データから、appIdpassword、および tenant を後で使用のためにコピーします。

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Microsoft Entra アプリケーションとサービス プリンシパルが作成されました。

ターゲット テーブルを作成する

受信データのターゲット テーブルと、取り込まれたデータ列をターゲット テーブル内の列にマップするためのインジェスト マッピングを作成します。

  1. プレースホルダー TableName をターゲット テーブルの名前に置き換えて、クエリ エディターで次のテーブル作成コマンドを実行します。

    .create table <TableName> (_raw: string, _time: long, cribl_pipe: dynamic)
    
  2. プレースホルダーの TableName をターゲット テーブル名に、TableNameMapping をインジェスト マッピング名に置き換えて、次の create ingestion mapping コマンドを実行します。

    .create table <TableName> ingestion csv mapping '<TableNameMapping>' 'CriblLogMapping' '[{"Name":"_raw","DataType":"string","Ordinal":"0","ConstValue":null},{"Name":"_time","DataType":"long","Ordinal":"1","ConstValue":null},{"Name":"cribl_pipe","DataType":"dynamic","Ordinal":"2","ConstValue":null}]'
    
  3. Microsoft Entra サービス プリンシパルの作成」からサービス プリンシパルにデータベースを操作するためのデータベース インジェスター ロール アクセス許可を付与します。 詳細については、「」を参照してください。 プレースホルダー DatabaseName をターゲット データベースの名前に、ApplicationID を Microsoft Entra サービス プリンシパルの作成時に保存した AppId 値に置き換えます。

    .add database <DatabaseName> ingestors ('aadapp=<ApplicationID>') 'App Registration'
    

Cribl Stream の宛先を作成する

次のセクションでは、Kusto 内のテーブルにデータを書き込む Cribl Stream の宛先を作成する方法について説明します。 各テーブルには、個別の Cribl Stream 宛先コネクタが必要です。

宛先の選択

Cribl Stream をテーブルに接続するには:

  1. Cribl 内の上部のナビゲーションで、[Manage] を選択してから [Worker Group] を選択します。

  2. [Routing]>[QuickConnect (Stream)]>[Add Destination] を選択します。

  3. [Set up new QuickConnect Destination] ウィンドウ内で、[Azure Data Explorer]、次に [Add now] を選択します。

General Settings を設定する

[New Data Explorer] ウィンドウ内の、[General Settings] 内で次の設定を行います。

設定 説明
Output ID <OutputID>、たとえば KustoDestination 宛先を識別するために使用される名前。
Ingestion Mode Batching (規定値) または Streaming インジェスト モードの設定。 Batching を使用すると、短時間で大量のデータを取り込む場合に、Cribl ストレージ コンテナーからデータのバッチをテーブルにプルできます。 Streaming を使用すると、ターゲットの KQL テーブルに直接データを送信します。 Streaming は、少量のデータを取り込む場合や、重要なアラートをリアルタイムで送信する場合などに便利です。 Streaming では、Batching よりも低遅延を実現できます。 Ingestion Mode が Streaming に設定されている場合は、ストリーミング ポリシーを有効にする必要があります。 詳細については、「ストリーミング インジェスト ポリシー」を参照してください。
Cluster base URI ベース URI ベース URI
Ingestion service URI インジェスト URI Batching モードが選択されている場合に表示されます。 インジェスト URI
データベース名 <DatabaseName> ターゲット データベースの名前。
テーブル名 <TableName> ターゲット テーブルの名前。
Validate database settings Yes (既定値) または No 宛先を保存または起動する際に入力したサービス プリンシパル アプリの資格情報を検証します。 Add mapping object が有効な場合を除き、テーブル名を検証します。 アプリに Database ViewerTable Viewer ロールの両方がない場合は、この設定を無効にする必要があります。
Add mapping object Yes または No (既定値)。 Batching モードが選択されている場合にのみ (既定の Data mapping テキスト フィールドの代わりに) 表示されます。 Yes を選択すると、データ マッピングを JSON オブジェクトとして入力するウィンドウが開きます。
データ マッピング ターゲット テーブルを作成する」の手順の中で定義したマッピング スキーマ名。 マッピング スキーマ名。 Add mapping objectNo に設定されている場合の既定のビュー。
圧縮 gzip (既定値) Data format が Parquet に設定されている場合、Compress は使用できません。
データ形式 JSON (既定値)、Raw、または Parquet。 データ形式。 Parquet は Batching モードにおいてのみ使用でき、かつ Linux 上でのみサポートされます。
Backpressure behavior Block (既定値) または Drop 受信側でバックプレッシャーが発生している場合に、イベントをブロックするかドロップするかを選択します。
タグ オプションの値 Cribl Stream の [Manage Destinations] ページ内で、宛先をフィルター処理およびグループ化するためのオプションのタグ。 タグ名の間にはタブまたはハード リターンを使用します。 これらのタグは、処理されたイベントには追加されません。

完了したら [Next] を選択します。

認証設定

サイドバー内で [Authentication settings] を選択します。 「Microsoft Entra サービス プリンシパルを作成する」の中で保存した値とベース URI を次のように使用します。

設定 説明
テナント ID <TenantID> Microsoft Entra サービス プリンシパルを作成する」の中で保存した tenant の値を使用します。
クライアント ID <ClientID> Microsoft Entra サービス プリンシパルを作成する」の中で保存した appId の値を使用します。
スコープ <baseuri>/.default baseuri には、ベース URI の値を使用します。
認証方法 Client secretClient secret (text secret)、または Certificate オプションは次のとおりです。Client secret には、「Microsoft Entra サービス プリンシパルを作成する」の中で Client secret 用に作成した Microsoft Entra アプリケーションのクライアント シークレットを使用します。 Certificate の場合、「Microsoft Entra サービス プリンシパルを作成する」の中で作成した Microsoft Entra アプリケーションに登録した (登録する) 公開キーが証明書で使用されます。

[次へ] を選択します。

Persistent Queue

Ingestion ModeStreaming に設定され、かつ Backpressure behaviorPersistent Queue に設定されている場合に表示されます。

設定 説明
Max file size 1 MB (既定値) ファイルを閉じるまでに、キュー ファイルが到達する最大サイズ。 数値を入力する際は、KB または MB などの単位を含めます。
Max queue size 5 GB (既定値) 宛先がデータのエンキューを停止するまでに、各ワーカー プロセス上でキューが使用できるディスク領域の最大量。 KB、MB、GB などの単位を伴う正の数値 (必須)。 最大値は 1 TB です。
Queue file path $CRIBL_HOME/state/queues (既定) 永続キュー ファイルの場所。 Cribl Stream によって、この値に /<worker‑id>/<output‑id> が追加されます。
圧縮 None (既定値)、Gzip 永続化されたデータを閉じる際、圧縮するために使用する圧縮方法。
Queue-full behavior Block または Drop ディスク容量が少ない、またはディスク容量がいっぱいなためにキューでバックプレッシャーが発生している場合に、イベントをブロックするかドロップするかを選択します。
Strict ordering Yes (既定値) または No Yes に設定すると、イベントは先入れ先出しの順序に基づいて転送されます。 No に設定すると、先にエンキューされたイベントの前に新しいイベントを送信します。
Drain rate limit (EPS) 0 (既定値) このオプションは Strict orderingNo に設定されている場合に表示されます。キューから受信側への書き込み時の調整率 (1 秒あたりのイベント数) を設定できるようにするためです。 エンキューされたイベントのドレイン レートを調整すると、新しいまたはアクティブな接続のスループットが向上します。 ゼロにすると調整が無効になります。
Clear Persistent Queue NA 宛先へ配信するために現在エンキューされているファイルを削除する場合に選択します。 エンキューされたデータが配信されずに完全に削除されてしまうため、このアクションは必ず確認してください。

完了したら、[次へ] を選択します。

Processing settings

設定 説明
パイプライン <\defined_pipeline> この出力を使用して送信する前に、データを処理するオプションのパイプライン。
System fields cribl_pipe (既定値)、cribl_hostcribl_inputcribl_outputcribl_route、または cribl_wp 宛先に送信する前に、イベントに自動的に追加されるフィールドの一覧。 ワイルドカードを利用できます。

完了したら、[次へ] を選択します。

Parquet settings

Data FormatParquet が選択されている場合に表示されます。

Parquet を選択すると [Parquet Settings] タブが開き、Parquet スキーマを選択できます。

設定 説明
Automatic schema [オン] または [オフ] On を選択すると、Cribl Stream が書き込む各 Parquet ファイルのイベントに基づいて Parquet スキーマが生成されます。
Parquet schema ドロップダウン Automatic schemaOff に設定されている場合に表示され、Parquet スキーマを選択できます。
Parquet version 1.0, 2.4, 2.6 (既定値) バージョンによって、サポートされるデータ型とその表現方法が決まります。
Data page version V1、V2 (規定値) データ ページのシリアル化形式。 Parquet リーダーが Parquet V2 をサポートしていない場合は V1 を使用してください。
Group row limit 1000 (既定値) グループごとに含めることができる行の最大数。
ページ サイズ 1 MB (既定値) ページ セグメントのターゲット メモリ サイズ。 値を小さくすると読み取り速度が向上し、値を大きくすると圧縮が向上します。
Log invalid rows Yes または No Yes が選択され、Log leveldebug に設定されている場合、データ形式の不一致が原因でスキップされた一意の行が最大 20 行出力されます。
Write statistics On (既定値) または Off Parquet 統計表示ツールが構成されている場合は、On を選択します。
Write page indexes On (既定値) または Off Parquet リーダーで Parquet ページ インデックス統計を使用してページのスキップを有効にする場合は On を選択します。
Write page checksum [オン] または [オフ] Parquet ツールを使用し、Parquet ページ チェックサムを使用してデータ整合性を確認する場合は On を選択します。
Metadata (オプション)* キーと値のペアとして含めることができる、宛先ファイルのメタデータ プロパティ。

再試行

Ingestion Mode が Streaming に設定されている場合に表示されます。

設定 説明
Honor Retry-After header Yes または No Retry-After ヘッダーを優先するかどうか。 有効にすると、ヘッダーで 180 秒以下の遅延が指定されている限り、受信した Retry-After ヘッダーは、「Retries」セクション内の他の構成されたオプションよりも優先されます。 それ以外の場合、Retry-After ヘッダーは無視されます。
Settings for failed HTTP requests HTTP 状態コード 接続に失敗した場合に自動的に再試行する HTTP 状態コードの一覧。 Cribl Stream は、失敗した要求を 429 回自動的に再試行します。
Retry timed-out HTTP requests [オン] または [オフ] 設定すると、より多くの再試行動作の設定が使用可能になります。
Pre-backoff interval (ms) 1000 ミリ秒 (既定値) 再試行までの待機時間。
Backoff multiplier 2 秒 (規定値) 再試行間隔を決定するための、指数バックオフ アルゴリズムのベースとして使用されます。
Backoff limit (ms) 10,000 ミリ秒 (既定値) 最終的なストリーミング再試行の最大バックオフ間隔。 指定できる値の範囲は、10,000 ミリ秒 (10 秒) から 180,000 ミリ秒 (3 分) です。

完了したら、[次へ] を選択します。

詳細設定

サイドバーから [Advanced Settings] を選択します。 次に、Batching が選択されている場合の詳細設定について説明します。

設定 説明
Flush immediately Yes または No (既定値)。 Kusto 内のデータ集計をオーバーライドするには Yes に設定します。 詳細については、「Kusto Ingest ライブラリのベスト プラクティス」を参照してください。
Retain blob on success Yes または No (既定値)。 インジェストの完了時にデータ BLOB を保持するには Yes に設定します。
Extent tags <\ExtentTag, ET2,...> 必要に応じて、ターゲット テーブルのパーティション分割されたエクステントにタグを設定します。
Enforce uniqueness via tag values 受信エクステントをフィルター処理して、一覧の値に一致するエクステントを破棄するために使用する ingest-by 値一覧を指定するには、[Add value] を選択します。 詳細については、「エクステント (データ シャード)」を参照してください
Report level DoNotReportFailuresOnly (既定値)、および FailuresAndSuccesses インジェスト状態レポートのレベル。
Report method Queue (既定値)、TableQueueAndTable (推奨)。 インジェスト状態レポートのターゲット。
追加フィールド 必要に応じて、さらに構成プロパティを追加して、インジェスト サービスに送信します。
Staging location /tmp (既定) 圧縮して最終的な宛先に移動する前にファイルをバッファーする、ローカル ファイルシステムの場所。 Cribl では、安定して高性能な場所をお勧めします。
File name suffix expression .${C.env["CRIBL_WORKER_ID"]}.${__format}${__compression === "gzip" ? ".gz" : ""} (既定値) 出力ファイル名のサフィックスとして使用される、引用符またはバッククォートで囲まれた JavaScript 式。 formatJSON または raw で、__compressionnone または gzip です。 ファイル名の末尾に 6 文字のランダム シーケンスが追加され、上書きされるのを防ぎます。
Max file size (MB) 32 MB (規定値) ファイルを閉じてストレージ コンテナーに移動するまでに、ファイルが到達できる非圧縮出力ファイルの最大サイズ。
Max file open time (sec) 300 秒 (既定値) ファイルを閉じてストレージ コンテナーに移動するまでの、最大書き込み時間 (秒単位)。
Max file idle time (sec) 30 秒 (既定) 非アクティブなファイルを閉じてストレージ コンテナーに移動するまでに、開いたままにする最大時間 (秒単位)。
Max open files 100 (既定値) 最も古い開いているファイルを閉じてストレージ コンテナーに移動するまでに、同時に開いたままにするファイルの最大数。
Max concurrent file parts 1 (既定値) 同時にアップロードするファイル パーツの最大数。 既定値は 1 で、最高値は 10 です。 この値を 1 に設定すると、一度に 1 つのパーツを順番に送信できます。
Remove empty staging dirs Yes (既定値) または No 有効にすると、Cribl Stream ではファイルを移動した後に空のステージング ディレクトリが削除されます。 これにより、孤立した空のディレクトリが急増するのを防ぎます。 有効にすると、[Staging cleanup period] が表示されます。
Staging cleanup period 300 (既定値) Remove staging dirs が有効な場合に、空のディレクトリが削除されるまでの時間 (秒単位)。 Remove empty staging dirsYes に設定されている場合に表示されます。 最小値は 10 秒、最大値は 86,400 秒 (24 時間ごと) です。
Environment 空 (規定値) の場合、構成はどこでも有効になります。 GitOps を使用している場合は、構成を有効にする Git ブランチを指定できます。

完了したら [Save] を選択します。

接続の構成

開いた [Connection Configuration] ウィンドウで、[Passthru connection][Save] を選択します。 コネクタはデータのエンキューを開始します。

データ インジェストを確認する

  1. テーブル内にデータが到着したら、行数をチェックすることでデータの転送を確認します。

    <Tablename> 
    | count
    
  2. 直近の 5 分間にエンキューされたインジェストを確認します。

    .show commands-and-queries 
    | where Database == "" and CommandType == "DataIngestPull" 
    | where LastUpdatedOn >= ago(5m)
    
  3. インジェスト プロセスでエラーが発生していないことを確認します。

    • Batching の場合:
    .show ingestion failures
    
    • Streaming の場合:
    .show streamingingestion failures 
    | order by LastFailureOn desc
    
  4. テーブル内のデータを確認します。

    <TableName>
    | take 10
    

クエリの例とガイダンスについては、KQL でクエリを記述するKusto 照会言語のドキュメントを参照してください。