Querying Azure storage using DuckDB

DuckDB just added support for fsspec, which make Querying Object store Like GCP and Azure storage possible, please notice AWS S3 API was natively supported already.

Previously to Query Azure storage, you had to use pyarrow dataset as a workaround, with the recent update, it is no more needed.

Here is a simple example, Querying a folder of Parquet files partitioned using Hive style, notice DuckDB is smart enough to recognize Date as a partition field

import duckdb
import adlfs ,os
from dotenv import load_dotenv
load_dotenv()
AZURE_STORAGE_ACCOUNT_NAME = os.getenv('AZURE_STORAGE_ACCOUNT_NAME') 
AZURE_STORAGE_ACCOUNT_KEY = os.getenv('AZURE_STORAGE_ACCOUNT_KEY') 
table_path = os.getenv('table_path') 
fs = adlfs.AzureBlobFileSystem(account_name=AZURE_STORAGE_ACCOUNT_NAME, account_key=AZURE_STORAGE_ACCOUNT_KEY )
con = duckdb.connect()
con.register_filesystem(fs)
df = con.execute(f'''
    select *
    from read_parquet('{table_path}/scada/data/*/*.parquet', hive_partitioning=true)
    limit 10
    '''
).df()
con.unregister_filesystem('abfs')
df

and here is the result

Make sure to have the file”.env” when running the notebook from your computer, here is an example how it looks like

As a PowerBI user, I see a potential for a lightweight ETL process using just python that do complex transformation and output the results as a parquet files which PowerBI can consume.

As I deal with small data( less than 30 GB), Apache Spark does not make much sense to me, Hopefully Synapse will provide us with a cheap single node Notebook experience. I suspect it may be useful for a lot of customers.

Using Apache Arrow Dataset to compact old partitions

It is trick I learn today and thought it maybe useful to share, I have a folder of parquet files, partitioned by day using Hive style, the data is ingested every 5 minutes which end up generating 288 small parquet files per day, it is rather nice for a write scenario but reading that data will be slow as it generate a big overhead opening individual files, it is a well documented problem, and more sophisticated table format like Delta Table and Iceberg fix the problem by using compaction, but it does not work with Python. ( Edit by Python, I mean Engine Like Data Fusion, DuckDB , Pandas not Spark which does not make sense for a small Dataset)

In my example I use Only Python and pyarrow dataset which does not support compaction, but maybe there is a solution.

Just for illustration, here is a view of my Bucket in Cloudflare R2 (Pyarrow support S3, GCP and Azure)

Warning : the code will delete existing files, use at your own risk

  • Read the existing partitions except today data, as you may end up having concurrent Write , which will corrupt your table.
  • Filter only the partitions that contains more than 1 file , something like this Using DuckDB
create view base  as select * from  parquet_scan('s3://delta/aemo/scada/data/*/*.parquet' , HIVE_PARTITIONING = 1,filename=1) where Date < '{cut_off}';
create  view  filter as select Date, count(distinct filename) as cnt from  base  group by 1 having cnt>1 
  • Read the data using the previous filter, again, we are not touching today Partition to avoid any conflicts
tb=con.execute('''select SETTLEMENTDATE,DUID,SCADAVALUE,file,cast(base.Date as date) as Date from base inner join filter on base.Date= filter.Date''').arrow()
ds.write_dataset(xx,"delta/aemo/scada/data/", filesystem=s3,format="parquet" , partitioning=['Date'],partitioning_flavor="hive",
     min_rows_per_group=120000,existing_data_behavior="delete_matching")

Again there is no support for transaction, if your code for whatever reason, did not complete, you will end up with unstable table

  • And here is the results, all old partitions have only 1 file

You need to run the Job only once a day, hopefully next year sometimes, either Apache Iceberg or Delta Table will provide compaction for the Python client, in the meantime maybe this approach is good enough :), you can see the full code here

Another approach is copy on write, basically every time you ingest a new data, you need to copy the existing data append it to the new data and overwrite existing files, but it maybe an expensive operation, specially if your job runs more frequently.

Running a Serverless DuckDB on Google Cloud

TL;DR : it easy to setup and works relatively well, but there is a catch, watch out for Cloud Storage throughput. I shared a notebook here

in previous blog, I showed a POC how to run Queries from a Colab notebook against a delta lake table, but what if you want to run the same Query from other tools, or if you want to run a Query in a different region and avoid egress fees, turn out it is extremely easy to setup.

And here is an Overall View of the architecture. the most important decision is to make sure Cloud Storage and Cloud function are in the same region, in my case “us-central1”, you can call the function from anywhere.

Google Cloud Functions

As I said before the code is very simple, I spent some time googling to convert the results from a byte to json to dataframe, I think beside BigQuery, Google Cloud function is the easiest service to setup, just write your code and Google Cloud handle the rest , just for fun, I used a machine with 8 CPU and 32 GB of RAM.

