Building Idempotent Data Pipelines with DuckDB SQL

Alright using the term “idempotent” was just a clickbait πŸ™‚ it simply means avoiding the processing of the same file multiple times ( ok for a formal definition wikipedia has a nice article). in this case the script will process only new files added to the source system, assuming the filenames are unique.

Although this is easily accomplished using Python API , achieving the same result using DuckDB SQL was a bit tricky until the introduction of SQL variables in the developpement branch a couple of weeks ago, particularly Variables can now be referenced within read files operation.

in summary this is what the SQL script is doing.

  1. Creation of a Hive table in the file section of onelake, it is very important step, if you query an empty folder you will get an error, , as DuckDB does not support external table, we will just add a dummy parquet file with empty rows, it is filtered out in the next step.
  COPY (
  SELECT
    '' AS filename,
    NULL AS year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
    PARTITION_BY (year),
    OVERWRITE_OR_IGNORE ) ;
  1. Listing of CSV files for ingestion from the source system.
  2. Removal of already ingested files
SET
  VARIABLE list_of_files = (
  SELECT
    list(file)
  FROM
    glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
  WHERE
    parse_filename(file) NOT IN (
    SELECT
      filename
    FROM
      read_parquet("/lakehouse/default/Files/scada/*/*.parquet")))
  1. Generate list of new files and pass it to DuckDB for processing.

just a preview of the query plan for this step

  1. Saving as partitioned Hive parquet folders

Note : I am using Fabric notebook as an example, but obviously you can run it anywhere, actually I did it run in my laptop and as far as I can tell, Onelake throughput is competitive with my ssd disk πŸ™‚

  COPY (
  SELECT
    '' AS filename,
    NULL AS year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
    PARTITION_BY (year),
    OVERWRITE_OR_IGNORE ) ;
SET
  VARIABLE list_of_files = (
  SELECT
    list(file)
  FROM
    glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
  WHERE
    parse_filename(file) NOT IN (
    SELECT
      filename
    FROM
      read_parquet("/lakehouse/default/Files/scada/*/*.parquet"))) ;
CREATE OR REPLACE VIEW
  raw AS (
  SELECT
    *
  FROM
    read_csv(getvariable('list_of_files'),
      Skip=1,
      header =0,
      all_varchar=1,
      COLUMNS={ 'I': 'VARCHAR',
      'UNIT': 'VARCHAR',
      'XX': 'VARCHAR',
      'VERSION': 'VARCHAR',
      'SETTLEMENTDATE': 'VARCHAR',
      'RUNNO': 'VARCHAR',
      'DUID': 'VARCHAR',
      'INTERVENTION': 'VARCHAR',
      'DISPATCHMODE': 'VARCHAR',
      'AGCSTATUS': 'VARCHAR',
      'INITIALMW': 'VARCHAR',
      'TOTALCLEARED': 'VARCHAR',
      'RAMPDOWNRATE': 'VARCHAR',
      'RAMPUPRATE': 'VARCHAR',
      'LOWER5MIN': 'VARCHAR',
      'LOWER60SEC': 'VARCHAR',
      'LOWER6SEC': 'VARCHAR',
      'RAISE5MIN': 'VARCHAR',
      'RAISE60SEC': 'VARCHAR',
      'RAISE6SEC': 'VARCHAR',
      'MARGINAL5MINVALUE': 'VARCHAR',
      'MARGINAL60SECVALUE': 'VARCHAR',
      'MARGINAL6SECVALUE': 'VARCHAR',
      'MARGINALVALUE': 'VARCHAR',
      'VIOLATION5MINDEGREE': 'VARCHAR',
      'VIOLATION60SECDEGREE': 'VARCHAR',
      'VIOLATION6SECDEGREE': 'VARCHAR',
      'VIOLATIONDEGREE': 'VARCHAR',
      'LOWERREG': 'VARCHAR',
      'RAISEREG': 'VARCHAR',
      'AVAILABILITY': 'VARCHAR',
      'RAISE6SECFLAGS': 'VARCHAR',
      'RAISE60SECFLAGS': 'VARCHAR',
      'RAISE5MINFLAGS': 'VARCHAR',
      'RAISEREGFLAGS': 'VARCHAR',
      'LOWER6SECFLAGS': 'VARCHAR',
      'LOWER60SECFLAGS': 'VARCHAR',
      'LOWER5MINFLAGS': 'VARCHAR',
      'LOWERREGFLAGS': 'VARCHAR',
      'RAISEREGAVAILABILITY': 'VARCHAR',
      'RAISEREGENABLEMENTMAX': 'VARCHAR',
      'RAISEREGENABLEMENTMIN': 'VARCHAR',
      'LOWERREGAVAILABILITY': 'VARCHAR',
      'LOWERREGENABLEMENTMAX': 'VARCHAR',
      'LOWERREGENABLEMENTMIN': 'VARCHAR',
      'RAISE6SECACTUALAVAILABILITY': 'VARCHAR',
      'RAISE60SECACTUALAVAILABILITY': 'VARCHAR',
      'RAISE5MINACTUALAVAILABILITY': 'VARCHAR',
      'RAISEREGACTUALAVAILABILITY': 'VARCHAR',
      'LOWER6SECACTUALAVAILABILITY': 'VARCHAR',
      'LOWER60SECACTUALAVAILABILITY': 'VARCHAR',
      'LOWER5MINACTUALAVAILABILITY': 'VARCHAR',
      'LOWERREGACTUALAVAILABILITY': 'VARCHAR' },
      filename =1,
      null_padding = TRUE,
      ignore_errors=1,
      auto_detect=FALSE)
  WHERE
    I='D'
    AND UNIT ='DUNIT'
    AND VERSION = '3' ) ; COPY (
  SELECT
    UNIT,
    DUID,
    parse_filename(filename) AS filename,
    CAST(COLUMNS(*EXCLUDE(DUID,
          UNIT,
          SETTLEMENTDATE,
          I,
          XX,
          filename)) AS double),
    CAST (SETTLEMENTDATE AS TIMESTAMPTZ) AS SETTLEMENTDATE,
    isoyear (CAST (SETTLEMENTDATE AS timestamp)) AS year
  FROM
    raw) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET,
    PARTITION_BY (year),
    APPEND ) ;

