Esercitazione: Eseguire Funzioni di Azure dai processi di Analisi di flusso di Azure

In questa esercitazione viene creato un processo di Analisi di flusso di Azure che legge gli eventi da Hub eventi di Azure, esegue una query sui dati dell'evento e quindi richiama una funzione di Azure, che scrive in un'istanza di cache di Azure per Redis.

Screenshot che mostra la relazione tra i servizi di Azure nella soluzione.

Nota

  • È possibile eseguire Funzioni di Azure da Analisi di flusso di Azure configurando Funzioni come uno dei sink (output) nel processo di Analisi di flusso. Funzioni offre un'esperienza di calcolo on demand guidata dagli eventi che consente di implementare il codice attivato da eventi generati nei servizi di Azure o in servizi di terze parti. La possibilità offerta da Funzioni di rispondere ai trigger la rende l'output naturale per i processi di Analisi di flusso.
  • Analisi di flusso richiama Funzioni tramite trigger HTTP. L'adattatore di output di Funzioni consente agli utenti di connettere Funzioni ad Analisi di flusso, in modo che gli eventi possano essere attivati in base alle query di Analisi di flusso.
  • La connessione a Funzioni di Azure all'interno di una rete virtuale da un processo di Analisi di flusso in esecuzione in un cluster multi-tenant non è supportata.

In questa esercitazione apprenderai a:

  • Creare un'istanza di Hub eventi di Azure
  • Creare un'istanza di Azure Cache per Redis
  • Crea una Funzione di Azure
  • Creare un processo di Analisi di flusso.
  • Configurare l'hub eventi come input e funzione come output
  • Eseguire il processo di Analisi di flusso
  • Controllare i risultati in Azure Cache per Redis

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.

Prerequisiti

Prima di iniziare, assicurarsi di aver completato i passaggi seguenti:

  • Se non si ha una sottoscrizione di Azure, creare un account gratuito.
  • Scaricare l'app generatore di eventi di chiamata telefonica, TelcoGenerator.zip dall'Area download Microsoft o ottenere il codice sorgente da GitHub.

Accedere ad Azure

Accedere al portale di Azure.

Creare un hub eventi

È necessario inviare alcuni dati di esempio a un hub eventi prima che Analisi di flusso possa analizzare il flusso di dati delle chiamate fraudolente. In questa esercitazione si inviano dati ad Azure usando Hub eventi di Azure.

Usare la procedura seguente per creare un hub eventi e inviare i dati delle chiamate a tale hub eventi:

  1. Accedere al portale di Azure.

  2. Selezionare Tutti i servizi nel menu a sinistra, selezionare Internet delle cose, passare il mouse su Hub eventi e quindi fare clic sul pulsante + (Aggiungi).

    Screenshot che mostra la pagina di creazione di Hub eventi.

  3. Nella pagina Crea spazio dei nomi, seguire questa procedura:

    1. Selezionare una sottoscrizione di Azure in cui si vuole creare l'hub eventi.

    2. In Gruppo di risorse selezionare Crea nuovo e immettere un nome per il gruppo di risorse. Lo spazio dei nomi di Hub eventi viene creato in questo gruppo di risorse.

    3. Per Nome spazio dei nomi immettere un nome univoco per lo spazio dei nomi di Hub eventi.

    4. In Località selezionare l'area in cui si vuole creare lo spazio dei nomi.

    5. Per Piano tariffario selezionare Standard.

    6. Selezionare Rivedi e crea nella parte inferiore della pagina.

      Screenshot che mostra la pagina Crea spazio dei nomi.

    7. Nella pagina Rivedi e crea della creazione guidata dello spazio dei nomi selezionare Crea nella parte inferiore della pagina dopo aver esaminato tutte le impostazioni.

  4. Dopo aver distribuito correttamente lo spazio dei nomi, selezionare Vai alla risorsa per passare alla pagina Spazio dei nomi di Hub eventi.

  5. Nella pagina Spazio dei nomi di Hub eventi selezionare +Hub eventi sulla barra dei comandi.

    Screenshot che mostra il pulsante Aggiungi hub eventi nella pagina Spazio dei nomi di Hub eventi.

  6. Nella pagina Crea hub eventi immettere un nome per l'hub eventi. Impostare Conteggio partizioni su 2. Usare le opzioni predefinite nelle impostazioni rimanenti e selezionare Rivedi e crea.

    Screenshot che mostra la pagina Crea hub eventi.

  7. Nella pagina Rivedi e crea selezionare Crea nella parte inferiore della pagina. Attendere il completamento della distribuzione.

