Ingerir dados usando o SDK do .NET Kusto

Há duas bibliotecas de clientes para o .NET: uma biblioteca de ingestão e uma biblioteca de dados. Para obter mais informações sobre o SDK do .NET, confira Sobre o SDK do .NET. Essas bibliotecas permitem a inclusão de dados (carga) em um cluster e dados de consulta do seu código. Neste artigo, primeiro você aprenderá a criar uma tabela e um mapeamento de dados em um cluster de teste. Depois, você enfileira uma ingestão no cluster e valida os resultados.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de usuário do Microsoft Entra. Uma assinatura do Azure não é necessária.
  • Um cluster e um banco de dados. Criar um cluster e um banco de dados.

Instalar a biblioteca de ingestão

Install-Package Microsoft.Azure.Kusto.Ingest

Adicionar a autenticação e construir a cadeia de conexão

Autenticação

Para autenticar um aplicativo, o SDK usa sua ID de locatário do Microsoft Entra. Para encontrar seu ID de locatário, use a seguinte URL, substituindo seu domínio por YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, a URL será https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique nesta URL para ver os resultados; a primeira linha é a seguinte.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

A ID do locatário neste caso é 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Este exemplo usa uma autenticação de usuário interativa do Microsoft Entra para acessar o cluster. Use também a autenticação de aplicativo do Microsoft Entra com o segredo do aplicativo ou do certificado. Lembre-se de definir os valores corretos para tenantId e clusterUri antes de executar esse código.

O SDK fornece uma forma conveniente de configurar o método de autenticação como parte da cadeia de conexão. Para obter a documentação completa sobre as cadeias de conexão, confira Cadeias de conexão.

Observação

A versão atual do SDK não dá suporte à autenticação de usuário interativa no .NET Core. Se necessário, use o nome de usuário/a senha ou a autenticação de aplicativo do Microsoft Entra.

Construir a cadeia de conexão

Agora, você pode construir a cadeia de caracteres de conexão. Você criará a tabela de destino e o mapeamento em uma etapa posterior.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Definir informações de arquivo de origem

Defina o caminho até o arquivo de origem. Este exemplo usa um arquivo de exemplo hospedado no armazenamento de BLOBs do Azure. O conjunto de dados de amostra StormEvents contém dados relacionados ao clima dos Centros Nacionais de Informações Ambientais.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Criar uma tabela em seu cluster de teste

Crie uma tabela chamada StormEvents que corresponde ao esquema dos dados no arquivo StormEvents.csv.

Dica

Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Em produção, as instâncias de cliente são reentrantes e devem ser mantidas desde que sejam necessárias. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir mapeamento de ingestão

Mapear os dados CSV de entrada para os nomes de colunas usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir a política de envio em lote para sua tabela

O envio em lote dos dados de entrada otimiza o tamanho de fragmento dos dados, que é controlado pela política de envio em lote de ingestão. Modifique a política com o comando de gerenciamento da política de envio em lote de ingestão. Use essa política para reduzir a latência de dados com chegada lenta.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Recomendamos definir um valor Raw Data Size para os dados ingeridos e reduzir incrementalmente o tamanho para 250 MB, verificando se o desempenho é aprimorado.

Use a propriedade Flush Immediately para ignorar o envio em lote, embora isso não seja recomendado para a ingestão em larga escala, pois pode causar baixo desempenho.

Enfileirar uma mensagem para ingestão

Enfileirar uma mensagem para extrair dados do armazenamento de blobs e ingerir os dados. Uma conexão é estabelecida com o cluster de ingestão e outro cliente é criado para trabalhar com esse ponto de extremidade.

Dica

Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Em produção, as instâncias de cliente são reentrantes e devem ser mantidas desde que sejam necessárias. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Validar os dados ingeridos na tabela

Aguarde de cinco a dez minutos para a ingestão na fila agendar a ingestão e carregar os dados no seu cluster. Em seguida, execute o código a seguir para obter a contagem de registros na tabela StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Executar consultas de solução de problemas

Conectar https://dataexplorer.azure.com e conectar ao seu cluster. Execute o seguinte comando no banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para exibir o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar os recursos

Caso pretenda seguir nossos outros artigos, mantenha os recursos criados. Caso contrário, execute o comando a seguir no seu banco de dados para limpar a tabela StormEvents.

.drop table StormEvents