Creare una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando C\# (Anteprima)
Esplora dati di Azure Synapse è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure Synapse offre funzionalità di inserimento (caricamento dei dati) da hub eventi, hub IoT e BLOB scritti nei contenitori BLOB.
In questo articolo viene creata una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando C\#.
Prerequisiti
Una sottoscrizione di Azure. Creare un account Azure gratuito.
Creare un pool di Esplora dati usando Synapse Studio o il portale di Azure
Creare un database di Esplora dati.
Nel riquadro sinistro di Synapse Studio selezionare Dati.
Selezionare + (Aggiungi nuova risorsa) >Pool Esplora dati e usare le informazioni seguenti:
Impostazione Valore suggerito Descrizione Nome pool contosodataexplorer Nome del pool Esplora dati da usare Nome TestDatabase Il nome del database deve essere univoco all'interno del cluster. Periodo di conservazione predefinito 365 Intervallo di tempo (in giorni) per cui è garantito che i dati rimangano disponibili per le query. L'intervallo di tempo viene misurato dal momento in cui i dati vengono inseriti. Periodo cache predefinito 31 L'intervallo di tempo (in giorni) per cui mantenere i dati sottoposti frequentemente a query disponibili nell'archiviazione su unità SSD o nella RAM, invece che nell'archiviazione a lungo termine. Selezionare Crea per creare il database. Per la creazione è in genere necessario meno di un minuto.
Nota
L'inserimento di dati da un hub eventi nei pool Esplora dati non sarà possibile se l'area di lavoro di Synapse usa una rete virtuale gestita con protezione dall'esfiltrazione di dati abilitata.
- Visual Studio 2019, scaricare e usare l'edizione gratuitaVisual Studio 2019 Community Edition. Abilitare lo sviluppo di Azure durante la configurazione di Visual Studio.
Creare una tabella nel cluster di prova
Creare una tabella denominata StormEvents
che corrisponda allo schema dei dati nel file StormEvents.csv
.
Suggerimento
I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono di tipo rientrante e devono essere mantenute a condizione che sia necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definire il mapping di inserimento
Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella. Effettuare il provisioning di un oggetto di mapping di colonne CSV su tale tabella.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installare NuGet per C#
- Installare il pacchetto Microsoft.Azure.Management.Kusto NuGet.
Autenticazione
Per eseguire l'esempio seguente, sono necessarie un'applicazione Microsoft Entra e un'entità servizio che possano accedere alle risorse. Per creare un'applicazione Microsoft Entra gratuita e aggiungere un'assegnazione di ruolo a livello di sottoscrizione, vedere Creare un'applicazione Microsoft Entra. Sono necessari anche l'ID directory (tenant), l'ID applicazione e il segreto client.
Aggiungere una connessione dati dell'hub eventi
L'esempio seguente illustra come aggiungere una connessione dati dell'hub eventi a livello programmatico. Per informazioni sull'aggiunta di una connessione dati all'hub eventi tramite il portale di Azure, vedere Connettersi all'hub eventi.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Impostazione | Valore consigliato | Descrizione campo |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID tenant. Noto anche come ID directory. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | L'ID sottoscrizione usato per la creazione di risorse. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | L'ID client dell'applicazione che può accedere alle risorse nel tenant. |
clientSecret | xxxxxxxxxxxxxx | Il segreto client dell'applicazione che può accedere alle risorse nel tenant. |
resourceGroupName | testrg | Nome del gruppo di risorse contenente il cluster. |
clusterName | mykustocluster | Nome del cluster. |
databaseName | mykustodatabase | Nome del database di destinazione nel cluster. |
dataConnectionName | myeventhubconnect | Nome desiderato della connessione dati. |
tableName | StormEvents | Nome della tabella di destinazione nel database di destinazione. |
mappingRuleName | StormEvents_CSV_Mapping | Nome del mapping delle colonne correlato alla tabella di destinazione. |
dataFormat | csv | Formato dati del messaggio. |
eventHubResourceId | ID risorsa | ID risorsa dell'hub eventi che contiene i dati per l'inserimento. |
consumerGroup | $Default | Gruppo di consumer dell'hub eventi. |
posizione | Stati Uniti centrali | Posizione della risorsa di connessione dati. |
compressione | Gzip o Nessuno | Tipo di compressione dei dati. |
Genera dati
Vedere l'app di esempio che genera i dati e li invia a un hub eventi.
Un evento può contenere uno o più record, fino al limite di dimensioni. Nell'esempio seguente vengono inviati due eventi, ognuno con cinque record accodati:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Pulire le risorse
Per eliminare la connessione dati, usare il comando seguente:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);