Concedere l'accesso all'hub eventi e ottenere una stringa di connessione

Affinché un'applicazione possa inviare dati ad Hub eventi di Azure, è necessario che l'hub eventi abbia criteri che consentono l'accesso. I criteri di accesso generano una stringa di connessione che include informazioni di autorizzazione.

  1. Nella pagina Spazio dei nomi di Hub eventi selezionare Criteri di accesso condiviso nel menu a sinistra.

  2. Selezionare RootManageSharedAccessKey nell'elenco dei criteri.

    Screenshot che mostra la pagina Criteri di accesso condiviso.

  3. Selezionare quindi il pulsante Copia accanto a Stringa di connessione - chiave primaria.

  4. Incollare la stringa di connessione in un editor di testo. La stringa sarà necessaria nella sezione seguente.

    La stringa di connessione è simile a quanto segue:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Si noti che il stringa di connessione contiene più coppie chiave-valore separate da punti e virgola: Endpoint, SharedAccessKeyName e SharedAccessKey.

Avviare l'applicazione generatore eventi

Prima di avviare l'app TelcoGenerator, configurarla per inviare i dati all'istanza di Hub eventi di Azure creata in precedenza.

  1. Estrarre il contenuto del file TelcoGenerator.zip.

  2. Aprire il TelcoGenerator\TelcoGenerator\telcodatagen.exe.config file in un editor di testo di propria scelta C'è più di un .config file, quindi assicurarsi di aprire quello corretto.

  3. Aggiornare l'elemento <appSettings> nel file di configurazione con i dettagli seguenti:

    • Impostare il valore della chiave EventHubName sul valore di EntityPath alla fine del stringa di connessione.
    • Impostare il valore della chiave Microsoft.ServiceBus.ConnectionString sul stringa di connessione sullo spazio dei nomi . Se si usa un stringa di connessione per un hub eventi, non uno spazio dei nomi, rimuovere EntityPath il valore (;EntityPath=myeventhub) alla fine. Non dimenticare di rimuovere il punto e virgola che precede il valore entityPath.
  4. Salvare il file.

  5. Aprire quindi una finestra di comando e passare alla cartella in cui è stata decompressa l'app TelcoGenerator. Immettere quindi il comando seguente:

    .\telcodatagen.exe 1000 0.2 2
    

    Questo comando accetta i parametri seguenti:

    • Numero di record di dati delle chiamate all'ora.
    • Percentuale di probabilità di frode, ovvero frequenza con cui l'app deve simulare una chiamata fraudolenta. Il valore 0,2 indica che circa il 20% dei record di chiamata sembra fraudolento.
    • Durata in ore, ovvero numero di ore per cui l'app deve essere eseguita. È anche possibile arrestare l'app in qualsiasi momento terminando il processo (CTRL+C) dalla riga di comando.

    Dopo alcuni secondi, l'app inizierà a visualizzare i record delle chiamate mentre ne esegue l'invio all'hub eventi. I dati delle telefonate contengono i campi seguenti:

    Record Definizione
    CallrecTime Timestamp dell'ora di inizio della chiamata.
    SwitchNum Commutatore telefonico usato per la connessione della chiamata. Per questo esempio i commutatori sono stringhe che rappresentano il paese/area di origine (Stati Uniti, Cina, Regno Unito, Germania o Australia).
    CallingNum Numero di telefono del chiamante.
    CallingIMSI Codice IMSI (International Mobile Subscriber Identity). Identificatore univoco del chiamante.
    CalledNum Numero di telefono del destinatario della chiamata.
    CalledIMSI Codice IMSI (International Mobile Subscriber Identity). Identificatore univoco del destinatario della chiamata.

Creare un processo di Analisi di flusso.

