Upsert data in to SQL from delta table

Rocky420 21 Reputation points
2020-06-26T17:49:42.503+00:00

Hello Team,

we have scenario where we have to get the data from lake , process it and then store in SQL database . This is what we are doing

  1. Read the entity from Lake
  2. Store that in delta table _staging
  3. Do merge between delta table and its _staging , so that only new data needs to be updated
  4. Upsert the same data in SQL warehouse

I am not able to find the option on how to do 4th Step i.e how to send the new changes ( insert, delete, update ) to SQL Database

Any pointers are highly appreciated

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,045 questions
{count} votes

2 answers

Sort by: Most helpful
  1. ChiragMishra-MSFT 956 Reputation points
    2020-06-30T08:52:24.937+00:00

    Hi @Rocky420 ,

    Sorry for the delayed response. You can set up "incremental copy" to only copy delta data (new/updated/deleted) in a couple of ways. Here are some that suit the most in your case :

    1. Delta data loading from database by using a watermark :
      In this case, you define a watermark in your source database. A watermark is a column that has the last updated time stamp or an
      incrementing key.
    2. Since you have a SQL based source (staging_table), you can leverage Change Tracking technology. It is a lightweight solution in SQL Server and Azure SQL Database that provides an efficient change tracking mechanism for applications. It enables an application to easily identify data that was inserted, updated, or deleted.

    To learn more, please refer this doc on incrementally copying data.

    Hope this helps.


  2. Rajkumar V 1 Reputation point
    2020-06-30T19:30:28.833+00:00

    Hi Rocky,

    In your Target delta file, add a last action & last action date field to capture the updates from the Merge operation.

    Using the watermark you can either upload all the data at once to a staging table in SQL and do a SQL Merge operation or you can trigger Insert/Update/delete queries from databricks

    to trigger queries an example below

    import com.microsoft.azure.sqldb.spark.config.Config
    import com.microsoft.azure.sqldb.spark.connect._
    
    val TruncStmt = "TRUNCATE TABLE TABLENAME"
    
    val Password = "xxxx"
    val dwServer = "xxxx"
    val dbname ="xxxx"
    val Username = "xxxx" 
    val tableName = "xxxx"
    
     val config = Config(Map(
      "url"          -> dwServer,
      "databaseName" -> dbname,
      "queryCustom"  -> TruncStmt , 
      "user"         -> Username,
      "password"     -> Password
    )
    
    
    //Executing the Merge Proc to merge the data 
    
    sqlContext.sqlDBQuery(config)
    
    0 comments No comments