First Look at Open Table Format

TL; DR : I wrote a simple notebook how to create a Delta Table using PySpark, delete some rows and run some Queries using DuckDB.

When you write a record in a database, Like SQL Server, SQLite, it needs to store it somewhere, usually that location was your Disk, your local storage system is a very powerful system, you can write, read, modify existing file etc and it is very fast , so this problem was already solved 50 years ago. Why we should care about data lake Table Format (Delta, Iceberg etc).

The Internet happened

Internet tech company start getting massive amount of data generated by the exponential use of the internet, saving data in the local disk of Big Database is just too expensive, there was a need for a separate approach.

Separation of Storage and Compute

The solution was very simple separate the computer that run Queries from the system that store data, and both can be scaled independently, if you don’t need to run Analytical Queries fine just turn it off and keep adding data to your storage system. That was a genius idea!!! ( I think Google MapReduce was first then Yahoo basically copied the idea by creating the Open Source Hadoop, but it was slow then Spark tool over)

AWS S3

Another important event, in 2006, AWS, released S3, a massive Object store, basically you can save your files for a very low cost, but there is a catch, the files are immutable, you can’t simply open a file and do some modification and save it.

Basically, you can only add and delete files.

Table format solved this limitation by using some tricks, for example you want to delete some records, the DB will just add a new file with all the records and later when you read it, you just ignore those records. (it is called Merge on Read)

Another Approach is the DB will simply write a new file that don’t contains the delete records, and tag the existing file, so when you read it later, you will read the new file, as an example, if you have a table with 1 million records and want to delete 1 record, the DB will write a new file with 999999 records, that’s seems not very efficient to me !!!

Proprietary Table Format

Commercial DB Vendor like BigQuery (Capacitor), Snowflake (PAX) use it internally since day one, but it is Proprietary, you can’t read it using your Own Query Engine, although BigQuery provide an API that talk directly to the storage.

My understanding At that time Open Source system (Hadoop, Spark ) used a very simple table format called Hive, something that looks like this

But it did have a lot of problems, basically no ACID support , as an example, if someone is adding files when you are reading the table, you will get wrong results,  if you overwrite a table and your script timeout for any reason, your table will be corrupted ( I learn it the hard way).

Open Source Table Format

Open Source developers understood that they need to do something about it and they knew commercial DB vendors have already solved this problem, the result was those three Major format.

But at the very basic level, Table Storage Format is just a folder that contains

  • Data (usually) in a parquet format
  • Meta Data that contains the statistics of every file and its location

Something like this, Obviously the exact implementation details different for every format

Because it is open source, you will always find some developers who will build something slightly different, see databend as example, and even new Proprietary vendors , see FireBolt F3

I don’t delete why Should I care

That’s a respectable statement, even if all you do is read only and never delete, one may argue a folder of parquet file with a decent hive type partition is all you need, I still think even in that use case, Table Format maybe worth it, listing a directory is extremely slow operation and cost money, instead you can just read a metadata file and knows exactly the location of every file, not only that, some Query Engine are smart enough to read the statistics from the metadata file and skips files that don’t contains the Data needed for the Query.

Another example if you want to count the total records of a table or the min/max potentially you can just read it from the metadata and it will take millisecond.

Which one is better?

If you google it, you will find all kind of articles why one table format is better than the others, it is a very technical subject and I don’t know enough to comment on it, but my impression is Delta is rather controlled by Databricks ( and some features are still proprietary) which make the other vendors a bit suspicious and hence they are more into the Iceberg camp. (Snowflake, Trino, Dremio and to a less extend BigQuery)

Should you care about Table Format ?

Actually we should not, I never cared where SQL Server save its data, it is just there, and this is exactly what happening with Data Lake Vendors, they are slowly turning into a classical DW approach, surprisingly nowadays the best practise is to read the table using a catalog as there is no way to have a proper row level security, column masking just by reading from files. ( it is a bit ironic, it is like 2010 again)

