Tutorial: carregar e transformar dados usando DataFrames do Apache Spark

Este tutorial mostra como carregar e transformar dados usando a API do DataFrame do Apache Spark no Python (PySpark), a API do DataFrame do Apache Spark no Scala e a API do SparkDataFrame do SparkR no Azure Databricks.

Ao final deste tutorial, você entenderá o que é um DataFrame e estará familiarizado com as tarefas a seguir:

Python

Consulte também a referência da API do Apache Spark PySpark.

Scala

Consulte também a Referência de API de Scala do Apache Spark.

R

Consulte também a Referência de API do Apache SparkR.

O que é um DataFrame?

Um DataFrame é uma estrutura de dados rotulada bidimensional com colunas de tipos potencialmente diferentes. Considere um DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos de série. O Apache Spark DataFrame contém um amplo conjunto de funções (selecionar colunas, filtrar, unir, agregar) para resolução de problemas comuns de análise de dados com eficiência.

Os DataFrames do Apache Spark são uma abstração criada com base em RDDs (Conjuntos de dados distribuídos resilientes). O DataFrames do Spark e o SQL do Spark usam um mecanismo unificado de planejamento e otimização, permitindo que você obtenha um desempenho quase idêntico em todos os idiomas com suporte no Azure Databricks (Python, SQL, Scala e R).

Requisitos

Para concluir o tutorial a seguir, você deve atender aos seguintes requisitos:

  • Para usar os exemplos neste tutorial, seu espaço de trabalho deve ter o Catálogo do Unity habilitado.

  • Os exemplos neste tutorial usam um volume do Catálogo do Unity para armazenar dados de exemplo. Para usar esses exemplos, crie um volume e use esse catálogo de volumes, esquema e nomes de volume para definir o caminho de volume usado pelos exemplos.

  • Você deve ter as seguintes permissões no Catálogo do Unity:

    • READ VOLUME e WRITE VOLUME ou ALL PRIVILEGES para o volume usado neste tutorial.
    • USE SCHEMA ou ALL PRIVILEGES para o esquema usado neste tutorial.
    • USE CATALOG ou ALL PRIVILEGES para o catálogo usado neste tutorial.

    Para definir essas permissões, consulte o administrador do Databricks ou Privilégios e objetos protegidos do Catálogo do Unity.

Dica

Para obter um notebook concluído para este artigo, consulte Tutorial do DataFrame sobre notebooks.

Etapa 1: definir variáveis e carregar o arquivo CSV

Esta etapa define variáveis para uso neste tutorial e carrega um arquivo CSV que contém dados de nome de bebê de health.data.ny.gov no volume do Catálogo do Unity.

  1. Abra um novo notebook clicando no ícone Novo ícone. Para saber como navegar em notebooks do Azure Databricks, confira Interface e controles de notebook do Databricks.

  2. Copie e cole o código a seguir na nova célula vazia do notebook. Substitua <catalog-name>, <schema-name> e <volume-name> pelos nomes do catálogo, esquema e volume de um volume do Catálogo do Unity. Substitua <table_name> por um nome de sua escolha. Você carregará dados de nome de bebê nesta tabela mais adiante neste tutorial.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Pressione Shift+Enter para executar a célula e criar uma nova célula em branco.

  4. Copie e cole o código a seguir na nova célula vazia do notebook. Esse código copia o arquivo rows.csv de health.data.ny.gov para o volume do Catálogo do Unity usando o comando Databricks dbutuils.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Etapa 2: criar um DataFrame

Esta etapa cria um DataFrame chamado df1 com dados de teste e, depois, exibe seu conteúdo.

  1. Copie e cole o código a seguir na nova célula vazia do notebook. Esse código cria o DataFrame com dados de teste e exibe o conteúdo e o esquema do DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Etapa 3: carregar dados em um DataFrame de um arquivo CSV

Esta etapa cria um DataFrame chamado df_csv a partir do arquivo CSV que você carregou anteriormente no volume do Catálogo do Unity. Consulte spark.read.csv.

  1. Copie e cole o código a seguir na nova célula vazia do notebook. Esse código carrega dados de nome de bebê no DataFrame df_csv do arquivo CSV e exibe o conteúdo do DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Você pode carregar dados de muitos formatos de arquivo com suporte.

Etapa 4: exibir e interagir com seu DataFrame

Veja e interaja com os DataFrames de nomes de bebê usando os métodos a seguir.

Saiba como exibir o esquema de um DataFrame do Apache Spark. O Apache Spark usa o termo esquema para se referir aos nomes e tipos de dados das colunas no DataFrame.

Observação

O Azure Databricks também usa o termo esquema para descrever uma coleção de tabelas registradas em um catálogo.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código mostra o esquema de DataFrames com o método .printSchema() para exibir os esquemas dos dois DataFrames – para preparar a união dos dois DataFrames.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Renomear coluna no DataFrame

Saiba como renomear uma coluna em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código renomeia uma coluna no DataFrame df1_csv para corresponder à respectiva coluna no DataFrame df1. Esse código usa o método withColumnRenamed() do Apache Spark.

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Combinar DataFrames

Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método union() do Apache Spark para combinar o conteúdo do seu primeiro DataFrame df com o DataFrame df_csv que contém os dados de nomes de bebê carregados do arquivo CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Filtrar linhas em um DataFrame

Descubra os nomes de bebê mais populares em seu conjunto de dados filtrando linhas e usando os métodos .filter() ou .where() do Apache Spark. Use a filtragem para selecionar um subconjunto de linhas para retorno ou modificação em um DataFrame. Não há diferença no desempenho ou na sintaxe, conforme visto nos exemplos a seguir.

Usar o método .filter()

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método .filter() do Apache Spark para exibir essas linhas no DataFrame com uma contagem de mais de 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Usar o método .where()

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método .where() do Apache Spark para exibir essas linhas no DataFrame com uma contagem de mais de 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Selecionar colunas de um DataFrame e ordenar por frequência

Saiba mais sobre qual frequência de nome de bebê com o método select() para especificar as colunas do DataFrame a serem retornadas. Use as funções orderby e desc do Apache Spark para ordenar os resultados.

O módulo pyspark.sql para Apache Spark fornece suporte para funções SQL. Entre essas funções que usamos neste tutorial estão as funções orderBy(), desc() e expr() do Apache Spark. Habilite o uso dessas funções importando-as para sua sessão, conforme necessário.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função desc() e, em seguida, usa o método select()do Apache Spark e funções orderBy() e desc() do Apache Spark para exibir os nomes mais comuns e suas contagens em ordem decrescente.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Criar um DataFrame de subconjunto

Saiba como criar um DataFrame de subconjunto com base em um DataFrame existente.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método filter do Apache Spark para criar um DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o método select() do Apache Spark para limitar as colunas. Ele também usa as funções orderBy() e desc() do Apache Spark para classificar o novo DataFrame por contagem.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Etapa 5: salvar o DataFrame

Saiba como salvar um DataFrame. Você pode salvar o DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.

Salvar um DataFrame em uma tabela

O Azure Databricks usa o formato Delta Lake para todas as tabelas por padrão. Para salvar seu DataFrame, você deve ter privilégios de tabela CREATE no catálogo e no esquema.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código salva o conteúdo do DataFrame em uma tabela usando a variável que você definiu no início deste tutorial.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

A maioria dos aplicativos Apache Spark funciona em grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. O Delta Lake divide as pastas e arquivos parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. O Azure Databricks recomenda o uso de tabelas em caminhos de arquivo na maioria dos aplicativos.

Salvar um DataFrame em arquivos JSON

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código salva o DataFrame em um diretório de arquivos JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Ler o DataFrame de um arquivo JSON

Saiba como usar o método spark.read.format() do Apache Spark para ler dados JSON de um diretório em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código exibe os arquivos JSON salvos no exemplo anterior.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Tarefas adicionais: executar consultas SQL no PySpark, Scala e R

Os DataFrames do Apache Spark fornece as seguintes opções para combinar SQL com PySpark, Scala e R. Você pode executar o código a seguir no mesmo notebook criado para este tutorial.

Especificar uma coluna como consulta SQL

Saiba como usar o método selectExpr() do Apache Spark. Essa é uma variante do método select() que aceita expressões SQL e retorna um DataFrame atualizado. Esse método permite que você use uma expressão SQL, como upper.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método selectExpr() do Apache Spark e a expressão SQL upper para converter uma coluna de cadeia de caracteres em maiúsculas (e renomear a coluna).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Usar expr() para usar a sintaxe SQL de uma coluna

Saiba como importar e usar a função expr() do Apache Spark para usar a sintaxe SQL em qualquer lugar em que uma coluna seria especificada.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função expr() e depois usa a função expr() do Apache Spark e a expressão SQL lower para converter uma coluna de cadeia de caracteres em minúsculas (e renomear a coluna).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Executar uma consulta SQL arbitrária usando a função spark.sql()

Saiba como usar a função spark.sql() do Apache Spark para executar consultas SQL arbitrárias.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa a função spark.sql() do Apache Spark para consultar uma tabela SQL usando a sintaxe SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Pressione Shift+Enter para executar a célula e depois vá para a próxima célula.

Tutorial do DataFrame sobre notebooks

Os notebooks a seguir incluem os exemplos de consultas deste tutorial.

Python

Tutorial do DataFrames usando Python

Obter notebook

Scala

Tutorial do DataFrames usando Scala

Obter notebook

R

Tutorial do DataFrames usando R

Obter notebook

Recursos adicionais