Validação de entrada em consultas do Azure Stream Analytics

A validação de entrada é uma técnica usada para proteger a lógica de consulta principal contra eventos malformados ou inesperados. A consulta é atualizada para processar explicitamente e verificar os registros para que eles não possam interromper a lógica principal.

Para implementar a validação de entrada, adicionamos duas etapas iniciais a uma consulta. Primeiro, garantimos que o esquema enviado para a lógica de negócios principal corresponda às expectativas. Em seguida, fazemos a triagem das exceções e, opcionalmente, roteamos os registros inválidos para uma saída secundária.

Uma consulta com validação de entrada será estruturada da seguinte maneira:

WITH preProcessingStage AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		field1 AS in_field1,
		field2 AS in_field2,
		...

		-- Try casting fields in their expected type
		TRY_CAST(field1 AS bigint) as field1,
		TRY_CAST(field2 AS array) as field2,
		...

	FROM myInput TIMESTAMP BY myTimestamp
),

triagedOK AS (
	SELECT -- Only fields in their new expected type
		field1,
		field2,
		...
	FROM preProcessingStage
	WHERE ( ... ) -- Clauses make sure that the core business logic expectations are satisfied
),

triagedOut AS (
	SELECT -- All fields to ease diagnostic
		*
	FROM preProcessingStage
	WHERE NOT (...) -- Same clauses as triagedOK, opposed with NOT
)

-- Core business logic
SELECT
	...
INTO myOutput
FROM triagedOK
...

-- Audit output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- To a storage adapter that doesn't require strong typing, here blob/adls
FROM triagedOut

Para ver um exemplo abrangente de uma configuração de consulta com validação de entrada, confira a seção: Exemplo de consulta com validação de entrada.

Este artigo ilustra como implementar essa técnica.

Contexto

Os trabalhos do ASA (Azure Stream Analytics) processam dados provenientes de fluxos. Fluxos são sequências de dados brutos que são transmitidos serializados (CSV, JSON, AVRO...). Para ler de um fluxo, um aplicativo precisará conhecer o formato de serialização específico. No ASA, o formato de serialização do evento precisa ser definido ao configurar uma entrada de streaming.

Quando os dados são desserializados, um esquema precisa ser aplicado para conferir significado a eles. Com esquema, nos referimos à lista de campos no fluxo e aos respectivos tipos de dados. Com o ASA, o esquema dos dados de entrada não precisa ser definido no nível da entrada. Em vez disso, o ASA dá suporte a esquemas de entrada dinâmicos de modo nativo. Ele espera que a lista de campos (colunas), bem como seus tipos, mude entre eventos (linhas). O ASA também infere os tipos de dados quando nenhum é fornecido implicitamente, e tenta converter os tipos implicitamente quando necessário.

O tratamento de esquema dinâmico é um recurso avançado, fundamental para o processamento do fluxo. Frequentemente, os fluxos de dados contêm dados de várias origens, com vários tipos de evento, cada um deles com um esquema exclusivo. Para rotear, filtrar e processar eventos nesses fluxos, o ASA precisa ingerir todos eles, qualquer que seja o esquema.

Illustration of a pipeline with two fleet of devices sending data with conflicting schemas

No entanto, os recursos fornecidos pelo tratamento de esquema dinâmico têm um possível ponto negativo. Eventos inesperados podem passar pela lógica de consulta principal e interrompê-la. Por exemplo, podemos usar ROUND em um campo do tipo NVARCHAR(MAX). O ASA o converterá implicitamente para float para corresponder à assinatura de ROUND. Aqui, nós esperamos, ou desejamos, que esse campo sempre contenha valores numéricos. Mas quando recebemos um evento com o campo definido como "NaN", ou se o campo estiver completamente ausente, o trabalho poderá falhar.

Com a validação de entrada, adicionamos etapas preliminares à nossa consulta para tratar desses eventos malformados. Em princípio, usaremos WITH e TRY_CAST para implementá-la.

Cenário: validação de entrada para produtores de eventos não confiáveis

Criaremos um trabalho do ASA que vai ingerir dados de um hub de eventos. Assim como costuma ocorrer, não somos responsáveis pelos produtores dos dados. Aqui, os produtores são dispositivos IoT vendidos por diversos fornecedores de hardware.

Em conjunto com os stakeholders, concordamos com um formato de serialização e um esquema. Todos os dispositivos efetuarão push dessas mensagens a um hub de eventos comum, que será a entrada do trabalho do ASA.

