Processamento de fluxo com o Azure Databricks

Azure Cosmos DB
Azure Databricks
Hubs de eventos do Azure
Azure Log Analytics
Azure Monitor

Essa arquitetura de referência mostra um pipeline de processamento de fluxo de ponta a ponta. Esse tipo de pipeline tem quatro estágios: ingerir, processar, armazenar e analisar e gerar relatórios. Para essa arquitetura de referência, o pipeline ingere dados de duas origens, realiza uma junção em registros relacionados de cada fluxo, enriquece o resultado e calcula uma média em tempo real. Os resultados são armazenados para análise posterior.

Logotipo do GitHubHá uma implantação de referência para essa arquitetura disponível no GitHub.

Arquitetura

Diagrama mostrando uma arquitetura de referência para processamento de fluxo com o Azure Databricks.

Baixe um Arquivo Visio dessa arquitetura.

Workflow

Essa arquitetura consiste nos seguintes componentes:

Fontes de dados. Nessa arquitetura há duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações da corrida, e o segundo contém informações da tarifa. A arquitetura de referência inclui um gerador de dados simulados que faz a leitura de um conjunto de arquivos estáticos e envia os dados para os Hubs de Eventos. Em um aplicativo real, as fontes de dados seriam dispositivos instalados nos táxis.

Hubs de Eventos do Azure. Hubs de Eventos são um serviço de ingestão de eventos. Essa arquitetura usa duas instâncias de hub de eventos, uma para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o hub de eventos associado.

Azure Databricks. O Databricks é uma plataforma de análise baseada no Apache Spark otimizada para a plataforma de Serviços de Nuvem do Microsoft Azure. O Databricks é usado para correlacionar os dados de corrida e de tarifa do táxi e também para enriquecer os dados correlacionados com dados armazenados sobre áreas no sistema de arquivos do Databricks.

Azure Cosmos DB. A saída do trabalho de um trabalho do Azure Databricks é uma série de registros que são gravados no Azure Cosmos DB for Apache Cassandra. O Azure Cosmos DB para Apache Cassandra é usado porque oferece suporte à modelagem de dados de série temporal.

Azure Log Analytics. Os dados de log do aplicativo coletados pelo Monitor do Azure são armazenados em um espaço de trabalho do Log Analytics. As consultas do Log Analytics podem ser usadas para analisar e visualizar métricas e inspecionar mensagens de log para identificar problemas no aplicativo.

Alternativas

  • O Link do Synapse é a solução preferencial da Microsoft para análises sobre dados do Azure Cosmos DB.

Detalhes do cenário

Cenário: uma empresa de táxi coleta dados sobre cada viagem. Para esse cenário, assumimos que há dois dispositivos separados enviando dados. O táxi tem um medidor que envia informações sobre cada corrida — duração, distância e locais de embarque e desembarque de passageiros. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. Para identificar as tendências dos passageiros, a empresa de táxi deseja calcular a média de gorjeta por quilômetro percorrido em tempo real, para cada área.

Possíveis casos de uso

Esta solução é otimizada para o setor de varejo.

Ingestão de dados

Para simular uma fonte de dados, essa arquitetura de referência usa o conjunto de dados dos Dados de táxi de Nova York[1]. Esse conjunto de dados contém dados sobre viagens de táxi em Nova York durante um período de 4 anos (2010 – 2013). Ele contém dois tipos de registro: dados de corrida e dados de tarifa. Os dados de corrida incluem a duração da viagem, a distância da viagem e os locais de embarque e desembarque de passageiros. Os dados de tarifa incluem a tarifa, impostos e quantias das gorjetas. Campos comuns em ambos os tipos de registro incluem o número da licença, carteira de habilitação e ID do fornecedor. Juntos, esses três campos fazem a identificação exclusiva de um táxi e um motorista. Os dados são armazenados no formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é um aplicativo .NET Core que lê os registros e os envia para os Hubs de Eventos do Azure. O gerador envia os dados de corrida em formato JSON e os dados de tarifa em formato CSV.

