Öğretici: Apache Spark DataFrames kullanarak verileri yükleme ve dönüştürme
Bu öğreticide Apache Spark Python (PySpark) DataFrame API'sini, Apache Spark Scala DataFrame API'sini ve Azure Databricks'teki SparkR SparkDataFrame API'sini kullanarak verileri yükleme ve dönüştürme adımları gösterilmektedir.
Bu öğreticinin sonunda DataFrame'in ne olduğunu anlayacak ve aşağıdaki görevlere aşina olacaksınız:
Python
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- Python ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- PySpark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark PySpark API başvurusu.
Scala
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- Scala ile DataFrame oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- Apache Spark'ta SQL sorguları çalıştırma
Ayrıca bkz. Apache Spark Scala API başvurusu.
R
- Değişkenleri tanımlama ve ortak verileri Unity Kataloğu birimine kopyalama
- SparkR SparkDataFrames oluşturma
- CSV dosyasından DataFrame'e veri yükleme
- DataFrame'i görüntüleme ve bunlarla etkileşim kurma
- DataFrame'i kaydetme
- SparkR'da SQL sorguları çalıştırma
Ayrıca bkz. Apache SparkR API başvurusu.
DataFrame nedir?
DataFrame, potansiyel olarak farklı türlerde sütunlar içeren iki boyutlu etiketli bir veri yapısıdır. DataFrame'i elektronik tablo, SQL tablosu veya seri nesneleri sözlüğü gibi düşünebilirsiniz. Apache Spark DataFrames, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev kümesi (sütunları seçme, filtreleme, birleştirme, toplama) sağlar.
Apache Spark DataFrames, Dayanıklı Dağıtılmış Veri Kümelerinin (RDD) üzerine kurulmuş bir soyutlamadır. Spark DataFrames ve Spark SQL birleşik bir planlama ve iyileştirme altyapısı kullanarak Azure Databricks'te desteklenen tüm dillerde (Python, SQL, Scala ve R) neredeyse aynı performansı elde edebilirsiniz.
Gereksinimler
Aşağıdaki öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
Bu öğreticideki örnekleri kullanmak için çalışma alanınızda Unity Kataloğu'nu etkinleştirmeniz gerekir.
Bu öğreticideki örneklerde örnek verileri depolamak için Unity Kataloğu birimi kullanılır. Bu örnekleri kullanmak için bir birim oluşturun ve bu birimin kataloğunu, şemasını ve birim adlarını kullanarak örnekler tarafından kullanılan birim yolunu ayarlayın.
Unity Kataloğu'nda aşağıdaki izinlere sahip olmanız gerekir:
READ VOLUME
veWRITE VOLUME
veyaALL PRIVILEGES
bu öğretici için kullanılan birim için.USE SCHEMA
veyaALL PRIVILEGES
bu öğretici için kullanılan şema için.USE CATALOG
veyaALL PRIVILEGES
bu öğretici için kullanılan katalog için.
Bu izinleri ayarlamak için Databricks yöneticinize veya Unity Kataloğu ayrıcalıklarına ve güvenli hale getirilebilir nesnelere bakın.
İpucu
Bu makalenin tamamlanmış not defteri için bkz . DataFrame öğretici not defterleri.
1. Adım: Değişkenleri tanımlama ve CSV dosyasını yükleme
Bu adım, bu öğreticide kullanılacak değişkenleri tanımlar ve ardından health.data.ny.gov Unity Kataloğu biriminize bebek adı verilerini içeren bir CSV dosyası yükler.
Simgeye tıklayarak yeni bir not defteri açın. Azure Databricks not defterlerinde gezinmeyi öğrenmek için bkz . Databricks not defteri arabirimi ve denetimleri.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. ,
<schema-name>
ve<volume-name>
yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın<catalog-name>
. değerini, seçtiğiniz bir tablo adıyla değiştirin<table_name>
. Bu öğreticinin ilerleyen bölümlerinde bu tabloya bebek adı verilerini yükleyebilirsiniz.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Hücreyi çalıştırmak ve yeni bir boş hücre oluşturmak için basın
Shift+Enter
.Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, Databricks
rows.csv
dbutuils komutunu kullanarak dosyayı health.data.ny.gov Unity Kataloğu biriminize kopyalar.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
2. Adım: DataFrame oluşturma
Bu adım, test verileriyle adlı df1
bir DataFrame oluşturur ve içeriğini görüntüler.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, test verileriyle DataFrame'i oluşturur ve ardından DataFrame'in içeriğini ve şemasını görüntüler.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
3. Adım: CSV dosyasından DataFrame'e veri yükleme
Bu adım, Unity Kataloğu biriminize daha önce yüklediğiniz CSV dosyasından adlı df_csv
bir DataFrame oluşturur. Bkz. spark.read.csv.
Aşağıdaki kodu kopyalayıp yeni boş not defteri hücresine yapıştırın. Bu kod, CSV dosyasından DataFrame'e
df_csv
bebek adı verilerini yükler ve ardından DataFrame'in içeriğini görüntüler.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Desteklenen birçok dosya biçiminden veri yükleyebilirsiniz.
4. Adım: DataFrame'inizi görüntüleme ve bunlarla etkileşim kurma
Aşağıdaki yöntemleri kullanarak bebek adlarınızı DataFrames olarak görüntüleyin ve bunlarla etkileşime geçin.
DataFrame şemasını yazdırma
Apache Spark DataFrame şemasını görüntülemeyi öğrenin. Apache Spark, DataFrame'deki sütunların adlarına ve veri türlerine başvurmak için şema terimini kullanır.
Not
Azure Databricks, kataloğa kayıtlı tablo koleksiyonunu açıklamak için şema terimini de kullanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, iki DataFrame'in şemalarını görüntüleme yöntemiyle DataFrame'lerinizin
.printSchema()
şemasını gösterir. İki DataFrame'in birleşimine hazırlanmak için.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de sütunu yeniden adlandırma
DataFrame'de bir sütunu yeniden adlandırmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'deki bir sütunu DataFrame'deki
df1_csv
df1
ilgili sütunla eşleşecek şekilde yeniden adlandırır. Bu kod Apache SparkwithColumnRenamed()
yöntemini kullanır.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'leri birleştirme
Bir DataFrame'in satırlarını başka bir DataFrame'e ekleyen yeni bir DataFrame oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, ilk DataFrame'inizin
df
içeriğini CSV dosyasından yüklenen bebek adları verilerini içeren DataFramedf_csv
ile birleştirmek için Apache Sparkunion()
yöntemini kullanır.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'de satırları filtreleme
Apache Spark'ı .filter()
veya .where()
yöntemleri kullanarak satırları filtreleyerek veri kümenizdeki en popüler bebek adlarını keşfedin. DataFrame'de döndürülecek veya değiştirilebilen satırların bir alt kümesini seçmek için filtrelemeyi kullanın. Aşağıdaki örneklerde görüldüğü gibi performans veya söz diziminde fark yoktur.
.filter() yöntemini kullanma
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.filter()
yöntemini kullanır.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
.where() yöntemini kullanma
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, 50'den fazla sayıyla Bu satırları DataFrame'de görüntülemek için Apache Spark
.where()
yöntemini kullanır.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame'den sütunları seçme ve sıklık ölçütüne göre sıralama
Döndürülecek DataFrame sütunlarını belirtmek için yöntemiyle select()
hangi bebek adı sıklığı hakkında bilgi edinin. Sonuçları sıralamak için Apache Spark orderby
ve desc
işlevlerini kullanın.
Apache Spark için pyspark.sql modülü SQL işlevleri için destek sağlar. Bu öğreticide kullandığımız bu işlevler arasında Apache Spark orderBy()
, desc()
ve expr()
işlevleri yer alır. Gerektiğinde bunları oturumunuza aktararak bu işlevlerin kullanımını etkinleştirirsiniz.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır
desc()
ve en yaygın adları ve bunların sayılarını azalan sırada görüntülemek için Apacheselect()
Spark yöntemini ve Apache SparkorderBy()
desc()
ve işlevlerini kullanır.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame alt kümesi oluşturma
Mevcut bir DataFrame'den bir dataframe alt kümesi oluşturmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, verileri yıla, sayıya ve cinsiyete göre kısıtlayan yeni bir DataFrame oluşturmak için Apache Spark
filter
yöntemini kullanır. Sütunları sınırlamak için Apache Sparkselect()
yöntemini kullanır. Ayrıca Apache SparkorderBy()
vedesc()
işlevlerini kullanarak yeni DataFrame'i sayıya göre sıralar.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
5. Adım: DataFrame'i kaydetme
DataFrame'i kaydetmeyi öğrenin. DataFrame'inizi bir tabloya kaydedebilir veya DataFrame'i bir dosyaya veya birden çok dosyaya yazabilirsiniz.
DataFrame'i tabloya kaydetme
Azure Databricks varsayılan olarak tüm tablolar için Delta Lake biçimini kullanır. DataFrame'inizi kaydetmek için katalog ve şemada tablo ayrıcalıklarınız olmalıdır CREATE
.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bu öğreticinin başında tanımladığınız değişkeni kullanarak DataFrame'in içeriğini bir tabloya kaydeder.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Apache Spark uygulamalarının çoğu büyük veri kümelerinde ve dağıtılmış bir şekilde çalışır. Apache Spark, tek bir dosya yerine dosyaların dizinini yazar. Delta Lake, Parquet klasörlerini ve dosyalarını böler. Birçok veri sistemi bu dosya dizinlerini okuyabilir. Azure Databricks, çoğu uygulama için dosya yolları üzerinden tabloların kullanılmasını önerir.
DataFrame'i JSON dosyalarına kaydetme
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, DataFrame'i JSON dosyalarının dizinine kaydeder.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
JSON dosyasından DataFrame'i okuma
Apache Spark spark.read.format()
yöntemini kullanarak bir dizinden DataFrame'e JSON verilerini okumayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, önceki örnekte kaydettiğiniz JSON dosyalarını görüntüler.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Ek görevler: PySpark, Scala ve R'de SQL sorguları çalıştırma
Apache Spark DataFrames, SQL'i PySpark, Scala ve R ile birleştirmek için aşağıdaki seçenekleri sağlar. Aşağıdaki kodu bu öğretici için oluşturduğunuz not defterinde çalıştırabilirsiniz.
Sütunu SQL sorgusu olarak belirtme
Apache Spark selectExpr()
yöntemini kullanmayı öğrenin. Bu, SQL ifadelerini kabul eden ve güncelleştirilmiş bir DataFrame döndüren yöntemin bir değişkenidir select()
. Bu yöntem, gibi upper
bir SQL ifadesi kullanmanıza olanak tanır.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, bir dize sütununu büyük harfe dönüştürmek (ve sütunu yeniden adlandırmak) için Apache Spark
selectExpr()
yöntemini ve SQLupper
ifadesini kullanır.Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
Sütun için SQL söz dizimi kullanmak için kullanın expr()
Bir sütunun belirtileceği her yerde SQL söz dizimini kullanmak için Apache Spark expr()
işlevini içeri aktarmayı ve kullanmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod işlevi içeri aktarır
expr()
ve ardından Apache Sparkexpr()
işlevini ve SQLlower
ifadesini kullanarak bir dize sütununu küçük harfe dönüştürür (ve sütunu yeniden adlandırır).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
spark.sql() işlevini kullanarak rastgele bir SQL sorgusu çalıştırma
Rastgele SQL sorguları çalıştırmak için Apache Spark spark.sql()
işlevini kullanmayı öğrenin.
Aşağıdaki kodu kopyalayıp boş bir not defteri hücresine yapıştırın. Bu kod, SQL söz dizimini kullanarak bir SQL tablosunu sorgulamak için Apache Spark
spark.sql()
işlevini kullanır.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Hücreyi çalıştırmak için basın
Shift+Enter
ve ardından sonraki hücreye geçin.
DataFrame öğreticisi not defterleri
Aşağıdaki not defterleri bu öğreticideki örnek sorguları içerir.