O contrato do esquema é definido da seguinte forma:

Nome do campo Tipo de campo Descrição do campo
deviceId Integer Identificador de dispositivo exclusivo
readingTimestamp Datetime Hora da mensagem, gerada por um gateway central
readingStr String
readingNum Numérico
readingArray Matriz de cadeia de caracteres

Que por sua vez nos dá a seguinte mensagem de exemplo com a serialização JSON:

{
    "deviceId" : 1,
    "readingTimestamp" : "2021-12-10T10:00:00",
    "readingStr" : "A String",
    "readingNum" : 1.7,
    "readingArray" : ["A","B"]
}

Já podemos ver uma discrepância entre o contrato do esquema e sua implementação. No formato JSON, não há tipo de dados para datetime. Ele será transmitido como uma cadeia de caracteres (veja readingTimestamp acima). O ASA pode resolver esse problema facilmente, mas isso demonstra a necessidade de validar e converter tipos explicitamente. Especialmente para dados serializados em CSV, uma vez que todos os valores serão transmitidos como uma cadeia de caracteres.

Há outra discrepância. O ASA usa o próprio sistema de tipos que não corresponde ao sistema de entrada. Se o ASA tiver tipos internos para inteiro (bigint), datetime, cadeia de caracteres (nvarchar(max)) e matrizes, ele dará suporte ao tipo numérico somente via float. Essa diferença não é um problema para a maioria dos aplicativos. No entanto, em determinados casos de borda, pode causar ligeiros descompassos na precisão. Nesse caso, converteríamos o valor numérico em uma cadeia de caracteres em um novo campo. Downstream, usaríamos um sistema com suporte para decimal fixo para detectar e corrigir possíveis descompassos.

De volta à nossa consulta, aqui pretendemos:

  • Passar readingStr para um UDF do JavaScript
  • Contar o número de registros na matriz
  • Arredondar readingNum para a segunda casa decimal
  • Inserir os dados em uma tabela SQL

A tabela SQL de destino tem o seguinte esquema:

CREATE TABLE [dbo].[readings](
    [Device_Id] int NULL,
    [Reading_Timestamp] datetime2(7) NULL,
    [Reading_String] nvarchar(200) NULL,
    [Reading_Num] decimal(18,2) NULL,
    [Array_Count] int NULL
) ON [PRIMARY]

É uma boa prática mapear o que acontece com cada campo conforme ele passa pelo trabalho:

Campo Entrada (JSON) Tipo herdado (ASA) Saída (SQL do Azure) Comentário
deviceId número BIGINT Número inteiro
readingTimestamp string nvarchar(MAX) datetime2
readingStr string nvarchar(MAX) nvarchar(200) usado pelo UDF
readingNum número FLOAT decimal(18,2) a ser arredondado
readingArray array(string) matriz de nvarchar(MAX) inteiro a ser contado

Pré-requisitos

Desenvolveremos a consulta no Visual Studio Code usando a extensão Ferramentas do ASA. As primeiras etapas deste tutorial vão orientar você ao longo da instalação dos componentes necessários.

No VS Code, usaremos execuções locais com entrada/saída local para não gerar cursos e para acelerar o loop de depuração. Não precisaremos configurar um hub de eventos nem um Banco de Dados SQL do Azure.

Consulta base

Vamos começar com uma implementação básica, sem validação de entrada. Vamos adicioná-la na próxima seção.

No VS Code, vamos criar um projeto do ASA

Dentro da pasta input, criaremos um arquivo JSON chamado data_readings.json e adicionaremos os seguintes registros a ele:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingStr" : "Another String",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : -4.85436,
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : ["G","G"]
    }
]

Em seguida, definiremos uma entrada local, chamada readings, fazendo referência ao arquivo JSON criado acima.

Após configurado, ele deve ter esta aparência:

{
    "InputAlias": "readings",
    "Type": "Data Stream",
    "Format": "Json",
    "FilePath": "data_readings.json",
    "ScriptType": "InputMock"
}

Usando visualizar dados, podemos observar que nossos registros foram carregados corretamente.

Vamos criar um UDF do JavaScript chamado udfLen clicando com o botão direito do mouse na pasta Functions e selecionando ASA: Add Function. O código que usaremos é:

// Sample UDF that returns the length of a string for demonstration only: LEN will return the same thing in ASAQL
function main(arg1) {
    return arg1.length;
}

