I have a notebook that processes hot data every 5 minutes. Meanwhile, another pipeline processes historical data, and I want to create a summary table that uses the hot data incrementally but refreshes entirely when the historical data changes.
Problem
Checking for changes in historical data every 5 minutes is inefficient, slows down the hot data pipeline, and increases costs. There are many potential solutions for this use case, but one approach I used has been working well.
Solution
Using Delta Table Version
Delta tables provide a variety of functions to access metadata without reading actual data files. For instance, you can retrieve the latest table version, which is highly efficient and typically takes less than a second.
dt = try_get_deltatable(f'/lakehouse/default/Tables/{schema}/scada', storage_options=storage_options)
if dt is None:
current_version = -1
else:
current_version = dt.version()
Storing Custom Metadata
You can store arbitrary metadata, such as a Python dictionary, when writing a Delta table. This metadata storage does not modify Parquet files and can contain information like who wrote the table or any custom data. In my case, I store the version of the historical table used in creating my summary table.
The hot data pipeline incrementally adds data and checks the version of the historical table, storing it in the summary table. If the stored version differs from the latest version, this indicates a change, triggering a full refresh of the summary table.
Example Scenarios
When the Historical Table Has Not Changed
When a Change is Detected in the Historical Table
Key Takeaway
The Python Delta package is a versatile tool that can solve complex data engineering challenges efficiently.
Was doing some visualization in Fabric Python Notebook and I end up with this report, don’t worry, it is not the source table, my data is normalized, it is just a pivot table not a Delta Table.
It makes sense visually, I can easily see the numbers side by side, that’s a decent Pivot Table, there is something deeply human about seeing the raw numbers.
Note : Don’t read too much in the data itself, it is not very rigorous ( OS cache is messing everything up)
SQL Excel Style
Then I wanted to add a column to show the biggest number for every row. Yes, I know how to do it the Proper way 🙂 but why can’t I do it here without doing unpivot and stuff, after all it is trivial in Excel right ? turn out it is possible now using DuckDB SQL ( development build)
xx = duckdb.sql(f" select *, greatest(*columns(*exclude(query))) as max from result ")
xx
Yes, that’s right, it’s looking for the highest number horizontally !!! I think it is awesome, it seems only the greatest and the least are supported so far, but I will not be surprised if they add sum and addition after all there is no natural law that dictate a sum in SQL should accept only 1 column as a parameter.
I want to normalize the numbers , the worst performance will be 1, I can do that by dividing the columns values by the the column max, again, I am doing calculations at the row, level, probably an Excel user will wonder what’s the big deal about it 🙂
I want to count the worst Query by engine, I will just count all the rows with number 1
Now let’s use the original report “results” to see the total duration for all Engines
And geometric mean
Cast multiple columns in one go
This one was really helpful as I had a table with nearly 40 columns to cast to double, which is a trivial operation when using Dataframe API but a nightmare in SQL as you need to type every column name, with columns, it is trivial
cast(columns(*exclude(DUID,UNIT,SETTLEMENTDATE,I,XX,filename)) as double)
Take away
As an Excel user, who try to play a bit with SQL, sometimes I ask myself, surely there must be a better way, why SQL can’t do this or that easily ? and I remember asking the same question to an industry veteran a couple of years ago and his answer was basically the current status quo is the natural order of things and you should try a Dataframe API for that, but now I know it is not true, SQL can do anything in theory, no one has bothered to do the hard work, I had the same reaction when I “discovered” TSQL where it acts literally as a programming language with loops and if then !!!
Having said that, i feel the industry has more appetite for a change, notice for example how quickly DWH vendors adopted group by all, maybe it is more practical to enhance SQL than hope for the adoption of a standard DataFrame API ?
and by the way, if you want to do analysis using SQL do yourself a favor and use a notebook 🙂
For this example the table is not small ( 1.3 billion rows) , having filter pushdown is a must for a good user experience, for smaller data it does not matter.
let’s show 5 rows, the limit is pushed to the source, we don’t need to scan 20 GB just to see some rows.
display(duckdb.sql(f''' SELECT * from TAXI limit 5 ''').df())
Now let’s filter the data only for this year, again, filter pruning works, what I really like ; although the table is not partitioned somehow the scan is leveraging the stats in the delta table log
data = duckdb.sql(f''' SELECT date , ROUND (SUM (fare_amount),0) as TotalFares , ROUND (AVG (fare_amount),0) as AVGFares
FROM TAXI where year = 2024 GROUP BY ALL ''').df()
display(data)
Not everything is perfect yet 😦
max (column) , count(*) unfortunately does not use the delta log and trigger a whole table scan.
let’s show some interactive chart
I have the aggregated data already, using the excellent library Altair, I can easily plot an interactive chart
Alright using the term “idempotent” was just a clickbait 🙂 it simply means avoiding the processing of the same file multiple times ( ok for a formal definition wikipedia has a nice article). in this case the script will process only new files added to the source system, assuming the filenames are unique.
Although this is easily accomplished using Python API , achieving the same result using DuckDB SQL was a bit tricky until the introduction of SQL variables in the developpement branch a couple of weeks ago, particularly Variables can now be referenced within read files operation.
in summary this is what the SQL script is doing.
Creation of a Hive table in the file section of onelake, it is very important step, if you query an empty folder you will get an error, , as DuckDB does not support external table, we will just add a dummy parquet file with empty rows, it is filtered out in the next step.
COPY (
SELECT
'' AS filename,
NULL AS year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
PARTITION_BY (year),
OVERWRITE_OR_IGNORE ) ;
Listing of CSV files for ingestion from the source system.
Removal of already ingested files
SET
VARIABLE list_of_files = (
SELECT
list(file)
FROM
glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
WHERE
parse_filename(file) NOT IN (
SELECT
filename
FROM
read_parquet("/lakehouse/default/Files/scada/*/*.parquet")))
Generate list of new files and pass it to DuckDB for processing.
just a preview of the query plan for this step
Saving as partitioned Hive parquet folders
Note : I am using Fabric notebook as an example, but obviously you can run it anywhere, actually I did it run in my laptop and as far as I can tell, Onelake throughput is competitive with my ssd disk 🙂
COPY (
SELECT
'' AS filename,
NULL AS year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
PARTITION_BY (year),
OVERWRITE_OR_IGNORE ) ;
SET
VARIABLE list_of_files = (
SELECT
list(file)
FROM
glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
WHERE
parse_filename(file) NOT IN (
SELECT
filename
FROM
read_parquet("/lakehouse/default/Files/scada/*/*.parquet"))) ;
CREATE OR REPLACE VIEW
raw AS (
SELECT
*
FROM
read_csv(getvariable('list_of_files'),
Skip=1,
header =0,
all_varchar=1,
COLUMNS={ 'I': 'VARCHAR',
'UNIT': 'VARCHAR',
'XX': 'VARCHAR',
'VERSION': 'VARCHAR',
'SETTLEMENTDATE': 'VARCHAR',
'RUNNO': 'VARCHAR',
'DUID': 'VARCHAR',
'INTERVENTION': 'VARCHAR',
'DISPATCHMODE': 'VARCHAR',
'AGCSTATUS': 'VARCHAR',
'INITIALMW': 'VARCHAR',
'TOTALCLEARED': 'VARCHAR',
'RAMPDOWNRATE': 'VARCHAR',
'RAMPUPRATE': 'VARCHAR',
'LOWER5MIN': 'VARCHAR',
'LOWER60SEC': 'VARCHAR',
'LOWER6SEC': 'VARCHAR',
'RAISE5MIN': 'VARCHAR',
'RAISE60SEC': 'VARCHAR',
'RAISE6SEC': 'VARCHAR',
'MARGINAL5MINVALUE': 'VARCHAR',
'MARGINAL60SECVALUE': 'VARCHAR',
'MARGINAL6SECVALUE': 'VARCHAR',
'MARGINALVALUE': 'VARCHAR',
'VIOLATION5MINDEGREE': 'VARCHAR',
'VIOLATION60SECDEGREE': 'VARCHAR',
'VIOLATION6SECDEGREE': 'VARCHAR',
'VIOLATIONDEGREE': 'VARCHAR',
'LOWERREG': 'VARCHAR',
'RAISEREG': 'VARCHAR',
'AVAILABILITY': 'VARCHAR',
'RAISE6SECFLAGS': 'VARCHAR',
'RAISE60SECFLAGS': 'VARCHAR',
'RAISE5MINFLAGS': 'VARCHAR',
'RAISEREGFLAGS': 'VARCHAR',
'LOWER6SECFLAGS': 'VARCHAR',
'LOWER60SECFLAGS': 'VARCHAR',
'LOWER5MINFLAGS': 'VARCHAR',
'LOWERREGFLAGS': 'VARCHAR',
'RAISEREGAVAILABILITY': 'VARCHAR',
'RAISEREGENABLEMENTMAX': 'VARCHAR',
'RAISEREGENABLEMENTMIN': 'VARCHAR',
'LOWERREGAVAILABILITY': 'VARCHAR',
'LOWERREGENABLEMENTMAX': 'VARCHAR',
'LOWERREGENABLEMENTMIN': 'VARCHAR',
'RAISE6SECACTUALAVAILABILITY': 'VARCHAR',
'RAISE60SECACTUALAVAILABILITY': 'VARCHAR',
'RAISE5MINACTUALAVAILABILITY': 'VARCHAR',
'RAISEREGACTUALAVAILABILITY': 'VARCHAR',
'LOWER6SECACTUALAVAILABILITY': 'VARCHAR',
'LOWER60SECACTUALAVAILABILITY': 'VARCHAR',
'LOWER5MINACTUALAVAILABILITY': 'VARCHAR',
'LOWERREGACTUALAVAILABILITY': 'VARCHAR' },
filename =1,
null_padding = TRUE,
ignore_errors=1,
auto_detect=FALSE)
WHERE
I='D'
AND UNIT ='DUNIT'
AND VERSION = '3' ) ; COPY (
SELECT
UNIT,
DUID,
parse_filename(filename) AS filename,
CAST(COLUMNS(*EXCLUDE(DUID,
UNIT,
SETTLEMENTDATE,
I,
XX,
filename)) AS double),
CAST (SETTLEMENTDATE AS TIMESTAMPTZ) AS SETTLEMENTDATE,
isoyear (CAST (SETTLEMENTDATE AS timestamp)) AS year
FROM
raw) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
PARTITION_BY (year),
APPEND ) ;
While the Python API in a notebook is IMHO the easiest approach for data transformation, it is nice to see that Pure SQL can support it.
However, the real game-changer will be when DuckDB introduces support for Delta write. for now, you need another step to load those Parquet files to Delta either using Deltalake Python package or Spark Delta writer.