Apache Spark is a system design for Big Data workload using distributed computing, it scale out very well just by adding more compute nodes, in real life though, it is used a lot with smaller data, people use whatever they have access to.
I don’t have a lot of experience with Pyspark, but I know enough to run some SQL Queries, and what a better way to test that than TPCH benchmark, in this case, I used a scale factor of 10 with the main table lineitem has 60 million records.
The test is run in Google Colab using the free compute ( 2 vCPU), I did try it with a paid VM ( 8 CPU) but strangely, Pyspark had a memory error ( although it did work fine with low memory, go figure)
I tried too to install in my laptop, but it did complain about some java compatibility, I have no time for this kind of shenanigans , I use to love it when I was much younger but now, I want just stuff to work.
The Data is first download to the local Disk, so all Queries are pretty much CPU bound, I used DuckDB as a reference as it has an excellent Single Node performance.
The Beauty of SQL
Pyspark has an excellent support for SQL, actually the same SQL is run both for Spark and DuckDB, we are talking 22 Queries with nearly 900 line of code, that impressive that it just works without modification for both Engine.
The Results
I used a Python Function to run every Query, and show duration and results
And here is the results, Using PySpark, DuckDB running Queries Directly from Parquet and just for reference DuckDB running from a DuckDB file
DuckDB around 5 minutes
Spark around 21 minutes
DuckDB using Duckdb file format : 2 minutes ( it take 3 minutes to import)
Take away
For Smaller Dataset that don’t fit in Memory which is a problem for Pandas, it is useful to look around for alternative before jumping to a distributed system, you don’t want to be paying for expensive computer overhead, tools like DuckDB and Polars can do wonder these days, there is a lot to like about Spark, but performance in a single node is not one of them.
Having said that, those new Engines have a lot of work to reach feature parity with Spark when doing Data Engineering jobs, for example support for Table Format like Iceberg and Delta is still experimental.
When you write a record in a database, Like SQL Server, SQLite, it needs to store it somewhere, usually that location was your Disk, your local storage system is a very powerful system, you can write, read, modify existing file etc and it is very fast , so this problem was already solved 50 years ago. Why we should care about data lake Table Format (Delta, Iceberg etc).
The Internet happened
Internet tech company start getting massive amount of data generated by the exponential use of the internet, saving data in the local disk of Big Database is just too expensive, there was a need for a separate approach.
Separation of Storage and Compute
The solution was very simple separate the computer that run Queries from the system that store data, and both can be scaled independently, if you don’t need to run Analytical Queries fine just turn it off and keep adding data to your storage system. That was a genius idea!!! ( I think Google MapReduce was first then Yahoo basically copied the idea by creating the Open Source Hadoop, but it was slow then Spark tool over)
AWS S3
Another important event, in 2006, AWS, released S3, a massive Object store, basically you can save your files for a very low cost, but there is a catch, the files are immutable, you can’t simply open a file and do some modification and save it.
Basically, you can only add and delete files.
Table format solved this limitation by using some tricks, for example you want to delete some records, the DB will just add a new file with all the records and later when you read it, you just ignore those records. (it is called Merge on Read)
Another Approach is the DB will simply write a new file that don’t contains the delete records, and tag the existing file, so when you read it later, you will read the new file, as an example, if you have a table with 1 million records and want to delete 1 record, the DB will write a new file with 999999 records, that’s seems not very efficient to me !!!
Proprietary Table Format
Commercial DB Vendor like BigQuery (Capacitor), Snowflake (PAX) use it internally since day one, but it is Proprietary, you can’t read it using your Own Query Engine, although BigQuery provide an API that talk directly to the storage.
My understanding At that time Open Source system (Hadoop, Spark ) used a very simple table format called Hive, something that looks like this
But it did have a lot of problems, basically no ACID support , as an example, if someone is adding files when you are reading the table, you will get wrong results, if you overwrite a table and your script timeout for any reason, your table will be corrupted ( I learn it the hard way).
Open Source Table Format
Open Source developers understood that they need to do something about it and they knew commercial DB vendors have already solved this problem, the result was those three Major format.
But at the very basic level, Table Storage Format is just a folder that contains
Data (usually) in a parquet format
Meta Data that contains the statistics of every file and its location
Something like this, Obviously the exact implementation details different for every format
Because it is open source, you will always find some developers who will build something slightly different, see databend as example, and even new Proprietary vendors , see FireBolt F3
I don’t delete why Should I care
That’s a respectable statement, even if all you do is read only and never delete, one may argue a folder of parquet file with a decent hive type partition is all you need, I still think even in that use case, Table Format maybe worth it, listing a directory is extremely slow operation and cost money, instead you can just read a metadata file and knows exactly the location of every file, not only that, some Query Engine are smart enough to read the statistics from the metadata file and skips files that don’t contains the Data needed for the Query.
Another example if you want to count the total records of a table or the min/max potentially you can just read it from the metadata and it will take millisecond.
Which one is better?
If you google it, you will find all kind of articles why one table format is better than the others, it is a very technical subject and I don’t know enough to comment on it, but my impression is Delta is rather controlled by Databricks ( and some features are still proprietary) which make the other vendors a bit suspicious and hence they are more into the Iceberg camp. (Snowflake, Trino, Dremio and to a less extend BigQuery)
Should you care about Table Format ?
Actually we should not, I never cared where SQL Server save its data, it is just there, and this is exactly what happening with Data Lake Vendors, they are slowly turning into a classical DW approach, surprisingly nowadays the best practise is to read the table using a catalog as there is no way to have a proper row level security, column masking just by reading from files. ( it is a bit ironic, it is like 2010 again)
Another interesting twist, Cloud DW vendors like Snowflake are adding support for Open Storage format as a native table, see this excellent presentation , Trino and Dremio has already a solid write and read support for Iceberg.
Microsoft and Google approach is to use Spark for Writing the Table Format and use their DW offering for read, I find this approach rather lazy. maybe it is just a workaround till they have a native support.
I suspect we will see a commoditization of open Table format this coming year(s), and users will just go back caring about what really matter;
DuckDB just added support for fsspec, which make Querying Object store Like GCP and Azure storage possible, please notice AWS S3 API was natively supported already.
Previously to Query Azure storage, you had to use pyarrow dataset as a workaround, with the recent update, it is no more needed.
Here is a simple example, Querying a folder of Parquet files partitioned using Hive style, notice DuckDB is smart enough to recognize Date as a partition field
Make sure to have the file”.env” when running the notebook from your computer, here is an example how it looks like
As a PowerBI user, I see a potential for a lightweight ETL process using just python that do complex transformation and output the results as a parquet files which PowerBI can consume.
As I deal with small data( less than 30 GB), Apache Spark does not make much sense to me, Hopefully Synapse will provide us with a cheap single node Notebook experience. I suspect it may be useful for a lot of customers.
It is trick I learn today and thought it maybe useful to share, I have a folder of parquet files, partitioned by day using Hive style, the data is ingested every 5 minutes which end up generating 288 small parquet files per day, it is rather nice for a write scenario but reading that data will be slow as it generate a big overhead opening individual files, it is a well documented problem, and more sophisticated table format like Delta Table and Iceberg fix the problem by using compaction, but it does not work with Python. ( Edit by Python, I mean Engine Like Data Fusion, DuckDB , Pandas not Spark which does not make sense for a small Dataset)
In my example I use Only Python and pyarrow dataset which does not support compaction, but maybe there is a solution.
Just for illustration, here is a view of my Bucket in Cloudflare R2 (Pyarrow support S3, GCP and Azure)
Warning : the code will delete existing files, use at your own risk
Read the existing partitions except today data, as you may end up having concurrent Write , which will corrupt your table.
Filter only the partitions that contains more than 1 file , something like this Using DuckDB
create view base as select * from parquet_scan('s3://delta/aemo/scada/data/*/*.parquet' , HIVE_PARTITIONING = 1,filename=1) where Date < '{cut_off}';
create view filter as select Date, count(distinct filename) as cnt from base group by 1 having cnt>1
Read the data using the previous filter, again, we are not touching today Partition to avoid any conflicts
tb=con.execute('''select SETTLEMENTDATE,DUID,SCADAVALUE,file,cast(base.Date as date) as Date from base inner join filter on base.Date= filter.Date''').arrow()
Again there is no support for transaction, if your code for whatever reason, did not complete, you will end up with unstable table
And here is the results, all old partitions have only 1 file
You need to run the Job only once a day, hopefully next year sometimes, either Apache Iceberg or Delta Table will provide compaction for the Python client, in the meantime maybe this approach is good enough :), you can see the full code here
Another approach is copy on write, basically every time you ingest a new data, you need to copy the existing data append it to the new data and overwrite existing files, but it maybe an expensive operation, specially if your job runs more frequently.