Em execuções locais, não precisamos definir saídas. Não precisamos nem mesmo usar INTO, a menos que haja mais de uma saída. No arquivo .asaql, podemos substituir a consulta existente:

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
FROM readings AS r TIMESTAMP BY r.readingTimestamp
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

Vamos percorrer rapidamente a consulta que enviamos:

  • Para contar o número de registros em cada matriz, primeiro precisamos desempacotá-los. Usaremos CROSS APPLY e GetArrayElements() (mais amostras aqui)
    • Fazendo isso, chegamos a dois conjuntos de dados na consulta: a entrada original e os valores da matriz. Para garantir que não misturemos os campos, definimos alias (AS r) e os utilizamos por toda parte
    • Para de fato COUNT os valores da matriz, precisamos agregar usando GROUP BY
    • Para isso, precisamos definir uma janela de tempo. Aqui, como não temos uma para nossa lógica, a janela de instantâneo é a escolha certa
  • Também precisamos GROUP BY todos os campos e projetá-los no SELECT. Projetar os campos explicitamente é uma boa prática, pois SELECT * permite que os erros fluam da entrada até a saída
    • Se definirmos uma janela de tempo, poderá ser conveniente definir um carimbo de data/hora com TIMESTAMP BY. Aqui, isso não é necessário para que nossa lógica funcione. Para execuções locais, sem TIMESTAMP BY, todos os registros são carregados em um só carimbo de data/hora, a hora de início da execução.
  • Usamos o UDF para filtrar leituras em que readingStr tem menos de dois caracteres. Deveríamos ter usado LEN aqui. Estamos usando um UDF apenas para fins de demonstração

Podemos iniciar uma execução e observar os dados sendo processados:

deviceId readingTimestamp readingStr readingNum arrayCount
1 2021-12-10T10:00:00 Uma cadeia de caracteres 1,71 2
2 2021-12-10T10:01:00 Outra cadeia de caracteres 2.38 1
3 2021-12-10T10:01:20 Uma terceira cadeia de caracteres -4.85 3
1 2021-12-10T10:02:10 Uma quarta cadeia de caracteres 1.21 2

Agora que sabemos que nossa consulta está funcionando, vamos testá-la com mais dados. Vamos substituir o conteúdo de data_readings.json pelos seguintes registros:

[
    {
        "deviceId" : 1,
        "readingTimestamp" : "2021-12-10T10:00:00",
        "readingStr" : "A String",
        "readingNum" : 1.7145,
        "readingArray" : ["A","B"]
    },
    {
        "deviceId" : 2,
        "readingTimestamp" : "2021-12-10T10:01:00",
        "readingNum" : 2.378,
        "readingArray" : ["C"]
    },
    {
        "deviceId" : 3,
        "readingTimestamp" : "2021-12-10T10:01:20",
        "readingStr" : "A Third String",
        "readingNum" : "NaN",
        "readingArray" : ["D","E","F"]
    },
    {
        "deviceId" : 4,
        "readingTimestamp" : "2021-12-10T10:02:10",
        "readingStr" : "A Forth String",
        "readingNum" : 1.2126,
        "readingArray" : {}
    }
]

Aqui, podemos ver os seguintes problemas:

  • O dispositivo n° 1 fez tudo corretamente
  • O dispositivo n° 2 esqueceu de incluir um readingStr
  • O dispositivo n° 3 enviou NaN como um número
  • O dispositivo n° 4 enviou um registro vazio em vez de uma matriz

Executar o trabalho não deve dar certo. Receberemos uma das seguintes mensagens de erro:

O dispositivo n° 2 nos dará:

[Error] 12/22/2021 10:05:59 PM : **System Exception** Function 'udflen' resulted in an error: 'TypeError: Unable to get property 'length' of undefined or null reference' Stack: TypeError: Unable to get property 'length' of undefined or null reference at main (Unknown script code:3:5)
[Error] 12/22/2021 10:05:59 PM :    at Microsoft.EventProcessing.HostedRuntimes.JavaScript.JavaScriptHostedFunctionsRuntime.

O dispositivo n° 3 nos dará:

[Error] 12/22/2021 9:52:32 PM : **System Exception** The 1st argument of function round has invalid type 'nvarchar(max)'. Only 'bigint', 'float' is allowed.
[Error] 12/22/2021 9:52:32 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Arithmetics.Round(CompilerPosition pos, Object value, Object length)

