Прием данных с помощью библиотеки Node в Azure Data Explorer
Обозреватель данных Azure — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Azure Data Explorer предоставляет две клиентские библиотеки для Node: библиотеку приема и библиотеку данных. Они позволяют принимать (загружать) данные в кластер и запрашивать данные из кода. В этой статье вы сначала создадите таблицу и сопоставление данных в тестовом кластере. Затем вы поставите в очередь прием данных в кластер и проверите результаты.
Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
Предварительные требования
- Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
- Кластер и база данных Azure Data Explorer. Создайте кластер и базу данных.
- Node.js на компьютере, где ведется разработка.
Установка библиотек данных и приема
Установите azure-kusto-ingest и azure-kusto-data.
npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2
Добавление операторов и констант импорта
Импорт классов из библиотек
const { Client: KustoClient, KustoConnectionStringBuilder } = require('azure-kusto-data');
const {
IngestClient: KustoIngestClient,
IngestionProperties,
IngestionDescriptors,
DataFormat,
IngestionMappingKind,
} = require("azure-kusto-ingest");
Для проверки подлинности приложения azure Data Explorer использует идентификатор клиента Microsoft Entra. Чтобы узнать свой идентификатор клиента, см. раздел Найдите свой идентификатор клиента Microsoft 365.
Задайте значения для свойств authorityId
, kustoUri
, kustoIngestUri
и kustoDatabase
перед выполнением этого кода.
const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase = "Weather";
Создайте строку подключения. В этом примере используется проверка подлинности устройства для доступа к кластеру. Проверьте выходные данные консоли, чтобы завершить проверку подлинности. Вы также можете использовать Microsoft Entra сертификат приложения, ключ приложения, а также пользователя и пароль.
Целевую таблицу и сопоставление вы создадите позднее.
const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";
Определение данных исходного файла
Импортируйте дополнительные классы и задайте константы для исходного файла данных. В этом примере используется пример файла, размещенный в хранилище BLOB-объектов Azure. Пример набора данных StormEvents содержит данные, связанные с погодой, от Национальных центров экологической информации.
const container = "samplefiles";
const account = "kustosamples";
const sas = ""; // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;
Создание таблицы в тестовом кластере
Создайте таблицу, которая соответствует схеме данных в файле StormEvents.csv
. При выполнении код запускает следующее сообщение: Чтобы войти, воспользуйтесь браузером и откройте страницу https://microsoft.com/devicelogin. Введите код XXXXXXXXX для прохождения проверки подлинности. Следуйте инструкциям по входу, а затем выполните следующий блок кода. Для выполнения последующих блоков кода, устанавливающих соединение, необходимо повторно выполнить вход.
const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;
const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);
Определение сопоставления приема
Сопоставьте входящие данные CSV с именами столбцов и типами данных, которые использовались при создании таблицы.
const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;
const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);
Отправка сообщения в очередь на прием
Поставьте в очередь сообщение, чтобы получить данные из хранилища BLOB-объектов и получить эти данные в обозреватель данных Azure.
const defaultProps = new IngestionProperties({
database: kustoDatabase,
table: destTable,
format: DataFormat.CSV,
ingestionMappingReference: destTableMapping,
ingestionMappingKind: IngestionMappingKind.CSV,
additionalProperties: {ignoreFirstRecord: true},
});
const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://video2.skills-academy.com/azure/kusto/management/data-ingest#ingestion-properties
const blobDesc = new BlobDescriptor(blobPath, 10);
try {
const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
// Handle errors
}
Проверка наличия данных в таблице
Проверьте, что данные приняты в таблицу. Подождите 5–10 минут, пока поставленные в очередь прием запланирует прием и загрузку данных в обозреватель данных Azure. Затем выполните следующий код, чтобы получить количество записей в таблице StormEvents
.
const query = `${destTable} | count`;
var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);
Выполнение запросов по устранению неполадок
Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Очистка ресурсов
Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы очистить таблицу StormEvents
.
.drop table StormEvents