Ingerir dados usando o SDK do Go do Azure Data Explorer

O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. Ele fornece uma biblioteca de clientes do SDK do Go para interagir com o serviço do Azure Data Explorer. Você pode usar o SDK do Go para ingerir, controlar e consultar dados em clusters do Azure Data Explorer.

Neste artigo, primeiro você aprenderá a criar uma tabela e um mapeamento de dados em um cluster de teste. Depois, você enfileira uma ingestão para o cluster usando o SDK do Go e valida os resultados.

Pré-requisitos

Instalar o SDK do Go

O SDK do Go do Azure Data Explorer será instalado automaticamente quando você executar o aplicativo de exemplo que usa os módulos do Go. Se você instalou o SDK do Go para outro aplicativo, crie um módulo do Go e busque o pacote do Azure Data Explorer (usando go get), por exemplo:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

A dependência de pacote será adicionada ao arquivo go.mod. Use isso em seu aplicativo do Go.

Examine o código

Essa seção Examinar o código é opcional. Se você quiser saber como o código funciona, poderá examinar os snippets de código a seguir. Caso contrário, você poderá passar direto para Executar o aplicativo.

Authenticate

O programa precisa ser autenticado no serviço do Azure Data Explorer antes de executar qualquer operação.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Uma instância do kusto.Authorization é criada usando as credenciais da entidade de serviço. Em seguida, ela é usada para criar um kusto.Client com a função New que também aceita o ponto de extremidade do cluster.

Criar Tabela

O comando create table é representado por uma instrução Kusto. A função Mgmt é usada para executar comandos de gerenciamento. Ela é usada para executar o comando a fim de criar uma tabela.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Dica

Uma instrução Kusto é constante, por padrão, para maior segurança. NewStmt aceita constantes de cadeia de caracteres. A API UnsafeStmt permite o uso de segmentos de instrução não constantes, mas não é recomendada.

O comando create table da Kusto é o seguinte:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Criar mapeamento

Os mapeamentos de dados são usados durante a ingestão para mapear os dados de entrada nas colunas das tabelas do Azure Data Explorer. Para obter mais informações, confira o mapeamento de dados. O mapeamento é criado da mesma forma que uma tabela, usando a função Mgmt com o nome do banco de dados e o comando apropriado. O comando completo está disponível no Repositório do GitHub para o exemplo.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingestão de dados

Uma ingestão é colocada na fila usando um arquivo de um contêiner existente do Armazenamento de Blobs do Azure.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

O cliente de Ingestão é criado usando o ingest.New. A função FromFile é usada para se referir ao URI do Armazenamento de Blobs do Azure. O nome de referência do mapeamento e o tipo de dados são passados na forma FileOption.

Executar o aplicativo

  1. Clone o código de exemplo do GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Execute o código de exemplo, conforme visto neste snippet de main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Dica

    Para experimentar diferentes combinações de operações, você pode remover/inserir as marcas de comentário das respectivas funções em main.go.

    Quando você executa o código de exemplo, as seguintes ações são realizadas:

    1. Remover tabela: a tabela StormEvents será removida (se existir).
    2. Criação de tabela: a tabela StormEvents é criada.
    3. Criação de mapeamento: o mapeamento StormEvents_CSV_Mapping é criado.
    4. Ingestão de arquivo: um arquivo CSV (no Armazenamento de Blobs do Azure) é colocado na fila para ingestão.
  3. Para criar uma entidade de serviço para autenticação, use a CLI do Azure com o comando az ad sp create-for-rbac. Defina as informações da entidade de serviço com o ponto de extremidade do cluster e o nome do banco de dados na forma de variáveis de ambiente que serão usadas pelo programa:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Execute o programa:

    go run main.go
    

    Você obterá uma saída semelhante:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Validar e solucionar problemas

Aguarde de cinco a dez minutos para que a ingestão que está na fila agende o processo de ingestão e carregue os dados no Azure Data Explorer.

  1. Conectar https://dataexplorer.azure.com e conectar ao seu cluster. Então, execute o comando a seguir para obter a contagem de registros na tabela StormEvents.

    StormEvents | count
    
  2. Execute o seguinte comando no banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Execute o seguinte comando para exibir o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Limpar os recursos

Caso pretenda seguir nossos outros artigos, mantenha os recursos criados. Caso contrário, execute o comando a seguir no banco de dados para remover a tabela StormEvents.

.drop table StormEvents

Próxima etapa