Otimização do processamento de dados para o Apache Spark
Este artigo aborda como otimizar a configuração do cluster do Apache Spark para obter o melhor desempenho no Azure HDInsight.
Descrição Geral
Se tiver tarefas lentas numa Associação ou Shuffle, a causa é provavelmente distorção de dados. A distorção de dados é assimetria nos dados da tarefa. Por exemplo, uma tarefa de mapa pode demorar 20 segundos. Contudo, a execução de um trabalho em que os dados são associados ou aleatórios demora horas. Para corrigir a distorção de dados, deve salgar a chave inteira ou utilizar um sal isolado apenas para um subconjunto de chaves. Se estiver a utilizar um sal isolado, deverá filtrar ainda mais para isolar o subconjunto de teclas salgadas em associações de mapa. Outra opção é introduzir primeiro uma coluna de registo e pré-agregar em registos.
Outro fator que causa associações lentas pode ser o tipo de associação. Por predefinição, o Spark utiliza o tipo de associação SortMerge
. Este tipo de associação é mais adequado para grandes conjuntos de dados. No entanto, é computacionalmente dispendioso, porque primeiro tem de ordenar os lados esquerdo e direito dos dados antes de os unir.
Uma Broadcast
associação é mais adequada para conjuntos de dados mais pequenos ou onde um dos lados da associação é muito menor do que o outro lado. Este tipo de associação transmite um lado a todos os executores, pelo que requer mais memória para as transmissões em geral.
Pode alterar o tipo de associação na configuração ao definir spark.sql.autoBroadcastJoinThreshold
ou pode definir uma sugestão de associação com as APIs do DataFrame (dataframe.join(broadcast(df2))
).
// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
createOrReplaceTempView("V_JOIN")
sql("SELECT col1, col2 FROM V_JOIN")
Se estiver a utilizar tabelas com registos, tem um terceiro tipo de associação, a associação Merge
. Um conjunto de dados corretamente pré-particionado e pré-ordenado ignorará a fase de ordenação dispendiosa de uma SortMerge
associação.
A ordem das associações é importante, particularmente em consultas mais complexas. Comece com as associações mais seletivas. Além disso, mover associações que aumentam o número de linhas após agregações, sempre que possível.
Para gerir o paralelismo das associações cartesianas, pode adicionar estruturas aninhadas, janelas e, talvez, ignorar um ou mais passos na tarefa do Spark.
Otimizar a execução de tarefas
- Cache conforme necessário, por exemplo, se utilizar os dados duas vezes e, em seguida, colocar em cache.
- Difundir variáveis para todos os executores. As variáveis só são serializadas uma vez, o que resulta em pesquisas mais rápidas.
- Utilize o conjunto de threads no controlador, o que resulta numa operação mais rápida para muitas tarefas.
Monitorize regularmente as tarefas em execução para problemas de desempenho. Se precisar de mais informações sobre determinados problemas, considere uma das seguintes ferramentas de criação de perfis de desempenho:
- A Intel PAL Tool monitoriza a utilização da CPU, do armazenamento e da largura de banda de rede.
- Oracle Java 8 Mission Control perfis Spark e código do executor.
A chave para o desempenho de consultas do Spark 2.x é o motor Tungsten, que depende da geração de código em toda a fase. Em alguns casos, a geração de código em fase inteira pode ser desativada. Por exemplo, se utilizar um tipo não mutável (string
) na expressão de agregação, SortAggregate
aparece em vez de HashAggregate
. Por exemplo, para um melhor desempenho, experimente o seguinte e, em seguida, reativar a geração de código:
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))