@Shambhu Rai
In Databricks, you can use a combination of PySpark and DBFS commands to move files based on the last modified timestamp. Here’s an example:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
import time
spark = SparkSession.builder.getOrCreate()
# Define source and destination directories
src_dir = "dbfs:/mnt/test/test1"
dest_dir = "dbfs:/mnt/test/test2"
# Get list of files in the source directory
df = spark.read.text(src_dir).withColumn("filename", input_file_name())
# Filter files based on last modified timestamp
# This requires accessing the Hadoop FileSystem API
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(src_dir))
for file_status in list_status:
modification_time = file_status.getModificationTime() / 1000 # Convert to seconds
# Check if the file was modified in the last 24 hours
if time.time() - modification_time <= 86400:
# Move the file to the destination directory
dbutils.fs.mv(file_status.getPath().toString(), dest_dir)
This script will move all files from dbfs:/mnt/test/test1 to dbfs:/mnt/test/test2 that have been modified in the last 24 hours. You can adjust the time condition (86400 seconds = 24 hours) as per your needs.
Please note that this script should be run with sufficient permissions to read from the source directory and write to the destination directory. Also, remember to replace dbfs:/mnt/test/test1 and dbfs:/mnt/test/test2 with your actual source and destination paths.
Alternatively, you can use Databricks Autoloader for incremental ingestion of files. This feature can process new data files as they arrive in the cloud object stores. It maintains the state information at a checkpoint location in a key-value store called RocksDB. As the state is now maintained in the checkpoint, it can resume from where it was left off even in times of failure and can guarantee exactly-once semantics.
Another approach would be to maintain a control table to keep a track of the last load timestamp and keep comparing with the modified timestamps of your files to identify the new files and load them. This might need to be done in Python as no direct functions in Spark
Please go through the link https://community.databricks.com/t5/data-engineering/load-files-filtered-by-last-modified-in-pyspark/td-p/4159