import pyarrow.dataset as ds
import duckdb
import json
lineitem = ds.dataset("gs://xxxxx/lineitem",format="parquet", partitioning="hive")
con = duckdb.connect()
def Query(request):
    SQL = request.get_json().get('name')
    df = con.execute(SQL).df()
    return json.dumps(df.to_json(orient="records")), 200, {'Content-Type': 'application/json'}

How to Call web API

a lot of tool can send a web api call as long as it has the correct authentication, I started with a python script for a simple reason as it was the easiest to get the code from the internet.

Here is the interesting part, you write any arbitrary SQL code, Then you send an API call, in return you get a json with the results, a user don’t need to know anything about the cloud function, all he needs is the web address and write a correct SQL Query.

Performance considerations

This is where it get very interesting, I have no clue where is the bottleneck, but we can ask some Questions

Cold Start

  • The First example was 334 ms, that’s impressive, but I was cheating, I showed the best case scenario, Google Cloud function or more precisely Cloud Run was already running so no cold start and DuckDB was running a local Query which did not require a call to Cloud storage
  • Currently Cold Start for Cloud run Gen2 is around 10 second, notice it is still in preview.

Transfer from Google Cloud Storage

let’s try this simple Query, we get the result in 29 second

The Same Query using my Laptop, 600 ms, btw table lineitem contains 60 Million rows !!!

I don’t know why the massive difference, I presume network speed is limited, but when I look at the the bucket stats, it is actually very good, nearly 500 MB/S

I am no expert in network, but that number don’t seems right !!, when I check how much Data the cloud function is receiving then the whole discrepancy start making more sense, in average I am getting around 30 MB/s, the maximum was 50 MB/s , I have to say this is really slow !!!

File Pruning

Arrow dataset is smart enough to prune columns that are not used for partitions, in this Query, I made a filter on L_shipdate , notice the parquet file was sorted on that field, and as expected the performance is very good 1.7 second, DuckDB scan only the row groups that contains the date ‘1998-09-02’

Dataset Catalog

I am defining a very rudimentary catalog, the user can just call ” Show Tables”

You can even check the table schema

Cloud Storage throughput is the bottleneck.

having a speed of 30 MB/S make the whole setup just good for POC or doing a Query on a small dataset , I don’t know the reason why such a poor performance from Cloud Run, I suspect Apache arrow implementation is not optimized for GCP although it works very well in a local file system.

Another missing piece is the lack of cache, it would have being good if somehow DuckDB cache the data already Queried, but cache is very hard to implement specially if you want cache invalidation, and you risk reinventing a full Data warehouse. I genuinely hope it is a bug and cloud run can provide a better network performance.

Poor Man’s lakehouse using Cloud Storage, Delta lake and DuckDB

TL;DR : a proof of concept how to assemble a ‘Toy’ Lakehouse using Delta Table and DuckDB, You can download the Notebook here . Cloud storage throughput is the bottleneck of the system, I will appreciate a vote on this feature request

Delta lake one of the main storage file format used by Databricks and Microsoft has an experimental support for a standalone reader, it means you don’t need a big Data Engine to read it nor to write it, it is experimental at this stage but under active development.

I already blogged about it already, But I was using my laptop, turn out it works relatively well using any major Cloud storage provider, initially I tried Azure Cloud Storage and it did works, but I could not find any free Azure notebook offering, and I am not interested in paying any egress fees, instead I end up used Google Cloud Storage and Colab

I would like to talk more about arrow dataset, which I think is an amazing technology

The Overall Idea is simple, I have a delta table in a cloud storage created by something like Apache Spark, DuckDB can’t read Delta Directly but instead I am using the Delta lake python package that can produce an arrow Dataset whch can be Queried by DuckDB or any other engine that support Arrow.

Arrow Dataset

Let’s look at this section of the code, as per the documentation, an arrow dataset does not copy the data but it is like a Virtual Table that knows about all the files inside that particular path, what’s exciting in theory Engine does not need to know about the storage at all, if it is csv, parquet or something else.

For example in the future, a Query engine would not even care if the Table is Delta or Iceberg, obviously it is not the case today but there is no reason it will not happen.

Filter Pushdown

When you read data from a cloud storage, for latency issues, it make sense to read the minimum possible number of files, currently only filter partition works, but they are working on adding filter on any columns

Final results

Currently, it is not particularly fast, but I can Query the Data Directly from a cloud storage and show arbitrary chart.

Local Cache

If the data is small and fit the notebook SSD, and does not change very often, it make sense to first download the data into a local DuckDB Database file and run Queries locally , it will be substantially Faster.

Take Away

I was a bit suspicious about this whole Lake House thing, but maybe I was wrong about it, having an open storage format will open all kind of interesting possibilities and that’s a very good thing.

As you can see, anyone can build a lake house, now we need to figure out a boring details, the overall performance of the system 🙂

%d bloggers like this: