使用 mssparkutils 透過連結服務保護您的認證

從外部來源存取資料是常見的模式。 除非外部資料源允許匿名存取,否則您可能需要使用認證、祕密或連接字串來保護您的連線。

Azure Synapse Analytics 預設會使用 Microsoft Entra 傳遞,在資源之間進行驗證。 如果您需要使用其他認證連線至資源,請直接使用 mssparkutils。 mssparkutils 可簡化擷取 SAS 權杖、Microsoft Entra 權杖、連接字串和祕密的流程,而這些項目儲存在連結服務中或來自 Azure Key Vault。

Microsoft Entra 傳遞會使用以 Microsoft Entra ID 中的使用者身分指派給您的權限,而不是指派給 Synapse 或個別服務主體的權限。 例如,如果您想要使用 Microsoft Entra 傳遞存取儲存體帳戶中的 Blob,則應該前往該儲存體帳戶,並將 Blob 參與者角色指派給自己。

從 Azure Key Vault 擷取秘密時,建議您建立 Azure Key Vault 的連結服務。 請確定 Synapse 工作區的受控服務識別 (MSI) 具有 Azure Key Vault 的「秘密取得」權限。 Synapse 會使用 Synapse 工作區的受控服務識別向 Azure Key Vault 進行驗證。 如果您直接連線至沒有連結服務的 Azure Key Vault,請使用您的使用者 Microsoft Entra 認證進行驗證。

如需詳細資訊,請參閱已連結的服務

使用方式

權杖和祕密的 mssparkutils 說明

此函式會顯示 Synapse 中祕密和權杖管理的說明文件。

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

取得結果:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

存取 Azure Data Lake Storage Gen2

ADLS Gen2 主要儲存體

從主要 Azure Data Lake Storage 存取檔案時,預設會使用 Microsoft Entra 傳遞進行驗證,而不需要明確使用 mssparkutils。 傳遞驗證中使用的身分識別會根據幾個因素而有所不同。 根據預設,互動式筆記本會利用使用者的身分識別來執行,但其可以變更為工作區受控服務識別 (MSI)。 筆記本的批次作業和非互動式執行會使用工作區 MSI。

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

具有連結服務的 ADLS Gen2 儲存體

Azure Synapse Analytics 會在連線至 Azure Data Lake Storage Gen2 時提供整合式連結服務體驗。 連結服務可以設定為使用帳戶金鑰服務主體受控識別認證進行驗證。

當連結服務驗證方法設定為 [帳戶金鑰] 時,連結服務將會使用提供的儲存體帳戶金鑰進行驗證,並要求 SAS 金鑰,然後使用 LinkedServiceBasedSASProvider 自動將其套用至儲存體要求。

Synapse 允許使用者設定適用於特定儲存體帳戶的連結服務。 這樣就可以從單一 Spark 應用程式/查詢中的多個儲存體帳戶中讀取/寫入資料。 一旦我們針對將使用的每個儲存體帳戶設定 spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName,Synapse 就會找出要用於特定讀取/寫入作業的連結服務。 不過,如果我們的 Spark 作業僅處理單一儲存體帳戶,我們可以省略儲存體帳戶名稱,並使用 spark.storage.synapse.linkedServiceName

注意

無法變更預設 ABFS 儲存體容器的驗證方法。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

當連結服務驗證方法設定為 [受控識別] 或 [服務主體] 時,連結服務會使用受控識別或服務主體權杖搭配 LinkedServiceBasedTokenProvider 提供者。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

透過 Spark 組態設定驗證設定

您也可以透過 Spark 組態來指定驗證設定,而不是執行Spark 陳述式。 所有 Spark 組態的前面都應該加上 spark.,而所有 hadoop 組態的前面都應該加上 spark.hadoop.

Spark 組態名稱 組態值
spark.storage.synapse.teststorage.dfs.core.windows.net.linkedServiceName LINKED SERVICE NAME
spark.hadoop.fs.azure.account.oauth.provider.type.teststorage.dfs.core.windows.net microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider

沒有連結服務的 ADLS Gen2 儲存體

使用 SAS 金鑰直接連線至 ADLS Gen2 儲存體。 使用 ConfBasedSASProvider 並將 SAS 金鑰提供給 spark.storage.synapse.sas 組態設定。 SAS 權杖可以在容器層級、帳戶層級或全域中設定。 不建議在全域層級設定 SAS 金鑰,因為作業將無法從多個儲存體帳戶讀取/寫入。

