How can we use insert-update function in synapse analytics notebook while storing data to blob and database

Shrimathi M 40 Reputation points
2024-08-28T13:56:09.35+00:00

I am beginner to synapse analytics. Trying save data to blob and DB using below code. But showing "AnalysisException: /mnt/usprodannex1/usprodgold_layer/ is not a Delta table." error.

Note imported required library( from delta.tables import *)

if I add import DeltaTable showing " No module named 'DeltaTable'" error.

Can please help me to solve this.

delta_Timeupdate_table_path=("/mnt/usprodannex1/usprodgold_layer/")

target_Timeupdate_delta_table = DeltaTable.forPath(spark, delta_Timeupdate_table_path) target_Timeupdate_delta_table.alias("target").merge(source=gold_layer_primarykey.alias("source"),condition="target.Gold_Primary_Key = source.Gold_Primary_Key").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Azure SQL Database
Azure Blob Storage
Azure Blob Storage
An Azure service that stores unstructured data in the cloud as blobs.
2,787 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
10,568 questions
{count} votes

Accepted answer
  1. Vinodh247 18,251 Reputation points
    2024-08-28T15:18:33.8966667+00:00

    Hi Shrimathi M,

    Thanks for reaching out to Microsoft Q&A.

    It looks like you're trying to use the Delta Lake framework within an Azure Synapse Analytics notebook, but you’re encountering some issues. I'll guide you through the process and help you resolve the errors you're seeing.

    Understanding the Error:

    • Error 1: "AnalysisException: /mnt/usprodannex1/usprodgold_layer/ is not a Delta table."
      • This error occurs because the directory specified is not recognized as a Delta table. Delta tables are special types of tables that store data in a format that allows for ACID transactions, versioning, and more.
    • Error 2: "No module named 'DeltaTable'"
      • This suggests that the required Delta Lake Python libraries are not available in your Synapse environment.

    Possible Issues and Solutions:

    Issue 1: The Path is Not a Delta Table

    • Resolution:
      • Make sure that the data at "/mnt/usprodannex1/usprodgold_layer/" is indeed stored as a Delta table. You can convert your data to a Delta format using the below code. Once this is done, your data should now be stored in Delta format.
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("DeltaConversion").getOrCreate()
    
    df = spark.read.parquet("/mnt/usprodannex1/usprodgold_layer/")
    df.write.format("delta").mode("overwrite").save("/mnt/usprodannex1/usprodgold_layer/")
    
    
    

    Issue 2: Delta Table Module Not Found

    • Resolution:
      • Ensure that Delta Lake is installed in your Synapse environment. You can install it by including the Delta Lake package when creating your Spark pool. If you can modify the environment:
        • In the Synapse workspace > "Manage" section > Select "Apache Spark pools".
        • Edit your Spark pool and add the Delta Lake package. For ex, add the following coordinates:
          • Group ID: io.delta
          • Artifact ID: delta-core_2.12
          • Version: 1.2.1 (or the latest available version)
        If you do not have permissions to modify the environment, you might need to ask your Azure admin to add this library to your Spark pool.

    Using the Merge Operation:

    Once your environment is set up with Delta Lake, you should be able to use the DeltaTable class to perform the merge operation as you intended:

    from delta.tables import DeltaTable
    
    delta_Timeupdate_table_path = "/mnt/usprodannex1/usprodgold_layer/"
    
    # Initialize the Delta table
    target_Timeupdate_delta_table = DeltaTable.forPath(spark, delta_Timeupdate_table_path)
    
    # Perform the merge operation
    target_Timeupdate_delta_table.alias("target") \
        .merge(source=gold_layer_primarykey.alias("source"),
               condition="target.Gold_Primary_Key = source.Gold_Primary_Key") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    
    
    

    Saving Data to Blob Storage and Database:

    • Blob Storage: You can save your data to blob storage directly using the .write method.
    • Database: To save data to a database, you might need to use a JDBC connection or other appropriate connectors based on your specific database.

    Note: Make sure to check & modify the code given above to suit your requirements.

    Please 'Upvote'(Thumbs-up) and 'Accept' as an answer if the reply was helpful. This will benefit other community members who face the same issue.

    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.