Aggiornare o unire record nel database SQL di Azure con Funzioni di Azure

Attualmente Analisi di flusso di Azure (ASA) supporta solo l'inserimento (accodamento) di righe agli output SQL (database SQL di Azure e Azure Synapse Analytics). Questo articolo illustra le soluzioni alternative per abilitare UPDATE, UPSERT o MERGE nei database SQL, con Funzioni di Azure come livello intermedio.

Le opzioni alternative a Funzioni di Azure vengono presentate alla fine.

Requisito

La scrittura di dati in una tabella può essere in genere eseguita nel modo seguente:

Modalità Istruzione T-SQL equivalente Requisiti
Aggiunta INSERT None
Sostituzione UNIRE (UPSERT) Chiave univoca
Accumulate UNIRE (UPSERT) con operatore di assegnazione composta (+=, -=...) Chiave univoca e identificatore

Per illustrare le differenze, esaminare cosa accade quando si inseriscono i due record seguenti:

Ora di arrivo Device_Id Measure_Value
10.00 A 1
10:05 A 20

Nella modalità di accodamento vengono inseriti due record. L'istruzione T-SQL equivalente è:

INSERT INTO [target] VALUES (...);

Risultato:

Modified_Time Device_Id Measure_Value
10.00 A 1
10:05 A 20

Nella modalità di sostituzione si ottiene solo l'ultimo valore per chiave. Qui si usa Device_Id come chiave. L'istruzione T-SQL equivalente è:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 A 20

Infine, nella modalità di accumulo si somma Value con un operatore di assegnazione composta (+=). In questo caso si usano anche Device_Id come chiave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 Un 21

Per considerazioni sulle prestazioni, gli adattatori di output del database SQL ASA supportano attualmente solo la modalità di accodamento in modo nativo. Questi adattatori usano l'inserimento bulk per ottimizzare la velocità effettiva e limitare la pressione.

Questo articolo illustra come usare Funzioni di Azure per implementare le modalità Replace e Accumulate per ASA. Quando si usa una funzione come livello intermedio, le potenziali prestazioni di scrittura non influiscono sul processo di streaming. A questo proposito, l'uso di Funzioni di Azure funziona meglio con Azure SQL. Con Synapse SQL, il passaggio da istruzioni bulk a righe per riga potrebbe creare problemi di prestazioni maggiori.

Output di Funzioni di Azure

Nel processo viene sostituito l'output SQL ASA dall'output di Funzioni di Azure ASA. Le funzionalità UPDATE, UPSERT o MERGE vengono implementate nella funzione.

Attualmente sono disponibili due opzioni per accedere a un database SQL in una funzione. Il primo è l'associazione di output di Azure SQL. Attualmente è limitato a C# e offre solo la modalità di sostituzione. In secondo luogo, creare una query SQL da inviare tramite il driver SQL appropriato (Microsoft.Data.SqlClient per .NET).

Per entrambi gli esempi seguenti si presuppone lo schema della tabella seguente. L'opzione di associazione richiede l'impostazione di una chiave primaria nella tabella di destinazione. Non è necessario, ma consigliato, quando si usa un driver SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Una funzione deve soddisfare le aspettative seguenti da usare come output da ASA:

  • Analisi di flusso di Azure prevede lo stato HTTP 200 dall'app Funzioni per i batch elaborati correttamente
  • Quando Analisi di flusso di Azure riceve l'eccezione 413 ("Entità richiesta HTTP troppo grande") da una funzione di Azure, riduce la dimensione dei batch che invia a Funzioni di Azure
  • Durante la connessione di test, Analisi di flusso invia una richiesta POST con un batch vuoto a Funzioni di Azure e prevede lo stato HTTP 20x per convalidare il test

Opzione 1: Aggiornare per chiave con l'associazione SQL della funzione di Azure

Questa opzione usa l'associazione di output SQL della funzione di Azure. Questa estensione può sostituire un oggetto in una tabella, senza dover scrivere un'istruzione SQL. Al momento, non supporta operatori di assegnazione composti (accumuli).

Questo esempio è stato compilato in:

Per comprendere meglio l'approccio di associazione, è consigliabile seguire questa esercitazione.

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Sono richieste le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare l'estensione di associazione eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Aggiungere l'elemento SqlConnectionString nella sezione Values del local.settings.json, inserendo la stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aggiornare il nome della tabella di destinazione nella sezione di associazione:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Aggiornare la sezione di classe e mapping Device in modo che corrisponda al proprio schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in Visual Studio Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto dovrebbe restituire http 204. Una richiesta con un payload effettivo dovrebbe essere mantenuta nella tabella di destinazione (in modalità replace/update). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

È ora possibile pubblicare la funzione in Azure. È necessario specificare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.

È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.

Opzione 2: eseguire il merge con l'assegnazione composta (accumulare) tramite una query SQL personalizzata

Nota

Al riavvio e al ripristino, ASA può inviare nuovamente gli eventi di output già generati. Si tratta di un comportamento previsto che può causare l'esito negativo della logica di accumulo (raddoppiando i singoli valori). Per evitare questo problema, è consigliabile restituire gli stessi dati in una tabella tramite l'output SQL ASA nativo. È quindi possibile usare questa tabella di controllo per rilevare i problemi e sincronizzare nuovamente l'accumulo quando necessario.

Questa opzione usa Microsoft.Data.SqlClient. Questa libreria consente di eseguire query SQL a un database SQL.

Questo esempio è stato compilato in:

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Sono richieste le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare la libreria SqlClient eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Aggiungere l'elemento SqlConnectionString nella sezione Values del local.settings.json, inserendo la stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Aggiornare la sezione di compilazione dei comandi sqltext in modo che corrisponda al proprio schema (si noti come l'accumulo viene ottenuto tramite l'operatore += all'aggiornamento):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto dovrebbe restituire http 204. Una richiesta con un payload effettivo dovrebbe essere salvata in modo permanente nella tabella di destinazione (in modalità di accumulo/unione). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

È ora possibile pubblicare la funzione in Azure. È necessario specificare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.

È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.

Alternative

Al di fuori di Funzioni di Azure, esistono diversi modi per ottenere il risultato previsto. In questa sezione vengono fornite alcune di esse.

Post-elaborazione nel database SQL di destinazione

Un'attività in background viene eseguita dopo l'inserimento dei dati nel database tramite gli output STANDARD di ASA SQL.

Per Azure SQL, è possibile usare i INSTEAD OFtrigger DML per intercettare i comandi INSERT emessi da ASA:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Per Synapse SQL, ASA è in grado di inserire in una tabella di staging. Un'attività ricorrente può quindi trasformare i dati in base alle esigenze in una tabella intermedia. Infine, i dati vengono spostati nella tabella di produzione.

Pre-elaborazione in Azure Cosmos DB

Azure Cosmos DB supporta UPSERT in modo nativo. Qui è possibile solo accodamento/sostituzione. Gli accumuli devono essere gestiti sul lato client in Azure Cosmos DB.

Se i requisiti corrispondono, un'opzione consiste nel sostituire il database SQL di destinazione da un'istanza di Azure Cosmos DB. In questo modo è necessaria una modifica importante nell'architettura complessiva della soluzione.

Per Synapse SQL, è possibile usare Azure Cosmos DB come livello intermedio tramite Collegamento ad Azure Synapse per Azure Cosmos DB. È possibile usare Collegamento ad Azure Synapse per creare un archivio analitico. È quindi possibile sottoporre questo archivio dati a query direttamente in Synapse SQL.

Confronto delle alternative

Ogni approccio offre proposte e funzionalità di valore diverse:

Type Opzione Modalità database SQL di Azure Azure Synapse Analytics
Post-elaborazione
Trigger Sostituisci, Accumula + N/D, i trigger non sono disponibili in Synapse SQL
Staging Sostituisci, Accumula + +
Pre-elaborazione
Funzioni di Azure Sostituisci, Accumula + - (prestazioni riga per riga)
Sostituzione di Azure Cosmos DB Sostituzione N/D N/D
Azure Cosmos DB Azure Synapse Link Sostituzione N/D +

Ottenere supporto

Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.

Passaggi successivi