Kurz: Spouštění azure Functions z úloh Azure Stream Analytics
V tomto kurzu vytvoříte úlohu Azure Stream Analytics, která čte události ze služby Azure Event Hubs, spustí dotaz na data události a potom vyvolá funkci Azure, která zapisuje do instance Azure Cache for Redis.
Poznámka:
- Azure Functions můžete spustit z Azure Stream Analytics tak, že nakonfigurujete službu Functions jako jednu z jímek (výstupů) do úlohy Stream Analytics. Služba Functions je událostmi řízené prostředí s výpočty na vyžádání, které umožňuje implementaci kódu aktivovaného událostmi, ke kterým dochází v Azure nebo ve službách třetích stran. Díky schopnosti reagovat na triggery je služba Functions přirozeným výstupem pro úlohy Azure Stream Analytics.
- Stream Analytics volá službu Functions prostřednictvím triggerů HTTP. Adaptér pro výstup služby Functions umožňuje uživatelům připojit Functions k Stream Analytics tak, že události lze aktivovat na základě dotazů Stream Analytics.
- Připojení ke službě Azure Functions ve virtuální síti (VNet) z úlohy Stream Analytics spuštěné v clusteru s více tenanty se nepodporuje.
V tomto kurzu se naučíte:
- Vytvoření instance služby Azure Event Hubs
- Vytvoření instance Azure Cache for Redis
- Vytvoření funkce Azure
- Vytvoření úlohy Stream Analytics
- Konfigurace centra událostí jako vstupu a funkce jako výstupu
- Spuštění úlohy Stream Analytics
- Kontrola výsledků ve službě Azure Cache for Redis
Pokud ještě nemáte předplatné Azure, vytvořte si bezplatný účet před tím, než začnete.
Požadavky
Než začnete, ujistěte se, že jste dokončili následující kroky:
- Pokud nemáte předplatné Azure, vytvořte si bezplatný účet.
- Stáhněte si aplikaci generátoru událostí telefonního hovoru, TelcoGenerator.zip z webu Microsoft Download Center nebo získejte zdrojový kód z GitHubu.
Přihlášení k Azure
Přihlaste se k portálu Azure.
Vytvoření centra událostí
Než Stream Analytics dokáže analyzovat datový proud podvodných volání, musíte do centra událostí odeslat nějaká ukázková data. V tomto kurzu odesíláte data do Azure pomocí služby Azure Event Hubs.
Pomocí následujícího postupu vytvořte centrum událostí a odešlete do tohoto centra událostí data volání:
Přihlaste se k portálu Azure.
V nabídce vlevo vyberte Všechny služby, vyberte Internet věcí, najeďte myší na Event Hubs a pak vyberte + (Přidat).
Na stránce Vytvořit obor názvů postupujte takto:
Vyberte předplatné Azure, ve kterém chcete centrum událostí vytvořit.
V části Skupina prostředků vyberte Vytvořit novou a zadejte název skupiny prostředků. Obor názvů služby Event Hubs se vytvoří v této skupině prostředků.
Jako název oboru názvů zadejte jedinečný název oboru názvů služby Event Hubs.
V části Umístění vyberte oblast, ve které chcete obor názvů vytvořit.
Pro cenovou úroveň vyberte Standard.
Vyberte Zkontrolovat a vytvořit v dolní části stránky.
Na stránce Zkontrolovat a vytvořit v průvodci vytvořením oboru názvů vyberte Vytvořit v dolní části stránky po kontrole všech nastavení.
Po úspěšném nasazení oboru názvů vyberte Přejít k prostředku a přejděte na stránku Oboru názvů služby Event Hubs.
Na stránce Obor názvů služby Event Hubs vyberte na panelu příkazů +Event Hub.
Na stránce Vytvořit centrum událostí zadejte název centra událostí. Nastavte počet oddílů na hodnotu 2. Ve zbývajících nastaveních použijte výchozí možnosti a vyberte Zkontrolovat a vytvořit.
Na stránce Zkontrolovat a vytvořit vyberte Vytvořit v dolní části stránky. Potom počkejte na úspěšné dokončení nasazení.
Udělení přístupu k centru událostí a získání připojovacího řetězce
Aby aplikace mohl odesílat data do služby Azure Event Hubs, musí mít centrum událostí zásadu, která umožňuje přístup. Zásady přístupu vytváří připojovací řetězec, který obsahuje informace o autorizaci.
Na stránce Obor názvů služby Event Hubs vyberte v nabídce vlevo zásady sdíleného přístupu.
V seznamu zásad vyberte RootManageSharedAccessKey .
Pak vyberte tlačítko kopírovat vedle připojovacího řetězce – primární klíč.
Vložte připojovací řetězec do textového editoru. Tento připojovací řetězec budete potřebovat v další části.
Připojovací řetězec vypadá takto:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
Všimněte si, že připojovací řetězec obsahuje několik dvojic klíč-hodnota oddělených středníky: Koncový bod, SharedAccessKeyName a SharedAccessKey.
Spuštění aplikace generátoru událostí
Před spuštěním aplikace TelcoGenerator byste ji měli nakonfigurovat tak, aby odesílala data do služby Azure Event Hubs, kterou jste vytvořili dříve.
Extrahujte obsah souboru TelcoGenerator.zip.
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
Otevřete soubor v textovém editoru podle vašeho výběru Existuje více než jeden.config
soubor, proto se ujistěte, že jste otevřeli ten správný.Aktualizujte element
<appSettings>
v konfiguračním souboru následujícím způsobem:- Nastavte hodnotu klíče EventHubName na hodnotu EntityPath na konci připojovací řetězec.
- Nastavte hodnotu klíče Microsoft.ServiceBus.ConnectionString na připojovací řetězec oboru názvů. Pokud použijete připojovací řetězec do centra událostí, nikoli oboru názvů, odeberte
EntityPath
hodnotu (;EntityPath=myeventhub
) na konci. Nezapomeňte odebrat středník, který předchází hodnotě EntityPath.
Uložte soubor.
Dále otevřete příkazové okno a přejděte do složky, do které jste extrahovali aplikaci TelcoGenerator. Potom zadejte následující příkaz:
.\telcodatagen.exe 1000 0.2 2
Tento příkaz má následující parametry:
- Počet záznamů dat volání za hodinu.
- Procento pravděpodobnosti podvodů – to znamená, jak často by měla aplikace simulovat podvodné volání. Hodnota 0,2 znamená, že přibližně 20 % záznamů volání vypadá podvodně.
- Doba v hodinách – počet hodin, po které by aplikace měla být spuštěná. Aplikaci můžete také kdykoli zastavit ukončením procesu (Ctrl+C) na příkazovém řádku.
Po několika sekundách aplikace začne zobrazovat záznamy telefonních hovorů na obrazovce, když je odešle do centra událostí. Data telefonních hovorů obsahují následující pole:
Záznam Definice CallrecTime Časové razítko pro počáteční čas volání. SwitchNum Telefonní ústředna použitá pro spojení volání. V tomto příkladu jsou přepínače řetězce, které představují zemi/oblast původu (USA, Čína, Spojené království, Německo nebo Austrálie). CallingNum Telefonní číslo volajícího. CallingIMSI IMSI (International Mobile Subscriber Identity). Jedinečný identifikátor volajícího. CalledNum Telefonní číslo příjemce volání. CalledIMSI IMSI (International Mobile Subscriber Identity). Jedinečný identifikátor příjemce volání.
Vytvoření úlohy Stream Analytics
Teď, když máte stream událostí volání, můžete vytvořit úlohu Stream Analytics, která načte data z centra událostí.
- Pokud chcete vytvořit úlohu Stream Analytics, přejděte na web Azure Portal.
- Vyberte Vytvořit prostředek a vyhledejte úlohu Stream Analytics. Vyberte dlaždici úlohy Stream Analytics a vyberte Vytvořit.
- Na stránce Nová úloha Stream Analytics postupujte takto:
V části Předplatné vyberte předplatné, které obsahuje obor názvů služby Event Hubs.
V části Skupina prostředků vyberte skupinu prostředků, kterou jste vytvořili dříve.
V části Podrobnosti instance zadejte jako název jedinečný název pro úlohu Stream Analytics.
V části Oblast vyberte oblast, ve které chcete vytvořit úlohu Stream Analytics. Doporučujeme umístit úlohu a centrum událostí do stejné oblasti, abyste mohli dosáhnout nejlepšího výkonu, abyste nemuseli platit za přenos dat mezi oblastmi.
V případě hostování prostředí< vyberte Cloud , pokud ještě není vybraný. Úlohy Stream Analytics můžete nasadit do cloudu nebo do hraničního zařízení. Cloud umožňuje nasazení do Azure Cloudu a Edge umožňuje nasazení do zařízení IoT Edge.
U jednotek streamování vyberte 1. Jednotky streamování představují výpočetní prostředky nutné k provedení úlohy. Ve výchozím nastavení je tato hodnota nastavená na 1. Podrobnosti o škálování jednotek streamování najdete v článku věnovaném principům a úpravám jednotek streamování.
Vyberte Zkontrolovat a vytvořit v dolní části stránky.
- Na stránce Zkontrolovat a vytvořit zkontrolujte nastavení a pak vyberte Vytvořit a vytvořte úlohu Stream Analytics.
- Po nasazení úlohy vyberte Přejít k prostředku a přejděte na stránku úlohy Stream Analytics.
Konfigurace vstupu úlohy
Dalším krokem je definování vstupního zdroje, ze kterého bude úloha číst data pomocí centra událostí, které jste vytvořili v předchozí části.
Na stránce úlohy Stream Analytics v části Topologie úlohy v nabídce vlevo vyberte Vstupy.
Na stránce Vstupy vyberte + Přidat vstup a centrum událostí.
Na stránce Centra událostí postupujte takto:
Jako vstupní alias zadejte CallStream. Vstupní alias je popisný název pro identifikaci vašeho vstupu. Vstupní alias může obsahovat jenom alfanumerické znaky, spojovníky a podtržítka a musí být dlouhý 3 až 63 znaků.
V části Předplatné vyberte předplatné Azure, ve kterém jste vytvořili centrum událostí. Centrum událostí může být ve stejném předplatném jako úloha Stream Analytics, ale i v jiném.
V případě oboru názvů služby Event Hubs vyberte obor názvů služby Event Hubs, který jste vytvořili v předchozí části. Všechny obory názvů dostupné v aktuálním předplatném jsou uvedené v rozevíracím seznamu.
Jako název centra událostí vyberte centrum událostí, které jste vytvořili v předchozí části. Všechna centra událostí dostupná ve vybraném oboru názvů jsou uvedená v rozevíracím seznamu.
V případě skupiny příjemců centra událostí ponechte vybranou možnost Vytvořit novou , aby se v centru událostí vytvořila nová skupina příjemců. Pro každou úlohu Stream Analytics doporučujeme použít samostatnou skupinu příjemců. Pokud není zadána žádná skupina příjemců, použije
$Default
úloha Stream Analytics skupinu příjemců. Pokud úloha obsahuje vlastní spojení nebo má více vstupů, některé vstupy mohou později přečíst více než jeden čtenář. Tato situace ovlivňuje počet čtenářů v jedné skupině příjemců.V režimu ověřování vyberte Připojovací řetězec. Tento kurz je jednodušší otestovat pomocí této možnosti.
Jako název zásady centra událostí vyberte Použít existující a pak vyberte zásadu, kterou jste vytvořili dříve.
Vyberte Uložit v dolní části stránky.
Vytvoření instance Azure Cache for Redis
Vytvořte mezipaměť ve službě Azure Cache for Redis pomocí kroků popsaných v tématu Vytvoření mezipaměti.
Po vytvoření mezipaměti v části Nastavení vyberte Přístupové klíče. Poznamenejte si primární připojovací řetězec.
Vytvoření funkce ve službě Azure Functions, která může zapisovat data do Azure Cache for Redis
Podívejte se v dokumentaci k Functions na část věnovanou vytváření aplikací funkcí. Tato ukázka byla postavena na:
- Modul runtime Azure Functions verze 4
- .NET 6.0
- StackExchange.Redis 2.2.8
Podle tohoto kurzu vytvořte výchozí aplikaci funkcí HttpTrigger v editoru Visual Studio Code. Používají se následující informace: jazyk:
C#
, modul runtime:.NET 6
(pod funkcí v4), šablona:HTTP trigger
.Nainstalujte klientskou knihovnu Redis spuštěním následujícího příkazu v terminálu umístěném ve složce projektu:
dotnet add package StackExchange.Redis --version 2.2.88
Přidejte položky
RedisConnectionString
aRedisDatabaseIndex
položky vValues
části, vyplňtelocal.settings.json
připojovací řetězec cílového serveru:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
Index databáze Redis je číslo od 0 do 15 identifikující databázi v instanci.
Nahraďte celou funkci (.cs soubor v projektu) následujícím fragmentem kódu. Aktualizujte obor názvů, název třídy a název funkce vlastním názvem:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Když Stream Analytics obdrží výjimku "Entita požadavku HTTP je příliš velká" z funkce, zmenšuje velikost dávek, které odesílá do funkcí. Následující kód zajišťuje, že Stream Analytics neodesílá příliš velké dávky. Ujistěte se, že hodnoty maximálního počtu a velikosti pro dávku, které se používají ve funkci, jsou konzistentní s hodnotami zadanými na portálu Stream Analytics.
Funkce je teď možné publikovat do Azure.
Otevřete funkci na webu Azure Portal a nastavte nastavení aplikace pro
RedisConnectionString
aRedisDatabaseIndex
.
Aktualizace úlohy Stream Analytics, ve které jako výstup bude daná funkce
Otevřete úlohu Stream Analytics na portálu Azure Portal.
Přejděte k dané funkci a vyberte Přehled>Výstupy>Přidat. Pokud chcete přidat nový výstup, vyberte funkci Azure pro možnost jímky. Výstupní adaptér služby Functions má následující vlastnosti:
Název vlastnosti Popis Alias pro výstup Popisný název, který v dotazu úlohy používáte k odkazu na výstup Možnost importu Můžete použít funkci z aktuálního předplatného nebo ručně zadat nastavení, pokud se funkce nachází v jiném předplatném. Function App Název vaší aplikace Functions Function Název funkce ve vaší aplikaci Functions (název vaší funkce run.csx) Maximální velikost dávky Nastaví maximální velikost pro každou výstupní dávku, která se odešle do vaší funkce v bajtech. Ve výchozím nastavení je tato hodnota nastavená na 262 144 bajtů (256 kB). Maximální počet v dávce Určuje maximální počet událostí v každé dávce, která se odesílá do dané funkce. Výchozí hodnota je 100. Tato vlastnost je nepovinná. Klíč Umožňuje vám použít funkci z jiného předplatného. Zadejte hodnotu klíče pro přístup k dané funkci. Tato vlastnost je nepovinná. Zadejte název aliasu pro výstup. V tomto kurzu se jmenuje saop1, ale můžete použít libovolný název. Zadejte další podrobnosti.
Otevřete úlohu Stream Analytics a aktualizujte dotaz následujícím způsobem.
Důležité
Následující ukázkový skript předpokládá, že jste pro název vstupu použili CallStream a název výstupu saop1 . Pokud jste použili jiné názvy, nezapomeňte dotaz aktualizovat.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
Spusťte telcodatagen.exe aplikaci spuštěním následujícího příkazu v příkazovém řádku. Příkaz používá formát
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.telcodatagen.exe 1000 0.2 2
Spusťte úlohu Stream Analytics.
Na stránce Monitorování funkce Azure uvidíte, že je funkce vyvolána.
Na stránce Azure Cache for Redis vyberte metriky v nabídce vlevo, přidejte metriku zápisu do mezipaměti a nastavte dobu trvání na poslední hodinu. Zobrazí se graf podobný následujícímu obrázku.
Kontrola výsledků ve službě Azure Cache for Redis
Získání klíče z protokolů Azure Functions
Nejprve získejte klíč pro záznam vložený do Azure Cache for Redis. V kódu se klíč vypočítá ve funkci Azure, jak je znázorněno v následujícím fragmentu kódu:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Přejděte na web Azure Portal a najděte svou aplikaci Azure Functions.
V nabídce vlevo vyberte Funkce .
Ze seznamu funkcí vyberte HTTPTrigger1 .
V nabídce vlevo vyberte Sledovat .
Přepněte na kartu Protokoly .
Poznamenejte si klíč z informační zprávy, jak je znázorněno na následujícím snímku obrazovky. Tento klíč použijete k vyhledání hodnoty ve službě Azure Cache for Redis.
Použití klíče k vyhledání záznamu ve službě Azure Cache for Redis
Přejděte na web Azure Portal a vyhledejte azure Cache for Redis. Vyberte Konzola.
Pomocí příkazů Azure Cache for Redis ověřte, že jsou vaše data ve službě Azure Cache for Redis. (Příkaz přebírá formát Get {key}.) Použijte klíč, který jste zkopírovali z protokolů monitorování pro funkci Azure (v předchozí části).
Získání "KEY-FROM-THE-PREVIOUS-SECTION"
Tento příkaz by měl vytisknout hodnotu pro zadaný klíč:
Zpracování chyb a opakování
Pokud dojde k selhání při odesílání událostí do Azure Functions, Stream Analytics opakuje většinu operací. Všechny výjimky HTTP se opakují, dokud nedojde k úspěchu s výjimkou chyby HTTP 413 (příliš velká entita). Příliš velká chyba entity je považována za chybu dat, která podléhá zásadě opakování nebo vyřazení.
Poznámka:
Časový limit požadavků HTTP ze Stream Analytics do Azure Functions je nastavený na 100 sekund. Pokud zpracování dávky trvá více než 100 sekund vaší aplikace Azure Functions, služba Stream Analytics dojde k chybám a opakování dávky.
Opakování časových limitů může mít za následek duplicitní události zapsané do výstupní jímky. Když Stream Analytics opakuje neúspěšnou dávku, opakuje se pro všechny události v dávce. Představte si například dávku 20 událostí, které se odesílají do Azure Functions ze Stream Analytics. Předpokládejme, že zpracování prvních 100 událostí v této dávce trvá 100 sekund. Po 100 sekundách Stream Analytics pozastaví požadavek, protože neobdržel pozitivní odpověď ze služby Azure Functions a odešle se další požadavek pro stejnou dávku. Prvních 10 událostí v dávce se znovu zpracuje službou Azure Functions, což způsobí duplikát.
Známé problémy
Když se na portálu Azure Portal provede pokus o resetování maximální velikosti dávky nebo maximálního počtu v dávce na prázdnou (výchozí) hodnotu, po uložení se tato hodnota změní zpět na dříve zadanou hodnotu. V tom případě zadejte výchozí hodnoty pro tato pole ručně.
Stream Analytics v současné době nepodporuje směrování HTTP ve službě Azure Functions.
Podpora připojení ke službě Azure Functions hostované ve virtuální síti není povolená.
Vyčištění prostředků
Odstraňte skupinu prostředků, úlohu streamování a všechny související prostředky, pokud je už nepotřebujete. Odstraněním úlohy se zabrání zaúčtování jednotek streamování, které daná úloha spotřebovává. Pokud plánujete používat tuto úlohu v budoucnu, můžete ji zastavit a znovu ji spustit později, až ji budete potřebovat. Pokud tuto úlohu nebudete dál používat, pomocí následujícího postupu odstraňte všechny prostředky vytvořené tímto rychlým startem:
- V nabídce vlevo na portálu Azure Portal vyberte Skupiny prostředků a potom zvolte název vytvořeného prostředku.
- Na stránce skupiny prostředků zvolte Odstranit, do textového pole zadejte prostředek, který chcete odstranit, a potom vyberte Odstranit.
Další kroky
V tomto kurzu jste vytvořili jednoduchou úlohu Stream Analytics, která spouští funkci Azure Functions. Další informace o úlohách Stream Analytics získáte v dalším kurzu: