Настройка приема потоковой передачи в кластере Azure Data Explorer
Статья
Прием потоковой передачи используется для загрузки данных, если требуется низкая задержка между приемом и запросом. Вы можете использовать прием потоковой передачи в следующих сценариях:
Требуется задержка меньше секунды.
Для оптимизации операционной обработки многих таблиц, когда поток данных в каждой таблице относительно мал (несколько записей в секунду), а общий объем приема данных большой (тысячи записей в секунду).
Если поток данных в каждой таблице большой (более 4 ГБ в час), рассмотрите возможность использования приема пакетов.
Выбор соответствующего типа приема потоковой передачи
Поддерживаются два типа приема потоковой передачи:
Тип приема
Описание
Подключение к данным
Подключения к данным концентратора событий, Центр Интернета вещей и Сетки событий могут использовать прием потоковой передачи при условии, что она включена на уровне кластера. Решение об использовании приема потоковой передачи выполняется в соответствии с политикой приема потоковой передачи, настроенной для целевой таблицы. Сведения об управлении подключениями к данным см. в разделе Концентратор событий, Центр Интернета вещей и Сетка событий.
Используйте следующую таблицу, чтобы выбрать тип приема, подходящий для вашей среды.
Критерий
Подключение к данным
Настраиваемая загрузка
Задержка данных между инициированием приема и данными, доступными для запроса
Более длительная задержка
Более короткая задержка
Затраты на разработку
Быстрая и простая установка, без дополнительных затрат на разработку
Высокая нагрузка при разработке приложения, принимающего данные, обработки ошибок и обеспечения согласованности данных.
Примечание
Вы можете управлять процессом включения и отключения приема потоковой передачи в кластере с помощью портала Azure или программных средств C#. Если вы используете C# для своего пользовательского приложения, возможно, вам будет удобнее использовать программные средства.
Рекомендации по обеспечению производительности и эксплуатации
Основные факторы, которые могут повлиять на прием потоковой передачи:
Размер виртуальной машины и кластера. Производительность и емкость приема потоковой передачи повышаются с увеличением размера виртуальной машины и кластера. Число параллельных запросов приема ограничено шестью на ядро. Например, для номеров SKU с 16 ядрами, таких как D14 и L16, максимальная поддерживаемая загрузка составляет 96 параллельных запросов приема. Для номеров SKU с двумя ядрами, например D11, максимальная поддерживаемая нагрузка составляет 12 параллельных запросов приема.
Ограничение на объем данных. Предельный объем данных для запроса на прием потоковой передачи составляет 4 МБ. Это касается всех данных, созданных для политик обновления во время приема.
Обновления схемы. Обновления схемы, такие как создание и изменение таблиц и сопоставлений приема, для службы приема потоковой передачи могут выполняться до 5 минут. Дополнительные сведения см. в статье Прием потоковой передачи данных и изменения схемы.
Емкость SSD. При включении приема потоковой передачи в кластере, даже если данные не принимаются через потоковую передачу, для данных приема потоковой передачи будет использоваться часть локального диска SSD компьютеров кластера. Это уменьшает объем хранилища, доступный для горячего кэша.
Включите прием потоковой передачи в своем кластере
На портале Azure перейдите к кластеру Azure Data Explorer.
Под разделом Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Вкл. , чтобы включить Прием потоковой передачи.
Щелкните Сохранить.
Вы можете включить прием потоковой передачи при создании нового кластера обозревателя данных Azure.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Создание целевой таблицы и определение политики
Создайте таблицу для получения данных приема потоковой передачи и определите связанную с ней политику с помощью портала Azure или программных средств C#.
Чтобы создать таблицу, которая будет получать данные посредством потоковой передачи, скопируйте следующую команду в Панель запросов и выберите Выполнить.
Скопируйте одну из следующих команд в Панель запросов и выберите Выполнить. Будет определена политика приема потоковой передачи для созданной вами таблицы или базы данных, содержащей эту таблицу.
Совет
Политика, которая определяется на уровне базы данных, применяется ко всем существующим и будущим таблицам в базе данных. При включении политики на уровне базы данных нет необходимости включать ее для каждой таблицы.
Чтобы определить политику для созданной таблицы, выполните следующий код:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Создание приложения приема потоковой передачи для приема данных в кластере
Создайте приложение для приема данных в кластере с помощью выбранного вами языка.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Отключение приема потоковой передачи в кластере
Предупреждение
Отключение приема потоковой передачи может занять несколько часов.
Перед отключением приема потоковой передачи в кластере Azure Data Explorer удалите политику приема потоковой передачи из всех соответствующих таблиц и баз данных. Удаление политики приема потоковой передачи активирует реорганизацию данных в кластере Azure Data Explorer. Данные приема потоковой передачи перемещаются из первоначального хранилища в постоянное хранилище в хранилище столбцов (экстентов или сегментов). Этот процесс может занять от нескольких секунд до нескольких часов в зависимости от объема данных в первоначальном хранилище.
Удаление политики приема потоковой передачи
Политику приема потоковой передачи можно удалить с помощью портала Azure или программных средств C#.
На портале Azure перейдите в кластер Azure Data Explorer и выберите Запрос.
Чтобы удалить политику приема потоковой передачи из таблицы, скопируйте следующую команду в Панель запросов и выберите Выполнить.
.delete table TestTable policy streamingingestion
Под разделом Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Выкл. , чтобы включить Прием потоковой передачи.
Щелкните Сохранить.
Чтобы удалить политику приема потоковой передачи из таблицы, выполните следующий код:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Чтобы отключить прием потоковой передачи в кластере, выполните следующий код:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Ограничения
Сопоставления данных должны быть предварительно созданы, чтобы их можно было использовать для приема потоковой передачи. Отдельные запросы на прием потоковой передачи не поддерживают встроенные сопоставления данных.
Невозможно установить теги экстентов для данных приема потоковой передачи.
Политика обновления. Политика обновления может ссылаться только на недавно принятые данные в исходной таблице, а не на другие данные или таблицы в базе данных.
Если для ведущего кластера применяется потоковый прием данных, кластер подписчиков также должен использовать потоковый прием данных, чтобы отслеживать данных потокового приема. Это же условие применимо при общем доступе к данным кластера через Data Share.