Azure Veri Gezgini kümenizde akış veri alımını yapılandırma
Makale
Akış alımı, alma ve sorgu arasında düşük gecikme süresine ihtiyaç duyduğunuzda verileri yüklemek için kullanışlıdır. Aşağıdaki senaryolarda akış alımını kullanmayı göz önünde bulundurun:
Bir saniyeden kısa gecikme süresi gereklidir.
Her tabloya veri akışının görece küçük olduğu (saniye başına birkaç kayıt) ancak genel veri alımı hacminin yüksek olduğu (saniyede binlerce kayıt) birçok tablonun işlemsel işlemesini iyileştirmek için.
Her tabloya veri akışı yüksekse (saatte 4 GB'ın üzerinde), kuyruğa alınmış alımı kullanmayı göz önünde bulundurun.
Önceki SDK sürümlerini temel alan kod örnekleri için arşivlenmiş makaleye bakın.
Uygun akış alımı türünü seçin
İki akış alımı türü desteklenir:
Alma türü
Açıklama
Veri bağlantısı
Event Hubs, IoT Hub ve Event Grid veri bağlantıları, küme düzeyinde etkinleştirilmesi koşuluyla akış alımını kullanabilir. Akış alımını kullanma kararı, hedef tabloda yapılandırılan akış alma ilkesine göre yapılır. Veri bağlantılarını yönetme hakkında bilgi için bkz. Event Hub, IoT Hub ve Event Grid.
Özel alma
Özel veri alımı, Azure Veri Gezgini istemci kitaplıklarından birini kullanan bir uygulama yazmanızı gerektirir. Özel alımı yapılandırmak için bu konudaki bilgileri kullanın. C# akış alımı örnek uygulamasını da yararlı bulabilirsiniz.
Ortamınıza uygun alım türünü seçmenize yardımcı olması için aşağıdaki tabloyu kullanın:
Ölçüt
Veri bağlantısı
Özel Alma
Veri alımı başlatma ile sorgu için kullanılabilir veriler arasındaki veri gecikmesi
Daha uzun gecikme
Daha kısa gecikme
Geliştirme yükü
Hızlı ve kolay kurulum, geliştirme yükü yok
Bir uygulama oluşturmak, verileri almak, hataları işlemek ve veri tutarlılığını sağlamak için yüksek geliştirme yükü
Not
Azure portalını kullanarak veya C# dilinde program aracılığıyla kümenizde akış alımını etkinleştirme ve devre dışı bırakma işlemini yönetebilirsiniz. Özel uygulamanız için C# kullanıyorsanız, programlı yaklaşımı kullanarak daha kullanışlı bulabilirsiniz.
Akış alımını etkileyebilecek başlıca katkıda bulunanlar şunlardır:
VM ve küme boyutu: Akış alımı performansı ve kapasite ölçekleri, vm ve küme boyutlarının artmasıyla sağlanır. Eşzamanlı alım isteklerinin sayısı çekirdek başına altı ile sınırlıdır. Örneğin, D14 ve L16 gibi 16 çekirdekli SKU'lar için desteklenen en yüksek yük 96 eşzamanlı alım isteğidir. D11 gibi iki çekirdek SKU için desteklenen en yüksek yük 12 eşzamanlı alma isteğidir.
Veri boyutu sınırı: Akış alma isteğinin veri boyutu sınırı 4 MB'tır. Bu, veri alımı sırasında güncelleştirme ilkeleri için oluşturulan tüm verileri içerir.
Şema güncelleştirmeleri: Tablo oluşturma ve değiştirme ve alma eşlemeleri gibi şema güncelleştirmeleri, akış alımı hizmeti için beş dakika kadar sürebilir. Daha fazla bilgi için bkz . Akış alımı ve şema değişiklikleri.
SSD kapasitesi: Akış yoluyla veri alınmasa bile bir kümede akış alımını etkinleştirmek, veri akışı için küme makinelerinin yerel SSD diskinin bir bölümünü kullanır ve sık erişimli önbellek için kullanılabilir depolama alanını azaltır.
Yeni bir Azure Veri Gezgini kümesi oluştururken akış alımını etkinleştirmek için aşağıdaki kodu çalıştırın:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Mevcut bir kümede akış alımını etkinleştirme
Mevcut bir kümeniz varsa Azure portalını kullanarak veya C# dilinde program aracılığıyla akış alımını etkinleştirebilirsiniz.
Mevcut bir Azure Veri Gezgini kümesini güncelleştirirken akış alımını etkinleştirebilirsiniz.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Hedef tablo oluşturma ve ilkeyi tanımlama
Akış alma verilerini almak için bir tablo oluşturun ve Azure portalını kullanarak veya C# dilinde program aracılığıyla ilgili ilkesini tanımlayın.
Aşağıdaki komutlardan birini Sorgu bölmesine kopyalayın ve Çalıştır'ı seçin. Bu, oluşturduğunuz tabloda veya tabloyu içeren veritabanında akış alma ilkesini tanımlar.
İpucu
Veritabanı düzeyinde tanımlanan bir ilke, veritabanındaki mevcut ve gelecekteki tüm tablolar için geçerlidir. İlkeyi veritabanı düzeyinde etkinleştirdiğinizde tablo başına etkinleştirmeniz gerekmez.
Oluşturduğunuz tabloda ilkeyi tanımlamak için şunu kullanın:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Kümenizde akış alımını devre dışı bırakma
Uyarı
Akış alımının devre dışı bırakılması birkaç saat sürebilir.
Azure Veri Gezgini kümenizde akış alımını devre dışı bırakmadan önce, tüm ilgili tablo ve veritabanlarından akış alımı ilkesini bırakın. Akış alımı ilkesinin kaldırılması, Azure Veri Gezgini kümenizin içinde veri yeniden düzenlemeyi tetikler. Akış alma verileri ilk depolama alanından sütun deposundaki kalıcı depolamaya taşınır (kapsamlar veya parçalar). Bu işlem, ilk depolamadaki veri miktarına bağlı olarak birkaç saniye ile birkaç saat arasında sürebilir.
Akış alma ilkesini bırakma
Akış alma ilkesini Azure portalını kullanarak veya C# dilinde program aracılığıyla bırakabilirsiniz.
Azure portalında Azure Veri Gezgini kümenize gidin ve Sorgu'yu seçin.
Akış alma ilkesini tablodan bırakmak için aşağıdaki komutu Sorgu bölmesine kopyalayın ve Çalıştır'ı seçin.
.delete table TestTable policy streamingingestion
Ayarlar'da Yapılandırmalar'ı seçin.
Yapılandırmalar bölmesinde Kapalı'yı seçerek Akış alımını devre dışı bırakın.
Kaydet'i seçin.
Akış alma ilkesini tablodan bırakmak için aşağıdaki kodu çalıştırın:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Kümenizde akış alımını devre dışı bırakmak için aşağıdaki kodu çalıştırın:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Sınırlamalar
Veri eşlemeleri, akış alımında kullanılmak üzere önceden oluşturulmalıdır. Tek tek akış alımı istekleri satır içi veri eşlemelerini barındırmaz.
İlkeyi güncelleştirin. Güncelleştirme ilkesi, veritabanındaki diğer verilere veya tablolara değil, yalnızca kaynak tablodaki yeni alınan verilere başvurabilir.
İşlem ilkesi içeren bir güncelleştirme ilkesi başarısız olduğunda, yeniden denemeler toplu alıma geri döner.