Dopo aver creato un flusso di eventi di chiamata, è possibile creare un processo di Analisi di flusso che legge i dati dall'hub eventi.

  1. Per creare un processo di Analisi di flusso, passare al portale di Azure.
  2. Selezionare Crea una risorsa e cercare Processo di Analisi di flusso. Selezionare il riquadro Processo di Analisi di flusso e quindi Crea.
  3. Nella pagina Nuovo processo di Analisi di flusso seguire questa procedura:
    1. Per Sottoscrizione selezionare la sottoscrizione che contiene lo spazio dei nomi di Hub eventi.

    2. In Gruppo di risorse selezionare il gruppo di risorse creato in precedenza.

    3. Nella sezione Dettagli istanza immettere un nome univoco per il processo di Analisi di flusso.

    4. In Area selezionare l'area in cui si vuole creare il processo di Analisi di flusso. È consigliabile inserire il processo e l'hub eventi nella stessa area per ottenere prestazioni ottimali e in modo da non pagare per trasferire i dati tra aree.

    5. Per Ambiente di hosting< selezionare Cloud se non è già selezionato. Per la distribuzione dei processi di Analisi di flusso è possibile scegliere tra Cloud o Edge. Il cloud consente di eseguire la distribuzione nel cloud di Azure e Edge consente di eseguire la distribuzione in un dispositivo IoT Edge.

    6. Per Unità di streaming selezionare 1. Le unità di streaming rappresentano le risorse di calcolo necessarie per eseguire un processo. Il valore predefinito di questa impostazione è 1. Per informazioni sul ridimensionamento delle unità di streaming, vedere Informazioni sulle unità di flusso e su come modificarle.

    7. Selezionare Rivedi e crea nella parte inferiore della pagina.

      Screenshot che mostra la pagina Crea processo di Analisi di flusso di Azure.

  4. Nella pagina Rivedi e crea rivedere le impostazioni e quindi selezionare Crea per creare il processo di Analisi di flusso.
  5. Dopo aver distribuito il processo, selezionare Vai alla risorsa per passare alla pagina del processo di Analisi di flusso.

Configurare l'input del processo

Il passaggio successivo consiste nel definire un'origine di input da cui il processo può leggere i dati usando l'hub eventi creato nella sezione precedente.

  1. Nella pagina processo di Analisi di flusso, nella sezione Topologia processo del menu a sinistra selezionare Input.

  2. Nella pagina Input selezionare + Aggiungi input e Hub eventi.

    Screenshot che mostra la pagina Input per un processo di Analisi di flusso.

  3. Nella pagina Hub eventi seguire questa procedura:

    1. Per Alias di input immettere CallStream. L'alias di input è un nome descrittivo per identificare l'input. L'alias di input può contenere solo caratteri alfanumerici, trattini e caratteri di sottolineatura e deve avere una lunghezza compresa tra 3 e 63 caratteri.

    2. Per Sottoscrizione selezionare la sottoscrizione di Azure in cui è stato creato l'hub eventi. L'hub eventi può trovarsi nella stessa sottoscrizione del processo di Analisi di flusso o in una sottoscrizione diversa.

    3. Per Spazio dei nomi di Hub eventi selezionare lo spazio dei nomi di Hub eventi creato nella sezione precedente. Tutti gli spazi dei nomi disponibili nella sottoscrizione corrente sono elencati nell'elenco a discesa.

    4. Per Nome hub eventi selezionare l'hub eventi creato nella sezione precedente. Tutti gli hub eventi disponibili nello spazio dei nomi selezionato sono elencati nell'elenco a discesa.

    5. Per gruppo di consumer dell'hub eventi mantenere selezionata l'opzione Crea nuova in modo che venga creato un nuovo gruppo di consumer nell'hub eventi. È consigliabile usare un gruppo di consumer distinto per ogni processo di Analisi di flusso. Se non viene specificato alcun gruppo di consumer, il processo di Analisi di flusso usa il $Default gruppo di consumer. Quando un processo contiene un self-join o ha più input, alcuni input potrebbero essere letti da più di un lettore. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer.

    6. Per Modalità di autenticazione selezionare Stringa di connessione. È più semplice testare l'esercitazione con questa opzione.

    7. Per Nome criterio hub eventi selezionare Usa esistente e quindi selezionare il criterio creato in precedenza.

    8. Selezionare Salva nella parte inferiore della pagina.

      Screenshot che mostra la pagina di configurazione di Hub eventi per un input.

Creare un'istanza di Azure Cache per Redis

  1. Creare una cache in Azure Cache per Redis seguendo la procedura descritta in Creare una cache.

  2. Dopo aver creato la cache, in Impostazioniselezionare Chiavi di accesso. Annotare la Stringa di connessione primaria.

    Screenshot che mostra la selezione della voce di menu Chiave di scelta.