Another interesting twist, Cloud DW vendors like Snowflake are adding support for Open Storage format as a native table, see this excellent presentation , Trino and Dremio has already a solid write and read support for Iceberg.

Microsoft and Google approach is to use Spark for Writing the Table Format and use their DW offering for read, I find this approach rather lazy. maybe it is just a workaround till they have a native support.

I suspect we will see a commoditization of open Table format this coming year(s), and users will just go back caring about what really matter;

Cost, Performance and Concurrency.

Advertisement

Querying Azure storage using DuckDB

DuckDB just added support for fsspec, which make Querying Object store Like GCP and Azure storage possible, please notice AWS S3 API was natively supported already.

Previously to Query Azure storage, you had to use pyarrow dataset as a workaround, with the recent update, it is no more needed.

Here is a simple example, Querying a folder of Parquet files partitioned using Hive style, notice DuckDB is smart enough to recognize Date as a partition field

import duckdb
import adlfs ,os
from dotenv import load_dotenv
load_dotenv()
AZURE_STORAGE_ACCOUNT_NAME = os.getenv('AZURE_STORAGE_ACCOUNT_NAME') 
AZURE_STORAGE_ACCOUNT_KEY = os.getenv('AZURE_STORAGE_ACCOUNT_KEY') 
table_path = os.getenv('table_path') 
fs = adlfs.AzureBlobFileSystem(account_name=AZURE_STORAGE_ACCOUNT_NAME, account_key=AZURE_STORAGE_ACCOUNT_KEY )
con = duckdb.connect()
con.register_filesystem(fs)
df = con.execute(f'''
    select *
    from read_parquet('{table_path}/scada/data/*/*.parquet', hive_partitioning=true)
    limit 10
    '''
).df()
con.unregister_filesystem('abfs')
df

and here is the result

Make sure to have the file”.env” when running the notebook from your computer, here is an example how it looks like

As a PowerBI user, I see a potential for a lightweight ETL process using just python that do complex transformation and output the results as a parquet files which PowerBI can consume.

As I deal with small data( less than 30 GB), Apache Spark does not make much sense to me, Hopefully Synapse will provide us with a cheap single node Notebook experience. I suspect it may be useful for a lot of customers.

Using Apache Arrow Dataset to compact old partitions

It is trick I learn today and thought it maybe useful to share, I have a folder of parquet files, partitioned by day using Hive style, the data is ingested every 5 minutes which end up generating 288 small parquet files per day, it is rather nice for a write scenario but reading that data will be slow as it generate a big overhead opening individual files, it is a well documented problem, and more sophisticated table format like Delta Table and Iceberg fix the problem by using compaction, but it does not work with Python. ( Edit by Python, I mean Engine Like Data Fusion, DuckDB , Pandas not Spark which does not make sense for a small Dataset)

In my example I use Only Python and pyarrow dataset which does not support compaction, but maybe there is a solution.

Just for illustration, here is a view of my Bucket in Cloudflare R2 (Pyarrow support S3, GCP and Azure)

Warning : the code will delete existing files, use at your own risk

  • Read the existing partitions except today data, as you may end up having concurrent Write , which will corrupt your table.
  • Filter only the partitions that contains more than 1 file , something like this Using DuckDB
create view base  as select * from  parquet_scan('s3://delta/aemo/scada/data/*/*.parquet' , HIVE_PARTITIONING = 1,filename=1) where Date < '{cut_off}';
create  view  filter as select Date, count(distinct filename) as cnt from  base  group by 1 having cnt>1 
  • Read the data using the previous filter, again, we are not touching today Partition to avoid any conflicts
tb=con.execute('''select SETTLEMENTDATE,DUID,SCADAVALUE,file,cast(base.Date as date) as Date from base inner join filter on base.Date= filter.Date''').arrow()
ds.write_dataset(xx,"delta/aemo/scada/data/", filesystem=s3,format="parquet" , partitioning=['Date'],partitioning_flavor="hive",
     min_rows_per_group=120000,existing_data_behavior="delete_matching")