每個儲存體容器的 SAS 設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

每個儲存體帳戶的 SAS 設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

所有儲存體帳戶的 SAS 設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

使用 MSAL 來取得權杖 (使用自訂應用程式認證)

當 ABFS 儲存體驅動程式設定為直接使用 MSAL 進行驗證時,提供者不會快取權杖。 這可能會產生可靠性問題。 我們建議使用 ClientCredsTokenProvider,這是 Synapse Spark 的一部分。

%%spark
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.$source_full_storage_account_name", "<Entra AppId>")
spark.conf.set("fs.azure.account.oauth2.client.secret.$source_full_storage_account_name", "<Entra app secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.$source_full_storage_account_name", "https://login.microsoftonline.com/<tenantid>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{source_full_storage_account_name}.linkedServiceName", "<Entra AppId>")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{source_full_storage_account_name}.linkedServiceName", "<Entra app secret>")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{source_full_storage_account_name}.linkedServiceName", "https://login.microsoftonline.com/<tenantid>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

具有 SAS 權杖 (來自 Azure Key Vault) 的 ADLS Gen2 儲存體

使用儲存在 Azure Key Vault 秘密中的 SAS 權杖,連線至 ADLS Gen2 儲存體。

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

其他連結服務的 TokenLibrary

若要連線到其他已連結的服務,您可以直接呼叫 TokenLibrary。

getConnectionString()

若要擷取連接字串,請使用 getConnectionString 函式,並傳入連結服務名稱

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll 是 Scala 和 Python 中可用的協助程式函式,用來取得連結服務的所有屬性

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

輸出將看起來如下

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

若要從 Azure Key Vault 擷取儲存的秘密,建議您在 Synapse 工作區中建立 Azure Key Vault 的連結服務。 必須為 Synapse 工作區的受控服務識別授與 Azure Key Vault 的取得秘密權限。 連結服務會使用受控服務識別連線至 Azure Key Vault 服務,以取得秘密。 否則,直接連線至 Azure Key Vault 時,將會採用使用者的 Microsoft Entra 認證。 在此情況下,使用者將必須獲得 Azure Key Vault 中的「取得秘密」權限。

在政府雲端中,提供 keyvault 的完整網域名稱。

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

若要從 Azure Key Vault 擷取祕密,請使用 mssparkutils.credentials.getSecret() 函式。


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Spark 執行階段支援的連結服務連線

雖然 Azure Synapse Analytics 支援各種連結服務連線 (來自管線和其他 Azure 產品),但 Spark 執行階段並未支援所有連線。 以下是支援的連結服務清單:

  • Azure Blob 儲存體
  • Azure AI 服務
  • Azure Cosmos DB
  • Azure Data Explorer
  • 適用於 MySQL 的 Azure 資料庫
  • 適用於 PostgreSQL 的 Azure 資料庫
  • Azure Data Lake Store (Gen1)
  • Azure Key Vault
  • Azure Machine Learning
  • Azure Purview
  • Azure SQL Database
  • Azure SQL 資料倉儲 (專用與無伺服器)
  • Azure 儲存體

mssparkutils.credentials.getToken()

需要 OAuth 持有人權杖來直接存取服務時,您可以使用 getToken 方法。 支援下列資源:

服務名稱 要在 API 呼叫中使用的字串常值
Azure Storage Storage
Azure Key Vault Vault
Azure Management AzureManagement
Azure SQL Data Warehouse (Dedicated and Serverless) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Azure Database for MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Azure Database for PostgreSQL AzureOSSDB

Spark 執行階段不支援的連結服務存取

Spark 執行階段不支援下列存取連結服務的方法:

  • 將引數傳遞至參數化的連結服務
  • 與使用者指派的受控識別 (UAMI) 的連線
  • 當您的 Notebook/SparkJobDefinition 以受控識別身分執行時,會將持有人權杖送至 Keyvault 資源
    • 或者,您可以建立 Keyvault 的連結服務,並從筆記本/批次作業取得祕密,而不是取得存取權杖
  • 針對 Azure Cosmos DB 連線,僅支援金鑰型存取。 不支援權杖型存取。

執行筆記本或 Spark 作業時,使用連結服務取得權杖/祕密的要求可能會失敗,並出現指出 'BadRequest' 的錯誤訊息。 這通常是連結服務的設定問題所造成。 如果您看到此錯誤訊息,請檢查連結服務的設定。 如果您有任何問題,請在 Azure 入口網站連絡 Microsoft Azure 支援。