First Look at Open Table Format

TL; DR : I wrote a simple notebook how to create a Delta Table using PySpark, delete some rows and run some Queries using DuckDB.

When you write a record in a database, Like SQL Server, SQLite, it needs to store it somewhere, usually that location was your Disk, your local storage system is a very powerful system, you can write, read, modify existing file etc and it is very fast , so this problem was already solved 50 years ago. Why we should care about data lake Table Format (Delta, Iceberg etc).

The Internet happened

Internet tech company start getting massive amount of data generated by the exponential use of the internet, saving data in the local disk of Big Database is just too expensive, there was a need for a separate approach.

Separation of Storage and Compute

The solution was very simple separate the computer that run Queries from the system that store data, and both can be scaled independently, if you don’t need to run Analytical Queries fine just turn it off and keep adding data to your storage system. That was a genius idea!!! ( I think Google MapReduce was first then Yahoo basically copied the idea by creating the Open Source Hadoop, but it was slow then Spark tool over)

AWS S3

Another important event, in 2006, AWS, released S3, a massive Object store, basically you can save your files for a very low cost, but there is a catch, the files are immutable, you can’t simply open a file and do some modification and save it.

Basically, you can only add and delete files.

Table format solved this limitation by using some tricks, for example you want to delete some records, the DB will just add a new file with all the records and later when you read it, you just ignore those records. (it is called Merge on Read)

Another Approach is the DB will simply write a new file that don’t contains the delete records, and tag the existing file, so when you read it later, you will read the new file, as an example, if you have a table with 1 million records and want to delete 1 record, the DB will write a new file with 999999 records, that’s seems not very efficient to me !!!

Proprietary Table Format

Commercial DB Vendor like BigQuery (Capacitor), Snowflake (PAX) use it internally since day one, but it is Proprietary, you can’t read it using your Own Query Engine, although BigQuery provide an API that talk directly to the storage.

My understanding At that time Open Source system (Hadoop, Spark ) used a very simple table format called Hive, something that looks like this

But it did have a lot of problems, basically no ACID support , as an example, if someone is adding files when you are reading the table, you will get wrong results,  if you overwrite a table and your script timeout for any reason, your table will be corrupted ( I learn it the hard way).

Open Source Table Format

Open Source developers understood that they need to do something about it and they knew commercial DB vendors have already solved this problem, the result was those three Major format.

But at the very basic level, Table Storage Format is just a folder that contains

  • Data (usually) in a parquet format
  • Meta Data that contains the statistics of every file and its location

Something like this, Obviously the exact implementation details different for every format

Because it is open source, you will always find some developers who will build something slightly different, see databend as example, and even new Proprietary vendors , see FireBolt F3

I don’t delete why Should I care

That’s a respectable statement, even if all you do is read only and never delete, one may argue a folder of parquet file with a decent hive type partition is all you need, I still think even in that use case, Table Format maybe worth it, listing a directory is extremely slow operation and cost money, instead you can just read a metadata file and knows exactly the location of every file, not only that, some Query Engine are smart enough to read the statistics from the metadata file and skips files that don’t contains the Data needed for the Query.

Another example if you want to count the total records of a table or the min/max potentially you can just read it from the metadata and it will take millisecond.

Which one is better?

If you google it, you will find all kind of articles why one table format is better than the others, it is a very technical subject and I don’t know enough to comment on it, but my impression is Delta is rather controlled by Databricks ( and some features are still proprietary) which make the other vendors a bit suspicious and hence they are more into the Iceberg camp. (Snowflake, Trino, Dremio and to a less extend BigQuery)

Should you care about Table Format ?

Actually we should not, I never cared where SQL Server save its data, it is just there, and this is exactly what happening with Data Lake Vendors, they are slowly turning into a classical DW approach, surprisingly nowadays the best practise is to read the table using a catalog as there is no way to have a proper row level security, column masking just by reading from files. ( it is a bit ironic, it is like 2010 again)

Another interesting twist, Cloud DW vendors like Snowflake are adding support for Open Storage format as a native table, see this excellent presentation , Trino and Dremio has already a solid write and read support for Iceberg.

Microsoft and Google approach is to use Spark for Writing the Table Format and use their DW offering for read, I find this approach rather lazy. maybe it is just a workaround till they have a native support.

I suspect we will see a commoditization of open Table format this coming year(s), and users will just go back caring about what really matter;

Cost, Performance and Concurrency.

Querying Azure storage using DuckDB

DuckDB just added support for fsspec, which make Querying Object store Like GCP and Azure storage possible, please notice AWS S3 API was natively supported already.

Previously to Query Azure storage, you had to use pyarrow dataset as a workaround, with the recent update, it is no more needed.

Here is a simple example, Querying a folder of Parquet files partitioned using Hive style, notice DuckDB is smart enough to recognize Date as a partition field

import duckdb
import adlfs ,os
from dotenv import load_dotenv
load_dotenv()
AZURE_STORAGE_ACCOUNT_NAME = os.getenv('AZURE_STORAGE_ACCOUNT_NAME') 
AZURE_STORAGE_ACCOUNT_KEY = os.getenv('AZURE_STORAGE_ACCOUNT_KEY') 
table_path = os.getenv('table_path') 
fs = adlfs.AzureBlobFileSystem(account_name=AZURE_STORAGE_ACCOUNT_NAME, account_key=AZURE_STORAGE_ACCOUNT_KEY )
con = duckdb.connect()
con.register_filesystem(fs)
df = con.execute(f'''
    select *
    from read_parquet('{table_path}/scada/data/*/*.parquet', hive_partitioning=true)
    limit 10
    '''
).df()
con.unregister_filesystem('abfs')
df

