Azure SQL Edge'de veri akışı işi oluşturma
Önemli
Azure SQL Edge 30 Eylül 2025'te kullanımdan kaldırılacaktır. Daha fazla bilgi ve geçiş seçenekleri için Bkz . Kullanımdan kaldırma bildirimi.
Not
Azure SQL Edge artık ARM64 platformunu desteklememektedir.
Bu makalede, Azure SQL Edge'de T-SQL akış işinin nasıl oluşturulacağı açıklanır. Dış akış giriş ve çıkış nesnelerini oluşturur ve akış işi sorgusunu akış işi oluşturma işleminin bir parçası olarak tanımlarsınız.
Dış akış giriş ve çıkış nesnelerini yapılandırma
T-SQL akışı, akış işinin dış akış girişleri ve çıkışlarıyla ilişkili veri kaynaklarını tanımlamak için SQL Server'ın dış veri kaynağı işlevselliğini kullanır. Dış akış girişi veya çıkış nesnesi oluşturmak için aşağıdaki T-SQL komutlarını kullanın:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
Ayrıca, Azure SQL Edge, SQL Server veya Azure SQL Veritabanı çıkış akışı olarak kullanılıyorsa CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL) gerekir. Bu T-SQL komutu veritabanına erişmek için kimlik bilgilerini tanımlar.
Desteklenen giriş ve çıkış akışı veri kaynakları
Azure SQL Edge şu anda yalnızca aşağıdaki veri kaynaklarını akış girişleri ve çıkışları olarak desteklemektedir.
Veri kaynağı türü | Girdi | Çıktı | Açıklama |
---|---|---|---|
Azure IoT Edge hub'ı | Y | Y | Azure IoT Edge hub'ına akış verilerini okumak ve yazmak için veri kaynağı. Daha fazla bilgi için bkz . IoT Edge Hub. |
SQL Veritabanı | N | Y | akış verilerini SQL Veritabanı yazmak için veri kaynağı bağlantısı. Veritabanı, Azure SQL Edge'deki yerel bir veritabanı veya SQL Server veya Azure SQL Veritabanı uzak veritabanı olabilir. |
Kafka | Y | N | Kafka konusundan akış verilerini okumak için veri kaynağı. |
Örnek: Azure IoT Edge hub'ı için dış akış giriş/çıkış nesnesi oluşturma
Aşağıdaki örnek, Azure IoT Edge hub'ı için bir dış akış nesnesi oluşturur. Azure IoT Edge hub'ına yönelik bir dış akış giriş/çıkış veri kaynağı oluşturmak için, önce okunan veya yazılan verilerin düzeni için bir dış dosya biçimi oluşturmanız gerekir.
JSON türünde bir dış dosya biçimi oluşturun.
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
Azure IoT Edge hub'ı için bir dış veri kaynağı oluşturun. Aşağıdaki T-SQL betiği, Azure SQL Edge ile aynı Docker ana bilgisayarında çalışan bir IoT Edge hub'ına veri kaynağı bağlantısı oluşturur.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
Azure IoT Edge hub'ı için dış akış nesnesi oluşturun. Aşağıdaki T-SQL betiği IoT Edge hub'ı için bir akış nesnesi oluşturur. IoT Edge hub'ı akış nesnesi söz konusu olduğunda LOCATION parametresi, okunan veya yazılan IoT Edge hub'ı konusunun veya kanalının adıdır.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
Örnek: Azure SQL Veritabanı için dış akış nesnesi oluşturma
Aşağıdaki örnek, Azure SQL Edge'deki yerel veritabanına bir dış akış nesnesi oluşturur.
Veritabanında bir ana anahtar oluşturun. Kimlik bilgisi gizli dizisini şifrelemek için bu gereklidir.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
SQL Server kaynağına erişmek için veritabanı kapsamlı bir kimlik bilgisi oluşturun. Aşağıdaki örnek, dış veri kaynağı için IDENTITY = kullanıcı adı ve GİzLİ = parola ile bir kimlik bilgisi oluşturur.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
CREATE EXTERNAL DATA SOURCE ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:
- LocalSQLOutput adlı bir dış veri kaynağı oluşturur.
- Dış veri kaynağını (
LOCATION = '<vendor>://<server>[:<port>]'
) tanımlar. Örnekte, Azure SQL Edge'in yerel bir örneğine işaret eder. - Daha önce oluşturulan kimlik bilgilerini kullanır.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
Dış akış nesnesini oluşturun. Aşağıdaki örnek, dbo tablosunu işaret eden bir dış akış nesnesi oluşturur. MySQLDatabase veritabanındaki TemperatureMeasurements.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Örnek: Kafka için dış akış nesnesi oluşturma
Aşağıdaki örnek, Azure SQL Edge'deki yerel veritabanına bir dış akış nesnesi oluşturur. Bu örnekte kafka sunucusunun anonim erişim için yapılandırıldığı varsayılır.
CREATE EXTERNAL DATA SOURCE ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
Kafka girişi için bir dış dosya biçimi oluşturun. Aşağıdaki örnek GZipped Compression ile bir JSON dosya biçimi oluşturmuştur.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
Dış akış nesnesini oluşturun. Aşağıdaki örnek, Kafka konusuna
TemperatureMeasurement
işaret eden bir dış akış nesnesi oluşturur.CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
Akış işini ve akış sorgularını oluşturma
sys.sp_create_streaming_job
Akış sorgularını tanımlamak ve akış işini oluşturmak için sistem saklı yordamını kullanın. sp_create_streaming_job
Saklı yordam aşağıdaki parametreleri alır:
@job_name
: Akış işinin adı. Akış işi adları örnek genelinde benzersizdir.@statement
: Stream Analytics Sorgu Dili tabanlı akış sorgu deyimleri.
Aşağıdaki örnek, tek bir akış sorgusuyla basit bir akış işi oluşturur. Bu sorgu IoT Edge hub'ından girişleri okur ve veritabanına yazar dbo.TemperatureMeasurements
.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
Aşağıdaki örnek, birden çok farklı sorguyla daha karmaşık bir akış işi oluşturur. Bu sorgular, sıcaklık verilerindeki AnomalyDetection_ChangePoint
anomalileri tanımlamak için yerleşik işlevi kullanan sorgulardan birini içerir.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
Akış işlerini başlatma, durdurma, bırakma ve izleme
Azure SQL Edge'de bir akış işi başlatmak için saklı yordamı çalıştırın sys.sp_start_streaming_job
. Saklı yordam, giriş olarak başlatılması için akış işinin adını gerektirir.
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
Akış işini durdurmak için saklı yordamı çalıştırın sys.sp_stop_streaming_job
. Saklı yordam, giriş olarak durdurulması için akış işinin adını gerektirir.
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
Akış işini bırakmak (veya silmek) için saklı yordamı çalıştırın sys.sp_drop_streaming_job
. Saklı yordam, akış işinin adının giriş olarak bırakılmasını gerektirir.
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
Akış işinin geçerli durumunu almak için saklı yordamı çalıştırın sys.sp_get_streaming_job
. Saklı yordam, akış işinin adının giriş olarak bırakılmasını gerektirir. Akış işinin adını ve geçerli durumunu döndürür.
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
Akış işi aşağıdaki durumlardan herhangi birine sahip olabilir:
Çalıştırma Durumu | Açıklama |
---|---|
Oluşturulma | Akış işi oluşturuldu, ancak henüz başlatılmadı. |
Başlatılıyor | Akış işi başlangıç aşamasındadır. |
Boş | Akış işi çalışıyor, ancak işlenmek için giriş yok. |
İşleme | Akış işi çalışıyor ve girişleri işliyor. Bu durum, akış işi için iyi durumda olduğunu gösterir. |
Düzeyi düşürüldü | Akış işi çalışıyor, ancak giriş işlemi sırasında önemli olmayan bazı hatalar oluştu. Giriş işi çalışmaya devam eder, ancak hatalarla karşılaşan girişleri bırakır. |
Durduruldu | Akış işi durduruldu. |
Başarısız | Akış işi başarısız oldu. Bu genellikle işleme sırasında önemli bir hatanın göstergesidir. |
Not
Akış işi zaman uyumsuz olarak yürütüleceğinden, iş çalışma zamanında hatalarla karşılaşabilir. Akış işi hatasını gidermek için saklı yordamı kullanın sys.sp_get_streaming_job
veya Azure SQL Edge kapsayıcısından Docker günlüğünü gözden geçirin. Bu işlem akış işinden hata ayrıntılarını sağlayabilir.