Databricks Pyspark exception handling best practices

Satya D 141 Reputation points
2020-12-08T16:10:17.32+00:00

Hi, In the current development of pyspark notebooks on Databricks, I typically use the python specific exception blocks to handle different situations that may arise. I am wondering if there are any best practices/recommendations or patterns to handle the exceptions in the context of distributed computing like Databricks. What are the best ways to consolidate the exceptions and report back to user if the notebooks are triggered from orchestrations like Azure Data Factories?

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,150 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
10,508 questions
{count} vote

Accepted answer
  1. PRADEEPCHEEKATLA-MSFT 88,381 Reputation points Microsoft Employee
    2020-12-10T06:20:07.717+00:00

    Hello @Satya D ,

    Here is one of the best practice which has been used in the past. It gives you some transparency into exceptions when running UDFs.

    # Define two outputs: value and return code:  
    foo_schema = StructType([  
        StructField("return_code", StringType(), False),  
        StructField("output", StringType(), False)  
    ])  
    # Define function (returns a tuple):  
    def foo(col):  
        """  
        Run logic on one column, and return both the return_code and the output  
        """  
        try:  
            # Your logic here  
            output = ...  
            return_code = 'PASS'  
        except Exception as e:  
            output = f"{e}"  
            return_code = 'FAIL'  
        return (return_code, output)  
    # Define udf:  
    foo_udf = udf(foo, foo_schema)  
    # Call:  
    df_all = df.withColumn("foo_temp", foo_udf(col("input"))) \  
               .withColumn("foo_output", col("foo_temp")['output']) \  
               .withColumn("foo_code", col("foo_temp")['return_code'])  
                 
    df = df_all.filter((col("foo_code") == lit('PASS'))  
    df_errors = df_all.filter((col("foo_code") == lit('FAIL'))  
    

    We require the UDF to return two values: The output and an error code. We use the error code to filter out the exceptions and the good values into two different data frames. The good values are used in the next steps, and the exceptions data frame can be used for monitoring / ADF responses etc.

    And also you may refer to the GitHub issue Catching exceptions raised in Python Notebooks in Datafactory?, which addresses a similar issue.

    Hope this helps. Do let us know if you any further queries.

    ------------

    • Please accept an answer if correct. Original posters help the community find answers faster by identifying the correct answer. Here is how.
    • Want a reminder to come back and check responses? Here is how to subscribe to a notification.
    3 people found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.