Pozyskiwanie danych przy użyciu zestawu Azure Data Explorer Go SDK
Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Udostępnia bibliotekę klienta zestawu GO SDK do interakcji z usługą Azure Data Explorer. Zestaw SDK języka Go umożliwia pozyskiwanie, kontrolowanie i wykonywanie zapytań dotyczących danych w klastrach usługi Azure Data Explorer.
W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie należy w kolejce pozyskiwanie do klastra przy użyciu zestawu SDK języka Go i zweryfikować wyniki.
Wymagania wstępne
- Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danych.
- Zainstaluj usługę Git.
- Zainstaluj język Go z następującymi minimalnymi wymaganiami dotyczącymi zestawu SDK języka Go.
- Utwórz rejestrację aplikacji i przyznaj jej uprawnienia do bazy danych. Zapisz identyfikator klienta i klucz tajny klienta do późniejszego użycia.
Instalowanie zestawu SDK języka Go
Zestaw AZURE Data Explorer Go SDK zostanie automatycznie zainstalowany po uruchomieniu [przykładowej aplikacji korzystającej z modułów Języka Go. Jeśli zestaw SDK języka Go został zainstalowany dla innej aplikacji, utwórz moduł Go i pobierz pakiet usługi Azure Data Explorer (przy użyciu narzędzia go get
), na przykład:
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
Zależność pakietu zostanie dodana do go.mod
pliku. Użyj go w aplikacji Języka Go.
Przeglądanie kodu
Ta sekcja Przejrzyj kod jest opcjonalna . Jeśli chcesz dowiedzieć się, jak działa kod, możesz przejrzeć następujące fragmenty kodu. W przeciwnym razie możesz przejść do sekcji Uruchamianie aplikacji.
Uwierzytelnianie
Program musi uwierzytelniać się w usłudze Azure Data Explorer przed wykonaniem jakichkolwiek operacji.
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
Wystąpienie usługi kusto. Autoryzacja jest tworzona przy użyciu poświadczeń jednostki usługi. Następnie jest on używany do tworzenia usługi Kusto. Klient z funkcją New , która akceptuje również punkt końcowy klastra.
Tworzenie tabeli
Polecenie create table jest reprezentowane przez instrukcję Kusto. Funkcja Mgmt służy do wykonywania poleceń zarządzania. Służy do wykonywania polecenia w celu utworzenia tabeli.
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
Porada
Instrukcja Kusto jest domyślnie stała, aby uzyskać lepsze zabezpieczenia.
NewStmt
akceptuje stałe ciągów. Interfejs UnsafeStmt
API umożliwia korzystanie z segmentów instrukcji niestałych, ale nie jest zalecany.
Polecenie Kusto create table jest następujące:
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
Tworzenie mapowania
Mapowania danych są używane podczas pozyskiwania do mapowania danych przychodzących na kolumny w tabelach usługi Azure Data Explorer. Aby uzyskać więcej informacji, zobacz mapowanie danych. Mapowanie jest tworzone w taki sam sposób jak tabela przy użyciu Mgmt
funkcji z nazwą bazy danych i odpowiednim poleceniem. Pełne polecenie jest dostępne w repozytorium GitHub dla przykładu.
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
Pozyskiwanie danych
Pozyskiwanie jest kolejkowane przy użyciu pliku z istniejącego kontenera Azure Blob Storage.
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
Klient pozyskiwania jest tworzony przy użyciu pozyskiwania. Nowy. Funkcja FromFile służy do odwoływania się do identyfikatora URI Azure Blob Storage. Nazwa odwołania do mapowania i typ danych są przekazywane w postaci FileOption.
Uruchamianie aplikacji
Sklonuj przykładowy kod z usługi GitHub:
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
Uruchom przykładowy kod, jak pokazano w tym fragmencie kodu z pliku
main.go
:func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
Porada
Aby wypróbować różne kombinacje operacji, możesz usunąć komentarz/dodać komentarz do odpowiednich funkcji w pliku
main.go
.Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:
-
Usuwanie tabeli:
StormEvents
tabela jest porzucana (jeśli istnieje). -
Tworzenie tabeli:
StormEvents
tworzona jest tabela. -
Tworzenie mapowania:
StormEvents_CSV_Mapping
jest tworzone mapowanie. - Pozyskiwanie plików: plik CSV (w Azure Blob Storage) jest w kolejce na potrzeby pozyskiwania.
-
Usuwanie tabeli:
Aby utworzyć jednostkę usługi na potrzeby uwierzytelniania, użyj interfejsu wiersza polecenia platformy Azure za pomocą polecenia az ad sp create-for-rbac . Ustaw informacje o jednostce usługi z punktem końcowym klastra i nazwą bazy danych w postaci zmiennych środowiskowych, które będą używane przez program:
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Uruchom program:
go run main.go
Uzyskasz podobne dane wyjściowe:
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
Weryfikowanie i rozwiązywanie problemów
Poczekaj od 5 do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.
Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Następnie uruchom następujące polecenie, aby uzyskać liczbę rekordów w
StormEvents
tabeli.StormEvents | count
Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Czyszczenie zasobów
Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. Jeśli nie, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents
.
.drop table StormEvents