Validação de entrada em consultas do Azure Stream Analytics

A validação de entrada é uma técnica a ser usada para proteger a lógica de consulta principal contra eventos malformados ou inesperados. A consulta é atualizada para processar e verificar registros explicitamente para que eles não possam quebrar a lógica principal.

Para implementar a validação de entrada, adicionamos duas etapas iniciais a uma consulta. Primeiro, certificamo-nos de que o esquema submetido à lógica de negócios principal corresponde às suas expectativas. Em seguida, triamos exceções e, opcionalmente, encaminhamos registros inválidos para uma saída secundária.

Uma consulta com validação de entrada será estruturada da seguinte forma:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Para ver um exemplo abrangente de uma consulta configurada com validação de entrada, consulte a seção: Exemplo de consulta com validação de entrada.

Este artigo ilustra como implementar essa técnica.

Contexto

Os trabalhos do Azure Stream Analytics (ASA) processam dados provenientes de fluxos. Fluxos são sequências de dados brutos que são transmitidos serializados (CSV, JSON, AVRO...). Para ler a partir de um fluxo, um aplicativo precisará saber o formato de serialização específico usado. No ASA, o formato de serialização de eventos deve ser definido ao configurar uma entrada de streaming.

Uma vez que os dados são desserializados, um esquema precisa ser aplicado para dar-lhes significado. Por esquema, queremos dizer a lista de campos no fluxo e seus respetivos tipos de dados. Com o ASA, o esquema dos dados de entrada não precisa ser definido no nível de entrada. Em vez disso, o ASA suporta esquemas de entrada dinâmicos nativamente. Ele espera que a lista de campos (colunas) e seus tipos mudem entre eventos (linhas). O ASA também inferirá tipos de dados quando nenhum for fornecido explicitamente e tentará converter tipos implicitamente quando necessário.

A manipulação dinâmica de esquemas é um recurso poderoso, fundamental para o processamento de fluxo. Os fluxos de dados geralmente contêm dados de várias fontes, com vários tipos de eventos, cada um com um esquema exclusivo. Para encaminhar, filtrar e processar eventos em tais fluxos, o ASA tem que ingeri-los todos, seja qual for o seu esquema.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

Mas os recursos oferecidos pela manipulação dinâmica de esquemas vêm com uma desvantagem potencial. Eventos inesperados podem fluir através da lógica de consulta principal e quebrá-la. Como exemplo, podemos usar ROUND em um campo do tipo NVARCHAR(MAX). ASA irá implicitamente lançá-lo para flutuar para corresponder à assinatura de ROUND. Aqui esperamos, ou esperamos, que este campo contenha sempre valores numéricos. Mas quando recebemos um evento com o campo definido como "NaN", ou se o campo estiver totalmente ausente, o trabalho pode falhar.

Com a validação de entrada, adicionamos etapas preliminares à nossa consulta para lidar com esses eventos malformados. Usaremos principalmente WITH e TRY_CAST para implementá-lo.

Cenário: validação de entrada para produtores de eventos não confiáveis

Criaremos um novo trabalho ASA que ingerirá dados de um único hub de eventos. Como acontece na maioria das vezes, não somos responsáveis pelos produtores de dados. Aqui, os produtores são dispositivos IoT vendidos por vários fornecedores de hardware.

Reunindo-nos com as partes interessadas, acordamos um formato de serialização e um esquema. Todos os dispositivos enviarão essas mensagens para um hub de eventos comum, a entrada do trabalho ASA.

O contrato de esquema é definido da seguinte forma:

Nome do campo Tipo de campo Descrição do campo
deviceId Número inteiro Identificador exclusivo do dispositivo
readingTimestamp Datetime Tempo de mensagem, gerado por um gateway central
readingStr String
readingNum Numérico
readingArray Matriz de String

O que, por sua vez, nos dá a seguinte mensagem de exemplo em serialização JSON:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Já podemos ver uma discrepância entre o contrato do esquema e sua implementação. No formato JSON, não há nenhum tipo de dados para datetime. Ele será transmitido como uma string (veja readingTimestamp acima). O ASA pode facilmente resolver o problema, mas mostra a necessidade de validar e transmitir explicitamente os tipos. Ainda mais para dados serializados em CSV, uma vez que todos os valores são transmitidos como string.

Há outra discrepância. O ASA usa seu próprio sistema de tipo que não corresponde ao de entrada. Se o ASA tiver tipos internos para inteiros (bigint), datetime, string (nvarchar(max)) e matrizes, ele só suporta numéricos via float. Essa incompatibilidade não é um problema para a maioria dos aplicativos. Mas, em certos casos extremos, pode causar ligeiras derrapagens na precisão. Nesse caso, converteríamos o valor numérico como string em um novo campo. Então, a jusante, usaríamos um sistema que suporta decimal fixo para detetar e corrigir desvios potenciais.

Voltando à nossa consulta, aqui pretendemos:

  • Passar readingStr para um UDF JavaScript
  • Contar o número de registros na matriz
  • Arredondar readingNum para a segunda casa decimal
  • Inserir os dados em uma tabela SQL