Creare una funzione in Funzioni di Azure che possa scrivere dati in Azure Cache per Redis

  1. Vedere la sezione Creare un'app per le funzioni della documentazione relativa a Funzioni. Questo esempio è stato compilato in:

  2. Creare un'app per le funzioni HttpTrigger predefinita in Visual Studio Code seguendo questa esercitazione. Vengono usate le informazioni seguenti: linguaggio: C#, runtime: .NET 6 (in funzione v4), modello: HTTP trigger.

  3. Installare la libreria client Redis eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Aggiungere gli RedisConnectionString elementi e RedisDatabaseIndex nella Values sezione di local.settings.json, compilando il stringa di connessione del server di destinazione:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    L'indice del database Redis è il numero compreso tra 0 e 15 che identifica il database nell'istanza di .

  5. Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    Quando Analisi di flusso riceve l'eccezione "Entità richiesta HTTP troppo grande" dalla funzione, riduce le dimensioni dei batch inviati alle funzioni. Il codice seguente verifica che Analisi di flusso non invii batch troppo grandi. Assicurarsi che i valori relativi alle dimensioni massime e al numero massimo di batch usati nella funzione siano conformi ai valori inseriti nel portale di Analisi di flusso.

  6. È ora possibile pubblicare la funzione in Azure.

  7. Aprire la funzione nel portale di Azure e impostare le impostazioni dell'applicazione per RedisConnectionString e RedisDatabaseIndex.

Aggiornare il processo di Analisi di flusso con la funzione come output

  1. Nel portale di Azure aprire il processo di Analisi di flusso.

  2. Passare alla funzione e selezionare Panoramica>Output>Aggiungi. Per aggiungere un nuovo output, selezionare Funzione di Azure per l'opzione di sink. L'adattatore di output di Funzioni ha le proprietà seguenti:

    Nome proprietà Descrizione
    Alias di output Nome descrittivo usato nella query del processo per fare riferimento all'output.
    Opzione di importazione È possibile usare la funzione dalla sottoscrizione corrente oppure specificare manualmente le impostazioni se la funzione si trova in un'altra sottoscrizione.
    App per le funzioni Nome dell'app Funzioni.
    Funzione Nome della funzione nell'app Funzioni (nome della funzione di run.csx).
    Dimensioni massime batch Imposta le dimensioni massime, in byte, per ogni batch di output inviato alla funzione. Per impostazione predefinita, questo valore è impostato su 262.144 byte (256 KB).
    Numero massimo di batch Specifica il numero massimo di eventi in ogni batch inviato alla funzione. Il valore predefinito è 100. Questa proprietà è facoltativa.
    Chiave Consente di usare una funzione di un'altra sottoscrizione. Specificare il valore della chiave per accedere alla funzione. Questa proprietà è facoltativa.
  3. Specificare un nome per l'alias di output. In questa esercitazione è denominato saop1, ma è possibile usare qualsiasi nome di propria scelta. Specificare gli altri dettagli.

  4. Aprire il processo di Analisi di flusso e aggiornare la query nel modo seguente.

    Importante

    Lo script di esempio seguente presuppone che sia stato usato CallStream per il nome di input e saop1 per il nome di output. Se sono stati usati nomi diversi, non dimenticare di aggiornare la query.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Avviare l'applicazione telcodatagen.exe eseguendo il comando seguente nella riga di comando. Il comando usa il formato telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Avviare il processo di Analisi di flusso.

  7. Nella pagina Monitoraggio per la funzione di Azure si noterà che la funzione viene richiamata.

    Screenshot che mostra la pagina Monitoraggio per Funzioni di Azure con chiamate di funzione.

  8. Nella pagina cache di Azure per Redis cache selezionare Metriche nel menu a sinistra, aggiungere la metrica di scrittura della cache e impostare la durata sull'ultima ora. Viene visualizzato il grafico simile all'immagine seguente.

    Screenshot che mostra la pagina Metriche per il cache di Azure per Redis.

Controllare i risultati in Azure Cache per Redis

Ottenere la chiave dai log di Funzioni di Azure