Os Hubs de Eventos usam partições para segmentar os dados. As partições permitem que um consumidor leia cada partição em paralelo. Ao enviar dados para os Hubs de Eventos, é possível especificar a chave de partição explicitamente. Caso contrário, os registros são atribuídos a partições no estilo round robin.

Nesse cenário, os dados de corrida e de tarifa devem ter a mesma ID de partição para um determinado táxi. Isso permite que o Databricks aplique um grau de paralelismo ao correlacionar os dois fluxos. Um registro na partição n dos dados de corrida corresponderá a um registro na partição n dos dados de tarifa.

Diagrama de processamento de fluxo com o Azure Databricks e os Hubs de Eventos.

Baixe um Arquivo Visio dessa arquitetura.

No gerador de dados, o modelo de dados comum para ambos os tipos de registro têm uma propriedade PartitionKey que é a concatenação de Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Essa propriedade é usada para fornecer uma chave de partição explícita ao enviar para Hubs de Eventos:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hubs de Eventos

A capacidade da taxa de transferência dos Hubs de Eventos é medida em unidades de produtividade. É possível fazer o dimensionamento automático de um hub de eventos ao permitir a inflação automática, o que dimensiona automaticamente as unidades de produtividade com base no tráfego até um número máximo configurado.

Processamento de fluxo

No Azure Databricks, o processamento de dados é executado por um trabalho. O trabalho é atribuído a um cluster e é executado nele. O trabalho pode ser o código personalizado escrito em Java ou um bloco de notas do Spark.

Nessa arquitetura de referência, o trabalho é um arquivo morto de Java com classes escritas em Java e Scala. Ao especificar o arquivo morto de Java para um trabalho do Databricks, a classe será especificada para ser executada pelo cluster do Databricks. Aqui, o main método da classe com.microsoft.pnp.TaxiCabReader contém a lógica de processamento de dados.

Ler o fluxo das duas instâncias do hub de eventos

A lógica de processamento de dados usa o fluxo estruturado do Spark para ler as duas instâncias do hub de eventos do Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Como enriquecer os dados com informações sobre a área

Os dados da corrida incluem as coordenadas de latitude e longitude dos locais de coleta e entrega. Embora essas coordenadas sejam úteis, elas não são facilmente consumidas na análise. Portanto, esses dados são enriquecidos com dados de área que são lidos de um shapefile.

O formato shapefile é binário e não é fácil analisá-lo, mas a biblioteca GeoTools fornece ferramentas para dados geoespaciais que usam o formato shapefile. Essa biblioteca é usada na classe com.microsoft.pnp.GeoFinder para determinar o nome da área com base nas coordenadas de coleta e entrega.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Adicionar os dados de corrida e de tarifa

Primeiro, os dados da corrida e da tarifa são transformados:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

E, em seguida, os dados da corrida são associados aos dados da tarifa:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Como processar dados e inseri-los no Azure Cosmos DB

O valor da tarifa média para cada ambiente é calculado para um determinado intervalo de tempo:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Portanto, qual é inserido no Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerações

Estas considerações implementam os pilares do Azure Well-Architected Framework, que é um conjunto de princípios de orientação que podem ser usados para aprimorar a qualidade de uma carga de trabalho. Para obter mais informações, confira Microsoft Azure Well-Architected Framework.

Segurança

A segurança fornece garantias contra ataques deliberados e o abuso de seus dados e sistemas valiosos. Para saber mais, confira Visão geral do pilar de segurança.

O acesso ao workspace do Azure Databricks é controlado com o console do administrador. O console do administrador inclui recursos para adicionar usuários, gerenciar permissões de usuários e configurar o logon único. O controle de acesso para workspaces, clusters, trabalhos e tabelas também pode ser definido pelo console do administrador.

Gerenciamento de segredos

O Azure Databricks inclui um repositório secreto usado para armazenar credenciais e referenciá-las em blocos de anotações e tarefas. Os segredos dentro do armazenamento secreto do Azure Databricks são particionados por escopos:

databricks secrets create-scope --scope "azure-databricks-job"

Os segredos são adicionados ao nível do escopo:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Observação

Um escopo suportado pelo Azure Key Vault deve ser usado no lugar do escopo nativo do Azure Databricks. Para obter mais informações, confira Escopos com suporte do Azure Key Vault.

