Analizzare dati JSON e Avro in Analisi di flusso di Azure

Analisi di flusso di Azure supporta l'elaborazione di eventi nei formati di dati CSV, JSON e Avro. Entrambi i dati JSON e Avro possono essere strutturati e possono contenere alcuni tipi complessi come oggetti annidati (record) e matrici.

Nota

I file AVRO creati da Acquisizione di Hub eventi si basano su un formato specifico che richiede l'uso della funzionalità del deserializzatore personalizzato. Per altre informazioni, vedere Leggere input in qualsiasi formato tramite deserializzatori personalizzati .NET.

Tipi di dati record

I tipi di dati record vengono usati per rappresentare le matrici JSON e Avro quando vengono usati formati corrispondenti nei flussi di dati di input. Questi esempi illustrano un sensore di esempio che legge gli eventi di input in formato JSON. Di seguito è riportato un esempio di un singolo evento:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Accedere ai campi annidati nello schema noto

Usare la notazione con il punto (.) per accedere facilmente ai campi annidati direttamente dalla query. Ad esempio, questa query seleziona le coordinate di latitudine e longitudine nella proprietà Location dei dati JSON precedenti. La notazione con il punto può essere usata per spostarsi tra più livelli, come illustrato di seguito.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Il risultato è:

DeviceID Lat long Temperatura Versione
12345 47 122 80 1.2.45

Selezionare tutte le proprietà

È possibile selezionare tutte le proprietà di un record annidato con il carattere jolly asterisco (*). Prendere in considerazione gli esempi seguenti:

SELECT
    DeviceID,
    Location.*
FROM input

Il risultato è:

DeviceID Lat long
12345 47 122

Accedere ai campi annidati quando il nome della proprietà è una variabile

Usare la funzione GetRecordPropertyValue se il nome della proprietà è una variabile. Ciò consente di creare query dinamiche senza nomi di proprietà hardcoded.

Si supponga che un flusso di dati di esempio debba essere unito tramite join a dati di riferimento contenenti soglie per ogni sensore. Un frammento di tali dati di riferimento è illustrato di seguito.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

L'obiettivo è creare un join tra il set di dati di esempio illustrato all'inizio dell'articolo e i dati di riferimento, quindi restituire un evento per ogni misura del sensore al di sopra della soglia. Questo significa che il singolo evento precedente può generare più eventi di output se più sensori superano le rispettive soglie, grazie al join. Per ottenere risultati simili senza un join, vedere la sezione seguente.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue seleziona la proprietà in SensorReadings, il cui nome corrisponde al nome della proprietà proveniente dai dati di riferimento. Quindi viene estratto il valore associato da SensorReadings.

Il risultato è:

DeviceID SensorName AlertMessage
12345 Umidità Alert : Sensor above threshold

Convertire i campi dei record in eventi distinti

Per convertire i campi di record in eventi separati, usare l'operatore APPLY insieme alla funzione GetRecordProperties.

Con i dati di esempio originali, è possibile usare la query seguente per estrarre le proprietà in eventi diversi.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Il risultato è:

DeviceID SensorName AlertMessage
12345 Temperatura 80
12345 Umidità 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Usando WITH, è quindi possibile instradare tali eventi a destinazioni diverse:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Analizzare il record JSON nei dati di riferimento SQL

Quando si usa Database SQL di Azure come dati di riferimento nel processo, è possibile che una colonna contenga dati in formato JSON. Di seguito è illustrato un esempio.

DeviceID Data
12345 {"key": "value1"}
54321 {"key": "value2"}

È possibile analizzare il record JSON nella colonna Data scrivendo una semplice funzione JavaScript definita dall'utente.

function parseJson(string) {
return JSON.parse(string);
}

È quindi possibile creare un passaggio nella query di Analisi di flusso, come illustrato di seguito, per accedere ai campi dei record JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Tipi di dati matrice

I tipi di dati matrice sono una raccolta ordinata di valori. Di seguito sono descritte alcune operazioni tipiche sui valori di matrice. Questi esempi usano le funzioni GetArrayElement, GetArrayElements, GetArrayLength e l'operatore APPLY.

Ecco un esempio di evento. Sia CustomSensor03 che SensorMetadata sono di tipo matrice:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Uso di un elemento di matrice specifico

Selezionare un elemento della matrice in corrispondenza dell'indice specificato (selezione del primo elemento della matrice):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Il risultato è:

firstElement
12

Selezionare la lunghezza della matrice

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Il risultato è:

arrayLength
3

Convertire gli elementi di matrice in eventi distinti

Selezionare tutti gli elementi della matrice come singoli eventi. L'operatore APPLY, usato insieme alla funzione integrata GetArrayElements, estrae tutti gli elementi della matrice come singoli eventi:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Il risultato è:

deviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Il risultato è:

deviceId smKey smValue
12345 Produttore ABC
12345 Versione 1.2.45

Se i campi estratti devono essere visualizzati in colonne, è possibile trasformare il set di dati usando la sintassi WITH in aggiunta all'operazione JOIN. Tale join richiede una condizione di limite temporale che impedisce la duplicazione:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Il risultato è:

deviceId Lat long smVersion smManufacturer
12345 47 122 1.2.45 ABC

Vedere anche

Data Types in Azure Stream Analytics (Tipi di dati in Analisi di flusso di Azure)