Building an Ad Hoc Disk Cache with DuckDB and Fabric Notebook

This weekend, I came up with an idea to speed up query execution when running DuckDB inside a Fabric Notebook—and it actually works! 🎉

You can download the notebook here


Approach

  1. Parse the Query
    • Use SQGLot to parse the SQL query and extract the list of Delta tables that need to be scanned from OneLake.
  2. Track Table Metadata
    • Capture the Delta table version and ID to ensure consistency.
  3. Selective Copy
    • Copy only the necessary tables locally to satisfy the query.
  4. Reuse Cached Data
    • For subsequent queries, check if the Delta table has changed:
      • If unchanged, read data from the local SSD.
      • If new tables are required, repeat the caching process for those tables.

Why It Works

This approach effectively creates a temporary, ad hoc disk cache in the notebook. The cache:

  • Persists only for the session’s duration.
  • Evicts automatically when the session ends.
  • Ensures consistency by validating whether the Delta table in OneLake has changed before reusing cached data.
    • Thanks to the Delta format, this validation is a relatively cheap operation.
  • Leverages the user-level isolation in Fabric notebooks to eliminate risks of data inconsistency.

Despite its simplicity, this method has proven to be highly effective for query acceleration! 🚀


Limitations

Yes, I know—the cache is rather naïve since it loads the entire table. Other systems go further by:

  • Copying only the columns needed for the query.
  • Fetching just the row groups relevant to the query.

However, these optimizations would need to be implemented natively by the engine itself.


Industry Gap

Although virtually all Python engines (e.g., Polars, DataFusion, etc.) support reading formats like Delta and Iceberg, almost none offer built-in disk or RAM caching. This lack of caching support limits performance optimization opportunities.

Hopefully, this will change in the future, enabling more efficient workflows out of the box.

Btw, this is really fast !!! just a hint, this is faster than the results obtained by a state of the art DWH in 2022 !!!

Smart Data Pipeline Design: Check for Delta Table Changes with Minimal Overhead

Scenario

I have a notebook that processes hot data every 5 minutes. Meanwhile, another pipeline processes historical data, and I want to create a summary table that uses the hot data incrementally but refreshes entirely when the historical data changes.

Problem

Checking for changes in historical data every 5 minutes is inefficient, slows down the hot data pipeline, and increases costs. There are many potential solutions for this use case, but one approach I used has been working well.

Solution

Using Delta Table Version

Delta tables provide a variety of functions to access metadata without reading actual data files. For instance, you can retrieve the latest table version, which is highly efficient and typically takes less than a second.

dt = try_get_deltatable(f'/lakehouse/default/Tables/{schema}/scada', storage_options=storage_options)
if dt is None:
    current_version = -1
else:
    current_version = dt.version()

Storing Custom Metadata

You can store arbitrary metadata, such as a Python dictionary, when writing a Delta table. This metadata storage does not modify Parquet files and can contain information like who wrote the table or any custom data. In my case, I store the version of the historical table used in creating my summary table.

write_deltalake(Summary_table_path,
                df,
                mode="overwrite",
                storage_options= storage_options,
                custom_metadata = {'scada':str(current_version)},
                engine='rust')

and here is how this custom metadata is stored

Combining Both Methods

The hot data pipeline incrementally adds data and checks the version of the historical table, storing it in the summary table. If the stored version differs from the latest version, this indicates a change, triggering a full refresh of the summary table.

Example Scenarios

  • When the Historical Table Has Not Changed
  • When a Change is Detected in the Historical Table

Key Takeaway

The Python Delta package is a versatile tool that can solve complex data engineering challenges efficiently.

You can download the two notebooks here

Create Iceberg Table in Azure Storage using PostgreSQL as a catalog

Just sharing a notebook on how to load an iceberg table to ADLSGen2, I built it just for testing iceberg to delta conversion.

Unlike Delta, Iceberg requires a catalog in order to write data, there are a lot of options, from sqlite to full managed service like Snowflake Polaris, unfortunately pyiceberg has a bug when checking if a table exists in Polaris, the issue was fixed but it is not released yet.

SQLite is just local db, and I wanted to read and write those tables using my laptop and Fabric notebook, fortunately PostgreSQL is supported

I spinned up a PostgreSQL DB in Azure, I used the cheapest option possible, notice here, iceberg catalog doesn’t store any data, just a pointer to the latest snapshot, the DB is used for consistency guarantee not data storage.

Anyway, 24 AU $/ Month is not too bad. 

My initial plan was to use Daft for data preparation as it has a good integration with Iceberg catalog, unfortunately as of today, it does not support adding filename as a column in the table destination, so I endup using Duckdb for data preparation and Daft for reading from the catalog.

Daft added support for adding filename when reading from csv, so, it is used both for reading and writing.

The convenience of the catalog

what I really like about the catalog is the ease of use, you just need to initiate the catalog connection once.

def connect_catalog():
      catalog = SqlCatalog(
      "default",
      **{
          "uri"                : postgresql_db,
          "adlfs.account-name" : account_name ,
          "adlfs.account-key"  : AZURE_STORAGE_ACCOUNT_KEY,
          "adlfs.tenant-id"    : azure_storage_tenant_id,
          "py-io-impl"         : "pyiceberg.io.fsspec.FsspecFileIO",
          "legacy-current-snapshot-id": True
      },
                        )
      return catalog 

The writing is just one line of code, you don’t need to configure storage again

For writing , you just use something like this

catalog.create_table_if_not_exists('tbl', schema=df.schema, location=table_location + f'/{db}/{tbl}')
catalog.load_table(f'{db}.{tbl}').append(df)