Microsoft Fabric Rest API ile Spark İş Tanımı oluşturma ve güncelleştirme

Microsoft Fabric Rest API, Doku öğelerinin CRUD işlemleri için bir hizmet uç noktası sağlar. Bu öğreticide, Spark İş Tanımı yapıtı oluşturma ve güncelleştirmeye yönelik uçtan uca bir senaryoyu inceleyeceğiz. Üç üst düzey adım söz konusu olur:

  1. başlangıç durumuna sahip bir Spark İş Tanımı öğesi oluşturma
  2. ana tanım dosyasını ve diğer lib dosyalarını karşıya yükleme
  3. Spark İş Tanımı öğesini ana tanım dosyasının ve diğer lib dosyalarının OneLake URL'si ile güncelleştirin

Önkoşullar

  1. Doku Rest API'sine erişmek için bir Microsoft Entra belirteci gereklidir. Belirteci almak için MSAL kitaplığı önerilir. Daha fazla bilgi için bkz . MSAL'de kimlik doğrulama akışı desteği.
  2. OneLake API'sine erişmek için bir depolama belirteci gereklidir. Daha fazla bilgi için bkz . Python için MSAL.

Başlangıç durumuyla Spark İş Tanımı öğesi oluşturma

Microsoft Fabric Rest API, Doku öğelerinin CRUD işlemleri için birleşik bir uç nokta tanımlar. Uç nokta şeklindedir https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Öğe ayrıntısı istek gövdesinde belirtilir. Spark İş Tanımı öğesi oluşturmaya yönelik istek gövdesinin bir örneği aşağıda verilmişti:

{
    "displayName": "SJDHelloWorld",
    "type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
                "payloadType": "InlineBase64"
            }
        ]
    }
}

Bu örnekte Spark İş Tanımı öğesi olarak SJDHelloWorldadlandırılmıştır. alanı payload , ayrıntı kurulumunun base64 kodlanmış içeriğidir ve kod çözmeden sonra içerik şu şekildedir:

{
    "executableFile":null,
    "defaultLakehouseArtifactId":"",
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":null,
    "commandLineArguments":"",
    "additionalLibraryUris":[],
    "language":"",
    "environmentArtifactId":null
}

Ayrıntılı kurulumu kodlamak ve kodunu çözmek için iki yardımcı işlev aşağıdadır:

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Spark İş Tanımı öğesi oluşturmak için kod parçacığı aşağıdadır:

import requests

bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"

# Define the payload data for the POST request
payload_data = {
    "displayName": "SJDHelloWorld",
    "Type": "SparkJobDefinition",
    "definition": {
        "format": "SparkJobDefinitionV1",
        "parts": [
            {
                "path": "SparkJobDefinitionV1.json",
                "payload": payload,
                "payloadType": "InlineBase64"
            }
        ]
    }
}

# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)

Ana tanım dosyasını ve diğer lib dosyalarını karşıya yükleme

Dosyayı OneLake'e yüklemek için bir depolama belirteci gereklidir. Depolama belirtecini almak için bir yardımcı işlev aşağıdadır:


import msal

def getOnelakeStorageToken():
    app = msal.PublicClientApplication(
        "{client id}", # this filed should be the client id 
        authority="https://login.microsoftonline.com/microsoft.com")

    result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])

    print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")

    return result['access_token']

Artık bir Spark İş Tanımı öğesi oluşturuldu ve bu öğeyi çalıştırılabilir hale getirmek için ana tanım dosyasını ve gerekli özellikleri ayarlamamız gerekiyor. Bu SJD öğesinin dosyasını karşıya yüklemek için uç nokta şudur: https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}. Önceki adımla aynı "workspaceId" kullanılmalıdır; önceki adımın yanıt gövdesinde "sjdartifactid" değeri bulunabilir. Ana tanım dosyasını ayarlamak için kod parçacığı aşağıdadır:

import requests

# three steps are required: create file, append file, flush file

onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value


onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}

onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
    # Request was successful
    print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")

# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file

appendPosition = 0;
appendAction = "append";

### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents

onelakePatchRequestHeaders = {
    "Authorization": f"Bearer {onelakeStorageToken}",
    "Content-Type" : "text/plain"
}

onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
    # Request was successful
    print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")

# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file

flushAction = "flush";

### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
    print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
    print(onelakeFlushMainFileResponse.json())

Gerekirse diğer lib dosyalarını karşıya yüklemek için aynı işlemi izleyin.

Spark İş Tanımı öğesini ana tanım dosyasının ve diğer lib dosyalarının OneLake URL'si ile güncelleştirin

Şimdiye kadar, bazı başlangıç durumlarına sahip bir Spark İş Tanımı öğesi oluşturduk, ana tanım dosyasını ve diğer lib dosyalarını karşıya yükledik, Son adım Spark İş Tanımı öğesini ana tanım dosyasının ve diğer lib dosyalarının URL özelliklerini ayarlamak üzere güncelleştirmektir. Spark İş Tanımı öğesini güncelleştirmek için uç noktadır https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}. Önceki adımlardan aynı "workspaceId" ve "sjdartifactid" kullanılmalıdır. Spark İş Tanımı öğesini güncelleştirmek için kod parçacığı aşağıdadır:


mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}"  # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id

updateRequestBodyJson = {
    "executableFile":mainAbfssPath,
    "defaultLakehouseArtifactId":defaultLakehouseId,
    "mainClass":"",
    "additionalLakehouseIds":[],
    "retryPolicy":None,
    "commandLineArguments":"",
    "additionalLibraryUris":[libsAbfssPath],
    "language":"Python",
    "environmentArtifactId":None}

# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)

# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)

# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"

updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"

# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token

headers = {
    "Authorization": f"Bearer {bearerToken}", 
    "Content-Type": "application/json"  # Set the content type based on your request
}

# Define the payload data for the POST request
payload_data = {
    "displayName": "sjdCreateTest11",
    "Type": Type,
    "definition": {
        "format": format,
        "parts": [
            {
                "path": path,
                "payload": updatePayload,
                "payloadType": payloadType
            }
        ]
    }
}


# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
    print("Successfully updated SJD.")
else:
    print(response.json())
    print(response.status_code)

İşlemin tamamını özetlemek için bir Spark İş Tanımı öğesi oluşturmak ve güncelleştirmek için hem Doku REST API'sine hem de OneLake API'sine ihtiyaç vardır. Doku REST API'si Spark İş Tanımı öğesini oluşturmak ve güncelleştirmek için, OneLake API'si ise ana tanım dosyasını ve diğer lib dosyalarını karşıya yüklemek için kullanılır. Ana tanım dosyası ve diğer lib dosyaları önce OneLake'e yüklenir. Ardından ana tanım dosyasının ve diğer lib dosyalarının URL özellikleri Spark İş Tanımı öğesinde ayarlanır.