While the Python API in a notebook is IMHO  the easiest approach for data transformation, it is nice to see that Pure SQL can support it. 

However, the real game-changer will be when DuckDB introduces support for Delta write. for now, you need another step to load those Parquet files to Delta either using Deltalake Python package or Spark Delta writer.

How to attach Onelake Lakehouse to DuckDB

Update 26-Oct-2024 : using DuckDB 1.1.2, you don’t need to to mount a lakehouse to the notebooks and add support for reading Onelake Lakehouse outside of Fabric . currently it is read only, for writing you need Delta_rs

it is a very simple Python script how you can attach a Lakehouse to DuckDB in a Fabric notebook (you can use the same logic for Polars,Daft etc)

it is read only and will create views based on your existing Delta tables, it assumes you are using schemas, but you can edit it for simpler use cases, or if you have a lot of tables, maybe it is more practical just to attach one specific schema.

import duckdb
from glob import glob
def attach_lakehouse(base_path): 
    list_tables = glob(f"{base_path}*/*/", recursive=True)
    sql_schema = set()
    sql_statements = set()
    for table_path in list_tables:
        parts = table_path.strip("/").split("/")
        schema = parts[-2]
        table = parts[-1]
        sql_schema.add(f"CREATE SCHEMA IF NOT EXISTS {schema};")
        sql_statements.add(f"CREATE OR REPLACE VIEW {schema}.{table} AS SELECT * FROM delta_scan('{table_path}');")
    duckdb.sql(" ".join(sql_schema))
    duckdb.sql(" ".join(sql_statements))
    display(duckdb.sql("SHOW ALL TABLES").df())
attach_lakehouse('/lakehouse/default/Tables/')

and here is an example

now you can read and joins any tables even from different schemas

Notice Delta support in DuckDB is not very performant at this stage, compared to pure Parquet, but I suspect we will see a massive improvement in the next version 1.1

Some observations on single node vs distributed systems.Β 

Was doing an experimentation in a Fabric notebook, running TPCH benchmarks and kept increasing the size from 1 GB to 1 TB, using Spark, DuckDB reading both parquet files and native format,  it is not a very rigorous test, I was more interested about the overall system behavior then specific numbers, and again benchmarks means nothing, the only thing that matter is the actual workload.

The notebook cluster has 1 driver and up to 9 executors, spark dynamically add and remove executors , DuckDB is a single node system and runs only inside the driver.

