Spouštění úloh MapReduce pomocí sady HDInsight .NET SDK
Zjistěte, jak odesílat úlohy MapReduce pomocí sady HDInsight .NET SDK. Clustery HDInsight mají soubor JAR s některými ukázkami MapReduce. Soubor jar je /example/jars/hadoop-mapreduce-examples.jar
. Jednou z ukázek je počet slov. Vyvíjíte konzolovou aplikaci jazyka C#, která odešle úlohu wordcount. Úloha přečte /example/data/gutenberg/davinci.txt
soubor a vypíše výsledky do /example/data/davinciwordcount
. Pokud chcete aplikaci znovu spustit, musíte výstupní složku vyčistit.
Poznámka:
Kroky v tomto článku je nutné provést z klienta systému Windows. Informace o použití klienta Systému Linux, OS X nebo Unix pro práci s Hivem použijte selektor karet zobrazený v horní části článku.
Požadavky
Cluster Apache Hadoop ve službě HDInsight. Viz Vytváření clusterů Apache Hadoop pomocí webu Azure Portal.
Odesílání úloh MapReduce pomocí sady HDInsight .NET SDK
Sada HDInsight .NET SDK poskytuje klientské knihovny .NET, které usnadňují práci s clustery HDInsight z .NET.
Spusťte Visual Studio a vytvořte konzolovou aplikaci jazyka C#.
Přejděte do konzoly NuGet> Tools>Správce balíčků Správce balíčků a zadejte následující příkaz:
Install-Package Microsoft.Azure.Management.HDInsight.Job
Zkopírujte následující kód do Program.cs. Pak upravte kód nastavením hodnot pro:
existingClusterName
,defaultStorageAccountName
existingClusterPassword
, ,defaultStorageAccountKey
adefaultStorageContainerName
.using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using Microsoft.Azure.Management.HDInsight.Job; using Microsoft.Azure.Management.HDInsight.Job.Models; using Hyak.Common; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; namespace SubmitHDInsightJobDotNet { class Program { private static HDInsightJobManagementClient _hdiJobManagementClient; private const string existingClusterName = "<Your HDInsight Cluster Name>"; private const string existingClusterPassword = "<Cluster User Password>"; private const string defaultStorageAccountName = "<Default Storage Account Name>"; private const string defaultStorageAccountKey = "<Default Storage Account Key>"; private const string defaultStorageContainerName = "<Default Blob Container Name>"; private const string existingClusterUsername = "admin"; private const string existingClusterUri = existingClusterName + ".azurehdinsight.net"; private const string sourceFile = "/example/data/gutenberg/davinci.txt"; private const string outputFolder = "/example/data/davinciwordcount"; static void Main(string[] args) { System.Console.WriteLine("The application is running ..."); var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword }; _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials); SubmitMRJob(); System.Console.WriteLine("Press ENTER to continue ..."); System.Console.ReadLine(); } private static void SubmitMRJob() { List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } }; var paras = new MapReduceJobSubmissionParameters { JarFile = @"/example/jars/hadoop-mapreduce-examples.jar", JarClass = "wordcount", Arguments = args }; System.Console.WriteLine("Submitting the MR job to the cluster..."); var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras); var jobId = jobResponse.JobSubmissionJsonResponse.Id; System.Console.WriteLine("Response status code is " + jobResponse.StatusCode); System.Console.WriteLine("JobId is " + jobId); System.Console.WriteLine("Waiting for the job completion ..."); // Wait for job completion var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; while (!jobDetail.Status.JobComplete) { Thread.Sleep(1000); jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; } // Get job output System.Console.WriteLine("Job output is: "); var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey, defaultStorageContainerName); if (jobDetail.ExitValue == 0) { // Create the storage account object CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + defaultStorageAccountName + ";AccountKey=" + defaultStorageAccountKey); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000"); using (var stream = blockBlob.OpenRead()) { using (StreamReader reader = new StreamReader(stream)) { while (!reader.EndOfStream) { System.Console.WriteLine(reader.ReadLine()); } } } } else { // fetch stderr output in case of failure var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); using (var reader = new StreamReader(output, Encoding.UTF8)) { string value = reader.ReadToEnd(); System.Console.WriteLine(value); } } } } }
Stisknutím klávesy F5 spusťte aplikaci.
Pokud chcete úlohu spustit znovu, musíte změnit název výstupní složky úlohy v ukázce /example/data/davinciwordcount
.
Po úspěšném dokončení úlohy aplikace vytiskne obsah výstupního souboru part-r-00000
.
Další kroky
V tomto článku jste se naučili několik způsobů vytvoření clusteru HDInsight. Další informace najdete v těchto článcích:
- Pokud chcete odeslat úlohu Hive, přečtěte si téma Spouštění dotazů Apache Hive pomocí sady HDInsight .NET SDK.
- Informace o vytváření clusterů HDInsight najdete v tématu Vytváření clusterů Apache Hadoop založených na Linuxu ve službě HDInsight.
- Informace o správě clusterů HDInsight najdete v tématu Správa clusterů Apache Hadoop ve službě HDInsight.
- Informace o sadě HDInsight .NET SDK najdete v referenčních informacích k sadě HDInsight .NET SDK.
- Neinteraktivní ověřování v Azure najdete v tématu Vytváření neinteraktivních aplikací .NET HDInsight.