Creare una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando C\# (Anteprima)

Esplora dati di Azure Synapse è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure Synapse offre funzionalità di inserimento (caricamento dei dati) da hub eventi, hub IoT e BLOB scritti nei contenitori BLOB.

In questo articolo viene creata una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando C\#.

Prerequisiti

  • Una sottoscrizione di Azure. Creare un account Azure gratuito.

  • Creare un pool di Esplora dati usando Synapse Studio o il portale di Azure

  • Creare un database di Esplora dati.

    1. Nel riquadro sinistro di Synapse Studio selezionare Dati.

    2. Selezionare + (Aggiungi nuova risorsa) >Pool Esplora dati e usare le informazioni seguenti:

      Impostazione Valore suggerito Descrizione
      Nome pool contosodataexplorer Nome del pool Esplora dati da usare
      Nome TestDatabase Il nome del database deve essere univoco all'interno del cluster.
      Periodo di conservazione predefinito 365 Intervallo di tempo (in giorni) per cui è garantito che i dati rimangano disponibili per le query. L'intervallo di tempo viene misurato dal momento in cui i dati vengono inseriti.
      Periodo cache predefinito 31 L'intervallo di tempo (in giorni) per cui mantenere i dati sottoposti frequentemente a query disponibili nell'archiviazione su unità SSD o nella RAM, invece che nell'archiviazione a lungo termine.
    3. Selezionare Crea per creare il database. Per la creazione è in genere necessario meno di un minuto.

Nota

L'inserimento di dati da un hub eventi nei pool Esplora dati non sarà possibile se l'area di lavoro di Synapse usa una rete virtuale gestita con protezione dall'esfiltrazione di dati abilitata.

Creare una tabella nel cluster di prova

Creare una tabella denominata StormEvents che corrisponda allo schema dei dati nel file StormEvents.csv.

Suggerimento

I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono di tipo rientrante e devono essere mantenute a condizione che sia necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definire il mapping di inserimento

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella. Effettuare il provisioning di un oggetto di mapping di colonne CSV su tale tabella.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installare NuGet per C#

Autenticazione

Per eseguire l'esempio seguente, sono necessarie un'applicazione Microsoft Entra e un'entità servizio che possano accedere alle risorse. Per creare un'applicazione Microsoft Entra gratuita e aggiungere un'assegnazione di ruolo a livello di sottoscrizione, vedere Creare un'applicazione Microsoft Entra. Sono necessari anche l'ID directory (tenant), l'ID applicazione e il segreto client.

Aggiungere una connessione dati dell'hub eventi

L'esempio seguente illustra come aggiungere una connessione dati dell'hub eventi a livello programmatico. Per informazioni sull'aggiunta di una connessione dati all'hub eventi tramite il portale di Azure, vedere Connettersi all'hub eventi.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Impostazione Valore consigliato Descrizione campo
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID tenant. Noto anche come ID directory.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx L'ID sottoscrizione usato per la creazione di risorse.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx L'ID client dell'applicazione che può accedere alle risorse nel tenant.
clientSecret xxxxxxxxxxxxxx Il segreto client dell'applicazione che può accedere alle risorse nel tenant.
resourceGroupName testrg Nome del gruppo di risorse contenente il cluster.
clusterName mykustocluster Nome del cluster.
databaseName mykustodatabase Nome del database di destinazione nel cluster.
dataConnectionName myeventhubconnect Nome desiderato della connessione dati.
tableName StormEvents Nome della tabella di destinazione nel database di destinazione.
mappingRuleName StormEvents_CSV_Mapping Nome del mapping delle colonne correlato alla tabella di destinazione.
dataFormat csv Formato dati del messaggio.
eventHubResourceId ID risorsa ID risorsa dell'hub eventi che contiene i dati per l'inserimento.
consumerGroup $Default Gruppo di consumer dell'hub eventi.
posizione Stati Uniti centrali Posizione della risorsa di connessione dati.
compressione Gzip o Nessuno Tipo di compressione dei dati.

Genera dati

Vedere l'app di esempio che genera i dati e li invia a un hub eventi.

Un evento può contenere uno o più record, fino al limite di dimensioni. Nell'esempio seguente vengono inviati due eventi, ognuno con cinque record accodati:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Pulire le risorse

Per eliminare la connessione dati, usare il comando seguente:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Passaggi successivi