Process 1 Billion rows of raw csv in Fabric Notebook for less than 20 Cents 

The Use case

Data source is around 2200 files with a total of 897 Million rows of weird csv files (the file has more columns than the header) , This is a real world data not some synthetic dataset, it is relatively small around 100 GB uncompressed.

The Pipeline will read those files and extract clean data from it using non trivial transformation and save it as a Delta Table.

we used the smallest Compute available in Fabric Notebook which is 4 cores with 32 GB. to be clear this is a real single node (not 1 driver and 1 executor), Although the Runtime is using Spark, All the Engines interact Directly with the Operating system, as far as I can tell, Spark has a very minimum overhead when not used Directly by the Python code.

You need to pick the Engine

Nowadays we have plenty of high quality Calculation Engines,  but two seems to gain traction (Polars and DuckDB) , at least by the number of package downloaded and the religious wars that seems to erupt occasionally in twitter 🙂

For a change I tried to use Polars, as I was accused of having a bias toward DuckDB, long story short, hit a bug with Polars , I tried Datafusion too but did managed to get a working code, there is not enough documentation on the web, after that I did test Clickhouse chdb, but find a bug, anyway the code is public, feel free to test your own Engine.

So I ended up using DuckDB, the code is published here , it is using only 60 files as it is available publicly, the whole archive is saved in my tenant (happy to share it if interested) 

The results is rather surprising (God bless Onelake throughput), I am using the excellent Python Package Delta Lake to write to Onelake

26 minutes, that’s freaking fast, using Fabric F2, the total cost will be

0.36 $/Hour X(26/60) =  15 Cents

you need to add a couple of cents for Onelake Storage Transactions.

As far as I can tell, this is maybe one of the cheapest option in the Market.

0.36 $/Hour is the rate for pay as you go, if you have a reservation then it is substantially cheaper.

because it is Delta Table Then Any Fabric Engine ( SQL, PowerBI, Spark) can read it.

What’s the catch ?

Today DuckDB can not write directly to Delta Table ( it is coming though eventually) instead it will export data to Delta Lake writer using Arrow Table, it is supposed to be zero copy but as far as I can tell, it is the biggest bottleneck and will generate out of memory errors , the solution is easy ; process the files in chunks , not all at once

#############################################
list_files=[os.path.basename(x) for x in glob.glob(Source+'*.CSV')]
files_to_upload_full_Path = [Source + i for i in list_files]
if len(files_to_upload_full_Path) >0 :
  for i in range(0, len(files_to_upload_full_Path), chunk_len):
    chunk = files_to_upload_full_Path[i:i + chunk_len]
    df=get_scada(chunk)
    write_deltalake("/lakehouse/default/Tables/scada_duckdb",df,mode="append",engine='rust',partition_by=['YEAR'],storage_options={"allow_unsafe_rename":"true"})
    del df

By experimentation, I notice 100 files works fine with 16 GB, 200 files with 32 GB etc

When exporting to Parquet, DuckDB managed the memory natively and it is faster too.

Native Lakehouse Is the future of Data Engineering

The combination of Open table format like Delta and Iceberg with ultra efficient Open Source Engine like DuckDB, Polars, Velox, datafusion all written in C++/Rust will give data engineers an extremely powerful tools to build more flexible and way cheaper data solutions.

if I have to give an advice for young Data engineers/Analysts, Learn Python/SQL.

Would like to thanks Pedro Holanda for fixing some very hard to reproduce bugs in the DuckDB csv reader.

And Ion Koutsouris for answering my silly questions about Delta lake writer.

Sharing Public Data using Onelake, Delta and Cloudflare R2 

The use case is very simple, sharing public data continuously and as cheaply as possible, especially if the consumers are in a different geographic region. 

Note: This is not an officially supported solution, and the data can be inconsistent when copying to R2 , but it is good enough for public data

How to 

1- The Data is prepared and cleaned using Fabric and saved in Onelake 

2-  Copy the data to cloudflare R2 using code, as of today Shortcuts to S3 does not support  write operation, although I did not test it, Dataflow Gen2 (data pipeline) support S3 as a destination, I used code as I had it already from a previous project, you pay egress fees for this operation and storage in R2 with transaction cost

3- Provide access token to users or make it public, you don’t pay for egress fees from Cloudflare to end users, but the throughput is not guaranteed. 

Today, Fabric shortcuts requires list buckets permission, please vote for this idea to remove this requirement,  

For example, I am writing public data in Fabric US-West and consuming it in Fabric Melbourne  

make sure you turn on cache for Onelake, it helps a lot of performance.

You can try it  

You can try it yourself using this credential, they are temporary, and I may delete them anytime. 

Access Key ID 

3a3d5b5ce8c296e41a6de910d30e7fb6 

Secret 

9a080220941f3ff0f22ac93c7d2f5ec1d73a77cd3a141416b30c1239efc50777 

Endpoint 

https://c261c23c6a526f1de4652183768d7019.r2.cloudflarestorage.com

Building a cost effective solution using Fabric

I have being playing with Electricity Market Data recently and now I have a #Fabric solution that I think is easy enough to use but more importantly cost effective that it works just fine using F2 SKU, which cost $156 /Month

https://github.com/djouallah/aemo_fabric

a Quick Video

Howto

0- Create a Fabric Workspace

1- Create a lakehouse

2-Download the notebooks from Github and import it to Fabric Workspace