A tabela SQL de destino tem o seguinte esquema:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

É uma boa prática mapear o que acontece com cada campo ao longo do trabalho:

Campo Entrada (JSON) Tipo hereditário (ASA) Saída (Azure SQL) Comentário
deviceId Número bigint integer
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) Nvarchar(200) utilizado pela UDF
readingNum Número flutuante decimal(18,2) a arredondar
readingArray matriz(string) matriz de nvarchar (MAX) integer a contar

Pré-requisitos

Desenvolveremos a consulta no Visual Studio Code usando a extensão ASA Tools . Os primeiros passos deste tutorial irão guiá-lo através da instalação dos componentes necessários.

No VS Code, usaremos execuções locais com entrada/saída local para não incorrer em nenhum custo e aceleraremos o loop de depuração. Não precisaremos configurar um hub de eventos ou um Banco de Dados SQL do Azure.

Consulta base

Vamos começar com uma implementação básica, sem validação de entrada. Vamos adicioná-lo na próxima seção.

No VS Code, criaremos um novo projeto ASA

input Na pasta, criaremos um novo arquivo JSON chamado data_readings.json e adicionaremos os seguintes registros a ele:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Em seguida, definiremos uma entrada local, chamada readings, fazendo referência ao arquivo JSON que criamos acima.

Uma vez configurado, deve ter a seguinte aparência:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Com dados de visualização, podemos observar que nossos registros são carregados corretamente.

Vamos criar um novo JavaScript UDF chamado udfLen clicando com o botão direito do Functions mouse na pasta e selecionando ASA: Add Function. O código que usaremos é:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

Em execuções locais, não precisamos definir saídas. Nem precisamos usar INTO , a menos que haja mais de uma saída. .asaql No arquivo, podemos substituir a consulta existente por:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Vamos analisar rapidamente a consulta que enviamos:

  • Para contar o número de registros em cada matriz, primeiro precisamos descompactá-los. Usaremos CROSS APPLY e GetArrayElements() (mais exemplos aqui)
    • Fazendo isso, apresentamos dois conjuntos de dados na consulta: a entrada original e os valores da matriz. Para garantir que não misturamos campos, definimos aliases (AS r) e os usamos em todos os lugares
    • Então, para realmente COUNT os valores da matriz, precisamos agregar com GROUP BY
    • Para isso, temos de definir uma janela temporal. Aqui, como não precisamos de um para nossa lógica, a janela de instantâneo é a escolha certa
  • Temos também para GROUP BY todos os campos, e projetá-los todos no SELECT. A projeção explícita de campos é uma boa prática, pois SELECT * permitirá que os erros fluam da entrada para a saída
    • Se definirmos uma janela de tempo, podemos querer definir um carimbo de data/hora com TIMESTAMP BY. Aqui não é necessário que a nossa lógica funcione. Para execuções locais, sem TIMESTAMP BY que todos os registros sejam carregados em um único carimbo de data/hora, a hora de início da execução.
  • Usamos o UDF para filtrar leituras onde readingStr tem menos de dois caracteres. Deveríamos ter usado LEN aqui. Estamos usando um UDF apenas para fins de demonstração

Podemos iniciar uma corrida e observar os dados que estão sendo processados:

deviceId leituraCarimbo de data/hora leituraStr leituraNum arrayCount
1 2021-12-10T10:00:00 Uma cadeia de caracteres 1,71 2
2 2021-12-10T10:01:00 Outra String 2.38 1
3 2021-12-10T10:01:20 Uma terceira cadeia de caracteres -4.85 3
1 2021-12-10T10:02:10 Uma quarta corda 1.21 2

Agora que sabemos que nossa consulta está funcionando, vamos testá-la em relação a mais dados. Vamos substituir o conteúdo do data_readings.json pelos seguintes registros:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Aqui podemos ver as seguintes questões:

  • O dispositivo #1 fez tudo certo
  • O dispositivo #2 esqueceu-se de incluir um readingStr
  • Dispositivo #3 enviado NaN como um número
  • O dispositivo #4 enviou um registro vazio em vez de uma matriz

Executar o trabalho agora não deve terminar bem. Receberemos uma das seguintes mensagens de erro:

O Dispositivo 2 nos dará:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

O Dispositivo 3 nos dará:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

O dispositivo 4 nos dará:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

Cada vez que os registros malformados podiam fluir da entrada para a lógica de consulta principal sem serem validados. Agora percebemos o valor da validação de entrada.

Implementando a validação de entrada

Vamos estender nossa consulta para validar a entrada.

A primeira etapa da validação de entrada é definir as expectativas de esquema da lógica de negócios principal. Olhando para o requisito original, a nossa lógica principal é:

  • Passe readingStr para um UDF JavaScript para medir seu comprimento
  • Contar o número de registros na matriz
  • Arredondar readingNum para a segunda casa decimal
  • Inserir os dados em uma tabela SQL

