Smart Data Pipeline Design: Check for Delta Table Changes with Minimal Overhead

Scenario

I have a notebook that processes hot data every 5 minutes. Meanwhile, another pipeline processes historical data, and I want to create a summary table that uses the hot data incrementally but refreshes entirely when the historical data changes.

Problem

Checking for changes in historical data every 5 minutes is inefficient, slows down the hot data pipeline, and increases costs. There are many potential solutions for this use case, but one approach I used has been working well.

Solution

Using Delta Table Version

Delta tables provide a variety of functions to access metadata without reading actual data files. For instance, you can retrieve the latest table version, which is highly efficient and typically takes less than a second.

dt = try_get_deltatable(f'/lakehouse/default/Tables/{schema}/scada', storage_options=storage_options)
if dt is None:
    current_version = -1
else:
    current_version = dt.version()

Storing Custom Metadata

You can store arbitrary metadata, such as a Python dictionary, when writing a Delta table. This metadata storage does not modify Parquet files and can contain information like who wrote the table or any custom data. In my case, I store the version of the historical table used in creating my summary table.

write_deltalake(Summary_table_path,
                df,
                mode="overwrite",
                storage_options= storage_options,
                custom_metadata = {'scada':str(current_version)},
                engine='rust')

and here is how this custom metadata is stored

Combining Both Methods

The hot data pipeline incrementally adds data and checks the version of the historical table, storing it in the summary table. If the stored version differs from the latest version, this indicates a change, triggering a full refresh of the summary table.

Example Scenarios

  • When the Historical Table Has Not Changed
  • When a Change is Detected in the Historical Table

Key Takeaway

The Python Delta package is a versatile tool that can solve complex data engineering challenges efficiently.

You can download the two notebooks here

Process 1 Billion rows of raw csv in Fabric Notebook for less than 20 Cents 

The Use case

Data source is around 2200 files with a total of 897 Million rows of weird csv files (the file has more columns than the header) , This is a real world data not some synthetic dataset, it is relatively small around 100 GB uncompressed.

The Pipeline will read those files and extract clean data from it using non trivial transformation and save it as a Delta Table.

we used the smallest Compute available in Fabric Notebook which is 4 cores with 32 GB. to be clear this is a real single node (not 1 driver and 1 executor), Although the Runtime is using Spark, All the Engines interact Directly with the Operating system, as far as I can tell, Spark has a very minimum overhead when not used Directly by the Python code.

You need to pick the Engine

Nowadays we have plenty of high quality Calculation Engines,  but two seems to gain traction (Polars and DuckDB) , at least by the number of package downloaded and the religious wars that seems to erupt occasionally in twitter 🙂

For a change I tried to use Polars, as I was accused of having a bias toward DuckDB, long story short, hit a bug with Polars , I tried Datafusion too but did managed to get a working code, there is not enough documentation on the web, after that I did test Clickhouse chdb, but find a bug, anyway the code is public, feel free to test your own Engine.

So I ended up using DuckDB, the code is published here , it is using only 60 files as it is available publicly, the whole archive is saved in my tenant (happy to share it if interested) 

The results is rather surprising (God bless Onelake throughput), I am using the excellent Python Package Delta Lake to write to Onelake

26 minutes, that’s freaking fast, using Fabric F2, the total cost will be

0.36 $/Hour X(26/60) =  15 Cents

you need to add a couple of cents for Onelake Storage Transactions.

As far as I can tell, this is maybe one of the cheapest option in the Market.

0.36 $/Hour is the rate for pay as you go, if you have a reservation then it is substantially cheaper.

because it is Delta Table Then Any Fabric Engine ( SQL, PowerBI, Spark) can read it.

What’s the catch ?

Today DuckDB can not write directly to Delta Table ( it is coming though eventually) instead it will export data to Delta Lake writer using Arrow Table, it is supposed to be zero copy but as far as I can tell, it is the biggest bottleneck and will generate out of memory errors , the solution is easy ; process the files in chunks , not all at once

#############################################
list_files=[os.path.basename(x) for x in glob.glob(Source+'*.CSV')]
files_to_upload_full_Path = [Source + i for i in list_files]
if len(files_to_upload_full_Path) >0 :
  for i in range(0, len(files_to_upload_full_Path), chunk_len):
    chunk = files_to_upload_full_Path[i:i + chunk_len]
    df=get_scada(chunk)
    write_deltalake("/lakehouse/default/Tables/scada_duckdb",df,mode="append",engine='rust',partition_by=['YEAR'],storage_options={"allow_unsafe_rename":"true"})
    del df

By experimentation, I notice 100 files works fine with 16 GB, 200 files with 32 GB etc

When exporting to Parquet, DuckDB managed the memory natively and it is faster too.

Native Lakehouse Is the future of Data Engineering

The combination of Open table format like Delta and Iceberg with ultra efficient Open Source Engine like DuckDB, Polars, Velox, datafusion all written in C++/Rust will give data engineers an extremely powerful tools to build more flexible and way cheaper data solutions.

if I have to give an advice for young Data engineers/Analysts, Learn Python/SQL.

Would like to thanks Pedro Holanda for fixing some very hard to reproduce bugs in the DuckDB csv reader.

And Ion Koutsouris for answering my silly questions about Delta lake writer.