O dispositivo n° 4 nos dará:

[Error] 12/22/2021 9:50:41 PM : **System Exception** Cannot cast value of type 'record' to type 'array' in expression 'r . readingArray'. At line '9' and column '30'. TRY_CAST function can be used to handle values with unexpected type.
[Error] 12/22/2021 9:50:41 PM :    at Microsoft.EventProcessing.SteamR.Sql.Runtime.Cast.ToArray(CompilerPosition pos, Object value, Boolean isUserCast)

A cada vez, registros malformados puderam passar da entrada para a lógica de consulta principal sem serem validados. Agora, percebemos o valor da validação de entrada.

Implementando a validação de entrada

Vamos estender nossa consulta para validar a entrada.

A primeira etapa da validação de entrada é definir as expectativas do esquema da lógica de negócios principal. Considerando o requisito original, nossa lógica principal é:

  • Passar readingStr para um UDF JavaScript para medir seu comprimento
  • Contar o número de registros na matriz
  • Arredondar readingNum para a segunda casa decimal
  • Inserir os dados em uma tabela SQL

Para cada ponto, podemos listar as expectativas:

  • O UDF requer um argumento do tipo cadeia de caracteres (nvarchar(max) aqui) que não pode ser nulo
  • GetArrayElements() requer um argumento do tipo matriz ou um valor nulo
  • Round requer um argumento do tipo bigint ou float ou um valor nulo
  • Em vez de depender da conversão implícita do ASA, devemos fazer isso por conta própria e resolver os conflitos de tipo na consulta

Uma maneira de fazer isso é adaptar a lógica principal para lidar com essas exceções. Mas, neste caso, acreditamos que nossa lógica está perfeita. Então, vamos validar os dados de entrada.

Primeiro, vamos usar WITH para adicionar uma camada de validação de entrada como a primeira etapa da consulta. Vamos usar TRY_CAST para converter campos no tipo esperado e defini-los como NULL se a conversão falhar:

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
)

-- For debugging only
SELECT * FROM readingsValidated

Com o último arquivo de entrada que usamos (o que continha erros), essa consulta retornará o seguinte conjunto:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00 Uma cadeia de caracteres 1.7145 ["A","B"] 1 2021-12-10T10:00:00Z Uma cadeia de caracteres 1.7145 ["A","B"]
2 2021-12-10T10:01:00 NULO 2.378 ["C"] 2 2021-12-10T10:01:00Z NULO 2.378 ["C"]
3 2021-12-10T10:01:20 Uma terceira cadeia de caracteres NaN ["D","E","F"] 3 2021-12-10T10:01:20Z Uma terceira cadeia de caracteres NULL ["D","E","F"]
4 2021-12-10T10:02:10 Uma quarta cadeia de caracteres 1.2126 {} 4 2021-12-10T10:02:10Z Uma quarta cadeia de caracteres 1.2126 NULL

Já podemos ver dois de nossos erros sendo resolvidos. Transformamos NaN e {} em NULL. Agora, estamos certos de que esses registros serão inseridos corretamente na tabela SQL de destino.

Agora, precisamos decidir como lidar com os registros com valores inválidos ou ausentes. Depois de alguma discussão, decidimos rejeitar os registros com um readingArray vazio/inválido ou um readingStr ausente.

Então, adicionamos uma segunda camada que fará a triagem dos registros entre a camada de validação e a lógica principal:

WITH readingsValidated AS (
	...
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- For debugging only
SELECT * INTO Debug1 FROM readingsToBeProcessed
SELECT * INTO Debug2 FROM readingsToBeRejected

É uma boa prática escrever somente uma cláusula WHERE para as duas saídas e usar NOT (...) na segunda. Assim, nenhum registro poderá ser excluído das duas saídas e ser perdido.

Agora, obtemos duas saídas. Debug1 tem os registros que serão enviados à lógica principal:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z Uma cadeia de caracteres 1.7145 ["A","B"]
3 2021-12-10T10:01:20Z Uma terceira cadeia de caracteres NULO ["D","E","F"]

Debug2 tem os registros que serão rejeitados:

in_deviceId in_readingTimestamp in_readingStr in_readingNum in_readingArray deviceId readingTimestamp readingStr readingNum readingArray
2 2021-12-10T10:01:00 NULO 2.378 ["C"] 2 2021-12-10T10:01:00Z NULL 2.378 ["C"]
4 2021-12-10T10:02:10 Uma quarta cadeia de caracteres 1.2126 {} 4 2021-12-10T10:02:10Z Uma quarta cadeia de caracteres 1.2126 NULL

A última etapa é adicionar nossa lógica principal de volta. Também adicionaremos a saída que reúne as rejeições. Aqui, é melhor usar um adaptador de saída que não imponha tipagem forte, como uma conta de armazenamento.

A consulta completa pode ser encontrada na última seção.

WITH
readingsValidated AS (...),
readingsToBeProcessed AS (...),
readingsToBeRejected AS (...)

SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

SELECT
	*
INTO BlobOutput
FROM readingsToBeRejected

Isso nos dará o seguinte conjunto para SQLOutput, sem nenhum erro possível:

deviceId readingTimestamp readingStr readingNum readingArray
1 2021-12-10T10:00:00Z Uma cadeia de caracteres 1.7145 2
3 2021-12-10T10:01:20Z Uma terceira cadeia de caracteres NULO 3

Os outros dois registros são enviados para um BlobOutput para revisão humana e pós-processamento. Nossa consulta está em segurança.

Exemplo de consulta com validação de entrada

WITH readingsValidated AS (
	SELECT
		-- Rename incoming fields, used for audit and debugging
		deviceId AS in_deviceId,
		readingTimestamp AS in_readingTimestamp,
		readingStr AS in_readingStr,
		readingNum AS in_readingNum,
		readingArray AS in_readingArray,

		-- Try casting fields in their expected type
		TRY_CAST(deviceId AS bigint) as deviceId,
		TRY_CAST(readingTimestamp AS datetime) as readingTimestamp,
		TRY_CAST(readingStr AS nvarchar(max)) as readingStr,
		TRY_CAST(readingNum AS float) as readingNum,
		TRY_CAST(readingArray AS array) as readingArray

	FROM readings TIMESTAMP BY readingTimestamp
),

readingsToBeProcessed AS (
	SELECT
		deviceId,
		readingTimestamp,
		readingStr,
		readingNum,
		readingArray
	FROM readingsValidated
	WHERE
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
),

readingsToBeRejected AS (
	SELECT
		*
	FROM readingsValidated
	WHERE -- Same clauses as readingsToBeProcessed, opposed with NOT
	NOT (
		readingStr IS NOT NULL
	AND readingArray IS NOT NULL
	)
)

-- Core business logic
SELECT
	r.deviceId,
	r.readingTimestamp,
	SUBSTRING(r.readingStr,1,200) AS readingStr,
	ROUND(r.readingNum,2) AS readingNum,
	COUNT(a.ArrayValue) AS arrayCount
INTO SQLOutput
FROM readingsToBeProcessed AS r
CROSS APPLY GetArrayElements(r.readingArray) AS a
WHERE UDF.udfLen(r.readingStr) >= 2
GROUP BY
	System.Timestamp(), --snapshot window
	r.deviceId,
	r.readingTimestamp,
	r.readingStr,
	r.readingNum

-- Rejected output. For human review, correction, and manual re-insertion downstream
SELECT
	*
INTO BlobOutput -- to a storage adapter that doesn't require strong typing, here blob/adls
FROM readingsToBeRejected

Estendendo a validação da entrada

GetType pode ser usado para verificar um tipo explicitamente. Ele funciona bem com CASE no nível da projeção ou com WHERE no nível da definição. GetType também pode ser usado para verificar de maneira dinâmica o esquema de entrada com relação a um repositório de metadados. O repositório pode ser carregado por meio de um conjunto de dados de referência.

Os testes de unidade são uma boa prática para garantir que nossa consulta seja resiliente. Criaremos uma série de testes compostos por arquivos de entrada e suas saídas esperadas. Nossa consulta precisará corresponder à saída gerada para ser aprovada. No ASA, os testes de unidade são realizados por meio do módulo do npm asa-streamanalytics-cicd. Casos de teste com diversos eventos malformados devem ser criados e testados no pipeline de implantação.

Por fim, podemos fazer alguns testes de integração no VS Code. Podemos inserir registros na tabela SQL por meio de uma execução local para uma saída ao vivo.

Obtenha suporte

Para obter mais assistência, confira nossa página de Perguntas e respostas do Microsoft do Azure Stream Analytics.

Próximas etapas