Para cada ponto podemos elencar as expectativas:

  • O UDF requer um argumento do tipo string (nvarchar(max) aqui) que não pode ser nulo
  • GetArrayElements() requer um argumento do tipo array ou um valor nulo
  • Round requer um argumento do tipo bigint ou float, ou um valor nulo
  • Em vez de confiar na transmissão implícita do ASA, devemos fazê-lo nós mesmos e lidar com conflitos de tipo na consulta

Um caminho a seguir é adaptar a lógica principal para lidar com essas exceções. Mas, neste caso, acreditamos que a nossa lógica principal é perfeita. Então, vamos validar os dados recebidos.

Primeiro, vamos usar WITH para adicionar uma camada de validação de entrada como a primeira etapa da consulta. Usaremos TRY_CAST para converter campos para o tipo esperado e defini-los se NULL a conversão falhar:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

Com o último arquivo de entrada que usamos (aquele com erros), esta consulta retornará o seguinte conjunto:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId leituraCarimbo de data/hora leituraStr leituraNum readingArray
1 2021-12-10T10:00:00 Uma cadeia de caracteres 1.7145 ["A","B"] 1 2021-12-10T10:00:00.0000000Z Uma cadeia de caracteres 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULL 2.378 ["C"]
3 2021-12-10T10:01:20 Uma terceira cadeia de caracteres NaN ["D","E","F"] 3 2021-12-10T10:01:20.0000000Z Uma terceira cadeia de caracteres NULO ["D","E","F"]
4 2021-12-10T10:02:10 Uma quarta corda 1.2126 {} 4 2021-12-10T10:02:10.0000000Z Uma quarta corda 1.2126 NULO

Já podemos ver dois dos nossos erros a serem corrigidos. Nós nos transformamos NaN e {} em NULL. Agora estamos confiantes de que esses registros serão inseridos corretamente na tabela SQL de destino.

Agora temos que decidir como lidar com os registros com valores ausentes ou inválidos. Depois de alguma discussão, decidimos rejeitar registros com um vazio/inválido readingArray ou um ausente readingStr.

Assim, adicionamos uma segunda camada que fará a triagem dos registros entre a validação e a lógica principal:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

É uma boa prática escrever uma única WHERE cláusula para ambas as saídas e usar NOT (...) na segunda. Dessa forma, nenhum registro pode ser excluído das saídas e perdido.

Agora temos duas saídas. Debug1 tem os registros que serão enviados para a lógica principal:

deviceId leituraCarimbo de data/hora leituraStr leituraNum readingArray
1 2021-12-10T10:00:00.0000000Z Uma cadeia de caracteres 1.7145 ["A","B"]
3 2021-12-10T10:01:20.0000000Z Uma terceira cadeia de caracteres NULL ["D","E","F"]

Debug2 tem os registros que serão rejeitados:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId leituraCarimbo de data/hora leituraStr leituraNum readingArray
2 2021-12-10T10:01:00 NULL 2.378 ["C"] 2 2021-12-10T10:01:00.0000000Z NULO 2.378 ["C"]
4 2021-12-10T10:02:10 Uma quarta corda 1.2126 {} 4 2021-12-10T10:02:10.0000000Z Uma quarta corda 1.2126 NULO

O passo final é adicionar nossa lógica principal de volta. Também adicionaremos a saída que reúne rejeições. Aqui é melhor usar um adaptador de saída que não imponha digitação forte, como uma conta de armazenamento.

A consulta completa pode ser encontrada na última seção.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

O que nos dará o seguinte conjunto para SQLOutput, sem erro possível:

deviceId leituraCarimbo de data/hora leituraStr leituraNum readingArray
1 2021-12-10T10:00:00.0000000Z Uma cadeia de caracteres 1.7145 2
3 2021-12-10T10:01:20.0000000Z Uma terceira cadeia de caracteres NULL 3

Os outros dois registros são enviados para um BlobOutput para revisão humana e pós-processamento. A nossa consulta está agora segura.

Exemplo de consulta com validação de entrada

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Estendendo a validação de entrada

GetType pode ser usado para verificar explicitamente se há um tipo. Funciona bem com CASE na projeção, ou WHERE no nível definido. GetType também pode ser usado para verificar dinamicamente o esquema de entrada em relação a um repositório de metadados. O repositório pode ser carregado através de um conjunto de dados de referência.

O teste de unidade é uma boa prática para garantir que nossa consulta seja resiliente. Vamos construir uma série de testes que consistem em arquivos de entrada e sua saída esperada. Nossa consulta terá que corresponder à saída que gera para passar. No ASA, o teste de unidade é feito através do módulo npm asa-streamanalytics-cicd . Casos de teste com vários eventos malformados devem ser criados e testados no pipeline de implantação.

Finalmente, podemos fazer alguns testes de integração leve no VS Code. Podemos inserir registros na tabela SQL através de uma execução local para uma saída ao vivo.

Obter suporte

Para obter mais assistência, experimente a nossa página de perguntas e respostas da Microsoft para o Azure Stream Analytics.

Próximos passos