適用於 Apache Spark 的 Azure 數據總管連接器

Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。

適用於 Spark 的 Kusto 連接器是可在任何 Spark 叢集上執行的 開放原始碼 專案。 它會實作數據源和數據接收器,以在 Azure 數據總管和 Spark 叢集之間行動數據。 使用 Azure 資料總管和 Apache Spark,您可以建置以數據驅動案例為目標的快速且可調整的應用程式。 例如,機器學習服務 (ML)、擷取-Transform-Load (ETL) 和 Log Analytics。 透過連接器,Azure 資料總管會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。

您可以透過佇列擷取或串流擷取寫入 Azure 資料總管。 從 Azure 數據總管讀取支援數據行剪除和述詞下推,以篩選 Azure 數據總管中的數據,減少已傳輸的數據量。

注意

如需使用適用於 Azure 數據總管的 Synapse Spark 連接器的詳細資訊,請參閱 使用適用於 Azure Synapse Analytics 的 Apache Spark 連線到 Azure 數據總管

本主題描述如何安裝和設定 Azure 數據總管 Spark 連接器,以及在 Azure 數據總管和 Apache Spark 叢集之間行動數據。

注意

雖然下列一些範例參考 Azure Databricks Spark 叢集,但 Azure 數據總管 Spark 連接器不會直接相依於 Databricks 或任何其他 Spark 散發。

必要條件

提示

Spark 2.3.x 版本也受到支援,但可能需要變更pom.xml相依性。

如何建置Spark連接器

從 2.3.0 版開始,我們引進了取代 spark-kusto-connector 的新成品標識符: kusto-spark_3.0_2.12,以 Spark 3.x 和 Scala 2.12 為目標。

注意

2.5.1 之前的版本無法再用於內嵌至現有數據表,請更新為較新的版本。 此為選用步驟。 如果您使用預先建置的連結庫,例如 Maven,請參閱 Spark 叢集設定

建置必要條件

  1. 請參閱此來源以建置Spark連接器。

  2. 針對使用 Maven 專案定義的 Scala/Java 應用程式,請連結您的應用程式與最新的成品。 在 Maven Central尋找最新的成品。

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. 如果您未使用預先建置的連結庫,您必須安裝相依性中列出的連結庫,包括下列 Kusto Java SDK 連結庫。 若要尋找正確的安裝版本, 請查看相關版本的 pom

    1. 若要建置 jar 並執行所有測試:

      mvn clean package -DskipTests
      
    2. 若要建置 jar,請執行所有測試,並將 jar 安裝到本機 Maven 存放庫:

      mvn clean install -DskipTests
      

如需詳細資訊,請參閱 連接器使用方式

Spark 叢集設定

注意

建議您在執行下列步驟時使用最新的 Kusto Spark 連接器版本。

  1. 根據 Azure Databricks 叢集 Spark 3.0.1 和 Scala 2.12 設定下列 Spark 叢集設定:

    Databricks 叢集設定。

  2. 從 Maven 安裝最新的 spark-kusto-connector 連結庫:

    匯入程式庫。選取 [Spark-Kusto-Connector]。

  3. 確認已安裝所有必要的連結庫:

    確認已安裝連結庫。

  4. 若要使用 JAR 檔案進行安裝,請確認已安裝其他相依性:

    新增相依性。

驗證

Kusto Spark 連接器可讓您使用下列其中一種方法,向 Microsoft Entra 標識符進行驗證:

Microsoft Entra 應用程式驗證

Microsoft Entra 應用程式驗證是最簡單且最常見的驗證方法,建議用於 Kusto Spark 連接器。

  1. 透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。

    az login
    
  2. 選擇要裝載主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 建立服務主體。 在這裡範例中,服務主體稱為 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 從傳回的 JSON 數據中,複製 appIdpasswordtenant 以供日後使用。

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

您已建立您的 Microsoft Entra 應用程式和服務主體。

Spark 連接器會使用下列 Entra 應用程式屬性進行驗證:

屬性 選項字串 描述
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra 應用程式 (用戶端) 識別碼。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra 驗證授權單位。 Microsoft Entra Directory (tenant) 標識符。 選擇性 - 預設為 microsoft.com。 如需詳細資訊,請參閱 Microsoft Entra 授權單位
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft用戶端的 Entra 應用程式金鑰。
KUSTO_ACCESS_TOKEN kustoAccessToken 如果您已經有存取 Kusto 所建立的 accessToken,則可以用來傳遞至連接器以及驗證。

注意

舊版 API 版本 (小於 2.0.0) 具有下列命名:“kustoAADClientID”、“kustoClientAADClientPassword”、“kustoAADAuthorityID”

Kusto 許可權

根據您想要執行的Spark作業,授與 kusto 端的下列許可權。

Spark 作業 權限
讀取 - 單一模式 讀取者
讀取 – 強制分散式模式 讀取者
寫入 – 具有 CreateTableIfNotExist 數據表建立選項的佇列模式 管理
寫入 – 具有 FailIfNotExist 數據表建立選項的佇列模式 擷取器
Write – TransactionalMode 管理

如需主體角色的詳細資訊,請參閱 角色型訪問控制。 如需管理安全性角色,請參閱 安全性角色管理

Spark 接收:寫入 Kusto

  1. 設定接收參數:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 將 Spark DataFrame 寫入 Kusto 叢集作為批次:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或使用簡化的語法:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 寫入串流資料:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 來源:從 Kusto 讀取

  1. 讀取 少量資料時,請定義資料查詢:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 選擇性:如果您提供暫時性 Blob 記憶體(而非 Kusto),則會在呼叫者的責任下建立 Blob。 這包括布建記憶體、輪替存取密鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,可根據帳戶和容器座標和帳戶認證,或具有寫入、讀取和列表許可權的完整 SAS URL 來刪除 Blob。 當不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在個別的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告讀取事務歷史記錄的一部分。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上述範例中,不會使用連接器介面存取 金鑰保存庫;使用 Databricks 秘密的更簡單方法。

  3. 從 Kusto 讀取。

    • 如果您提供暫時性 Blob 記憶體,請從 Kusto 讀取,如下所示:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果 Kusto 提供暫時性 Blob 記憶體,請從 Kusto 讀取,如下所示:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)