and here is the result

Make sure to have the file”.env” when running the notebook from your computer, here is an example how it looks like

As a PowerBI user, I see a potential for a lightweight ETL process using just python that do complex transformation and output the results as a parquet files which PowerBI can consume.

As I deal with small data( less than 30 GB), Apache Spark does not make much sense to me, Hopefully Synapse will provide us with a cheap single node Notebook experience. I suspect it may be useful for a lot of customers.

Using Apache Arrow Dataset to compact old partitions

It is trick I learn today and thought it maybe useful to share, I have a folder of parquet files, partitioned by day using Hive style, the data is ingested every 5 minutes which end up generating 288 small parquet files per day, it is rather nice for a write scenario but reading that data will be slow as it generate a big overhead opening individual files, it is a well documented problem, and more sophisticated table format like Delta Table and Iceberg fix the problem by using compaction, but it does not work with Python. ( Edit by Python, I mean Engine Like Data Fusion, DuckDB , Pandas not Spark which does not make sense for a small Dataset)

In my example I use Only Python and pyarrow dataset which does not support compaction, but maybe there is a solution.

Just for illustration, here is a view of my Bucket in Cloudflare R2 (Pyarrow support S3, GCP and Azure)

Warning : the code will delete existing files, use at your own risk

  • Read the existing partitions except today data, as you may end up having concurrent Write , which will corrupt your table.
  • Filter only the partitions that contains more than 1 file , something like this Using DuckDB
create view base  as select * from  parquet_scan('s3://delta/aemo/scada/data/*/*.parquet' , HIVE_PARTITIONING = 1,filename=1) where Date < '{cut_off}';
create  view  filter as select Date, count(distinct filename) as cnt from  base  group by 1 having cnt>1 
  • Read the data using the previous filter, again, we are not touching today Partition to avoid any conflicts
tb=con.execute('''select SETTLEMENTDATE,DUID,SCADAVALUE,file,cast(base.Date as date) as Date from base inner join filter on base.Date= filter.Date''').arrow()
ds.write_dataset(xx,"delta/aemo/scada/data/", filesystem=s3,format="parquet" , partitioning=['Date'],partitioning_flavor="hive",
     min_rows_per_group=120000,existing_data_behavior="delete_matching")

Again there is no support for transaction, if your code for whatever reason, did not complete, you will end up with unstable table

  • And here is the results, all old partitions have only 1 file

You need to run the Job only once a day, hopefully next year sometimes, either Apache Iceberg or Delta Table will provide compaction for the Python client, in the meantime maybe this approach is good enough :), you can see the full code here

Another approach is copy on write, basically every time you ingest a new data, you need to copy the existing data append it to the new data and overwrite existing files, but it maybe an expensive operation, specially if your job runs more frequently.

PowerBI Query plan when using Top N filter

The October release of PowerBI Desktop introduced a very interesting feature, Top N filtered is pushed down for Direct Query Sources, I thought I may give it a try and blog about it, for some reason it did not work, which is great for the purpose of this blog as if you came from a SQL Background you will be surprised how PowerBI DAX Engine Works.

let’s try with one table in BigQuery as an example, and ask this Question, what’s the top 5 Substation by Electricity produced, in PowerBI, it is a trivial exercise, just use the Top N filter

First Observation, 3.7 Second seems rather Slow, BigQuery or any Columnar Database should return the results way faster, specially that we are grouping by low cardinality columns ( around 250 distinct values)

Let’s try SQL Query in BigQuery Console

And let’s check the duration, 351 ms, the table has 91 Million records, that’s not bad at all, but we need to account for the data transfer latency to my laptop, still that does not explain the difference in duration !!!

DAX Engine Query Plan

let’s have a look at the Query Plan generated by DAX Engine using the excellent free tool, DAX Studio

That’s very strange, 2 SQL Query and 1 Second spent by the Formula Engine, and the two SQL Queries are not even in parallel

Looking at the SQL Queries, I think this is the logic of the Query Plan

  • Send a SQL Query to get the list of all the substation and the sum of of MWH.
  • Order the result using the Formula Engine and select 5 substation.
  • Send another SQL Query with a filter of those 5 substation

Probably you are wondering why this convoluted Query Plan, Surely DAX Engine can just send 1 SQL Query to get the results, why the two trips to the source system, which make the whole experience slow.

Vertipaq Don’t support Sort Operator

Vertipaq which is the internal storage engine of PowerBI does not support the sort operator, hence the previous Query do make sense if your Storage engine don’t support sort.

But My Source do support Sorting ?

That’s the new feature, DAX Engine will generate a different plan when the source system do support sorting.

Great, again , Why Vertipaq don’t support sort Operator ?

No idea, probably only a couple of engineers from Microsoft Know the answer.

Edit : 23 October 2022

Jeffrey Wang ( One of the Original Authors of DAX Engine) was very kind and provided this explanation why the optimization did not kick in for BigQuery