Some observations

  • When the data fits into one machine, it will run faster than a distributed system, (assuming the engines have the same performance), in this test up to 100 GB , DuckDB Parquet is faster than Spark.
  • It is clear that DuckDB is optimized for its native file format, and there are a lot of opportunities to improve Parquet performance.
  • At 700 GB, DuckDB native file  is still competitive with Spark, even with multiple nodes, that’s very interesting technical achievement, although not very useful in Fabric ecosystem as only DuckDB can read it.
  • At 1 TB, DuckDB parquet timeout and the performance of the native file format degraded significantly, it is an indication we need a bigger machine.  

Although clearly I am a DuckDB fan, I appreciate the dynamic allocation of Spark resources,Spark is popular for reason πŸ™‚

Yes I could have used a bigger machine for DuckDB, but that’s a manual process and does changes based on the specific workload, one can imagine a world where a single node get resources dynamically based on the workload.

I think the main takeaway, if a workload fits into a single machine then that will give you the best performance you can get.

Edit : to be clear, so far the main use case for Something like DuckDB inside Fabric is cheap ETL, I use it for more than a year and it works great, specially with the fact that single node notebooks in Fabric start in less than 10 seconds.

Process 1 Billion rows of raw csv in Fabric Notebook for less than 20 CentsΒ 

The Use case

Data source is around 2200 files with a total of 897 Million rows of weird csv files (the file has more columns than the header)Β , This is a real world data not some synthetic dataset, it is relatively small around 100 GB uncompressed.

The Pipeline will read those files and extract clean data from it using non trivial transformation and save it as a Delta Table.

we used the smallest Compute available in Fabric Notebook which is 4 cores with 32 GB. to be clear this is a real single node (not 1 driver and 1 executor), Although the Runtime is using Spark, All the Engines interact Directly with the Operating system, as far as I can tell, Spark has a very minimum overhead when not used Directly by the Python code.

You need to pick the Engine

Nowadays we have plenty of high quality Calculation Engines,  but two seems to gain traction (Polars and DuckDB) , at least by the number of package downloaded and the religious wars that seems to erupt occasionally in twitter πŸ™‚

For a change I tried to use Polars, as I was accused of having a bias toward DuckDB, long story short, hit a bug with Polars , I tried Datafusion too but did managed to get a working code, there is not enough documentation on the web, after that I did test Clickhouse chdb, but find a bug, anyway the code is public, feel free to test your own Engine.

So I ended up using DuckDB, the code is published here , it is using only 60 files as it is available publicly, the whole archive is saved in my tenant (happy to share it if interested) 

The results is rather surprising (God bless Onelake throughput), I am using the excellent Python Package Delta Lake to write to Onelake

26 minutes, that’s freaking fast, using Fabric F2, the total cost will be

0.36 $/Hour X(26/60) =  15 Cents

you need to add a couple of cents for Onelake Storage Transactions.

As far as I can tell, this is maybe one of the cheapest option in the Market.

0.36 $/Hour is the rate for pay as you go, if you have a reservation then it is substantially cheaper.

because it is Delta Table Then Any Fabric Engine ( SQL, PowerBI, Spark) can read it.

What’s the catch ?

Today DuckDB can not write directly to Delta Table ( it is coming though eventually) instead it will export data to Delta Lake writer using Arrow Table, it is supposed to be zero copy but as far as I can tell, it is the biggest bottleneck and will generate out of memory errors , the solution is easy ; process the files in chunks , not all at once

#############################################
list_files=[os.path.basename(x) for x in glob.glob(Source+'*.CSV')]
files_to_upload_full_Path = [Source + i for i in list_files]
if len(files_to_upload_full_Path) >0 :
  for i in range(0, len(files_to_upload_full_Path), chunk_len):
    chunk = files_to_upload_full_Path[i:i + chunk_len]
    df=get_scada(chunk)
    write_deltalake("/lakehouse/default/Tables/scada_duckdb",df,mode="append",engine='rust',partition_by=['YEAR'],storage_options={"allow_unsafe_rename":"true"})
    del df

By experimentation, I notice 100 files works fine with 16 GB, 200 files with 32 GB etc

When exporting to Parquet, DuckDB managed the memory natively and it is faster too.

Native Lakehouse Is the future of Data Engineering

The combination of Open table format like Delta and Iceberg with ultra efficient Open Source Engine like DuckDB, Polars, Velox, datafusion all written in C++/Rust will give data engineers an extremely powerful tools to build more flexible and way cheaper data solutions.

if I have to give an advice for young Data engineers/Analysts, Learn Python/SQL.

Would like to thanks Pedro Holanda for fixing some very hard to reproduce bugs in the DuckDB csv reader.

And Ion Koutsouris for answering my silly questions about Delta lake writer.