Azure Data Factory'de Spark etkinliğini kullanarak verileri bulutta dönüştürme
UYGULANANLAR: Azure Data Factory Azure Synapse Analytics
İpucu
Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!
Bu öğreticide, Azure PowerShell kullanarak verileri Spark Etkinliği ve talep üzerine HDInsight bağlı hizmeti ile dönüştüren bir Data Factory işlem hattı oluşturacaksınız. Bu öğreticide aşağıdaki adımları gerçekleştireceksiniz:
- Veri fabrikası oluşturma.
- Bağlantılı hizmetler yazma ve dağıtma.
- İşlem hattı oluşturma ve dağıtma.
- Bir işlem hattı çalıştırması başlatma.
- İşlem hattı çalıştırmasını izleme.
Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Önkoşullar
Not
Azure ile etkileşim kurmak için Azure Az PowerShell modülünü kullanmanızı öneririz. Başlamak için bkz . Azure PowerShell'i yükleme. Az PowerShell modülüne nasıl geçeceğinizi öğrenmek için bkz. Azure PowerShell’i AzureRM’den Az’ye geçirme.
- Azure Depolama hesabı. Python betiği ve giriş dosyası oluşturup bunları Azure depolama alanına yüklersiniz. Spark programının çıktısı bu depolama hesabında depolanır. İsteğe bağlı Spark kümesi, birincil depolama alanıyla aynı depolama hesabını kullanır.
- Azure PowerShell. Azure PowerShell’i yükleme ve yapılandırma bölümündeki yönergeleri izleyin.
Python betiğini Blob Depolama hesabınıza yükleme
Aşağıdaki içeriğe sahip WordCount_Spark.py adlı bir Python dosyası oluşturun:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
<storageAccountName>’i Azure Depolama hesabınızın adıyla değiştirin. Ardından dosyayı kaydedin.
Azure Blob depolama alanınızda henüz yoksa adftutorial adlı bir kapsayıcı oluşturun.
Spark adlı bir klasör oluşturun.
Spark klasörünün altında script adlı bir alt klasör oluşturun.
WordCount_Spark.py dosyasını script alt klasörüne yükleyin.
Girdi dosyasını yükleme
- Bazı metinlerle minecraftstory.txt adlı bir dosya oluşturun. Spark programı bu metindeki sözcükleri sayar.
spark
klasöründeinputfiles
adlı bir alt klasör oluşturun.minecraftstory.txt
dosyasınıinputfiles
alt klasörüne yükleyin.
Bağlı hizmetler oluşturma
Bu bölümde iki Bağlı Hizmet oluşturacaksınız:
- Bir Azure Depolama hesabını veri fabrikasına bağlayan Azure Depolama Bağlı Hizmeti. Bu depolama alanı, isteğe bağlı HDInsight kümesi tarafından kullanılır. Ayrıca, yürütülecek Spark betiğini içerir.
- İsteğe bağlı HDInsight Bağlı Hizmeti. Azure Data Factory otomatik olarak bir HDInsight kümesi oluşturur, Spark programını çalıştırır ve önceden yapılandırılmış bir süre boyunca boşta kaldıktan sonra HDInsight kümesini siler.
Azure Storage bağlı hizmeti
Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure Depolama bağlı hizmetinin aşağıdaki JSON tanımını kopyalayın ve ardından dosyayı MyStorageLinkedService.json olarak kaydedin.
{
"name": "MyStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
}
}
}
<storageAccountName> ve <storageAccountKey> değerlerini Azure Depolama hesabınızın adı ve anahtarıyla güncelleştirin.
İsteğe bağlı HDInsight bağlı hizmeti
Tercih ettiğiniz düzenleyiciyi kullanarak bir JSON dosyası oluşturun, Azure HDInsight bağlı hizmetinin aşağıdaki JSON tanımını kopyalayın ve dosyayı MyOnDemandSparkLinkedService.json olarak kaydedin.
{
"name": "MyOnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "<subscriptionID> ",
"servicePrincipalId": "<servicePrincipalID>",
"servicePrincipalKey": {
"value": "<servicePrincipalKey>",
"type": "SecureString"
},
"tenant": "<tenant ID>",
"clusterResourceGroup": "<resourceGroupofHDICluster>",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
Bağlı hizmet tanımında aşağıdaki özelliklerin değerlerini güncelleştirin:
- hostSubscriptionId. <SubscriptionID>’yi Azure aboneliğinizin kimliği ile değiştirin. İsteğe bağlı HDInsight kümesi bu abonelikte oluşturulur.
- tenant. <tenantID> değerini Azure kiracınızın kimliği ile değiştirin.
- servicePrincipalId, servicePrincipalKey. servicePrincipalID> ve servicePrincipalKey> değerlerini Microsoft Entra Id içindeki hizmet sorumlunuzun kimliği ve anahtarıyla değiştirin<<. Bu hizmet sorumlusu, abonelikte ya da kümenin oluşturulduğu kaynak grubunda Katkıda Bulunan rolünün bir üyesi olmalıdır. Ayrıntılar için bkz . Microsoft Entra uygulaması ve hizmet sorumlusu oluşturma. Hizmet sorumlusu kimliği Uygulama Kimliği ile eşdeğerdir ve Hizmet sorumlusu anahtarı bir İstemci gizli dizisinin değerine eşdeğerdir.
- clusterResourceGroup. <resourceGroupOfHDICluster> değerini HDInsight kümesinin oluşturulması gereken kaynak grubunun adıyla değiştirin.
Not
Azure HDInsight, desteklediği her bir Azure bölgesinde kullanabileceğiniz toplam çekirdek sayısıyla ilgili sınırlamaya sahiptir. İsteğe Bağlı HDInsight Bağlı Hizmeti için HDInsight kümesi, birincil depolama olarak kullanılan Azure Depolama konumunda oluşturulur. Kümenin başarıyla oluşturulabilmesi için yeterince çekirdek kotanızın olduğundan emin olun. Daha fazla bilgi için bkz. HDInsight’ta Hadoop, Spark, Kafka ve daha fazlası ile küme ayarlama.
İşlem hattı oluşturma
Bu adımda, Spark etkinliği ile bir işlem hattı oluşturacaksınız. Etkinlik, sözcük sayısı örneğini kullanır. Henüz yapmadıysanız bu konumdan içeriği indirin.
Tercih ettiğiniz düzenleyicide bir JSON dosyası oluşturun, aşağıdaki işlem hattı JSON tanımını kopyalayın ve MySparkOnDemandPipeline.json olarak kaydedin.
{
"name": "MySparkOnDemandPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyOnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "MyStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
Aaşağıdaki noktaları unutmayın:
- rootPath, adftutorial kapsayıcısının spark klasörünü işaret eder.
- entryFilePath, spark klasörünün script alt klasöründeki WordCount_Spark.py dosyasını işaret eder.
Veri fabrikası oluşturma
JSON dosyalarında bağlı hizmet ve işlem hattı tanımları oluşturdunuz. Şimdi bir veri fabrikası oluşturalım ve PowerShell cmdlet'lerini kullanarak bağlı Hizmet ve işlem hattı JSON dosyalarını dağıtalım. Aşağıdaki PowerShell komutlarını tek tek çalıştırın:
Değişkenleri tek tek ayarlayın.
Kaynak Grup Adı
$resourceGroupName = "ADFTutorialResourceGroup"
Data Factory Adı. Genel olarak benzersiz olması gerekir
$dataFactoryName = "MyDataFactory09102017"
İşlem hattı adı
$pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
PowerShell’i başlatın. Bu hızlı başlangıcın sonuna kadar Azure PowerShell’i açık tutun. Kapatıp yeniden açarsanız komutları yeniden çalıştırmanız gerekir. Data Factory'nin kullanılabileceği Azure bölgelerinin bir listesi için bir sonraki sayfada ilgilendiğiniz bölgeleri seçin ve Analytics'i genişleterek Data Factory: Products available by region (Bölgeye göre kullanılabilir durumdaki ürünler) bölümünü bulun. Veri fabrikası tarafından kullanılan verileri depoları (Azure Depolama, Azure SQL Veritabanı vb.) ve işlemler (HDInsight vb.) başka bölgelerde olabilir.
Aşağıdaki komutu çalıştırın ve Azure portalda oturum açmak için kullandığınız kullanıcı adı ve parolayı girin:
Connect-AzAccount
Bu hesapla ilgili tüm abonelikleri görmek için aşağıdaki komutu çalıştırın:
Get-AzSubscription
Çalışmak isteğiniz aboneliği seçmek için aşağıdaki komutu çalıştırın. SubscriptionId’yi Azure aboneliğinizin kimliği ile değiştirin:
Select-AzSubscription -SubscriptionId "<SubscriptionId>"
ADFTutorialResourceGroup kaynak grubunu oluşturun.
New-AzResourceGroup -Name $resourceGroupName -Location "East Us"
Veri fabrikasını oluşturun.
$df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Çıktıyı görmek için aşağıdaki komutu yürütün:
$df
JSON dosyalarını oluşturduğunuz klasöre geçin ve Azure Depolama bağlı hizmetlerini dağıtmak için aşağıdaki komutu çalıştırın:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
İsteğe bağlı Spark bağlı hizmetinde aşağıdaki komutu çalıştırın:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
Bir işlem hattını dağıtmak için aşağıdaki komutu çalıştırın:
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
İşlem hattı çalıştırmasını başlatma ve izleme
Bir işlem hattı çalıştırması başlatma. Ayrıca, gelecekte izlemek üzere işlem hattı çalıştırma kimliğini yakalar.
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName
İşlem hattı çalıştırma durumunu tamamlanıncaya kadar sürekli olarak denetlemek için aşağıdaki betiği çalıştırın.
while ($True) { $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30) if(!$result) { Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow" } elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) { Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow" } else { Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow" $result break } ($result | Format-List | Out-String) Start-Sleep -Seconds 15 } Write-Host "Activity `Output` section:" -foregroundcolor "Yellow" $result.Output -join "`r`n" Write-Host "Activity `Error` section:" -foregroundcolor "Yellow" $result.Error -join "`r`n"
Örnek çalıştırmanın çıktısı aşağıdaki gibidir:
Pipeline run status: In Progress ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : DurationInMs : Status : InProgress Error : … Pipeline ' MySparkOnDemandPipeline' run finished. Result: ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : MyDataFactory09102017 ActivityName : MySparkActivity PipelineRunId : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794 PipelineName : MySparkOnDemandPipeline Input : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService} Output : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime} LinkedServiceName : ActivityRunStart : 9/20/2017 6:33:47 AM ActivityRunEnd : 9/20/2017 6:46:30 AM DurationInMs : 763466 Status : Succeeded Error : {errorCode, message, failureType, target} Activity Output section: "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/" "jobId": "0" "ExecutionProgress": "Succeeded" "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)" Activity Error section: "errorCode": "" "message": "" "failureType": "" "target": "MySparkActivity"
Spark programının çıktısı ile adftutorial kapsayıcısının
spark
klasöründeoutputfiles
adlı bir klasörün oluşturulduğunu onaylayın.
İlgili içerik
Bu örnekteki işlem hattı, verileri bir konumdan Azure blob depolama alanındaki başka bir konuma kopyalar. Şunları öğrendiniz:
- Veri fabrikası oluşturma.
- Bağlantılı hizmetler yazma ve dağıtma.
- İşlem hattı oluşturma ve dağıtma.
- Bir işlem hattı çalıştırması başlatma.
- İşlem hattı çalıştırmasını izleme.
Sanal ağdaki bir Azure HDInsight kümesinde Hive betiği çalıştırarak verileri dönüştürme hakkında bilgi almak için sonraki öğreticiye ilerleyin.