3-Open a notebook, attached it to the Lakehouse

4-Run the notebook in sequence just to have the initial load

5-Build your semantic Model either using Direct Lake for better “latency” , or use the attached template for import mode if you are happy with 8 refreshes per day( for people with a pro license), all you need is to input the Lakehouse SQL Endpoint, my initial plan was to read Delta Table directly from OneLake but currently filter pushdown works only at the partition level ( but apparently further stuff are coming)

6- Use a scheduler to run the jobs, 5 minutes and 24 Hours

Show me the Money

For developpement use Starter Pool, but for scheduling use Small Single Node, it is good enough, medium is faster but will consume more capacity Unit, still within F2 limits.

This is the actual usage of capacity unit for this scenario, yesterday I was messing around trying to further optimize which end up consuming more 🙂

Spark Dataframe API is amazing

When reading the csv files, I need to convert a lot of columns to double, using SQL you have to manually type all the fields names, in Spark , you can just do it in a loop, that was the exact moment, I *understood*  why people like Spark  API !!!

df_cols = list(set(df.columns) -{'SETTLEMENTDATE','DUID','file','UNIT','transactionId','PRIORITY'})
for col_name in df_cols:
df = df.withColumn(col_name, f.col(col_name).cast('double'))

Source Data around 1 Billion of dirty csv, data added every 5 minutes and backfilled every 24 hours, all data is saved in OneLake using Spark Delta Table.

not everything is csv though, there is some data in Json and Excel of course 🙂

Although the main code is using PySpark , I used Pandas and DuckDB too, no philosophical reasons, basically I use whatever Stackoverflow give me first 🙂 and then I just copy the result to a Spark dataframe and save it as Delta Tables, you don’t want to miss on VOrder

For example this is the code to generate a calendar Table using DuckDB SQL, the unnest syntax is very elegant !!!

df=duckdb.sql(""" SELECT cast(unnest(generate_series(cast ('2018-04-01' as date), cast('2024-12-31' as date), interval 1 day)) as date) as date,
EXTRACT(year from date) as year,
EXTRACT(month from date) as month
""").df()

x=spark.createDataFrame(df)
x.write.mode("overwrite").format("delta").saveAsTable("Calendar")

The Semantic Model

The Model is not too complex but not trivial, 2 facts tables, 4 dimensions, biggest table 240 M rows.

Stuff that needs improvements

1- have an option to download the semantic Model from the service using pbit, it has to be one file for ease of distribution.

2- More options for the scheduler, something like run this job only between 8 AM to 6 PM every 5 minutes.

Parting Thoughts

Notebook + OneLake + Direct Lake is emerging as a very Powerful Pattern , specially when the data is big and freshness is important, but somehow users needs to learn how to write Python code, I am afraid that’s not obvious at all for the average data analyst, maybe this AI thing can do something about ? that’s the big question.

Does it mean, Notebook is the best solution for all use cases ? I don’t know, and I don’t think there is such a thing as a universal best practice in ETL, as far as I am concerned, there is  only one hard rule.

Save raw data when the source system is volatile.

anything else is a matter of taste 🙂

Use Fabric Notebook code based orchestration tool to avoid concurrent write conflicts.

I had a simple data ingestion use case, Notebook A inserts data to a Delta Table every 5 minutes and Notebook B backfills the same table with new fields but only at 4 am.

Initially I just scheduled Notebook A to run every 5 minutes and Notebook B to run at 4 AM , did not work as I got a write conflict, basically Notebook B take longer time to process the data, when it is ready to update the table, it is a bit too late as it was already modified by Notebook A and you get this error

Workarounds

Solution 1 :  Schedule Notebook A to run every 5 minutes except from 4 AM to 4:15 AM, today it is not supported in Fabric scheduler ( although it works fine in Azure Data Factory).

Solution 2 : Partition by Date to avoid Spark writing to the same file at the same time, which is fine for my table as it is big enough around 230 Millions spread over 6 years, generating 2000 files is not the end of the world, but the same approach does not work for another table which is substantially smaller around 3 millions

Solution : Turn out, there is a code base orchestration tool in Fabric Notebook 

I knew about MSSparkUtils mainly because Sandeep Pawar can’t stop talking  about it 🙂 but I did not know that it does orchestration too,  in my case  the solution was trivial.

Add a check in notebook A if there is a new file to backfill ; if yes call Notebook B

if len(files_to_upload_full_Path) > 0 :

 mssparkutils.notebook.run("Transform_Backfill_Previous_Day")

And it did work beautifully ( I know the feeling, it is easy when you know it)

Notice that the second Notebook runs using the same Runtime, so it is faster and maybe even cheaper.

Ok there is More 

Conditionally running a notebook based on a new file arrival is a simple use case, but you can do more, for example you can run multiple notebooks in parallel or even define complex relationships between Notebooks using a DAG with just Python code !!!!

Take Away

 This is just a personal observation , because Fabric was released with all the Engines at the Same time, a lot of very powerful features and patterns did not get a chance to be fully exposed and appreciated, and based on some anecdotal evidence on twitter , it seems I am not the only one who never heard about Fabric Notebook code orchestration. 

For PowerBI people Starting with Fabric, Python is just too Powerful. Yes, we did fine without it all these years, but  if you have any complex data transformation scenarios, Python is just too important to ignore. 

Thanks Jene Zhang for answering my silly questions.