No código, os segredos são acessados pelos utilitários segredos do Azure Databricks.

Monitoramento

O Azure Databricks é baseado no Apache Spark e ambos usam log4j como a biblioteca padrão para registros em log. Além do log padrão fornecido pelo Apache Spark, você pode implementar o log no Azure Log Analytics seguindo o artigo Monitoramento do Azure Databricks.

Como a classe com.microsoft.pnp.TaxiCabReader processa as mensagens de corrida e tarifa, é provável que uma delas esteja malformada e, portanto, inválida. Em um ambiente de produção, é importante analisar essas mensagens malformadas para identificar problemas com as fontes de dados para que eles possam ser corrigidos rapidamente para evitar a perda de dados. A classe com.microsoft.pnp.TaxiCabReader registra um Apache Spark Accumulator que controla o número de registros de tarifas e corridas malformadas:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

O Apache Spark usa a biblioteca Dropwizard para enviar métricas. Alguns campos nativos de métricas do Dropwizard são incompatíveis com o Azure Log Analytics. Portanto, essa arquitetura de referência inclui um coletor e um repórter personalizado do Dropwizard. Ele formata as métricas no formato esperado pelo Azure Log Analytics. Quando o Apache Spark relata as métricas, as métricas personalizadas dos dados malformados de tarifa e corrida também são enviadas.

Confira a seguir exemplos de consultas que você pode usar em seu workspace do Azure Log Analytics para monitorar a execução do trabalho de streaming. O argumento ago(1d) em cada consulta retornará todos os registros que foram gerados no último dia e poderá ser ajustado para exibir um período de tempo diferente.

Exceções registradas durante a execução de consulta de streaming

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulação de tarifas malformadas e dados da corrida

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Execução do trabalho ao longo do tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Para obter mais informações, consulte Monitorar o Azure Databricks.

DevOps

  • Crie grupos de recursos separados para ambientes de produção, desenvolvimento e teste. Ter grupos de recursos separados facilita o gerenciamento de implantações, a exclusão de implantações de teste a atribuição de direitos de acesso.

  • Use o modelo do Azure Resource Manager para implantar os recursos do Azure. Siga o processo de IaC (Infraestrutura como Código). Com os modelos, a automatização de implantações usando o Azure DevOps Services ou outras soluções de CI/CD é mais fácil.

  • Coloque cada carga de trabalho em um modelo de implantação separado e armazene os recursos em sistemas de controle do código-fonte. É possível implantar os modelos juntos ou individualmente como parte de um processo de CI/CD, facilitando o processo de automação.

    Nessa arquitetura, os Hubs de Eventos do Azure, o Log Analytics e o Azure Cosmos DB são identificados como uma única carga de trabalho. Esses recursos são incluídos em um único modelo do ARM.

  • Considere preparar suas cargas de trabalho. Implante em várias fases e execute verificações de validação em cada uma antes de passar para a próxima. Desse modo, é possível efetuar push de atualizações para ambientes de produção de maneira altamente controlada, além de minimizar problemas de implantação inesperados.

    Nesta arquitetura, há várias fases de implantação. Considere criar um Pipeline do Azure DevOps e adicionar essas fases. Aqui estão alguns exemplos de fases que você pode automatizar:

    • Iniciar um cluster do Databricks
    • Configurar a CLI do Databricks
    • Instalar ferramentas do Scala
    • Adicionar os segredos do Databricks

    Além disso, considere desenvolver testes de integração automatizados para melhorar a qualidade e a confiabilidade do código do Databricks e seu ciclo de vida.

  • Considere usar o Azure Monitor para analisar o desempenho do pipeline de processamento de fluxo. Para obter mais informações, consulte Monitorar o Azure Databricks.

Para obter mais informações, confira a seção DevOps no Microsoft Azure Well-Architected Framework.

Otimização de custo

A otimização de custos é a análise de maneiras de reduzir as despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, confira Visão geral do pilar de otimização de custo.

Use a Calculadora de Preços do Azure para estimar os custos. Confira algumas considerações sobre os serviços usados nesta arquitetura de referência.

Hubs de Eventos

