Integrieren von Azure Functions in Azure Data Explorer mithilfe von Eingabe- und Ausgabebindungen (Vorschau)

Wichtig

Dieser Connector kann in Real-Time Intelligence in Microsoft Fabric verwendet werden. Verwenden Sie die Anweisungen in diesem Artikel mit den folgenden Ausnahmen:

Mit Azure Functions können Sie serverlosen Code in der Cloud nach einem Zeitplan oder als Reaktion auf ein Ereignis ausführen. Mit Azure Data Explorer-Eingabebindungen und -Ausgabebindungen für Azure Functions können Sie Azure Data Explorer in Ihre Workflows integrieren, um Daten zu erfassen und Abfragen für Ihren Cluster auszuführen.

Voraussetzungen

Testen Sie die Integration mit dem neuen Beispielprojekt.

Verwenden von Azure Data Explorer-Bindungen für Azure Functions

Informationen zur Verwendung von Azure Data Explorer-Bindungen für Azure Functions finden Sie in den folgenden Artikeln:

Szenarios für die Verwendung von Azure Data Explorer-Bindungen für Azure Functions

In den folgenden Abschnitten werden einige gängige Szenarios für die Verwendung von Azure Data Explorer-Bindungen für Azure Functions beschrieben.

Eingabebindungen

Eingabebindungen führen eine KQL-Abfrage (Kusto Query Language ) oder KQL-Funktion aus (optional mit Parametern) und gibt die Ausgabe an die Funktion zurück.

In den folgenden Abschnitten wird beschrieben, wie Eingabebindungen in einigen gängigen Szenarios verwendet werden.

Szenario 1: HTTP-Endpunkt zum Abfragen von Daten aus einem Cluster

Die Verwendung von Eingabebindungen ist in Situationen nützlich, in denen Sie Azure Data Explorer-Daten über eine REST-API verfügbar machen müssen. In diesem Szenario verwenden Sie einen Azure Functions-HTTP-Trigger, um Daten in Ihrem Cluster abzufragen. Das Szenario ist besonders nützlich in Situationen, in denen Sie programmgesteuerten Zugriff auf Azure Data Explorer-Daten für externe Anwendungen oder Dienste bereitstellen müssen. Wenn Sie Ihre Daten über eine REST-API verfügbar machen, können Anwendungen die Daten problemlos nutzen, ohne dass sie eine direkte Verbindung mit Ihrem Cluster herstellen müssen.

Der Code definiert eine Funktion mit einem HTTP-Trigger und einer Azure Data Explorer-Eingabebindung. Die Eingabebindung gibt die Abfrage an, die für die Tabelle Products in der Datenbank productsdb ausgeführt werden soll. Die Funktion verwendet die productId-Spalte als Prädikat, das als Parameter übergeben wird.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Die Funktion kann dann wie folgt aufgerufen werden:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Szenario 2: Geplanter Trigger zum Exportieren von Daten aus einem Cluster

Das folgende Szenario gilt in Situationen, in denen Daten in einem zeitbasierten Zeitplan exportiert werden müssen.

Der Code definiert eine Funktion mit einem Timertrigger, der eine Aggregation von Verkaufsdaten aus der Datenbank productsdb in eine CSV-Datei in Azure Blob Storage exportiert.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Ausgabebindungen

Ausgabebindungen verwenden eine oder mehrere Zeilen und fügen sie in eine Azure Data Explorer-Tabelle ein.

In den folgenden Abschnitten wird beschrieben, wie Ausgabebindungen in einigen gängigen Szenarios verwendet werden.

Szenario 1: HTTP-Endpunkt zum Erfassen von Daten in einem Cluster

Das folgende Szenario gilt in Situationen, in denen eingehende HTTP-Anforderungen verarbeitet und in Ihrem Cluster erfasst werden müssen. Mithilfe einer Ausgabebindung können eingehende Daten von der Anforderung in Azure Data Explorer-Tabellen geschrieben werden.

Der Code definiert eine Funktion mit einem HTTP-Trigger und einer Azure Data Explorer-Ausgabebindung. Diese Funktion verwendet JSON-Nutzdaten im HTTP-Anforderungstext und schreibt sie in die products-Tabelle in der productsdb-Datenbank.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Die Funktion kann dann wie folgt aufgerufen werden:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Szenario 2: Erfassen von Daten aus RabbitMQ oder anderen Messagingsystemen, die in Azure unterstützt werden

Das folgende Szenario gilt in Situationen, in denen Daten aus einem Messagingsystem in Ihrem Cluster erfasst werden müssen. Mithilfe einer Ausgabebindung können eingehende Daten aus dem Messagingsystem in Azure Data Explorer-Tabellen erfasst werden.

Der Code definiert eine Funktion mit Nachrichten und Daten im JSON-Format, die über einen RabbitMQ-Trigger eingehen und in der Tabelle products in der Datenbank productsdb erfasst werden.

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Weitere Informationen zu Funktionen finden Sie in der Azure Functions-Dokumentation. Die Azure Data Explorer-Erweiterung ist verfügbar unter: