CREATE EXTERNAL STREAM (Transact-SQL)

Importante

SQL Edge di Azure non supporta più la piattaforma ARM64.

L'oggetto EXTERNAL STREAM ha uno scopo doppio sia di un flusso di input che di output. Può essere usato come input per eseguire query sui dati di streaming da servizi di inserimento eventi, ad esempio Hub eventi di Azure, hub IoT di Azure (o Hub Edge) o Kafka oppure può essere usato come output per specificare dove e come archiviare i risultati da una query di streaming.

È anche possibile specificare e creare un FLUSSO ESTERNO come output e input per servizi come Hub eventi o archiviazione BLOB. Ciò semplifica gli scenari di concatenamento in cui una query di streaming rende persistenti i risultati nel flusso esterno come output e un'altra query di streaming che legge dallo stesso flusso esterno dell'input.

SQL Edge di Azure attualmente supporta solo le origini dati seguenti come output e input di flusso.

Tipo di origine dati Input Output Descrizione
Hub di Azure IoT Edge S S Origine dati per leggere e scrivere dati di streaming in un hub di Azure IoT Edge. Per altre informazioni, vedere Hub IoT Edge.
Database SQL N Y Connessione all'origine dati per scrivere i dati di streaming nel database SQL. Il database può essere un database locale in SQL Edge di Azure o un database remoto in SQL Server o database SQL di Azure.
Kafka Y N Origine dati per la lettura dei dati in streaming da un argomento Kafka.

Sintassi

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Argomenti

DATA_SOURCE

Per altre informazioni, vedere DATA_SOURCE.

FILE_FORMAT

Per altre informazioni, vedere FILE_FORMAT.

UBICAZIONE

specifica il nome dei dati effettivi o della posizione nell'origine dati.

  • Per gli oggetti flusso Hub Edge o Kafka, location specifica il nome dell'argomento Hub Edge o Kafka in cui leggere o scrivere.
  • Per gli oggetti flusso SQL (SQL Server, database SQL di Azure o SQL Edge di Azure), il percorso specifica il nome della tabella. Se il flusso viene creato nello stesso database e nello stesso schema della tabella di destinazione, è sufficiente solo il nome tabella. In caso contrario, è necessario qualificare completamente il nome della tabella (<database_name>.<schema_name>.<table_name>).
  • Per Archiviazione BLOB di Azure percorso dell'oggetto flusso si riferisce al modello di percorso da usare all'interno del contenitore BLOB. Per altre informazioni, vedere Output di Analisi di flusso di Azure.

INPUT_OPTIONS

Specificare le opzioni come coppie chiave-valore per i servizi, ad esempio Kafka e hub IoT Edge, che sono input per le query di streaming.

  • PARTIZIONI:

    Numero di partizioni definite per un argomento. Il numero massimo di partizioni che è possibile usare è limitato a 32 (si applica alla Flussi di input Kafka).

    • CONSUMER_GROUP:

      gli hub eventi e gli hub IoT limitano il numero di lettori in un gruppo di consumer (a 5). Se si lascia vuoto questo campo, verrà usato il gruppo di consumer '$Default'.

      • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
    • TIME_POLICY:

      descrive se eliminare gli eventi o modificare l'ora dell'evento in presenza di eventi in ritardo o eventi non in ordine che superano il relativo valore di tolleranza.

      • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
    • LATE_EVENT_TOLERANCE:

      ritardo massimo accettabile. Il ritardo rappresenta la differenza tra il timestamp dell'evento e l'orologio di sistema.

      • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      gli eventi possono arrivare non in ordine dopo aver seguito il percorso dall'input alla query di streaming. Questi eventi possono essere accettati così come sono oppure è possibile scegliere di sospenderli per un periodo di tempo determinato per riordinarli.

      • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.

OUTPUT_OPTIONS

specificare le opzioni come coppie chiave-valore per i servizi supportati usati come output per le query di streaming

  • REJECT_POLICY: DROP | RIPROVA

    Specie i criteri di gestione degli errori di dati quando si verificano errori di conversione dei dati.

    • Si applica a tutti gli output supportati.
  • MINIMUM_ROWS:

    numero minimo di righe necessarie per ogni batch scritto in un output. Per Parquet, ogni batch crea un nuovo file.

    • Si applica a tutti gli output supportati.
  • MAXIMUM_TIME:

    Tempo di attesa massimo in minuti per batch. Dopo questa volta, il batch verrà scritto nell'output anche se il requisito minimo delle righe non viene soddisfatto.

    • Si applica a tutti gli output supportati.
  • PARTITION_KEY_COLUMN:

    colonna usata per la chiave di partizione.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • PROPERTY_COLUMNS:

    Elenco delimitato da virgole dei nomi delle colonne di output associate ai messaggi come proprietà personalizzate, se specificato.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • SYSTEM_PROPERTY_COLUMNS:

    raccolta in formato JSON di coppie nome-valore dei nomi delle proprietà di sistema e delle colonne di output da popolare nei messaggi del bus di servizio. Ad esempio, { "MessageId": "column1", "PartitionKey": "column2" }.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • PARTITION_KEY:

    Nome della colonna di output contenente la chiave di partizione. La chiave di partizione è un identificatore univoco per la partizione in una determinata tabella che costituisce la prima parte della chiave primaria di un'entità. Si tratta di un valore stringa con dimensioni fino a 1 KB.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • ROW_KEY:

    Nome della colonna di output contenente la chiave di riga. La chiave di riga è un identificatore univoco per un’entità all'interno di una determinata partizione. Costituisce la seconda parte della chiave primaria di un'entità. La chiave di riga è un valore stringa le cui dimensioni massime sono di 1 KB.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • BATCH_SIZE:

    rappresenta il numero di transazioni per l'archiviazione tabelle in cui il massimo può raggiungere 100 record. Per Funzioni di Azure rappresenta le dimensioni del batch in byte inviate alla funzione per ogni chiamata. Il valore predefinito è 256 kB.

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.
  • MAXIMUM_BATCH_COUNT:

    numero massimo di eventi inviati alla funzione per ogni chiamata. Il valore predefinito è 100. Per il database SQL, rappresenta il numero massimo di record inviati con ogni transazione di inserimento bulk. Il valore predefinito è 10.000.

    • Si applica a tutti gli output basati su SQL
  • STAGING_AREA: oggetto EXTERNAL DATA SOURCE in blob Archiviazione

    Area di gestione temporanea per l'inserimento di dati ad alta velocità effettiva in Azure Synapse Analytics

    • Riservato per l'utilizzo futuro. Non si applica a SQL Edge di Azure.

Per altre informazioni sulle opzioni di input e output supportate corrispondenti al tipo di origine dati, vedere Rispettivamente Analisi di flusso di Azure - Panoramica dell'input e Analisi di flusso di Azure - Panoramica degli output.

Esempi

Esempio A: EdgeHub

Tipo: input o output.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Esempio B: database SQL di Azure, SQL Edge di Azure, SQL Server

Tipo: Output

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Esempio C: Kafka

Tipo: Input

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

Vedi anche