Pozyskiwanie danych przy użyciu zestawu SDK platformy .NET usługi Kusto

Istnieją dwie biblioteki klienckie dla platformy .NET: biblioteka pozyskiwania i biblioteka danych. Aby uzyskać więcej informacji na temat zestawu .NET SDK, zobacz about .NET SDK (Zestaw .NET SDK). Te biblioteki umożliwiają pozyskiwanie (ładowanie) danych do klastra i wykonywanie zapytań o dane z kodu. W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie umieścisz pozyskiwanie w kolejce do klastra i sprawdzisz poprawność wyników.

Wymagania wstępne

  • Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
  • Klaster i baza danych. Utwórz klaster i bazę danych.

Instalowanie biblioteki pozyskiwania

Install-Package Microsoft.Azure.Kusto.Ingest

Dodawanie uwierzytelniania i konstruowanie parametry połączenia

Authentication

Aby uwierzytelnić aplikację, zestaw SDK używa identyfikatora dzierżawy Microsoft Entra. Aby odnaleźć identyfikator dzierżawy, użyj następującego adresu URL, zastępując ciąg YourDomain nazwą domeny.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Jeśli na przykład Twoja domena to contoso.com, adres URL to https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kliknij ten adres URL, aby wyświetlić wyniki. Pierwszy wiersz wygląda w następujący sposób.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Identyfikator dzierżawy w tym przypadku to 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

W tym przykładzie użyto interaktywnego uwierzytelniania użytkownika Microsoft Entra w celu uzyskania dostępu do klastra. Można również użyć uwierzytelniania aplikacji Microsoft Entra z certyfikatem lub wpisem tajnym aplikacji. Przed uruchomieniem tego kodu upewnij się, że ustawiono poprawne wartości dla tenantId i clusterUri .

Zestaw SDK zapewnia wygodny sposób konfigurowania metody uwierzytelniania w ramach parametry połączenia. Aby uzyskać pełną dokumentację dotyczącą parametrów połączenia, zobacz parametry połączenia.

Uwaga

Bieżąca wersja zestawu SDK nie obsługuje uwierzytelniania interakcyjnego użytkownika na platformie .NET Core. W razie potrzeby zamiast tego użyj Microsoft Entra nazwy użytkownika/hasła lub uwierzytelniania aplikacji.

Tworzenie parametrów połączenia

Teraz możesz skonstruować parametry połączenia. W późniejszym kroku utworzysz tabelę docelową i mapowanie.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Ustawianie informacji o pliku źródłowym

Ustaw ścieżkę dla pliku źródłowego. W tym przykładzie używany jest przykładowy plik hostowany w usłudze Azure Blob Storage. Przykładowy zestaw danych StormEvents zawiera dane związane z pogodą z National Centers for Environmental Information.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Tworzenie tabeli w klastrze testowym

Utwórz tabelę o nazwie StormEvents, która będzie zgodna ze schematem danych w pliku StormEvents.csv.

Porada

Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Jest to wykonywane, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiowanie mapowania pozyskiwania

Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Aprowizuj obiekt mapowania kolumn CSV w tej tabeli.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiowanie zasad dzielenia na partie dla tabeli

Przetwarzanie wsadowe danych przychodzących optymalizuje rozmiar fragmentu danych, który jest kontrolowany przez zasady dzielenia na partie pozyskiwania. Zmodyfikuj zasady za pomocą polecenia zarządzania zasadami dzielenia na partie pozyskiwania. Użyj tych zasad, aby zmniejszyć opóźnienie powoli przychodzących danych.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Zalecamy zdefiniowanie Raw Data Size wartości dla pozyskanych danych i przyrostowe zmniejszenie rozmiaru w kierunku 250 MB, a jednocześnie sprawdzenie, czy wydajność się poprawia.

Możesz użyć Flush Immediately właściwości , aby pominąć przetwarzanie wsadowe, chociaż nie jest to zalecane w przypadku pozyskiwania na dużą skalę, ponieważ może to spowodować niską wydajność.

Wysyłanie komunikatu do kolejki w celu pozyskiwania

Kolejkowanie komunikatu w celu ściągnięcia danych z magazynu obiektów blob i pozyskiwania danych. Nawiązane jest połączenie z klastrem pozyskiwania, a inny klient jest tworzony do pracy z tym punktem końcowym.

Porada

Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Jest to wykonywane, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Weryfikowanie, czy dane zostały pozyskane do tabeli

Poczekaj od pięciu do dziesięciu minut na zaplanowanie pozyskiwania w kolejce i załadowanie danych do klastra. Następnie uruchom następujący kod, aby uzyskać liczbę rekordów w tabeli StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Uruchamianie zapytań dotyczących rozwiązywania problemów

Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Czyszczenie zasobów

Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. W przeciwnym razie uruchom następujące polecenie w bazie danych, aby wyczyścić tabelę StormEvents.

.drop table StormEvents