Bu başvuru mimarisi uçtan uca akış işleme işlem hattını gösterir. Bu işlem hattı türünün dört aşaması vardır: alma, işleme, depolama ve analiz ve raporlama. Bu başvuru mimarisi için işlem hattı iki kaynaktan veri alır, her akıştan ilgili kayıtlarda birleştirme gerçekleştirir, sonucu zenginleştirir ve gerçek zamanlı olarak bir ortalama hesaplar. Sonuçlar daha fazla analiz için depolanır.
Bu mimari için bir başvuru uygulaması GitHub'da kullanılabilir.
Mimari
Bu mimarinin Visio dosyasını indirin.
İş Akışı
Mimari aşağıdaki bileşenlerden oluşur:
Veri kaynakları. Bu mimaride, gerçek zamanlı olarak veri akışları oluşturan iki veri kaynağı vardır. İlk akış sürüş bilgilerini, ikinci akış ise ücret bilgilerini içerir. Başvuru mimarisi, bir dizi statik dosyadan okuyan ve verileri Event Hubs'a gönderen bir sanal veri oluşturucu içerir. Gerçek bir uygulamadaki veri kaynakları, taksilere yüklenen cihazlar olacaktır.
Azure Event Hubs. Event Hubs bir olay alma hizmetidir. Bu mimaride her veri kaynağı için bir tane olan iki olay hub'ı örneği kullanılır. Her veri kaynağı ilişkili olay hub'ına bir veri akışı gönderir.
Azure Databricks. Databricks , Microsoft Azure bulut hizmetleri platformu için iyileştirilmiş Apache Spark tabanlı bir analiz platformudur. Databricks, taksi yolculuğu ve ücret verilerini ilişkilendirmek ve ayrıca ilişkili verileri Databricks dosya sisteminde depolanan mahalle verileriyle zenginleştirmek için kullanılır.
Azure Cosmos DB. Azure Databricks işinin çıktısı, Apache Cassandra için Azure Cosmos DB'ye yazılan bir dizi kayıttır. Apache Cassandra için Azure Cosmos DB, zaman serisi veri modellemeyi desteklediği için kullanılır.
- Azure Cosmos DB için Azure Synapse Link, Azure Synapse çalışma alanınızda bulunan iki analiz altyapısını kullanarak işlem iş yükünüz üzerinde herhangi bir performans veya maliyet etkisi olmadan Azure Cosmos DB'deki operasyonel veriler üzerinde neredeyse gerçek zamanlı analiz çalıştırmanıza olanak tanır: SQL Sunucusuz ve Spark Havuzları.
Azure Log Analytics. Azure İzleyici tarafından toplanan uygulama günlüğü verileri Log Analytics çalışma alanında depolanır. Log Analytics sorguları, ölçümleri analiz etmek ve görselleştirmek ve uygulamadaki sorunları belirlemek için günlük iletilerini incelemek için kullanılabilir.
Alternatifler
- Synapse Link , Azure Cosmos DB verilerinin üzerinde analiz için Microsoft tarafından tercih edilen çözümdür.
Senaryo ayrıntıları
Senaryo: Bir taksi şirketi her taksi yolculuğu hakkında veri toplar. Bu senaryo için veri gönderen iki ayrı cihaz olduğunu varsayıyoruz. Taksinin her yolculukla ilgili bilgi gönderen bir ölçümü vardır: süre, mesafe ve teslim alma ve bırakma konumları. Ayrı bir cihaz müşterilerden gelen ödemeleri kabul eder ve ücretler hakkında veri gönderir. Binicilik eğilimlerini tespit etmek için taksi şirketi, her mahalle için gerçek zamanlı olarak kilometre başına ortalama ipucunu hesaplamak istiyor.
Olası kullanım örnekleri
Bu çözüm perakende sektörü için optimize edilmiştir.
Veri alımı
Bir veri kaynağının benzetimini yapmak için bu başvuru mimarisi New York City Taxi Data veri kümesini kullanır[1]. Bu veri kümesi, dört yıllık bir süre boyunca (2010 – 2013) New York'taki taksi yolculukları hakkındaki verileri içerir. İki tür kayıt içerir: Sürüş verileri ve ücret verileri. Yolculuk verileri seyahat süresini, seyahat mesafesini ve teslim alma ve bırakma konumunu içerir. Ücret verileri, ücret, vergi ve ipucu tutarlarını içerir. Her iki kayıt türündeki yaygın alanlar arasında madalyon numarası, hack lisansı ve satıcı kimliği yer alır. Bu üç alan birlikte benzersiz olarak bir taksi ve bir sürücü tanımlar. Veriler CSV biçiminde depolanır.
[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Urbana-Champaign'deki Illinois Üniversitesi. https://doi.org/10.13012/J8PN93H8
Veri oluşturucu, kayıtları okuyan ve Azure Event Hubs'a gönderen bir .NET Core uygulamasıdır. Oluşturucu, sürüş verilerini JSON biçiminde ve ücret verilerini CSV biçiminde gönderir.
Event Hubs, verileri segmentlere ayırmak için bölümleri kullanır. Bölümler, tüketicinin her bölümü paralel okumasına olanak sağlar. Event Hubs'a veri gönderdiğinizde bölüm anahtarını açıkça belirtebilirsiniz. Aksi takdirde, kayıtlar bölümlere hepsini bir kez deneme biçiminde atanır.
Bu senaryoda, sürüş verileri ve ücret verileri belirli bir taksi için aynı bölüm kimliğine sahip olmalıdır. Bu, Databricks'in iki akışla bağıntılı olduğunda bir paralellik derecesi uygulamasına olanak tanır. Yolculuk verilerinin n. bölümündeki bir kayıt, ücret verilerinin n. bölümündeki bir kayıtla eşleşecektir.
Bu mimarinin Visio dosyasını indirin.
Veri oluşturucuda, her iki kayıt türünün ortak veri modelinin , ve birleştirmesi Medallion
olan bir PartitionKey
özelliği VendorId
vardır. HackLicense
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Bu özellik Event Hubs'a gönderirken açık bir bölüm anahtarı sağlamak için kullanılır:
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
Event Hubs'ın aktarım hızı kapasitesi aktarım hızı birimleriyle ölçülür. Otomatik şişirme özelliğini etkinleştirerek olay hub'ını otomatik olarak ölçeklendirerek aktarım hızı birimlerini trafiğe göre yapılandırılan en yüksek değere kadar ölçeklendirin.
Akış işleme
Azure Databricks'te veri işleme bir iş tarafından gerçekleştirilir. İş bir kümeye atanır ve bu kümede çalışır. İş Java ile yazılmış özel kod veya Spark not defteri olabilir.
Bu başvuru mimarisinde iş, hem Java hem de Scala ile yazılmış sınıflara sahip bir Java arşividir. Databricks işi için Java arşivi belirtilirken, sınıf Databricks kümesi tarafından yürütülmeye yönelik olarak belirtilir. Burada sınıfının main
yöntemi com.microsoft.pnp.TaxiCabReader
veri işleme mantığını içerir.
İki olay hub'ı örneğinden akışı okuma
Veri işleme mantığı, iki Azure olay hub'ı örneğinden okumak için Spark yapılandırılmış akışı kullanır:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Verileri mahalle bilgileriyle zenginleştirme
Sürüş verileri, alma ve bırakma konumlarının enlem ve boylam koordinatlarını içerir. Bu koordinatlar yararlı olsa da, analiz için kolayca tüketilmemektedir. Bu nedenle, bu veriler bir şekil dosyasından okunan mahalle verileriyle zenginleştirilmiştir.
Şekil dosyası biçimi ikilidir ve kolayca ayrıştırılmaz, ancak GeoTools kitaplığı şekil dosyası biçimini kullanan jeo-uzamsal veriler için araçlar sağlar. Bu kitaplık, alma ve bırakma koordinatlarına göre mahalle adını belirlemek için sınıfında kullanılır com.microsoft.pnp.GeoFinder
.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Yolculuk ve ücret verilerine katılma
İlk olarak yolculuk ve ücret verileri dönüştürülür:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Ardından yolculuk verileri, ücret verileriyle birleştirilir:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Verileri işleme ve Azure Cosmos DB'ye ekleme
Her mahalle için ortalama ücret miktarı belirli bir zaman aralığı için hesaplanır:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Ardından Azure Cosmos DB'ye eklenir:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Dikkat edilmesi gereken noktalar
Bu önemli noktalar, bir iş yükünün kalitesini artırmak için kullanılabilecek bir dizi yol gösteren ilke olan Azure İyi Tasarlanmış Çerçeve'nin yapı taşlarını uygular. Daha fazla bilgi için bkz . Microsoft Azure İyi Tasarlanmış Çerçeve.
Güvenlik
Güvenlik, kasıtlı saldırılara ve değerli verilerinizin ve sistemlerinizin kötüye kullanılmasına karşı güvence sağlar. Daha fazla bilgi için bkz . Güvenlik sütununa genel bakış.
Azure Databricks çalışma alanına erişim, yönetici konsolu kullanılarak denetlenmektedir. Yönetici konsolu, kullanıcı ekleme, kullanıcı izinlerini yönetme ve çoklu oturum açma ayarlama işlevleri içerir. Çalışma alanları, kümeler, işler ve tablolar için erişim denetimi de yönetici konsolu aracılığıyla ayarlanabilir.
Gizli dizileri yönetme
Azure Databricks, kimlik bilgilerini depolamak ve bunlara not defterlerinde ve işlerde başvurmak için kullanılan bir gizli dizi deposu içerir. Azure Databricks gizli dizi deposundaki gizli diziler kapsamlara göre bölümlenir:
databricks secrets create-scope --scope "azure-databricks-job"
Gizli diziler kapsam düzeyinde eklenir:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Not
Yerel Azure Databricks kapsamı yerine Azure Key Vault destekli bir kapsam kullanılmalıdır. Daha fazla bilgi edinmek için bkz . Azure Key Vault destekli kapsamlar.
Kodda gizli dizilere Azure Databricks gizli dizi yardımcı programları aracılığıyla erişilir.
İzleme
Azure Databricks Apache Spark'ı temel alır ve her ikisi de günlük kaydı için standart kitaplık olarak log4j kullanır. Apache Spark tarafından sağlanan varsayılan günlüğe kaydetmeye ek olarak, Azure Databricks'i İzleme makalesini izleyerek Azure Log Analytics'te günlüğe kaydetme uygulayabilirsiniz.
com.microsoft.pnp.TaxiCabReader
Sınıf sürüş ve ücret iletilerini işlerken, birinin hatalı biçimlendirilmiş ve bu nedenle geçerli olmaması mümkündür. Üretim ortamında bu hatalı biçimlendirilmiş iletileri analiz ederek veri kaynaklarıyla ilgili bir sorunu belirlemek önemlidir, böylece veri kaybını önlemek için hızlı bir şekilde düzeltilebilir. sınıfı, com.microsoft.pnp.TaxiCabReader
hatalı biçimlendirilmiş ücret ve sürüş kayıtlarının sayısını izleyen bir Apache Spark Akümülatör kaydeder:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark ölçümleri göndermek için Dropwizard kitaplığını kullanır ve yerel Dropwizard ölçüm alanlarından bazıları Azure Log Analytics ile uyumlu değildir. Bu nedenle, bu başvuru mimarisi özel bir Dropwizard havuzu ve muhabir içerir. Ölçümleri Azure Log Analytics tarafından beklenen biçimde biçimlendirilir. Apache Spark ölçümleri raporladığında, hatalı biçimlendirilmiş yolculuk ve ücret verileri için özel ölçümler de gönderilir.
Aşağıda, akış işinin yürütülmesini izlemek için Azure Log Analytics çalışma alanınızda kullanabileceğiniz örnek sorgular verilmiştir. Her sorgudaki bağımsız değişken ago(1d)
, son gün içinde oluşturulan tüm kayıtları döndürür ve farklı bir zaman aralığını görüntülemek için ayarlanabilir.
Akış sorgusu yürütme sırasında günlüğe kaydedilen özel durumlar
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Hatalı biçimlendirilmiş ücret ve sürüş verilerinin birikmesi
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Zaman içinde iş yürütme
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Daha fazla bilgi için bkz . Azure Databricks'i izleme.
DevOps
Üretim, geliştirme ve test ortamları için ayrı kaynak grupları oluşturun. Ayrı kaynak grupları dağıtımları yönetmeyi, test dağıtımlarını silmeyi ve erişim haklarını atamayı kolaylaştırır.
Kod Olarak Altyapı (IaC) İşlemi'ni izleyen Azure kaynaklarını dağıtmak için Azure Resource Manager şablonunu kullanın. Şablonlarla, Azure DevOps Services veya diğer CI/CD çözümlerini kullanarak dağıtımları otomatikleştirmek daha kolaydır.
Her iş yükünü ayrı bir dağıtım şablonuna yerleştirin ve kaynakları kaynak denetim sistemlerinde depolayın. Şablonları bir CI/CD işleminin parçası olarak birlikte veya tek tek dağıtarak otomasyon sürecini kolaylaştırabilirsiniz.
Bu mimaride Azure Event Hubs, Log Analytics ve Azure Cosmos DB tek bir iş yükü olarak tanımlanır. Bu kaynaklar tek bir ARM şablonuna eklenir.
İş yüklerinizi hazırlamayı göz önünde bulundurun. Çeşitli aşamalara dağıtın ve sonraki aşamaya geçmeden önce her aşamada doğrulama denetimleri çalıştırın. Bu şekilde, güncelleştirmeleri üretim ortamlarınıza yüksek denetimli bir şekilde gönderebilir ve tahmin edilmeyen dağıtım sorunlarını en aza indirebilirsiniz.
Bu mimaride birden çok dağıtım aşaması vardır. Azure DevOps İşlem Hattı oluşturmayı ve bu aşamaları eklemeyi göz önünde bulundurun. Aşağıda, otomatikleştirebileceğiniz bazı aşama örnekleri verilmiştir:
- Databricks Kümesi Başlatma
- Databricks CLI'yi yapılandırma
- Scala Araçları'nı yükleme
- Databricks gizli dizilerini ekleme
Ayrıca Databricks kodunun ve yaşam döngüsünün kalitesini ve güvenilirliğini artırmak için otomatik tümleştirme testleri yazmayı göz önünde bulundurun.
Akış işleme işlem hattınızın performansını analiz etmek için Azure İzleyici'yi kullanmayı göz önünde bulundurun. Daha fazla bilgi için bkz . Azure Databricks'i izleme.
Daha fazla bilgi için Microsoft Azure İyi Tasarlanmış Çerçeve'deki DevOps bölümüne bakın.
Maliyet iyileştirme
Maliyet iyileştirmesi, gereksiz giderleri azaltmanın ve operasyonel verimlilikleri iyileştirmenin yollarını aramaktır. Daha fazla bilgi için bkz . Maliyet iyileştirme sütununa genel bakış.
Maliyetleri tahmin etmek için Azure fiyatlandırma hesaplayıcısını kullanın. Bu başvuru mimarisinde kullanılan hizmetlerle ilgili dikkat edilmesi gereken bazı noktalar aşağıdadır.
Event Hubs
Bu başvuru mimarisi, Event Hubs'ı Standart katmanda dağıtır. Fiyatlandırma modeli aktarım hızı birimlerini, giriş olaylarını ve yakalama olaylarını temel alır. Bir giriş olayı 64 KB veya daha küçük bir veri birimidir. Daha büyük iletiler 64 KB'ın katları şeklinde faturalandırılır. Aktarım hızı birimlerini Azure portalı veya Event Hubs yönetim API'leri aracılığıyla belirtirsiniz.
Daha fazla bekletme gününe ihtiyacınız varsa Ayrılmış katmanı göz önünde bulundurun. Bu katman, en zorlu gereksinimlere sahip tek kiracılı dağıtımlar sunar. Bu teklif, aktarım hızı birimlerine bağlı olmayan kapasite birimlerini (CU) temel alan bir küme oluşturur.
Standart katman, giriş olayları ve aktarım hızı birimlerine göre de faturalandırılır.
Event Hubs fiyatlandırması hakkında bilgi için bkz . Event Hubs fiyatlandırması.
Azure Databricks
Azure Databricks iki katman sunar Standart ve Premium her üç iş yükünü destekler. Bu başvuru mimarisi, Premium katmanında Azure Databricks çalışma alanını dağıtır.
Veri Madenciliği ve Veri Madenciliği Light iş yükleri, veri mühendislerinin iş oluşturup yürütmesine yöneliktir. Veri Analizi iş yükü, veri bilim adamlarının verileri ve içgörüleri etkileşimli olarak keşfetmesine, görselleştirmesine, işlemesine ve paylaşmasına yöneliktir.
Azure Databricks birçok fiyatlandırma modeli sunar.
Kullandıkça öde planı
Seçilen VM örneğine göre kümelerde ve Databricks Birimlerinde (DBU) sağlanan sanal makineler (VM) için faturalandırılırsınız. DBU, saniye başına kullanımda faturalandırılan bir işlem birimidir. DBU tüketimi, Azure Databricks çalıştıran örneğin boyutuna ve türüne bağlıdır. Fiyatlandırma, seçilen iş yüküne ve katmana bağlıdır.
Ön satın alma planı
Azure Databricks Birimleri'ne (DBU) bir veya üç yıl boyunca Databricks İşleme Birimleri (DBCU) olarak taahhütte bulunursunuz. Kullandıkça öde modeline kıyasla %37'ye kadar tasarruf edebilirsiniz.
Daha fazla bilgi için bkz . Azure Databricks Fiyatlandırması.
Azure Cosmos DB
Bu mimaride, Azure Databricks işi tarafından Azure Cosmos DB'ye bir dizi kayıt yazılır. Ekleme işlemlerini gerçekleştirmek için kullanılan, saniye başına İstek Birimleri (RU/sn) cinsinden belirtilen, ayırdığınız kapasite için ücretlendirilirsiniz. Faturalama birimi saatte 100 RU/sn'dir. Örneğin, 100 KB öğe yazmanın maliyeti 50 RU/sn'dir.
Yazma işlemleri için, saniye başına gereken yazma sayısını desteklemek için yeterli kapasite sağlayın. Yazma işlemlerini gerçekleştirmeden önce portalı veya Azure CLI'yı kullanarak sağlanan aktarım hızını artırabilir ve bu işlemler tamamlandıktan sonra aktarım hızını azaltabilirsiniz. Yazma süresi için aktarım hızınız, verilen veriler için gereken en düşük aktarım hızına ek olarak başka bir iş yükünün çalışmadığını varsayarak ekleme işlemi için gereken aktarım hızıdır.
Örnek maliyet analizi
Kapsayıcıda 1.000 RU/sn aktarım hızı değeri yapılandırdığınız varsayılır. 30 gün boyunca 24 saat, toplam 720 saat dağıtılır.
Kapsayıcı, her saat için saatte 100 RU/sn'lik 10 birim olarak faturalandırılır. 0,008 ABD doları (saatte 100 RU/sn başına) olan 10 birim, saatte 0,08 ABD doları ücretlendirilir.
720 saat veya 7.200 birim (100 RU)için aylık 57,60 ABD doları faturalandırılırsınız.
Depolama, depolanan verileriniz ve dizininiz için kullanılan her GB için de faturalandırılır. Daha fazla bilgi için bkz . Azure Cosmos DB fiyatlandırma modeli.
İş yükü maliyetini hızlı bir şekilde tahmin etmek için Azure Cosmos DB kapasite hesaplayıcısını kullanın.
Daha fazla bilgi için Microsoft Azure İyi Oluşturulmuş Mimari Çerçevesi makalesindeki maliyet bölümüne bakın.
Bu senaryoyu dağıtın
Başvuru uygulamasını dağıtmak ve çalıştırmak için GitHub benioku dosyasındaki adımları izleyin.