Microsoft Spark 公用程式 (MSSparkUtils) for Fabric

Microsoft Spark 公用程式 (MSSparkUtils) 是一個內建套件,可協助您輕鬆地執行一般工作。 您可以使用 MSSparkUtils 來處理檔案系統、取得環境變數、將筆記本鏈結在一起,以及使用祕密。 MSSparkUtils 套件可在 PySpark (Python) Scala、SparkR 筆記本和 Fabric 管線中使用。

注意

  • MsSparkUtils 已正式重新命名為 NotebookUtils。 現有的程式碼會保持 回溯相容,而且不會造成任何重大變更。 強烈建議升級至 Notebookutils,以確保持續支援並存取新功能。 mssparkutils 命名空間未來將會淘汰。
  • NotebookUtils 的設計目的是使用 Spark 3.4 (執行階段 v1.2) 和更新版本。 所有新功能和更新都將受到 Notebookutils 命名空間的獨佔支援。

檔案系統公用程式

mssparkutils.fs 提供使用各種檔案系統的公用程式,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 儲存體。 請確定您已適當地設定 Azure Data Lake Storage Gen2Azure Blob 儲存體的存取權。

執行下列命令以取得可用方法的概觀:

from notebookutils import mssparkutils
mssparkutils.fs.help()

輸出

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils 會以與 Spark API 相同的方式與檔案系統搭配使用。 以 mssparkuitls.fs.mkdirs() 和 Fabric Lakehouse 使用方式為例:

使用方式 HDFS 根目錄的相對路徑 ABFS 檔案系統的絕對路徑 驅動程式節點中本機檔案系統的絕對路徑
非預設 Lakehouse 不支援 mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
預設 Lakehouse 「檔案」或「資料表」下的目錄:mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

列出檔案

若要列出目錄的內容,請使用 mssparkutils.fs.ls (「您的目錄路徑」)。 例如:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

檢視檔案屬性

此方法會傳回檔案屬性,包括檔案名稱、檔案路徑、檔案大小,以及其是否為目錄和檔案。

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

建立新的目錄

如果指定的目錄不存在,這個方法會建立指定的目錄,並建立任何必要的父目錄。

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

複製檔案

此方法會複製檔案或目錄,並支援跨檔案系統複製活動。

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

高效能複製檔案

此方法提供更快速的方式複製或移動檔案,特別是大量資料。

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

預覽檔案內容

這個方法會傳回指定檔案的第一個 'maxBytes' 位元組,做為以 UTF-8 編碼的字串。

mssparkutils.fs.head('file path', maxBytes to read)

移動檔案

此方法會移動檔案或目錄,並支援跨檔案系統移動。

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

寫入檔案

這個方法會將指定的字串寫出至以 UTF-8 編碼的檔案。

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

將內容附加至檔案

這個方法會將指定的字串附加至以 UTF-8 編碼的檔案。

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

刪除檔案或目錄

此方法會移除檔案或目錄。

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

裝載/卸載目錄

檔案裝載和卸載中尋找詳細使用方式的詳細資訊。

筆記本公用程式

使用 MSSparkUtils Notebook 公用程式來執行筆記本,或使用值結束筆記本。 執行下列命令,以取得可用方法的概觀:

mssparkutils.notebook.help()

輸出:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

注意

筆記本公用程式不適用於 Apache Spark 工作定義 (SJD)。

參考筆記本

此方法會參考筆記本,並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 所參考的筆記本會在呼叫此函式的筆記本 Spark 集區上執行。

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

例如:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Fabric 筆記本也支持藉由指定工作區 ID,跨多個工作區參考筆記本。

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

您可以在儲存格輸出中開啟參考執行的快照連結。 快照集會擷取程式碼執行結果,並可讓您輕鬆地偵錯參考執行。

顯示參考執行結果的螢幕擷取畫面。

具有程式碼執行結果的快照集螢幕擷取畫面。

注意

  • 執行階段 1.2 版和更高版本支援跨工作區參考筆記本。
  • 如果您使用 Notebook 資源底下的檔案,請在參考的筆記本中使用 mssparkutils.nbResPath,確定它指向與互動式執行相同的資料夾。

參考以平行方式執行多個筆記本

重要

此功能為預覽版

方法 mssparkutils.notebook.runMultiple() 可讓您平行執行多個筆記本,或使用預先定義的拓撲結構。 API 會在 Spark 工作階段中使用多執行緒實作機制,這表示參考筆記本會共用計算資源。

使用 mssparkutils.notebook.runMultiple(),您可以:

  • 同時執行多個筆記本,而不需要等待每個筆記本完成。

  • 使用簡單的 JSON 格式,指定筆記本的相依性和執行順序。

  • 最佳化 Spark 計算資源的使用,並降低 Fabric 專案的成本。

  • 檢視輸出中每個筆記本執行記錄的快照集,並方便偵錯/監視筆記本工作。

  • 取得每個執行活動的結束值,並在下游工作中使用這些值。

您也可以嘗試執行 mssparkutils.notebook.help(“runMultiple”) 來尋找範例和詳細的使用方式。

以下是使用此方法平行執行筆記本清單的簡單範例:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

根筆記本的執行結果如下所示:

參考筆記本清單的螢幕擷取畫面。

以下是使用 mssparkutils.notebook.runMultiple() 執行具有拓撲結構的筆記本範例。 使用此方法,輕鬆地透過程式碼體驗協調筆記本。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

根筆記本的執行結果如下所示:

使用參數參考筆記本清單的螢幕擷取畫面。

注意

  • 多個筆記本執行的平行處理原則程度僅限於 Spark 工作階段的總可用計算資源。
  • 筆記本活動或並行筆記本的上限為 50。 超過此限制可能會導致因高計算資源使用量而導致穩定性和效能問題。 如果發生問題,請考慮將筆記本分成多個 runMultiple 呼叫,或藉由調整 DAG 參數中的並行欄位來減少並行。
  • 整個 DAG 的預設逾時為 12 小時,而子筆記本中每個儲存格的預設逾時為 90 秒。 您可以在 DAG 參數中設定 timeoutInSecondstimeoutPerCellInSeconds 欄位來變更逾時。

編輯筆記本

這個方法會結束具有值的筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。

  • 當您以互動方式從筆記本呼叫 exit() 函式時,Fabric 筆記本會擲回例外狀況、略過執行後續儲存格,並讓 Spark 工作階段保持運作。

  • 當您在管線中協調呼叫 exit() 函式的筆記本時,Notebook 活動會傳回具有結束值、完成管線執行,並停止 Spark 工作階段。

  • 當您在所參考的筆記本中呼叫 exit() 函式時,Fabric Spark 將會停止進一步執行參考的筆記本,並繼續在呼叫 run() 函式的主要筆記本中執行下一個儲存格。 例如:Notebook1 有三個儲存格,並呼叫第二個儲存格中的 exit() 函式。 Notebook2 的第三個儲存格中有五個儲存格並呼叫 run (notebook1)。 當您執行 Notebook2 時,Notebook1 會在達到 exit() 函式時停止在第二個儲存格。 Notebook2 會繼續執行第四個儲存格和第五個儲存格。

mssparkutils.notebook.exit("value string")

例如:

具有下列兩個儲存格的 Sample1 筆記本:

  • 儲存格 1 會定義預設值設定為 10 的輸入參數。

  • 儲存格 2 會結束筆記本,輸入為結束值。

顯示結束函式範例筆記本的螢幕擷取畫面。

您可以使用預設值在另一個筆記本中執行 Sample1

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

輸出:

Notebook executed successfully with exit value 10

您可以在另一個筆記本中執行 Sample1,並將輸入值設定為 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

輸出:

Notebook executed successfully with exit value 20

認證公用程式

您可以使用 MSSparkUtils 認證公用程式來取得存取權杖,以及管理 Azure Key Vault 中的祕密。

執行下列命令,以取得可用方法的概觀:

mssparkutils.credentials.help()

輸出:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

取得權杖

getToken 會針對指定的對象和名稱傳回 Microsoft Entra 權杖 (選擇性)。 下列清單顯示目前可用的對循金鑰:

  • 儲存體物件資源:“儲存體”
  • Power BI 資源:“pbi”
  • Azure Key Vault 資源:"keyvault"
  • Synapse RTA KQL DB 資源:“kusto”

執行下列命令以取得權杖:

mssparkutils.credentials.getToken('audience Key')

使用使用者認證取得秘密

getSecret 會使用使用者認證,針對指定的 Azure Key Vault 端點和祕密名稱,傳回 Azure Key Vault 祕密。

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

檔案裝載和卸載

Fabric 支援Microsoft Spark 公用程式套件中的下列裝載案例。 您可以使用裝載卸載getMountPath()mounts() API,將遠端儲存體 (ADLS Gen2) 連結至所有工作節點 (驅動程序節點和背景工作節點)。 儲存載入點就緒之後,請使用本機檔案 API 來存取資料,就好像儲存在本機文件系統中一樣。

如何裝載 ADLS Gen2 帳戶

下列範例說明如何裝載 Azure Data Lake Storage Gen2。 裝載 Blob 儲存體的運作方式類似。

此範例假設您有一個名為 storegen2 的 Data Lake Storage Gen2 帳戶,而該帳戶有一個名為 mycontainer 的容器,而您想要裝載至 /test 到筆記本 Spark 工作階段。

顯示在何處選取容器以裝載的螢幕擷取畫面。

若要裝載名為 mycontainer 的容器,mssparkutils 必須先檢查您是否有權存取容器。 目前,Fabric 支援兩種觸發程式裝載作業的驗證方法:accountKeysastoken

透過共用存取簽章權杖或帳戶密鑰裝載

MSSparkUtils 支援明確傳遞帳戶密鑰或共用存取簽章 (SAS) 權杖作為裝載目標的參數。

基於安全性考慮,我們建議您將帳戶金鑰或 SAS 權杖儲存在 Azure Key Vault 中 (如下列螢幕擷取畫面所示)。 接著,您可以使用 mssparkutils.credentials.getSecret API 來擷取它們。 如需關於 Azure Key Vault 的詳細資訊,請參閱關於 Azure Key Vault 受控儲存體帳戶金鑰

顯示祕密儲存在 Azure Key Vault 中的螢幕擷取畫面。

accountKey 方法的範例程式碼:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

sastoken 的範例程式碼:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

注意

如果無法使用,您可能需要匯入 mssparkutils

from notebookutils import mssparkutils

裝載參數:

  • fileCacheTimeout: Blobs 預設會在本機暫存資料夾中快取 120 秒。 在此期間,blobfuse 不會檢查檔案是否會最新狀態。 參數可以設定為變更預設逾時時間。 當多個用戶端同時修改檔案時,為了避免本機和遠端檔案之間的不一致,建議您縮短快取時間,甚至將其變更為 0,並且一律從伺服器取得最新的檔案。
  • timeout:裝載作業逾時預設為120秒。 參數可以設定為變更預設逾時時間。 當執行程式太多或裝載逾時時,建議增加值。

您可以使用這些參數,如下所示:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

注意

基於安全性考慮,我們建議您不要將認證儲存在程式碼中。 為了進一步保護您的認證,我們會在筆記本輸出中修訂您的祕密。 如需詳細資訊,請參閱祕密修訂

如何裝載 Lakehouse

將 Lakehouse 裝載至 /test 的範例程式碼:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

注意

不支援裝載區域端點。 Fabric 僅支援裝載全域端點,onelake.dfs.fabric.microsoft.com

使用 mssparktuils fs API 存取裝載點下的檔案

裝載作業的主要目的是讓客戶使用本機檔系統 API 存取儲存在遠端儲存體帳戶中的資料。 您也可以使用 mssparkutils fs API 搭配裝載路徑做為參數來存取資料。 此路徑格式稍有不同。

假設您使用掛接 API 將 Data Lake Storage Gen2 容器 mycontainer 裝載至 /test。 當您使用本機檔案系統 API 存取資料時,路徑格式如下所示:

/synfs/notebook/{sessionId}/test/{filename}

當您想要使用 mssparkutils fs API 存取資料時,建議您使用 getMountPath() 來取得正確的路徑:

path = mssparkutils.fs.getMountPath("/test")
  • 列出目錄:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • 讀取檔案內容:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • 建立目錄:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

透過本機路徑存取裝載點下的檔案

您可以使用標準檔案系統,輕鬆地在裝載點中讀取和寫入檔案。 以下是 Python 範例:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

如何檢查現有的裝載點

您可以使用 mssparkutils.fs.mounts() API 來檢查所有現有的裝載點資訊:

mssparkutils.fs.mounts()

如何卸除裝載點

使用下列程式碼來卸載掛接點 (此範例中的 /test):

mssparkutils.fs.unmount("/test")

已知的限制

  • 目前的裝載是作業層級組態;建議您使用 裝載 API 來檢查載入點是否存在或無法使用。

  • 取消裝載機制不是自動的。 當應用程式執行完成時,若要卸載裝入點並釋放磁碟空間,您必須在程式碼中明確呼叫卸載 API。 否則,在應用程式執行完成之後,裝載點仍會存在於節點中。

  • 不支援裝載 ADLS Gen1 儲存體帳戶。

Lakehouse 公用程式

mssparkutils.lakehouse 提供專為管理 Lakehouse 工藝品量身打造的公用程式。 這些公用程式可讓使用者輕鬆建立、擷取、更新和刪除 Lakehouse 工藝品。

注意

只有執行階段 1.2+ 版才支援 Lakehouse API。

方法概觀

以下是 mssparkutils.lakehouse 所提供的可用方法概觀:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

使用範例

若要有效地使用這些方法,請考慮下列使用範例:

建立 Lakehouse 工藝品

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

擷取 Lakehouse 工藝品

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

更新 Lakehouse 工藝品

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

刪除 Lakehouse 工藝品

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

列出 Lakehouse 工藝品

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

其他資訊

如需每個方法及其參數的詳細資訊,請使用 mssparkutils.lakehouse.help("methodName") 函式。

使用 MSSparkUtils 的 Lakehouse 公用程式,管理 Lakehouse 成品會變得更有效率,並整合到您的 Fabric 管線中,增強整體資料管理體驗。

您可以隨意探索這些公用程式,並將其併入您的 Fabric 工作流程,以便順暢地進行 Lakehouse 工藝品管理。

執行階段公用程式

顯示工作階段內容資訊

您可以透過 mssparkutils.runtime.context 取得目前即時工作階段的內容資訊,包括筆記本名稱、預設 Lakehouse、工作區資訊,如果是管線執行等。

mssparkutils.runtime.context

已知問題

使用高於 1.2 的執行階段版本並執行 mssparkutils.help() 時,目前不支援列出的 fabricClient倉儲工作區 API,未來將在進一步提供。