Pozyskiwanie danych przy użyciu zestawu Azure Data Explorer Go SDK

Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Udostępnia bibliotekę klienta zestawu GO SDK do interakcji z usługą Azure Data Explorer. Zestaw SDK języka Go umożliwia pozyskiwanie, kontrolowanie i wykonywanie zapytań dotyczących danych w klastrach usługi Azure Data Explorer.

W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie należy w kolejce pozyskiwanie do klastra przy użyciu zestawu SDK języka Go i zweryfikować wyniki.

Wymagania wstępne

Instalowanie zestawu SDK języka Go

Zestaw AZURE Data Explorer Go SDK zostanie automatycznie zainstalowany po uruchomieniu [przykładowej aplikacji korzystającej z modułów Języka Go. Jeśli zestaw SDK języka Go został zainstalowany dla innej aplikacji, utwórz moduł Go i pobierz pakiet usługi Azure Data Explorer (przy użyciu narzędzia go get), na przykład:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Zależność pakietu zostanie dodana do go.mod pliku. Użyj go w aplikacji Języka Go.

Przeglądanie kodu

Ta sekcja Przejrzyj kod jest opcjonalna . Jeśli chcesz dowiedzieć się, jak działa kod, możesz przejrzeć następujące fragmenty kodu. W przeciwnym razie możesz przejść do sekcji Uruchamianie aplikacji.

Uwierzytelnianie

Program musi uwierzytelniać się w usłudze Azure Data Explorer przed wykonaniem jakichkolwiek operacji.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Wystąpienie usługi kusto. Autoryzacja jest tworzona przy użyciu poświadczeń jednostki usługi. Następnie jest on używany do tworzenia usługi Kusto. Klient z funkcją New , która akceptuje również punkt końcowy klastra.

Tworzenie tabeli

Polecenie create table jest reprezentowane przez instrukcję Kusto. Funkcja Mgmt służy do wykonywania poleceń zarządzania. Służy do wykonywania polecenia w celu utworzenia tabeli.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Porada

Instrukcja Kusto jest domyślnie stała, aby uzyskać lepsze zabezpieczenia. NewStmt akceptuje stałe ciągów. Interfejs UnsafeStmt API umożliwia korzystanie z segmentów instrukcji niestałych, ale nie jest zalecany.

Polecenie Kusto create table jest następujące:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Tworzenie mapowania

Mapowania danych są używane podczas pozyskiwania do mapowania danych przychodzących na kolumny w tabelach usługi Azure Data Explorer. Aby uzyskać więcej informacji, zobacz mapowanie danych. Mapowanie jest tworzone w taki sam sposób jak tabela przy użyciu Mgmt funkcji z nazwą bazy danych i odpowiednim poleceniem. Pełne polecenie jest dostępne w repozytorium GitHub dla przykładu.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Pozyskiwanie danych

Pozyskiwanie jest kolejkowane przy użyciu pliku z istniejącego kontenera Azure Blob Storage.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

Klient pozyskiwania jest tworzony przy użyciu pozyskiwania. Nowy. Funkcja FromFile służy do odwoływania się do identyfikatora URI Azure Blob Storage. Nazwa odwołania do mapowania i typ danych są przekazywane w postaci FileOption.

Uruchamianie aplikacji

  1. Sklonuj przykładowy kod z usługi GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Uruchom przykładowy kod, jak pokazano w tym fragmencie kodu z pliku main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Porada

    Aby wypróbować różne kombinacje operacji, możesz usunąć komentarz/dodać komentarz do odpowiednich funkcji w pliku main.go.

    Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:

    1. Usuwanie tabeli: StormEvents tabela jest porzucana (jeśli istnieje).
    2. Tworzenie tabeli: StormEvents tworzona jest tabela.
    3. Tworzenie mapowania: StormEvents_CSV_Mapping jest tworzone mapowanie.
    4. Pozyskiwanie plików: plik CSV (w Azure Blob Storage) jest w kolejce na potrzeby pozyskiwania.
  3. Aby utworzyć jednostkę usługi na potrzeby uwierzytelniania, użyj interfejsu wiersza polecenia platformy Azure za pomocą polecenia az ad sp create-for-rbac . Ustaw informacje o jednostce usługi z punktem końcowym klastra i nazwą bazy danych w postaci zmiennych środowiskowych, które będą używane przez program:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Uruchom program:

    go run main.go
    

    Uzyskasz podobne dane wyjściowe:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Weryfikowanie i rozwiązywanie problemów

Poczekaj od 5 do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.

  1. Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Następnie uruchom następujące polecenie, aby uzyskać liczbę rekordów w StormEvents tabeli.

    StormEvents | count
    
  2. Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Czyszczenie zasobów

Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. Jeśli nie, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents .

.drop table StormEvents

Następny krok