Querying Azure storage using DuckDB

DuckDB just added support for fsspec, which make Querying Object store Like GCP and Azure storage possible, please notice AWS S3 API was natively supported already.

Previously to Query Azure storage, you had to use pyarrow dataset as a workaround, with the recent update, it is no more needed.

Here is a simple example, Querying a folder of Parquet files partitioned using Hive style, notice DuckDB is smart enough to recognize Date as a partition field

import duckdb
import adlfs ,os
from dotenv import load_dotenv
load_dotenv()
AZURE_STORAGE_ACCOUNT_NAME = os.getenv('AZURE_STORAGE_ACCOUNT_NAME') 
AZURE_STORAGE_ACCOUNT_KEY = os.getenv('AZURE_STORAGE_ACCOUNT_KEY') 
table_path = os.getenv('table_path') 
fs = adlfs.AzureBlobFileSystem(account_name=AZURE_STORAGE_ACCOUNT_NAME, account_key=AZURE_STORAGE_ACCOUNT_KEY )
con = duckdb.connect()
con.register_filesystem(fs)
df = con.execute(f'''
    select *
    from read_parquet('{table_path}/scada/data/*/*.parquet', hive_partitioning=true)
    limit 10
    '''
).df()
con.unregister_filesystem('abfs')
df

and here is the result

Make sure to have the file”.env” when running the notebook from your computer, here is an example how it looks like

As a PowerBI user, I see a potential for a lightweight ETL process using just python that do complex transformation and output the results as a parquet files which PowerBI can consume.

As I deal with small data( less than 30 GB), Apache Spark does not make much sense to me, Hopefully Synapse will provide us with a cheap single node Notebook experience. I suspect it may be useful for a lot of customers.

Using Apache Arrow Dataset to compact old partitions

It is trick I learn today and thought it maybe useful to share, I have a folder of parquet files, partitioned by day using Hive style, the data is ingested every 5 minutes which end up generating 288 small parquet files per day, it is rather nice for a write scenario but reading that data will be slow as it generate a big overhead opening individual files, it is a well documented problem, and more sophisticated table format like Delta Table and Iceberg fix the problem by using compaction, but it does not work with Python. ( Edit by Python, I mean Engine Like Data Fusion, DuckDB , Pandas not Spark which does not make sense for a small Dataset)

In my example I use Only Python and pyarrow dataset which does not support compaction, but maybe there is a solution.

Just for illustration, here is a view of my Bucket in Cloudflare R2 (Pyarrow support S3, GCP and Azure)

Warning : the code will delete existing files, use at your own risk

  • Read the existing partitions except today data, as you may end up having concurrent Write , which will corrupt your table.
  • Filter only the partitions that contains more than 1 file , something like this Using DuckDB
create view base  as select * from  parquet_scan('s3://delta/aemo/scada/data/*/*.parquet' , HIVE_PARTITIONING = 1,filename=1) where Date < '{cut_off}';
create  view  filter as select Date, count(distinct filename) as cnt from  base  group by 1 having cnt>1 
  • Read the data using the previous filter, again, we are not touching today Partition to avoid any conflicts
tb=con.execute('''select SETTLEMENTDATE,DUID,SCADAVALUE,file,cast(base.Date as date) as Date from base inner join filter on base.Date= filter.Date''').arrow()
ds.write_dataset(xx,"delta/aemo/scada/data/", filesystem=s3,format="parquet" , partitioning=['Date'],partitioning_flavor="hive",
     min_rows_per_group=120000,existing_data_behavior="delete_matching")

Again there is no support for transaction, if your code for whatever reason, did not complete, you will end up with unstable table

  • And here is the results, all old partitions have only 1 file

You need to run the Job only once a day, hopefully next year sometimes, either Apache Iceberg or Delta Table will provide compaction for the Python client, in the meantime maybe this approach is good enough :), you can see the full code here

Another approach is copy on write, basically every time you ingest a new data, you need to copy the existing data append it to the new data and overwrite existing files, but it maybe an expensive operation, specially if your job runs more frequently.

