Öğretici: Apache Spark DataFrames kullanarak verileri yükleme ve dönüştürme
Bu öğreticide Apache Spark Python (PySpark) DataFrame API'sini, Apache Spark Scala DataFrame API'sini ve Azure Databricks'teki SparkR SparkDataFrame API'sini kullanarak verileri yükleme ve dönüştürme adımları gösterilmektedir.
Bu öğreticinin sonunda DataFrame'in ne olduğunu anlayacak ve aşağıdaki görevlere aşina olacaksınız:
Python
- Değişkenleri tanımlama ve ortak verileri Unity Catalog birimine kopyalama
- Python ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- PySpark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark PySpark API başvurusu.
Scala
- Değişkenleri tanımlama ve ortak verileri Unity Catalog birimine kopyalama
- Scala ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- Apache Spark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark Scala API başvurusu.
R
- Değişkenleri tanımlama ve ortak verileri Unity Catalog birimine kopyalama
- SparkR SparkDataFrames oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- SparkR'da SQL sorguları çalıştırma
Ayrıca bkz. Apache SparkR API başvurusu.
DataFrame nedir?
DataFrame, potansiyel olarak farklı türlerde columns'a sahip iki boyutlu etiketli bir veri yapısıdır. DataFrame'i elektronik tablo, SQL tableveya seri nesneleri sözlüğü gibi düşünebilirsiniz. Apache Spark DataFrames, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev set (selectcolumns, filtre, join, toplama) sağlar.
Apache Spark DataFrames, Dayanıklı Dağıtılmış Veri Kümelerinin (RDD) üzerine kurulmuş bir soyutlamadır. Spark DataFrames ve Spark SQL, birleşik bir planlama ve optimizasyon motoru kullanarak, Azure Databricks'te desteklenen tüm dillerde (Python, SQL, Scala ve R) neredeyse aynı performansı elde etmenizi sağlar get.
Gereksinimler
Aşağıdaki öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
Bu öğreticideki örnekleri kullanmak için çalışma alanınızda Unity Catalog etkinleştirilmiş olmalıdır.
Bu öğreticideki örneklerde örnek verileri depolamak için Unity Catalogbirim kullanılır. Bu örnekleri kullanmak için bir birim oluşturun ve bu birimin catalog, schemave adlarını kullanarak, örnekler tarafından kullanılan birim yolunu set edin.
Unity Catalog'da aşağıdaki izinlere sahip olmanız gerekir:
-
READ VOLUME
veWRITE VOLUME
veyaALL PRIVILEGES
bu öğretici için kullanılan birim için. - Bu öğreticide kullanılan schema
USE SCHEMA
veyaALL PRIVILEGES
. - Bu öğretici için kullanılan catalog için
USE CATALOG
veyaALL PRIVILEGES
.
Bu izinleri set için Bkz. Databricks yöneticiniz veya Unity Catalog ayrıcalıkları ve güvenliği sağlanabilir nesneler.
-
İpucu
Bu makalenin tamamlanmış not defteri için bkz . DataFrame öğretici not defterleri.
1. Adım: Değişkenleri tanımlama ve CSV dosyasını yükleme
Bu adım, bu öğreticide kullanılacak değişkenleri tanımlar ve ardından health.data.ny.gov'den Unity Catalog biriminize bebek adı verilerini içeren bir CSV dosyası yükler.
Simgeye tıklayarak
yeni bir not defteri açın. Azure Databricks not defterlerinde gezinmeyi öğrenmek için bkz. Not defteri görünümünü özelleştirme.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın.
<catalog-name>
,<schema-name>
ve<volume-name>
yerine Unity Catalog biriminin catalog, schemave birim adlarını yazın.<table_name>
'ı table seçtiğiniz bir adla değiştirin. Bu öğreticinin ilerleyen bölümlerinde bu table bebek adı verilerini yükleyeceksiniz.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Hücreyi çalıştırmak ve yeni bir boş hücre oluşturmak için basın
Shift+Enter
.Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, Databricks dbutuils komutunu kullanarak
rows.csv
dosyasını health.data.ny.gov Unity Catalog biriminize kopyalar.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
2. Adım: DataFrame oluşturma
Bu adım, test verileriyle adlı df1
bir DataFrame oluşturur ve içeriğini görüntüler.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, test verileriyle DataFrame'i oluşturur ve ardından DataFrame'in içeriğini ve schema görüntüler.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
3. Adım: CSV dosyasından DataFrame'e veri yükleme
Bu adım, unity Catalog biriminize önceden yüklediğiniz CSV dosyasından df_csv
adlı bir DataFrame oluşturur. Bkz. spark.read.csv.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, CSV dosyasından DataFrame'e
df_csv
bebek adı verilerini yükler ve ardından DataFrame'in içeriğini görüntüler.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Desteklenen birçok dosya biçiminden veri yükleyebilirsiniz.
4. Adım: DataFrame'inizi görüntüleme ve bunlarla etkileşim kurma
Aşağıdaki yöntemleri kullanarak bebek adlarınızı DataFrames olarak görüntüleyin ve bunlarla etkileşime geçin.
DataFrame schema yazdırma
Apache Spark DataFrame'in schema görüntülemeyi öğrenin. Apache Spark, DataFrame'deki columns adlarına ve veri türlerine başvurmak için schema terimini kullanır.
Not
Azure Databricks, bir catalog'ye kayıtlı tables koleksiyonunu tanımlamak için schema terimini de kullanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, iki DataFrame'in şemalarını görüntülemek için
.printSchema()
yöntemini kullanarak DataFrame'lerinizin schema'ını gösterir. Bu, iki DataFrame'i birleştirmeye hazırlık içindir.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de column yeniden adlandırma
DataFrame'de bir column nasıl yeniden adlandıracağınızı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, column öğesini,
df1_csv
DataFrame'inde,df1
DataFrame'indeki ilgili column ile eşleşecek şekilde yeniden adlandırır. Bu kod Apache SparkwithColumnRenamed()
yöntemini kullanır.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'leri birleştirme
Bir DataFrame'in satırlarını başka bir DataFrame'e ekleyen yeni bir DataFrame oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, ilk DataFrame'inizin
union()
içeriğini CSV dosyasından yüklenen bebek adları verilerini içeren DataFramedf
ile birleştirmek için Apache Sparkdf_csv
yöntemini kullanır.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de satırları filtreleme
Apache Spark .filter()
veya .where()
yöntemlerini kullanarak satırları filtreleyerek veri set en popüler bebek adlarını keşfedin. DataFrame'de döndürmek veya değiştirmek istediğiniz satırların bir alt kümesini filtrelemek için select kullanın. Aşağıdaki örneklerde görüldüğü gibi performans veya söz diziminde fark yoktur.
.filter() yöntemini kullanma
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.filter()
yöntemini kullanır.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Kullanarak.where() yöntemi
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.where()
yöntemini kullanır.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'den Selectcolumns'yi al ve sıklığa göre sırala
DataFrame’de geri döndürülecek olan columns’i belirtmek için select()
yöntemiyle hangi bebek adı sıklığının öğrenileceğini keşfedin. Sonuçları sıralamak için Apache Spark orderby
ve desc
işlevlerini kullanın.
Apache Spark için pyspark.sql modülü SQL işlevleri için destek sağlar. Bu öğreticide kullandığımız bu işlevler arasında Apache Spark orderBy()
, desc()
ve expr()
işlevleri yer alır. Gerektiğinde bunları oturumunuza aktararak bu işlevlerin kullanımını etkinleştirirsiniz.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır
desc()
ve en yaygın adları ve bunların sayılarını azalan sırada görüntülemek için Apacheselect()
Spark yöntemini ve Apache SparkorderBy()
desc()
ve işlevlerini kullanır.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame alt kümesi oluşturma
Mevcut bir DataFrame'den bir dataframe alt kümesi oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, verileri yıla, sayıya ve cinsiyete göre kısıtlayan yeni bir DataFrame oluşturmak için Apache Spark
filter
yöntemini kullanır. Apache Sparkselect()
yöntemini columns'yi limit için kullanır. Ayrıca Apache SparkorderBy()
vedesc()
işlevlerini kullanarak yeni DataFrame'i sayıya göre sıralar.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
5. Adım: DataFrame'i kaydetme
DataFrame'i kaydetmeyi öğrenin. DataFrame'inizi bir table'ye kaydedebilir veya DataFrame'i bir dosyaya ya da birden çok dosyaya yazabilirsiniz.
DataFrame'i bir table'a kaydet
Azure Databricks varsayılan olarak tüm tables için Delta Lake biçimini kullanır. DataFrame'inizi kaydetmek için catalog ve schemaüzerinde CREATE
table ayrıcalıklarınız olmalıdır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, öğreticinin başında tanımladığınız değişkeni kullanarak DataFrame içeriğini bir table'a kaydeder.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Apache Spark uygulamalarının çoğu büyük veri kümelerinde ve dağıtılmış bir şekilde çalışır. Apache Spark, tek bir dosya yerine dosyaların dizinini yazar. Delta Lake, Parquet klasörlerini ve dosyalarını böler. Birçok veri sistemi bu dosya dizinlerini okuyabilir. Azure Databricks, çoğu uygulama için dosya yolları üzerinden tables kullanılmasını önerir.
DataFrame'i JSON dosyalarına kaydetme
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'i JSON dosyalarının dizinine kaydeder.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
JSON dosyasından DataFrame'i okuma
Apache Spark spark.read.format()
yöntemini kullanarak bir dizinden DataFrame'e JSON verilerini okumayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, önceki örnekte kaydettiğiniz JSON dosyalarını görüntüler.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Ek görevler: PySpark, Scala ve R'de SQL sorguları çalıştırma
Apache Spark DataFrames, SQL'i PySpark, Scala ve R ile birleştirmek için aşağıdaki seçenekleri sağlar. Aşağıdaki kodu bu öğretici için oluşturduğunuz not defterinde çalıştırabilirsiniz.
SQL sorgusu olarak bir column belirtme
Apache Spark selectExpr()
yöntemini kullanmayı öğrenin. Bu, SQL ifadelerini kabul eden ve güncelleştirilmiş bir DataFrame döndüren yöntemin bir değişkenidir select()
. Bu yöntem, gibi upper
bir SQL ifadesi kullanmanıza olanak tanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, apache Spark
selectExpr()
yöntemini ve SQLupper
ifadesini kullanarak bir dize column büyük harfe dönüştürür (ve columnyeniden adlandırır).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
SQL söz dizimini column için kullanmak amacıyla expr()
kullanın.
SQL söz dizimini bir column'in belirtildiği her yerde kullanmak için Apache Spark expr()
işlevini nasıl içeri aktaracağınızı ve kullanacağınızı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod,
expr()
işlevini içeri aktarır ve Apache Sparkexpr()
işlevi ile SQLlower
ifadesini kullanarak bir dizeyi column küçük harfe dönüştürür (ve ardından column'ü yeniden adlandırır).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
spark.sql() işlevini kullanarak rastgele bir SQL sorgusu çalıştırma
Rastgele SQL sorguları çalıştırmak için Apache Spark spark.sql()
işlevini kullanmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, SQL söz dizimini kullanarak bir SQL table sorgulamak için Apache Spark
spark.sql()
işlevini kullanır.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame öğreticisi not defterleri
Aşağıdaki not defterleri bu öğreticideki örnek sorguları içerir.