Again there is no support for transaction, if your code for whatever reason, did not complete, you will end up with unstable table

  • And here is the results, all old partitions have only 1 file

You need to run the Job only once a day, hopefully next year sometimes, either Apache Iceberg or Delta Table will provide compaction for the Python client, in the meantime maybe this approach is good enough :), you can see the full code here

Another approach is copy on write, basically every time you ingest a new data, you need to copy the existing data append it to the new data and overwrite existing files, but it maybe an expensive operation, specially if your job runs more frequently.

Running a Serverless DuckDB on Google Cloud

TL;DR : it easy to setup and works relatively well, but there is a catch, watch out for Cloud Storage throughput. I shared a notebook here

in previous blog, I showed a POC how to run Queries from a Colab notebook against a delta lake table, but what if you want to run the same Query from other tools, or if you want to run a Query in a different region and avoid egress fees, turn out it is extremely easy to setup.

And here is an Overall View of the architecture. the most important decision is to make sure Cloud Storage and Cloud function are in the same region, in my case “us-central1”, you can call the function from anywhere.

Google Cloud Functions

As I said before the code is very simple, I spent some time googling to convert the results from a byte to json to dataframe, I think beside BigQuery, Google Cloud function is the easiest service to setup, just write your code and Google Cloud handle the rest , just for fun, I used a machine with 8 CPU and 32 GB of RAM.

import pyarrow.dataset as ds
import duckdb
import json
lineitem = ds.dataset("gs://xxxxx/lineitem",format="parquet", partitioning="hive")
con = duckdb.connect()
def Query(request):
    SQL = request.get_json().get('name')
    df = con.execute(SQL).df()
    return json.dumps(df.to_json(orient="records")), 200, {'Content-Type': 'application/json'}

How to Call web API

a lot of tool can send a web api call as long as it has the correct authentication, I started with a python script for a simple reason as it was the easiest to get the code from the internet.

Here is the interesting part, you write any arbitrary SQL code, Then you send an API call, in return you get a json with the results, a user don’t need to know anything about the cloud function, all he needs is the web address and write a correct SQL Query.

Performance considerations

This is where it get very interesting, I have no clue where is the bottleneck, but we can ask some Questions

Cold Start

  • The First example was 334 ms, that’s impressive, but I was cheating, I showed the best case scenario, Google Cloud function or more precisely Cloud Run was already running so no cold start and DuckDB was running a local Query which did not require a call to Cloud storage
  • Currently Cold Start for Cloud run Gen2 is around 10 second, notice it is still in preview.

Transfer from Google Cloud Storage

let’s try this simple Query, we get the result in 29 second

The Same Query using my Laptop, 600 ms, btw table lineitem contains 60 Million rows !!!

I don’t know why the massive difference, I presume network speed is limited, but when I look at the the bucket stats, it is actually very good, nearly 500 MB/S

I am no expert in network, but that number don’t seems right !!, when I check how much Data the cloud function is receiving then the whole discrepancy start making more sense, in average I am getting around 30 MB/s, the maximum was 50 MB/s , I have to say this is really slow !!!

File Pruning

Arrow dataset is smart enough to prune columns that are not used for partitions, in this Query, I made a filter on L_shipdate , notice the parquet file was sorted on that field, and as expected the performance is very good 1.7 second, DuckDB scan only the row groups that contains the date ‘1998-09-02’

Dataset Catalog

I am defining a very rudimentary catalog, the user can just call ” Show Tables”

You can even check the table schema

Cloud Storage throughput is the bottleneck.

having a speed of 30 MB/S make the whole setup just good for POC or doing a Query on a small dataset , I don’t know the reason why such a poor performance from Cloud Run, I suspect Apache arrow implementation is not optimized for GCP although it works very well in a local file system.

Another missing piece is the lack of cache, it would have being good if somehow DuckDB cache the data already Queried, but cache is very hard to implement specially if you want cache invalidation, and you risk reinventing a full Data warehouse. I genuinely hope it is a bug and cloud run can provide a better network performance.

%d bloggers like this: