Aktualisieren oder Zusammenführen von Datensätzen in Azure SQL-Datenbank mit Azure Functions
Aktuell unterstützt Azure Stream Analytics (ASA) nur das Einfügen (Anfügen) von Zeilen in SQL-Ausgaben (Azure SQL-Datenbanken und Azure Synapse Analytics). In diesem Artikel werden Problemumgehungen zum Aktivieren von UPDATE, UPSERT oder MERGE für SQL-Datenbanken erläutert, wobei Azure Functions als zwischengeschaltete Ebene verwendet wird.
Alternative Optionen für Azure Functions werden am Ende vorgestellt.
Anforderung
Das Schreiben von Daten in eine Tabelle kann im Allgemeinen in der folgenden Weise erfolgen:
Mode | Entsprechende T-SQL-Anweisung | Anforderungen |
---|---|---|
Anfügen | INSERT | Keine |
Replace | MERGE (UPSERT) | Eindeutiger Schlüssel |
Accumulate | MERGE (UPSERT) mit Verbundzuweisungs-Operator (+= , -= ...) |
Eindeutiger Schlüssel und Akkumulator |
Sehen Sie sich zum Verdeutlichen der Unterschiede an, was bei der Erfassung der folgenden beiden Datensätze geschieht:
Arrival_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
Im Anfügemodus fügen wir zwei Datensätze ein. Die entsprechende T-SQL-Anweisung lautet:
INSERT INTO [target] VALUES (...);
Ergebnis:
Modified_Time | Device_Id | Measure_Value |
---|---|---|
10:00 | A | 1 |
10:05 | A | 20 |
Im Ersetzungsmodus erhalten wir nur den dem Schlüssel nach letzten Wert. Hier verwenden wir Device_Id als Schlüssel. Die entsprechende T-SQL-Anweisung lautet:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ergebnis:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 20 |
Schließlich wird im Akkumulationsmodus eine Summe aus Value
mit einem Verbundzuweisungsoperator (+=
) gebildet. Hier verwenden wir ebenfalls Device_Id als Schlüssel:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ergebnis:
Modified_Time | Device_Key | Measure_Value |
---|---|---|
10:05 | A | 21 |
Aus Leistungsgründen unterstützen die ASA-SQL Datenbankausgabeadapter derzeit nur den Anfügemodus nativ. Diese Adapter verwenden die Masseneinfügung, um den Durchsatz zu maximieren und den Gegendruck zu begrenzen.
In diesem Artikel wird die Verwendung von Azure Functions zum Implementieren des Ersetzungs- und des Akkumulationsmodus für ASA veranschaulicht. Wenn Sie eine Funktion als Zwischenschicht verwenden, wirkt sich die potenzielle Schreibleistung nicht auf den Streamingauftrag aus. In dieser Hinsicht funktioniert Azure Functions am besten mit Azure SQL. Bei Synapse SQL kann der Wechsel von der Massenausführung zu zeilenbasierten Anweisungen zu größeren Leistungsproblemen führen.
Azure Functions-Ausgabe
In unserem Auftrag ersetzen wir die ASA-SQL-Ausgabe durch die ASA Azure Functions-Ausgabe. Die Funktionen UPDATE, UPSERT oder MERGE sind in der Funktion implementiert.
Es gibt aktuell zwei Optionen für den Zugriff auf eine SQL-Datenbank in einer Funktion. Die erste ist die Azure SQL-Ausgabebindung. Sie ist derzeit auf C# beschränkt und bietet nur den Ersetzungsmodus. Die zweite besteht im Zusammenstellen einer SQL-Abfrage für die Übermittlung über den entsprechenden SQL-Treiber (Microsoft.Data.SqlClient für .NET).
Für beide nachfolgenden Beispiele gehen wir vom folgenden Tabellenschema aus. Für die Bindungsoption muss ein Primärschlüssel für die Zieltabelle festgelegt werden. Bei Verwendung eines SQL-Treibers ist dies nicht notwendig, aber empfehlenswert.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Eine Funktion muss die folgenden Erwartungen erfüllen, damit sie als Ausgabe von ASA verwendet werden kann:
- Azure Stream Analytics erwartet den HTTP-Status 200 von der Functions-App für Batches, die erfolgreich verarbeitet wurden.
- Die Größe der an Azure Functions gesendeten Batches wird verringert, wenn in Azure Stream Analytics die Ausnahme 413 (HTTP-Anforderungseinheit zu groß) durch eine Azure-Funktion auftritt.
- Bei bestehender Testverbindung sendet Stream Analytics eine POST-Anforderung mit einem leeren Batch an Azure Functions und erwartet 20 mal den HTTP-Status zurück, damit der Test als bestanden gilt.
Option 1: Aktualisierung nach Schlüssel mit der Azure SQL-Bindungsfunktion
Bei dieser Option wird die Azure-Funktion SQL-Ausgabebindung verwendet. Diese Erweiterung kann ein Objekt in einer Tabelle ersetzen, ohne eine SQL-Anweisung schreiben zu müssen. Zurzeit werden keine Verbundzuweisungsoperatoren (Akkumulationen) unterstützt.
Dieses Beispiel basiert auf:
- Azure Functions Runtime, Version 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
Zum besseren Verständnis des Bindungsansatzes empfehlen wir, diesem Tutorial zu folgen.
Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Die folgenden Informationen werden verwendet:
- Sprache:
C#
- Runtime:
.NET 6
(unter function/runtime v4) - Vorlage:
HTTP trigger
Installieren Sie die Bindungserweiterung, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Fügen Sie das SqlConnectionString
-Element im Abschnitt Values
Ihrer local.settings.json
hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen durch Ihre eigenen Werte:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Aktualisieren Sie den Namen der Zieltabelle im Bindungsabschnitt:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Aktualisieren Sie die Device
-Klasse und den Zuordnungsabschnitt so, dass sie Ihrem eigenen Schema entsprechen:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in Visual Studio Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. SSMS kann verwendet werden, um die Konnektivität zu überprüfen. Senden Sie dann POST-Anforderungen an den lokalen Endpunkt. Eine Anforderung mit leerem Text sollte http 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Ersetzungs-/Aktualisierungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Die Funktion kann jetzt in Azure veröffentlicht werden. Für SqlConnectionString
sollte eine Anwendungseinstellung festgelegt sein. Die Azure SQL Server-Firewall sollte eingehende Verbindungen mit Azure-Diensten zulassen, damit die Livefunktion sie erreichen kann.
Die Funktion kann dann als Ausgabe im ASA-Auftrag definiert und verwendet werden, um Datensätze zu ersetzen, anstatt sie einzufügen.
Option 2: Zusammenführen mit Verbundzuweisung (Akkumulation) über eine benutzerdefinierte SQL-Abfrage
Hinweis
Nach Neustart und Wiederherstellung sendet ASA möglicherweise Ausgabeereignisse erneut, die bereits ausgegeben wurden. Dies ist ein erwartetes Verhalten, das zu einem Fehler bei der Akkumulationslogik führen kann (Verdoppelung einzelner Werte). Damit dies verhindert wird, empfiehlt es sich, die gleichen Daten über die native ASA-SQL-Ausgabe in eine Tabelle auszugeben. Diese Steuertabelle kann dann verwendet werden, um Probleme zu erkennen und die Akkumulation bei Bedarf erneut zu synchronisieren.
Diese Option verwendet Microsoft.Data.SqlClient. Mit dieser Bibliothek können wir beliebige SQL-Abfragen an eine SQL-Datenbank ausgeben.
Dieses Beispiel basiert auf:
Erstellen Sie zunächst eine standardmäßige HttpTrigger-Funktions-App gemäß diesem Tutorial. Die folgenden Informationen werden verwendet:
- Sprache:
C#
- Runtime:
.NET 6
(unter function/runtime v4) - Vorlage:
HTTP trigger
Installieren Sie die SqlClient-Bibliothek, indem Sie den folgenden Befehl in einem Terminal ausführen, das sich im Projektordner befindet:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Fügen Sie das SqlConnectionString
-Element im Abschnitt Values
Ihrer local.settings.json
hinzu, und setzen Sie die Verbindungszeichenfolge des Zielservers ein:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersetzen Sie die gesamte Funktion (CS-Datei im Projekt) durch den folgenden Codeausschnitt. Aktualisieren Sie den Namespace, den Klassennamen und den Funktionsnamen durch Ihre eigenen Werte:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Aktualisieren Sie den Abschnitt zur sqltext
-Befehlserstellung so, dass er Ihrem eigenen Schema entspricht (beachten Sie, wie Akkumulation durch den +=
-Operator bei der Aktualisierung erreicht wird):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Sie können jetzt die Verkabelung zwischen der lokalen Funktion und der Datenbank durch Debuggen testen (F5 in VS Code). Die SQL-Datenbank muss von Ihrem Computer aus erreichbar sein. SSMS kann verwendet werden, um die Konnektivität zu überprüfen. Übermitteln Sie dann POST-Anforderungen an den lokalen Endpunkt. Eine Anforderung mit leerem Text sollte http 204 zurückgeben. Eine Anforderung mit einer tatsächlichen Nutzlast sollte in der Zieltabelle (im Akkumulations-/Zusammenführungsmodus) persistiert werden. Hier sehen Sie eine Beispielnutzlast, die dem in diesem Beispiel verwendeten Schema entspricht:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Die Funktion kann jetzt in Azure veröffentlicht werden. Für SqlConnectionString
sollte eine Anwendungseinstellung festgelegt sein. Die Azure SQL Server-Firewall sollte eingehende Verbindungen mit Azure-Diensten zulassen, damit die Livefunktion sie erreichen kann.
Die Funktion kann dann als Ausgabe im ASA-Auftrag definiert und verwendet werden, um Datensätze zu ersetzen, anstatt sie einzufügen.
Alternativen
Abgesehen von Azure Functions gibt es mehrere Möglichkeiten, das erwartete Ergebnis zu erzielen. Dieser Abschnitt enthält einige davon.
Nachverarbeitung in der SQL-Zieldatenbank
Eine Hintergrundaufgabe wird ausgeführt, sobald die Daten über die ASA-SQL-Standardausgaben in die Datenbank eingefügt wurden.
Bei Azure SQL können INSTEAD OF
DML-Trigger verwendet werden, um die von ASA ausgegebenen INSERT-Befehle abzufangen.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Bei Synapse SQL kann ASA in eine Stagingtabelle einfügen. Eine wiederkehrende Aufgabe kann die Daten dann bei Bedarf in eine Zwischentabelle transformieren. Schließlich werden die Daten in die Produktionstabelle verschoben.
Vorverarbeitung in Azure Cosmos DB
Azure Cosmos DB unterstützt UPSERT nativ. Hier ist nur Anfügen/Ersetzen möglich. Akkumulationen müssen clientseitig in Azure Cosmos DB verwaltet werden.
Wenn die Anforderungen übereinstimmen, ist eine Option, die SQL-Zieldatenbank durch eine Azure Cosmos DB-Instanz zu ersetzen. Dies erfordert eine wichtige Änderung der Gesamtarchitektur der Lösung.
Für Synapse SQL kann Azure Cosmos DB über Azure Synapse Link für Azure Cosmos DB als zwischengeschaltete Ebene verwendet werden. Azure Synapse Link kann verwendet werden, um einen Analysespeicher zu erstellen. Dieser Datenspeicher kann dann direkt in Synapse SQL abgefragt werden.
Vergleich der Alternativen
Jeder Ansatz bietet verschiedene Nutzenversprechen und Funktionen:
type | Option | Modi | Azure SQL-Datenbank | Azure Synapse Analytics |
---|---|---|---|---|
Nachbearbeitung | ||||
Auslöser | Ersetzen, Akkumulieren | + | N/V; Trigger stehen in Synapse SQL nicht zur Verfügung | |
Staging | Ersetzen, Akkumulieren | + | + | |
Vorverarbeitung | ||||
Azure-Funktionen | Ersetzen, Akkumulieren | + | – (Zeilenbasierte Leistung) | |
Azure Cosmos DB-Ersetzung | Replace | – | – | |
Azure Cosmos DB Azure Synapse Link | Ersetzen von | – | + |
Support
Weitere Unterstützung finden Sie auf der Frageseite von Microsoft Q&A (Fragen und Antworten) zu Azure Stream Analytics.
Nächste Schritte
- Grundlegendes zu den Ausgaben von Azure Stream Analytics
- Azure Stream Analytics-Ausgabe an Azure SQL-Datenbank
- Erhöhen der Durchsatzleistung zu Azure SQL-Datenbank aus Azure Stream Analytics
- Zugreifen auf Azure SQL-Datenbank oder Azure Synapse Analytics mit verwalteten Identitäten aus einem Azure Stream Analytics-Auftrag
- Verwenden von Verweisdaten aus einer SQL-Datenbank für einen Azure Stream Analytics-Auftrag
- Ausführen von Azure Functions in Azure Stream Analytics-Aufträgen – Tutorial für Redis-Ausgabe
- Schnellstart: Erstellen eines Stream Analytics-Auftrags mithilfe des Azure-Portals