Ingerir dados usando o SDK do .NET Kusto
Há duas bibliotecas de clientes para o .NET: uma biblioteca de ingestão e uma biblioteca de dados. Para obter mais informações sobre o SDK do .NET, confira Sobre o SDK do .NET. Essas bibliotecas permitem a inclusão de dados (carga) em um cluster e dados de consulta do seu código. Neste artigo, primeiro você aprenderá a criar uma tabela e um mapeamento de dados em um cluster de teste. Depois, você enfileira uma ingestão no cluster e valida os resultados.
Pré-requisitos
- Uma conta Microsoft ou uma identidade de usuário do Microsoft Entra. Uma assinatura do Azure não é necessária.
- Um cluster e um banco de dados. Criar um cluster e um banco de dados.
Instalar a biblioteca de ingestão
Install-Package Microsoft.Azure.Kusto.Ingest
Adicionar a autenticação e construir a cadeia de conexão
Autenticação
Para autenticar um aplicativo, o SDK usa sua ID de locatário do Microsoft Entra. Para encontrar seu ID de locatário, use a seguinte URL, substituindo seu domínio por YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Por exemplo, se o seu domínio for contoso.com, a URL será https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique nesta URL para ver os resultados; a primeira linha é a seguinte.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
A ID do locatário neste caso é 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
Este exemplo usa uma autenticação de usuário interativa do Microsoft Entra para acessar o cluster. Use também a autenticação de aplicativo do Microsoft Entra com o segredo do aplicativo ou do certificado. Lembre-se de definir os valores corretos para tenantId
e clusterUri
antes de executar esse código.
O SDK fornece uma forma conveniente de configurar o método de autenticação como parte da cadeia de conexão. Para obter a documentação completa sobre as cadeias de conexão, confira Cadeias de conexão.
Observação
A versão atual do SDK não dá suporte à autenticação de usuário interativa no .NET Core. Se necessário, use o nome de usuário/a senha ou a autenticação de aplicativo do Microsoft Entra.
Construir a cadeia de conexão
Agora, você pode construir a cadeia de caracteres de conexão. Você criará a tabela de destino e o mapeamento em uma etapa posterior.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Definir informações de arquivo de origem
Defina o caminho até o arquivo de origem. Este exemplo usa um arquivo de exemplo hospedado no armazenamento de BLOBs do Azure. O conjunto de dados de amostra StormEvents contém dados relacionados ao clima dos Centros Nacionais de Informações Ambientais.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Criar uma tabela em seu cluster de teste
Crie uma tabela chamada StormEvents
que corresponde ao esquema dos dados no arquivo StormEvents.csv
.
Dica
Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Em produção, as instâncias de cliente são reentrantes e devem ser mantidas desde que sejam necessárias. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definir mapeamento de ingestão
Mapear os dados CSV de entrada para os nomes de colunas usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definir a política de envio em lote para sua tabela
O envio em lote dos dados de entrada otimiza o tamanho de fragmento dos dados, que é controlado pela política de envio em lote de ingestão. Modifique a política com o comando de gerenciamento da política de envio em lote de ingestão. Use essa política para reduzir a latência de dados com chegada lenta.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Recomendamos definir um valor Raw Data Size
para os dados ingeridos e reduzir incrementalmente o tamanho para 250 MB, verificando se o desempenho é aprimorado.
Use a propriedade Flush Immediately
para ignorar o envio em lote, embora isso não seja recomendado para a ingestão em larga escala, pois pode causar baixo desempenho.
Enfileirar uma mensagem para ingestão
Enfileirar uma mensagem para extrair dados do armazenamento de blobs e ingerir os dados. Uma conexão é estabelecida com o cluster de ingestão e outro cliente é criado para trabalhar com esse ponto de extremidade.
Dica
Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Em produção, as instâncias de cliente são reentrantes e devem ser mantidas desde que sejam necessárias. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Validar os dados ingeridos na tabela
Aguarde de cinco a dez minutos para a ingestão na fila agendar a ingestão e carregar os dados no seu cluster. Em seguida, execute o código a seguir para obter a contagem de registros na tabela StormEvents
.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Executar consultas de solução de problemas
Conectar https://dataexplorer.azure.com e conectar ao seu cluster. Execute o seguinte comando no banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Execute o seguinte comando para exibir o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpar os recursos
Caso pretenda seguir nossos outros artigos, mantenha os recursos criados. Caso contrário, execute o comando a seguir no seu banco de dados para limpar a tabela StormEvents
.
.drop table StormEvents