Creare un processo di streaming di dati in SQL Edge di Azure
Importante
SQL Edge di Azure non supporta più la piattaforma ARM64.
Questo articolo illustra come creare un processo di streaming T-SQL in SQL Edge di Azure. Si creano gli oggetti di input e output del flusso esterno e quindi si definisce la query del processo di streaming come parte della creazione del processo di streaming.
Configurare gli oggetti di input e output del flusso esterno
Il flusso T-SQL usa la funzionalità dell'origine dati esterna di SQL Server per definire le origini dati associate agli input e agli output del flusso esterno del processo di streaming. Usare i comandi T-SQL seguenti per creare un oggetto di input o output del flusso esterno:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
Inoltre, se SQL Edge di Azure, SQL Server o database SQL di Azure viene usato come flusso di output, è necessario CREATE DATABA edizione Standard SCOPED CREDENTIAL (Transact-SQL). Questo comando T-SQL definisce le credenziali per accedere al database.
Origini dati del flusso di input e output supportate
SQL Edge di Azure attualmente supporta solo le origini dati seguenti come output e input di flusso.
Tipo di origine dati | Input | Output | Descrizione |
---|---|---|---|
Hub di Azure IoT Edge | S | S | Origine dati per leggere e scrivere dati di streaming in un hub di Azure IoT Edge. Per altre informazioni, vedere Hub IoT Edge. |
Database SQL | N | Y | Connessione all'origine dati per scrivere i dati di streaming nel database SQL. Il database può essere un database locale in SQL Edge di Azure o un database remoto in SQL Server o database SQL di Azure. |
Kafka | Y | N | Origine dati per la lettura dei dati in streaming da un argomento Kafka. |
Esempio: Creare un oggetto di input/output del flusso esterno per l'hub IoT Edge di Azure
L'esempio seguente crea un oggetto flusso esterno per l'hub IoT Edge di Azure. Per creare un'origine dati di input/output del flusso esterno per l'hub di Azure IoT Edge, è prima necessario creare un formato di file esterno per il layout dei dati letti o scritti.
Creare un formato di file esterno del tipo JSON.
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
Creare un'origine dati esterna per l'hub IoT Edge di Azure. Lo script T-SQL seguente crea una connessione all'origine dati a un hub IoT Edge eseguito nello stesso host Docker di SQL Edge di Azure.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
Creare l'oggetto flusso esterno per l'hub IoT Edge di Azure. Lo script T-SQL seguente crea un oggetto flusso per l'hub IoT Edge. Nel caso di un oggetto flusso dell'hub IoT Edge, il parametro LOCATION è il nome dell'argomento o del canale dell'hub IoT Edge in cui viene letto o scritto.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
Esempio: Creare un oggetto flusso esterno per database SQL di Azure
L'esempio seguente crea un oggetto flusso esterno al database locale in SQL Edge di Azure.
Creare una chiave master nel database. Questo passaggio è necessario per crittografare il segreto delle credenziali.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
Creare credenziali con ambito database per l'accesso all'origine di SQL Server. Nell'esempio seguente viene creata una credenziale per l'origine dati esterna, con IDENTITY = username e edizione Standard CRET = password.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:
- Crea un'origine dati esterna denominata LocalSQLOutput.
- Identifica l'origine dati esterna (
LOCATION = '<vendor>://<server>[:<port>]'
). Nell'esempio punta a un'istanza locale di SQL Edge di Azure. - Usa le credenziali create in precedenza.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta a un dbo di tabella . TemperatureMeasurements, nel database MySQLDatabase.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Esempio: Creare un oggetto flusso esterno per Kafka
L'esempio seguente crea un oggetto flusso esterno al database locale in SQL Edge di Azure. In questo esempio si presuppone che il server kafka sia configurato per l'accesso anonimo.
Creare un'origine dati esterna con CREATE EXTERNAL DATA SOURCE. L'esempio seguente:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
Creare un formato di file esterno per l'input Kafka. L'esempio seguente ha creato un formato di file JSON con compressione GZipped.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
Creare l'oggetto flusso esterno. Nell'esempio seguente viene creato un oggetto flusso esterno che punta all'argomento
TemperatureMeasurement
Kafka .CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
Creare il processo di streaming e le query di streaming
Usare la sys.sp_create_streaming_job
stored procedure di sistema per definire le query di streaming e creare il processo di streaming. La sp_create_streaming_job
stored procedure accetta i parametri seguenti:
@job_name
: nome del processo di streaming. I nomi dei processi di streaming sono univoci nell'istanza.@statement
: istruzioni di query di streaming basate sul linguaggio di query basate sul linguaggio di analisi di flusso.
L'esempio seguente crea un processo di streaming semplice con una query di streaming. Questa query legge gli input dall'hub IoT Edge e scrive dbo.TemperatureMeasurements
nel database.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
L'esempio seguente crea un processo di streaming più complesso con più query diverse. Queste query includono una che usa la funzione predefinita AnomalyDetection_ChangePoint
per identificare le anomalie nei dati relativi alla temperatura.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
Avviare, arrestare, eliminare e monitorare i processi di streaming
Per avviare un processo di streaming in SQL Edge di Azure, eseguire la sys.sp_start_streaming_job
stored procedure. La stored procedure richiede l'avvio del nome del processo di streaming, come input.
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
Per arrestare un processo di streaming, eseguire la sys.sp_stop_streaming_job
stored procedure. La stored procedure richiede che il nome del processo di streaming si arresti, come input.
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
Per eliminare o eliminare un processo di streaming, eseguire la sys.sp_drop_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input.
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
Per ottenere lo stato corrente di un processo di streaming, eseguire la sys.sp_get_streaming_job
stored procedure. La stored procedure richiede il nome del processo di streaming da eliminare, come input. Restituisce il nome e lo stato corrente del processo di streaming.
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
Il processo di streaming può avere uno dei seguenti stati:
Status | Descrizione |
---|---|
Data di creazione | Il processo di streaming è stato creato, ma non è ancora stato avviato. |
Starting | Il processo di streaming si trova nella fase di avvio. |
Idle | Il processo di streaming è in esecuzione, ma non è disponibile alcun input da elaborare. |
Elaborazione | Il processo di streaming è in esecuzione ed è in corso l'elaborazione degli input. Questo stato indica uno stato integro per il processo di streaming. |
Degraded | Il processo di streaming è in esecuzione, ma si sono verificati alcuni errori non irreversibili durante l'elaborazione dell'input. Il processo di input continua a essere eseguito, ma elimina gli input che riscontrano errori. |
Arrestato | Il processo di streaming è stato arrestato. |
Convalida non superata | Processo di streaming non riuscito. Si tratta in genere di un'indicazione di un errore irreversibile durante l'elaborazione. |
Nota
Poiché il processo di streaming viene eseguito in modo asincrono, il processo potrebbe riscontrare errori in fase di esecuzione. Per risolvere un errore del processo di streaming, usare la sys.sp_get_streaming_job
stored procedure o esaminare il log Docker dal contenitore SQL Edge di Azure, che può fornire i dettagli dell'errore del processo di streaming.