Ottenere prima di tutto la chiave per un record inserito in cache di Azure per Redis. Nel codice la chiave viene calcolata nella funzione di Azure, come illustrato nel frammento di codice seguente:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Passare alla portale di Azure e trovare l'app Funzioni di Azure.

  2. Selezionare Funzioni nel menu sinistro.

  3. Selezionare HTTPTrigger1 nell'elenco delle funzioni.

  4. Selezionare Monitoraggio nel menu a sinistra.

  5. Passare alla scheda Log .

  6. Prendere nota di una chiave dal messaggio informativo, come illustrato nello screenshot seguente. Usare questa chiave per trovare il valore in cache di Azure per Redis.

    Screenshot che mostra la pagina Monitoraggio log per la funzione di Azure.

Usare la chiave per trovare il record in cache di Azure per Redis

  1. Accedere al portale di Azure e individuare Azure Cache per Redis. Selezionare Console.

  2. Usare i comandi di Azure Cache per Redis per verificare che i dati siano in Azure Cache per Redis. Il comando accetta il formato Get {key}.) Usare la chiave copiata dai log di monitoraggio per la funzione di Azure (nella sezione precedente).

    Ottenere "KEY-FROM-THE-PREVIOUS-SECTION"

    Questo comando dovrebbe visualizzare il valore relativo alla chiave specificata:

    Screenshot che mostra la console di Cache Redis che mostra l'output del comando Get.

Gestione degli errori e tentativi

Se si verifica un errore durante l'invio di eventi a Funzioni di Azure, Analisi di flusso ritenta la maggior parte delle operazioni. Tutte le eccezioni HTTP vengono ritentate fino all'esito positivo, ad eccezione dell'errore HTTP 413 (entità troppo grande). Un errore di tipo entità troppo grande viene considerato un errore di dati soggetto al criterio Riprova o Rimuovi.

Nota

Il timeout per le richieste HTTP da Analisi di flusso a Funzioni di Azure è impostato su 100 secondi. Se l'app Funzioni di Azure richiede più di 100 secondi per elaborare un batch, analisi di flusso si verifica un errore e riprova per il batch.

La ripetizione dei tentativi per i timeout potrebbe comportare eventi duplicati scritti nel sink di output. Quando Analisi di flusso riprova a inviare un batch non riuscito, ripete il tentativo per tutti gli eventi del batch. Si consideri ad esempio un batch di 20 eventi inviati a Funzioni di Azure da Analisi di flusso. Si supponga che Funzioni di Azure impieghi 100 secondi per elaborare i primi 10 eventi del batch. Dopo 100 secondi, Analisi di flusso sospende la richiesta perché non ha ricevuto una risposta positiva da Funzioni di Azure e viene inviata un'altra richiesta per lo stesso batch. I primi 10 eventi del batch vengono elaborati di nuovi da Funzioni di Azure, generando un duplicato.

Problemi noti

Nel portale di Azure, quando si tenta di reimpostare il valore di Dimensioni massime batch/Numero massimo di batch su un valore vuoto (impostazione predefinita), al momento del salvataggio il valore viene reimpostato sul valore immesso in precedenza. In questo caso, immettere manualmente i valori predefiniti per questi campi.

L'uso del routing HTTP in Funzioni di Azure non è attualmente supportato da Analisi di flusso.

Il supporto per la connessione a Funzioni di Azure ospitato in una rete virtuale non è abilitato.

Pulire le risorse

Quando non sono più necessari, eliminare il gruppo di risorse, il processo di streaming e tutte le risorse correlate. Eliminando il processo si evita di pagare per le unità di streaming usate dal processo. Se si prevede di usare il processo in futuro, è possibile arrestarlo e riavviarlo in un secondo momento, quando è necessario. Se non si intende continuare a usare questo processo, eliminare tutte le risorse create da questa guida introduttiva attenendosi alla procedura seguente:

  1. Scegliere Gruppi di risorse dal menu a sinistra del portale di Azure e quindi selezionare il nome della risorsa creata.
  2. Nella pagina del gruppo di risorse selezionare Elimina, digitare il nome della risorsa da eliminare nella casella di testo e quindi selezionare Elimina.

Passaggi successivi

In questa esercitazione è stato creato un semplice processo di Analisi di flusso che esegue una funzione di Azure. Per altre informazioni sui processi di Analisi di flusso, continuare con l'esercitazione successiva: