Прием данных с помощью библиотеки Node в Azure Data Explorer

Обозреватель данных Azure — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Azure Data Explorer предоставляет две клиентские библиотеки для Node: библиотеку приема и библиотеку данных. Они позволяют принимать (загружать) данные в кластер и запрашивать данные из кода. В этой статье вы сначала создадите таблицу и сопоставление данных в тестовом кластере. Затем вы поставите в очередь прием данных в кластер и проверите результаты.

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.

Предварительные требования

  • Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не обязательна.
  • Кластер и база данных Azure Data Explorer. Создайте кластер и базу данных.
  • Node.js на компьютере, где ведется разработка.

Установка библиотек данных и приема

Установите azure-kusto-ingest и azure-kusto-data.

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Добавление операторов и констант импорта

Импорт классов из библиотек


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Для проверки подлинности приложения azure Data Explorer использует идентификатор клиента Microsoft Entra. Чтобы узнать свой идентификатор клиента, см. раздел Найдите свой идентификатор клиента Microsoft 365.

Задайте значения для свойств authorityId, kustoUri, kustoIngestUri и kustoDatabase перед выполнением этого кода.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Создайте строку подключения. В этом примере используется проверка подлинности устройства для доступа к кластеру. Проверьте выходные данные консоли, чтобы завершить проверку подлинности. Вы также можете использовать Microsoft Entra сертификат приложения, ключ приложения, а также пользователя и пароль.

Целевую таблицу и сопоставление вы создадите позднее.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Определение данных исходного файла

Импортируйте дополнительные классы и задайте константы для исходного файла данных. В этом примере используется пример файла, размещенный в хранилище BLOB-объектов Azure. Пример набора данных StormEvents содержит данные, связанные с погодой, от Национальных центров экологической информации.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Создание таблицы в тестовом кластере

Создайте таблицу, которая соответствует схеме данных в файле StormEvents.csv. При выполнении код запускает следующее сообщение: Чтобы войти, воспользуйтесь браузером и откройте страницу https://microsoft.com/devicelogin. Введите код XXXXXXXXX для прохождения проверки подлинности. Следуйте инструкциям по входу, а затем выполните следующий блок кода. Для выполнения последующих блоков кода, устанавливающих соединение, необходимо повторно выполнить вход.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Определение сопоставления приема

Сопоставьте входящие данные CSV с именами столбцов и типами данных, которые использовались при создании таблицы.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Отправка сообщения в очередь на прием

Поставьте в очередь сообщение, чтобы получить данные из хранилища BLOB-объектов и получить эти данные в обозреватель данных Azure.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://video2.skills-academy.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Проверка наличия данных в таблице

Проверьте, что данные приняты в таблицу. Подождите 5–10 минут, пока поставленные в очередь прием запланирует прием и загрузку данных в обозреватель данных Azure. Затем выполните следующий код, чтобы получить количество записей в таблице StormEvents.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Выполнение запросов по устранению неполадок

Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Очистка ресурсов

Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы очистить таблицу StormEvents.

.drop table StormEvents