Ingerir dados usando o SDK do Java do Kusto
O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. A biblioteca de clientes Java pode ser usada para ingerir dados, emitir comandos de gerenciamento e consultar dados em clusters do Azure Data Explorer.
Neste artigo, saiba como ingerir dados usando a biblioteca Java do Azure Data Explorer. Primeiro, você criará uma tabela e um mapeamento de dados em um cluster de teste. Em seguida, você colocará na fila uma ingestão do armazenamento de blobs para o cluster usando o SDK do Java e validará os resultados.
Pré-requisitos
- Uma conta da Microsoft ou uma identidade de usuário Microsoft Entra. Uma assinatura do Azure não é necessária.
- Um cluster e um banco de dados do Azure Data Explorer. Crie um cluster e um banco de dados.
- Git.
- JDK versão 1.8 ou posterior.
- Maven.
- Criar um registro de aplicativo e conceder a ele permissões para o banco de dados. Salve a ID do cliente e o segredo do cliente para uso posterior.
Examine o código
Esta seção é opcional. Examine os snippets de código a seguir para saber como o código funciona. Para ignorar esta seção, vá para Executar o aplicativo.
Autenticação
O programa usa Microsoft Entra credenciais de autenticação com ConnectionStringBuilder'.
Crie um
com.microsoft.azure.kusto.data.Client
para consulta e gerenciamento.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Crie e use um
com.microsoft.azure.kusto.ingest.IngestClient
para colocar na fila a ingestão de dados no Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Comandos de gerenciamento
Os comandos de gerenciamento, como .drop
e .create
, são executados chamando execute
em um com.microsoft.azure.kusto.data.Client
objeto .
Por exemplo, a tabela StormEvents
é criada da seguinte maneira:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Ingestão de dados
Coloque a ingestão na fila usando um arquivo de um contêiner existente do Armazenamento de Blobs do Azure.
- Use
BlobSourceInfo
para especificar o caminho do Armazenamento de Blobs. - Use
IngestionProperties
para definir a tabela, o banco de dados, o nome de mapeamento e o tipo de dados. No exemplo a seguir, o tipo de dados éCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
O processo de ingestão é iniciado em um thread separado, e o thread main
aguarda a conclusão do thread de ingestão. Esse processo usa CountdownLatch. A API de ingestão (IngestClient#ingestFromBlob
) não é assíncrona. Um loop while
é usado para sondar o status atual a cada 5 segundos e aguarda o status de ingestão ser alterado de Pending
para outro status. O status final pode ser Succeeded
, Failed
ou PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Dica
Há outros métodos para lidar com a ingestão de maneira assíncrona para diferentes aplicativos. Por exemplo, você pode usar CompletableFuture
para criar um pipeline que define a ação após a ingestão, como consultar a tabela ou tratar exceções que foram relatadas para o IngestionStatus
.
Executar o aplicativo
Geral
Quando você executa o código de exemplo, as seguintes ações são realizadas:
-
Remover tabela: a tabela
StormEvents
será removida (se existir). -
Criação de tabela: a tabela
StormEvents
é criada. -
Criação de mapeamento: o mapeamento
StormEvents_CSV_Mapping
é criado. - Ingestão de arquivo: um arquivo CSV (no Armazenamento de Blobs do Azure) é colocado na fila para ingestão.
O seguinte código de exemplo foi obtido de App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Dica
Para experimentar diferentes combinações de operações, você pode remover/inserir as marcas de comentário dos respectivos métodos no App.java
.
Executar o aplicativo
Clone o código de exemplo do GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Defina as informações da entidade de serviço com as seguintes informações como variáveis de ambiente usadas pelo programa:
- Ponto de extremidade do cluster
- Nome do banco de dados
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Compile-o e execute-o:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
A saída será semelhante a:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Aguarde alguns minutos para a conclusão do processo de ingestão. Após a conclusão bem-sucedida, você verá a seguinte mensagem de log: Ingestion completed successfully
. Neste ponto, você pode sair do programa e passar para a próxima etapa sem afetar o processo de ingestão, que já foi colocado na fila.
Validar
Aguarde de cinco a dez minutos para a ingestão na fila agendar o processo de ingestão e carregar os dados no Azure Data Explorer.
Conectar https://dataexplorer.azure.com e conectar ao seu cluster.
Execute o seguinte comando para obter a contagem de registros na tabela
StormEvents
:StormEvents | count
Solucionar problemas
Para ver as falhas de ingestão das últimas quatro horas, execute o seguinte comando no banco de dados:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Para ver o status de todas as operações de ingestão das últimas quatro horas, execute o seguinte comando:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Limpar os recursos
Caso não pretenda usar os recursos criados, execute o comando a seguir no banco de dados para remover a tabela StormEvents
.
.drop table StormEvents