PowerBI Query plan when using Top N filter

The October release of PowerBI Desktop introduced a very interesting feature, Top N filtered is pushed down for Direct Query Sources, I thought I may give it a try and blog about it, for some reason it did not work, which is great for the purpose of this blog as if you came from a SQL Background you will be surprised how PowerBI DAX Engine Works.

let’s try with one table in BigQuery as an example, and ask this Question, what’s the top 5 Substation by Electricity produced, in PowerBI, it is a trivial exercise, just use the Top N filter

First Observation, 3.7 Second seems rather Slow, BigQuery or any Columnar Database should return the results way faster, specially that we are grouping by low cardinality columns ( around 250 distinct values)

Let’s try SQL Query in BigQuery Console

And let’s check the duration, 351 ms, the table has 91 Million records, that’s not bad at all, but we need to account for the data transfer latency to my laptop, still that does not explain the difference in duration !!!

DAX Engine Query Plan

let’s have a look at the Query Plan generated by DAX Engine using the excellent free tool, DAX Studio

That’s very strange, 2 SQL Query and 1 Second spent by the Formula Engine, and the two SQL Queries are not even in parallel

Looking at the SQL Queries, I think this is the logic of the Query Plan

  • Send a SQL Query to get the list of all the substation and the sum of of MWH.
  • Order the result using the Formula Engine and select 5 substation.
  • Send another SQL Query with a filter of those 5 substation

Probably you are wondering why this convoluted Query Plan, Surely DAX Engine can just send 1 SQL Query to get the results, why the two trips to the source system, which make the whole experience slow.

Vertipaq Don’t support Sort Operator

Vertipaq which is the internal storage engine of PowerBI does not support the sort operator, hence the previous Query do make sense if your Storage engine don’t support sort.

But My Source do support Sorting ?

That’s the new feature, DAX Engine will generate a different plan when the source system do support sorting.

Great, again , Why Vertipaq don’t support sort Operator ?

No idea, probably only a couple of engineers from Microsoft Know the answer.

Edit : 23 October 2022

Jeffrey Wang ( One of the Original Authors of DAX Engine) was very kind and provided this explanation why the optimization did not kick in for BigQuery

Multi fact support in DAX and Malloy

This is a quick blog showing how the two languages behave when dealing with multiple fact tables.

let’s start with a simple Model, Two Tables Budget and Actual storing items sold by country and color

Budget

Actual

For example we want to ask, how many items were sold by continent, we don’t have this information, we need a dimension table that map state to continent.

DAX

And the Data Model will look like this.

To get the results, we write this DAX Query in DAX Studio ( Btw, the new version 3 is very slick !!!)

DAX will generate two SQL Query to get the results from the two tables and merge the results, using the internal “Formula” Engine

Malloy

in Malloy we do the same by writing code, you can download the Data Model here

In DAX we use summarize columns to aggregate measures from different tables, as far as I can tell, Malloy don’t support this model yet, The tables Budget and Actual are independent, basically you need to manually join the two Queries generated from the two tables.

Query: Budget_by_state is Budget -> {
  aggregate:_Budget
  group_by : dim_state.state
}

Query: Actual_by_state is Actual ->{
  aggregate:Quantity
  group_by : dim_state.state
}
query: merge_results is from_sql(state_source_) {
   join_one: q2 is from(->Budget_by_state ) with state
   join_one: q3 is from(->Actual_by_state) with state
} ->{
  
  group_by : continent
  aggregate: QTY_Budget is sum(q2._Budget),QTY_Sold is sum(q3.Quantity)
}

And we get the same results, Malloy always generate one SQL Query, as there is no way to merge the results internally, as a matter of fact the only “calculation” engine is the SQL Database, which is in this particular case DuckDB.

Obviously you can always create new source by using state as a base table, but I don’t think it is a sustainable solution, as the whole point is to have One model that answers a lot of different Questions.

Take Away

Native support to multiple tables is obviously not unique to DAX, thoughtspot TML support it out of the Box, I hope Malloy developers consider this common scenario for future development.

%d bloggers like this: