На схеме эталонной архитектуры представлен сквозной конвейер обработки потоков данных. Конвейер такого типа состоит из четырех этапов: прием, обработка, сохранение, анализ и создание отчета. В этой эталонной архитектуре конвейер принимает данные из двух источников, объединяет связанные записи из каждого потока, дополняет результат и вычисляет среднее значение в реальном времени. Результаты сохраняются для дальнейшего анализа.
Эталонная реализация этой архитектуры доступна на сайте GitHub.
Архитектура
Скачайте файл Visio этой архитектуры.
Рабочий процесс
Она состоит из следующих компонентов:
Источники данных. В этой архитектуре существует два источника данных, которые создают потоки данных в реальном времени. Первый поток содержит сведения о поездке, а второй — о тарифах. В эталонной архитектуре есть имитированный генератор данных, который считывает данные из набора статических файлов и отправляет данные в Центры событий. В реальном приложении источниками данных будут устройства, установленные в такси.
Центры событий Azure. Центры событий — это служба приема событий. В этой архитектуре используется два экземпляра службы — по одной на каждый источник данных. Каждый источник данных отправляет поток данных в соответствующую службу.
Azure Databricks. Databricks — это платформа аналитики на основе Apache Spark, оптимизированная для платформы облачных служб Microsoft Azure. Databricks используется для корреляции данных о поездке на такси и тарифах, а также для дополнения коррелированных данных сведениями об округе, хранящимися в файловой системе Databricks.
Azure Cosmos DB. Выходные данные задания Azure Databricks — это серия записей, которые записываются в Azure Cosmos DB для Apache Cassandra. Azure Cosmos DB для Apache Cassandra используется, так как он поддерживает моделирование данных временных рядов.
- Azure Synapse Link для Azure Cosmos DB позволяет практически в реальном времени выполнять аналитику по операционным данным в Azure Cosmos DB без какого-либо снижения производительности или повышения затрат на транзакционную рабочую нагрузку. Для этого применяются два аналитических модуля, доступные в рабочей области Azure Synapse: бессерверная служба SQL и пулы Spark.
Azure Log Analytics. Данные журнала приложений, собранные Azure Monitor, хранятся в рабочей области Log Analytics. Вы можете использовать запросы Log Analytics для анализа и визуализации метрик, а также просмотра сообщений журнала, чтобы выявить проблемы в приложении.
Альтернативные варианты
- Synapse Link — это предпочтительное решение Майкрософт для аналитики на основе данных Azure Cosmos DB.
Подробности сценария
Сценарий. Компания, предоставляющая услуги такси, собирает данные о каждой поездке в такси. В этом сценарии предполагается, что данные отправляются с двух отдельных устройств. Такси имеет метр, который отправляет информацию о каждой поездке — длительность, расстояние и посадку и раскрывающиеся места. Отдельное устройство принимает платежи от клиентов и отправляет данные о тарифах. Чтобы определить тенденции роста пассажиропотока, компании нужно для каждого округа вычислить среднюю сумму чаевых на милю в реальном времени.
Потенциальные варианты использования
Это решение оптимизировано для розничной отрасли.
Прием данных
Для имитации источника данных в этой эталонной архитектуре используется набор данных[1]New York City Taxi Data (Данные о поездках в такси в Нью-Йорке). Этот набор данных содержит данные о поездках по такси в Нью-Йорке в течение четырехлетнего периода (2010 –2013). В нем представлены два типа записей: данные о поездках и тарифах. Данные о поездках включают длительность поездки, расстояние поездки, а также место сбора и удаления. Данные о тарифах включают сведения о тарифе, налоге и сумме чаевых. В обоих типах записей есть стандартные поля: номер медальона, лицензия на право вождения и код организации. Вместе эти три поля позволяют уникально идентифицировать такси и водителя. Данные хранятся в формате CSV.
[1] Донован, Брайан; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Иллинойсский университет в Урбане-Шампейне. https://doi.org/10.13012/J8PN93H8
Генератор данных — это приложение .NET Core, которое считывает записи и отправляет их в Центры событий Azure. Генератор отправляет данные о поездке в формате JSON, а данные о тарифах — в формате CSV.
Для сегментации данных Центры событий используют секции. Они позволяют объекту-получателю считывать данные каждой секции параллельно. При отправке данных в Центры событий можно явно указать ключ секции. В противном случае записи назначаются секциям методом циклического перебора.
В этом примере данные о поездках и тарифах должны в итоге иметь одинаковый идентификатор секции для определенного такси. Это позволит Databricks применить определенную степень параллелизма при корреляции двух потоков. Запись в секции n с данными о поездке будет соответствовать записи в секции n с данными о тарифах.
Скачайте файл Visio этой архитектуры.
В генераторе данных общая модель данных для обоих типов записей имеет свойство PartitionKey
, в котором объединены Medallion
, HackLicense
и VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Это свойство используется для явного предоставления ключа секции при отправке в Центры событий:
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
Пропускная способность Центров событий вычисляется в единицах пропускной способности. Вы можете автоматически масштабировать концентратор событий, включив автоматическое расширение. Это позволит автоматически масштабировать единицы пропускной способности в зависимости от трафика вплоть до заданного максимума.
Потоковая обработка
В Azure Databricks обработка данных осуществляется с помощью задания. Задание назначается определенному кластеру и выполняется в нем. Задание может представлять собой пользовательский код на Java или записную книжку Spark.
В этой эталонной архитектуре задание представляет собой архив Java с классами, написанными на Java и Scala. Указывая архив Java для задания Databricks, нужно выбрать класс для выполнения в кластере Databricks. main
Здесь метод com.microsoft.pnp.TaxiCabReader
класса содержит логику обработки данных.
Считывание потоков данных из двух экземпляров концентратора событий
Для считывания данных из двух экземпляров концентратора событий Azure в логике обработки данных используется Spark Structured Streaming.
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Дополнение данных сведениями об округе
Данные о поездке включают координаты широты и долготы и сведения о местах посадки и высадки. Хотя эти координаты и полезны, их неудобно использовать для анализа. Таким образом, эти данные дополняются сведениями об округе, считанными из файла фигуры.
Файл фигуры имеет двоичный формат, что усложняет анализ. Но в библиотеке GeoTools представлены средства для работы с геопространственными данными в формате файла фигуры. Эта библиотека используется в com.microsoft.pnp.GeoFinder
классе для определения имени района на основе координат сбора и удаления.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Объединение данных о поездках и тарифах
Сначала выполняется преобразование данных о поездках и тарифах:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
После этого происходит объединение данных о поездке с данными о тарифах:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Обработка данных и вставка в Azure Cosmos DB
Вычисляется средний тариф для каждого округа за определенный интервал времени:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Затем он вставляется в Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Рекомендации
Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая является набором руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.
Безопасность
Безопасность обеспечивает гарантии от преднамеренного нападения и злоупотребления ценными данными и системами. Дополнительные сведения см. в разделе "Общие сведения о компоненте безопасности".
Доступ к рабочей области Azure Databricks управляется с помощью консоли администрирования. В консоли администрирования предусмотрены функции для добавления пользователей, управления их разрешениями и настройки единого входа. Также в ней можно настроить управление доступом для рабочих областей, кластеров, заданий и таблиц.
управление секретами;
Azure Databricks включает в себя хранилище секретов, которое используется для хранения учетных данных и ссылки на них в записных книжках и заданиях. Секреты в хранилище секретов Azure Databricks секционируются по областям:
databricks secrets create-scope --scope "azure-databricks-job"
Секреты добавляются на уровне области:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Примечание.
Область, поддерживаемая Azure Key Vault, должна использоваться вместо собственной области Azure Databricks. Дополнительные сведения см. в документации по областям, поддерживаемым Azure Key Vault.
В коде для получения доступа к секретам используются соответствующие служебные программы Azure Databricks.
Наблюдение
Платформа Azure Databricks создана на основе Apache Spark и тоже использует log4j в качестве стандартной библиотеки для ведения журналов. Помимо ведения журнала по умолчанию, предоставленного Apache Spark, вы можете реализовать ведение журнала в Azure Log Analytics после статьи "Мониторинг Azure Databricks".
com.microsoft.pnp.TaxiCabReader
Так как класс обрабатывает сообщения о поездках и тарифах, возможно, что любой из них может быть неправильно сформирован и поэтому недопустим. В рабочей среде нужно проанализировать такие сообщения, чтобы выявить проблему с источниками данных и быстро решить ее для предотвращения потери данных. Класс com.microsoft.pnp.TaxiCabReader
регистрирует накопитель Apache Spark, который отслеживает количество неправильно сформированных тарифов и записей о поездках:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Для отправки метрик Apache Spark использует библиотеку Dropwizard. Но некоторые стандартные поля метрик Dropwizard несовместимы с Azure Log Analytics. Поэтому в эталонной архитектуре предусмотрены пользовательские приемник и средство формирования отчетов Dropwizard. Это позволяет преобразовать метрики в формат, требуемый для Azure Log Analytics. Apache Spark передает метрики вместе с пользовательскими метриками для данных о поездках и тарифах неправильного формата.
Ниже приведены примеры запросов, которые можно использовать в рабочей области Azure Log Analytics для мониторинга выполнения задания потоковой передачи. Аргумент ago(1d)
в каждом запросе возвращает все записи, созданные в последний день, и их можно настроить для просмотра другого периода времени.
Исключения, зарегистрированные при выполнении запроса потоковой передачи
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Сбор данных неправильного формата о поездках и тарифах
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Выполнение задания с течением времени
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Дополнительные сведения см. в статье Мониторинг в Azure Databricks.
DevOps
Создайте отдельные группы ресурсов для рабочей среды, сред разработки и тестирования. Так будет проще управлять развертываниями, удалять тестовые развертывания и назначать права доступа.
Используйте шаблон Azure Resource Manager для развертывания ресурсов Azure после инфраструктуры в виде кода (IaC). Благодаря шаблонам автоматизация развертываний с помощью Azure DevOps Services или других решений CI/CD упрощается.
Поместите каждую рабочую нагрузку в отдельный шаблон развертывания и сохраните ресурсы в системах управления версиями. Вы можете развернуть шаблоны вместе или отдельно в рамках процесса CI/CD, что упрощает процесс автоматизации.
В этой архитектуре Центры событий Azure, Log Analytics и Azure Cosmos DB определяются как одна рабочая нагрузка. Эти ресурсы включены в один шаблон ARM.
Рассмотрите возможность промежуточного хранения рабочих нагрузок. Развертывание на различных этапах и выполнение проверок на каждом этапе перед переходом к следующему этапу. Таким образом вы можете отправлять обновления в рабочие среды с высокой степенью контроля и свести к минимуму непредвиденные проблемы с развертыванием.
В этой архитектуре существует несколько этапов развертывания. Рассмотрите возможность создания конвейера Azure DevOps и добавления этих этапов. Ниже приведены некоторые примеры этапов, которые можно автоматизировать:
- Запуск кластера Databricks
- Настройка интерфейса командной строки Databricks
- Установка средств Scala
- Добавление секретов Databricks
Кроме того, рассмотрите возможность написания автоматизированных тестов интеграции для повышения качества и надежности кода Databricks и его жизненного цикла.
Рассмотрите возможность использования Azure Monitor для анализа производительности конвейера потоковой обработки. Дополнительные сведения см. в статье Мониторинг в Azure Databricks.
Дополнительные сведения см. в разделе DevOps в Microsoft Azure Well-Architected Framework.
Оптимизация затрат
Оптимизация затрат заключается в поиске способов уменьшения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в разделе Обзор критерия "Оптимизация затрат".
Для оценки затрат используйте калькулятор цен Azure. Ниже приведены некоторые рекомендации по службам, используемым в этой эталонной архитектуре.
Event Hubs
Эта эталонная архитектура развертывает центры событий на уровне "Стандартный". Модель ценообразования основана на единицах пропускной способности, событиях входящего трафика и записи событий. Входящее событие — это блок данных размером 64 КБ или меньше. Оплата зависит от размера сообщений (одно событие до 64 КБ). Единицы пропускной способности указываются через API управления портал Azure или Центрами событий.
Если требуется больше дней хранения, рассмотрите уровень "Выделенный". Этот уровень предлагает развертывания с одним клиентом с наиболее требовательными требованиями. Это предложение создает кластер на основе единиц емкости (CU), которые не привязаны к единицам пропускной способности.
Плата за уровень "Стандартный" также взимается на основе событий входящего трафика и единиц пропускной способности.
Сведения о ценах на Центры событий см. в ценах на Центры событий.
Azure Databricks
Azure Databricks предлагает два уровня "Стандартный" и "Премиум" поддерживают три рабочих нагрузки. Эта эталонная архитектура развертывает рабочую область Azure Databricks на уровне "Премиум ".
Инжиниринг данных и Инжиниринг данных рабочие нагрузки Light предназначены для инженеров данных для создания и выполнения заданий. Рабочая нагрузка аналитики данных предназначена для специалистов по обработке и анализу данных для интерактивного просмотра, визуализации, управления и обмена данными и аналитических сведений.
Azure Databricks предлагает множество моделей ценообразования.
План с оплатой по мере использования
Плата взимается за виртуальные машины, подготовленные в кластерах и единицах субД Databricks на основе выбранного экземпляра виртуальной машины. DBU — это единица продуктивности обработки, за которую взимается из расчета за использование в секунду. Потребление DBU зависит от размера и типа используемого экземпляра Azure Databricks. Цены будут зависеть от выбранной рабочей нагрузки и уровня.
План предварительной покупки
Вы фиксируете единицы azure Databricks (DBU) в качестве единиц фиксации Databricks (DBCU) в течение одного или трех лет. По сравнению с моделью оплаты по мере использования вы можете сэкономить до 37 %.
Дополнительные сведения см. на странице цен на Azure Databricks.
Azure Cosmos DB
В этой архитектуре серия записей записывается в Azure Cosmos DB заданием Azure Databricks. Плата взимается за резервную емкость, выраженную в единицах запросов в секунду (ЕЗ/с), используемых для выполнения операций вставки. Единица выставления счетов составляет 100 ЕЗ/с в час. Например, стоимость записи элементов размером 100 КБ составляет 50 ЕЗ/с.
Для операций записи подготовьте достаточно емкости для поддержки количества операций записи, необходимых в секунду. Вы можете увеличить подготовленную пропускную способность с помощью портала или Azure CLI перед выполнением операций записи, а затем уменьшить пропускную способность после завершения этих операций. Пропускная способность для периода записи — это минимальная пропускная способность, необходимая для заданных данных, а также пропускная способность, необходимая для операции вставки, если не выполняется другая рабочая нагрузка.
Пример анализа затрат
Предположим, что вы настраиваете значение пропускной способности в 1000 ЕЗ/с в контейнере. Он развертывается в течение 24 часов в течение 30 дней, в общей сложности 720 часов.
Контейнер выставляется по 10 единицам 100 ЕЗ/с в час за каждый час. 10 единиц в $0,008 (за 100 ЕЗ/с в час) взимается плата за 0,08 долл. США в час.
В течение 720 часов или 7200 единиц (из 100 единиц ЕЗ), вы оплачиваете $ 57,60 за месяц.
Плата за хранение также взимается за каждый ГБ, используемый для хранимых данных и индекса. Дополнительные сведения см. в модели ценообразования Azure Cosmos DB.
Используйте калькулятор емкости Azure Cosmos DB, чтобы получить быструю оценку затрат на рабочую нагрузку.
См. сведения о затратах на платформу Microsoft Azure с продуманной архитектурой.
Развертывание этого сценария
Чтобы выполнить развертывание и запуск эталонной реализации, выполните действия, описанные в файле сведений на GitHub.