Ingerir dados usando o SDK do Java do Kusto

O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. A biblioteca de clientes Java pode ser usada para ingerir dados, emitir comandos de gerenciamento e consultar dados em clusters do Azure Data Explorer.

Neste artigo, saiba como ingerir dados usando a biblioteca Java do Azure Data Explorer. Primeiro, você criará uma tabela e um mapeamento de dados em um cluster de teste. Em seguida, você colocará na fila uma ingestão do armazenamento de blobs para o cluster usando o SDK do Java e validará os resultados.

Pré-requisitos

Examine o código

Esta seção é opcional. Examine os snippets de código a seguir para saber como o código funciona. Para ignorar esta seção, vá para Executar o aplicativo.

Autenticação

O programa usa Microsoft Entra credenciais de autenticação com ConnectionStringBuilder'.

  1. Crie um com.microsoft.azure.kusto.data.Client para consulta e gerenciamento.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Crie e use um com.microsoft.azure.kusto.ingest.IngestClient para colocar na fila a ingestão de dados no Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Comandos de gerenciamento

Os comandos de gerenciamento, como .drop e .create, são executados chamando execute em um com.microsoft.azure.kusto.data.Client objeto .

Por exemplo, a tabela StormEvents é criada da seguinte maneira:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Ingestão de dados

Coloque a ingestão na fila usando um arquivo de um contêiner existente do Armazenamento de Blobs do Azure.

  • Use BlobSourceInfo para especificar o caminho do Armazenamento de Blobs.
  • Use IngestionProperties para definir a tabela, o banco de dados, o nome de mapeamento e o tipo de dados. No exemplo a seguir, o tipo de dados é CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

O processo de ingestão é iniciado em um thread separado, e o thread main aguarda a conclusão do thread de ingestão. Esse processo usa CountdownLatch. A API de ingestão (IngestClient#ingestFromBlob) não é assíncrona. Um loop while é usado para sondar o status atual a cada 5 segundos e aguarda o status de ingestão ser alterado de Pending para outro status. O status final pode ser Succeeded, Failed ou PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Dica

Há outros métodos para lidar com a ingestão de maneira assíncrona para diferentes aplicativos. Por exemplo, você pode usar CompletableFuture para criar um pipeline que define a ação após a ingestão, como consultar a tabela ou tratar exceções que foram relatadas para o IngestionStatus.

Executar o aplicativo

Geral

Quando você executa o código de exemplo, as seguintes ações são realizadas:

  1. Remover tabela: a tabela StormEvents será removida (se existir).
  2. Criação de tabela: a tabela StormEvents é criada.
  3. Criação de mapeamento: o mapeamento StormEvents_CSV_Mapping é criado.
  4. Ingestão de arquivo: um arquivo CSV (no Armazenamento de Blobs do Azure) é colocado na fila para ingestão.

O seguinte código de exemplo foi obtido de App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Dica

Para experimentar diferentes combinações de operações, você pode remover/inserir as marcas de comentário dos respectivos métodos no App.java.

Executar o aplicativo

  1. Clone o código de exemplo do GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Defina as informações da entidade de serviço com as seguintes informações como variáveis de ambiente usadas pelo programa:

    • Ponto de extremidade do cluster
    • Nome do banco de dados
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Compile-o e execute-o:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    A saída será semelhante a:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Aguarde alguns minutos para a conclusão do processo de ingestão. Após a conclusão bem-sucedida, você verá a seguinte mensagem de log: Ingestion completed successfully. Neste ponto, você pode sair do programa e passar para a próxima etapa sem afetar o processo de ingestão, que já foi colocado na fila.

Validar

Aguarde de cinco a dez minutos para a ingestão na fila agendar o processo de ingestão e carregar os dados no Azure Data Explorer.

  1. Conectar https://dataexplorer.azure.com e conectar ao seu cluster.

  2. Execute o seguinte comando para obter a contagem de registros na tabela StormEvents:

    StormEvents | count
    

Solucionar problemas

  1. Para ver as falhas de ingestão das últimas quatro horas, execute o seguinte comando no banco de dados:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Para ver o status de todas as operações de ingestão das últimas quatro horas, execute o seguinte comando:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Limpar os recursos

Caso não pretenda usar os recursos criados, execute o comando a seguir no banco de dados para remover a tabela StormEvents.

.drop table StormEvents