Essa arquitetura de referência implanta Hubs de Eventos na camada Standard . O modelo de preços é baseado em unidades de taxa de transferência, eventos de entrada e eventos de captura. Um evento de entrada consiste em uma unidade de dados de 64 KB ou menos. As mensagens maiores são cobradas em múltiplos de 64 KB. Especifique as unidades de produtividade por meio do portal do Azure ou das APIs de gerenciamento dos Hubs de Eventos.

Se você precisar de mais dias de retenção, considere a camada Dedicados. Essa camada oferece implantações de locatário único com requisitos mais exigentes. Essa oferta cria um cluster baseado em capacidade que não é limitado por unidades de produtividade.

A camada Standard também é cobrada com base em eventos de entrada e unidades de taxa de transferência.

Para obter informações sobre os preços dos Hubs de Eventos, confira os preços dos Hubs de Eventos.

Azure Databricks

O Azure Databricks oferece duas camadas Standard e Premium, cada uma dá suporte a três cargas de trabalho. Essa arquitetura de referência implanta o workspace do Azure Databricks na camada Premium.

As cargas de trabalho Engenharia de Dados e Engenheiros de Dados Leve são para engenheiros de dados criarem e executarem trabalhos. A Análise de Dados inclui o conjunto de recursos anterior e é voltada para cientistas de dados que exploram, visualizam, manipulam e compartilham dados e insights interativamente.

O Azure Databricks oferece muitos modelos de preços.

  • Plano de pagamento conforme o uso

    Você é cobrado por VMs (máquinas virtuais) provisionadas em clusters e DBUs (Unidades do Databricks) com base na instância de VM selecionada. Uma DBU é uma unidade de capacidade de processamento, cobrada em um uso por segundo. O consumo de DBU depende do tamanho e do tipo de instância que executa o Azure Databricks. Os preços dependerão da carga de trabalho e da camada selecionadas.

  • Plano de pré-compra

    Você se compromete com as Unidades do Azure Databricks (DBU) como DBCU (Unidades de Confirmação do Databricks) por um ou três anos. Quando comparado ao modelo de pagamento conforme o uso, você pode economizar até 37%.

Para obter mais informações, confira Preços do Azure Databricks.

Azure Cosmos DB

Nesta arquitetura, uma série de registros é gravada no Azure Cosmos DB pelo trabalho do Azure Databricks. Você é cobrado pela capacidade que reserva, expressa em RU/s (Unidades de Solicitação por segundo), usada para realizar operações de inserção. A unidade para cobrança é de 100 RU/s por hora. Por exemplo, o custo do desenvolvimento de itens de 100 KB é de 50 RU/s.

Para operações de gravação, forneça capacidade suficiente para dar suporte ao número de gravações necessárias por segundo. Você pode aumentar a taxa de transferência provisionada usando o portal ou a CLI do Azure antes de realizar operações de gravação e reduzir a taxa de transferência após a conclusão dessas operações. Sua taxa de transferência para o período de gravação é a taxa de transferência mínima necessária para os dados fornecidos, mais a taxa de transferência necessária para a operação de inserção, supondo que nenhuma outra carga de trabalho esteja em execução.

Exemplo de análise de custo

Suponha que você configure um valor de taxa de transferência de 1.000 RU/s em um contêiner. Ele é implantado por 24 horas por 30 dias, um total de 720 horas.

O contêiner é cobrado em 10 unidades de 100 RU/s por hora para cada hora. 10 unidades a US$ 0,008 (por 100 RU/s por hora) são cobradas US$ 0,08 por hora.

Para 720 horas ou 7.200 unidades (de 100 RUs), você será cobrado US$ 57,60 por mês.

O armazenamento também é cobrado por cada GB usado para seus dados e índices armazenados. Para mais informações, consulte Modelo de preços do Azure Cosmos DB.

Use a calculadora de capacidade do Azure Cosmos DB para obter uma estimativa de custo rápida da carga de trabalho.

Para obter mais informações, confira a seção de custo em Estrutura Bem Projetada do Microsoft Azure.

Implantar este cenário

Para a implantação e execução da implementação de referência, siga as etapas em Leia-me do GitHub.

Próxima etapa