Apache Spark Benchmark for TPCH-SF10

TL;DR : a notebook to compare TPCH-SF10 results for Spark and DuckDB on a single Node.

Apache Spark is a system design for Big Data workload using distributed computing, it scale out very well just by adding more compute nodes, in real life though, it is used a lot with smaller data, people use whatever they have access to.

I don’t have a lot of experience with Pyspark, but I know enough to run some SQL Queries, and what a better way to test that than TPCH benchmark, in this case, I used a scale factor of 10 with the main table lineitem has 60 million records.

The test is run in Google Colab using the free compute ( 2 vCPU), I did try it with a paid VM ( 8 CPU) but strangely, Pyspark had a memory error ( although it did work fine with low memory, go figure)

I tried too to install in my laptop, but it did complain about some java compatibility, I have no time for this kind of shenanigans , I use to love it when I was much younger but now, I want just stuff to work.

The Data is first download to the local Disk, so all Queries are pretty much CPU bound, I used DuckDB as a reference as it has an excellent Single Node performance.

The Beauty of SQL

Pyspark has an excellent support for SQL, actually the same SQL is run both for Spark and DuckDB, we are talking 22 Queries with nearly 900 line of code, that impressive that it just works without modification for both Engine.

The Results

I used a Python Function to run every Query, and show duration and results

And here is the results, Using PySpark, DuckDB running Queries Directly from Parquet and just for reference DuckDB running from a DuckDB file

DuckDB around 5 minutes

Spark around 21 minutes

DuckDB using Duckdb file format : 2 minutes ( it take 3 minutes to import)

Take away

For Smaller Dataset that don’t fit in Memory which is a problem for Pandas, it is useful to look around for alternative before jumping to a distributed system, you don’t want to be paying for expensive computer overhead, tools like DuckDB and Polars can do wonder these days, there is a lot to like about Spark, but performance in a single node is not one of them.

Having said that, those new Engines have a lot of work to reach feature parity with Spark when doing Data Engineering jobs, for example support for Table Format like Iceberg and Delta is still experimental.

Advertisement

Running a Serverless DuckDB on Google Cloud

TL;DR : it easy to setup and works relatively well, but there is a catch, watch out for Cloud Storage throughput. I shared a notebook here

in previous blog, I showed a POC how to run Queries from a Colab notebook against a delta lake table, but what if you want to run the same Query from other tools, or if you want to run a Query in a different region and avoid egress fees, turn out it is extremely easy to setup.

And here is an Overall View of the architecture. the most important decision is to make sure Cloud Storage and Cloud function are in the same region, in my case “us-central1”, you can call the function from anywhere.

Google Cloud Functions

As I said before the code is very simple, I spent some time googling to convert the results from a byte to json to dataframe, I think beside BigQuery, Google Cloud function is the easiest service to setup, just write your code and Google Cloud handle the rest , just for fun, I used a machine with 8 CPU and 32 GB of RAM.

import pyarrow.dataset as ds
import duckdb
import json
lineitem = ds.dataset("gs://xxxxx/lineitem",format="parquet", partitioning="hive")
con = duckdb.connect()
def Query(request):
    SQL = request.get_json().get('name')
    df = con.execute(SQL).df()
    return json.dumps(df.to_json(orient="records")), 200, {'Content-Type': 'application/json'}

How to Call web API

a lot of tool can send a web api call as long as it has the correct authentication, I started with a python script for a simple reason as it was the easiest to get the code from the internet.

Here is the interesting part, you write any arbitrary SQL code, Then you send an API call, in return you get a json with the results, a user don’t need to know anything about the cloud function, all he needs is the web address and write a correct SQL Query.

Performance considerations

This is where it get very interesting, I have no clue where is the bottleneck, but we can ask some Questions

Cold Start

  • The First example was 334 ms, that’s impressive, but I was cheating, I showed the best case scenario, Google Cloud function or more precisely Cloud Run was already running so no cold start and DuckDB was running a local Query which did not require a call to Cloud storage
  • Currently Cold Start for Cloud run Gen2 is around 10 second, notice it is still in preview.

Transfer from Google Cloud Storage

let’s try this simple Query, we get the result in 29 second

The Same Query using my Laptop, 600 ms, btw table lineitem contains 60 Million rows !!!

I don’t know why the massive difference, I presume network speed is limited, but when I look at the the bucket stats, it is actually very good, nearly 500 MB/S

I am no expert in network, but that number don’t seems right !!, when I check how much Data the cloud function is receiving then the whole discrepancy start making more sense, in average I am getting around 30 MB/s, the maximum was 50 MB/s , I have to say this is really slow !!!

File Pruning

Arrow dataset is smart enough to prune columns that are not used for partitions, in this Query, I made a filter on L_shipdate , notice the parquet file was sorted on that field, and as expected the performance is very good 1.7 second, DuckDB scan only the row groups that contains the date ‘1998-09-02’

Dataset Catalog

I am defining a very rudimentary catalog, the user can just call ” Show Tables”

You can even check the table schema

Cloud Storage throughput is the bottleneck.

having a speed of 30 MB/S make the whole setup just good for POC or doing a Query on a small dataset , I don’t know the reason why such a poor performance from Cloud Run, I suspect Apache arrow implementation is not optimized for GCP although it works very well in a local file system.

Another missing piece is the lack of cache, it would have being good if somehow DuckDB cache the data already Queried, but cache is very hard to implement specially if you want cache invalidation, and you risk reinventing a full Data warehouse. I genuinely hope it is a bug and cloud run can provide a better network performance.

Aggregate 100 GB using PowerQuery and DuckDB

Maybe not known enough, but you can run a python script inside PowerQuery in the desktop, and deploy it to PowerBI service using a personal gateway.

it is just a POC that showcase you can do a lot; just using a decent laptop and python.

let’s say, you have a folder with 100 GB of parquet files, and you want to aggregate some results for further analysis in PowerBI, Obviously importing 100 GB maybe not be a great idea, in this scenario, from what I can see, people recommend stuff Like PySpark or Dask , which I guess are great option, but if you want a lightweight option with a simple pip install then DuckDB is amazing !!

For testing, first write your SQL Query using only 1 parquet file, when you are happy with the results, change the path from “folder/x.parquet” to “folder/*.parquet”

In this example I have a folder of 100 GB of Parquet files.

in PowerQuery you need just to insert a python script, you don’t need to know anything about python, it is just a standard SQL Query, PowerQuery is smart enough to know that df is a dataframe, and it will show the result in the next step.

import duckdb 
con = duckdb.connect()
df =con.execute('''
select  L_RETURNFLAG,L_LINESTATUS,count(*) from
 'C:/xxxxx/parquet/lineitem/*.parquet' 
        group by 1,2
''').df()

And here is the final results.

The duration will depend mainly on your CPU and SSD speed, DuckDB consume little memory, in that example less than 2 GB, the secret sauce is parallel scan of Parquet files.

Traditionally Big Data means; Data that don’t fit into RAM, nowadays, Big Data is how much Data can fit in your SSD, and that’s a welcome change.

Advance Geospatial analysis using location Parameter with Streamlit

This blog is a POC of something that I always wanted to have in a BI tool, and I tried Tableau, PowerBI and Data Studio, without success ( not interested in adding an invisible grid as a hack), The idea is extremely simple yet very powerful, retrieve data when you click on a map, you may think it should simple, it seems BI tool are good at retrieving data based on filter, but it is very hard to push a parameter from a map back to a source data.

Traditionally, if you want to have this kind of interactivity, you need to write code, to be honest the idea of writing javascript and learning how to deploy a web server was not very interesting for me, but luckily we have a new Option in Streamlit

Streamlit is a code first, web app platform using only Python, web page are generated behind the scene, and there are a lot of component where you need to write a minimum of code, and deployment is absolutely trivial using Streamlit Cloud, and because it is open source, you can deploy using alternative approach like Cloud Run, or Azure

I came across this component Streamlit-Folium recently, and it is magnificent work, when you click on a map, it does provide variable back on the last location clicked zoom, bounds etc, all for free, no code required !!!!

All I have done is copied the code from the source and built a SQL Query that take the last clicked item filter all the “cafe” in a radius of 500 m, the SQL Code is copied from this previous Blog

The Source Data is nearly half a Million, as you can imagine plotting a massive dataset just to see a small portion is a waste of computer resources.

here is the final results

Here an example of a SQL Query generated.

State management

I added the code here, again it was too easy to write as I nearly copied everything from the component sample code, the tricky part was how to update the value of a variable which was already declared, Streamlit has a brilliant solution using State Management, the solution is very simple

Assign a default value when the Streamlit run for the first time

if 'key' not in st.session_state:
    st.session_state.key = '( 153.024198,-27.467276)'
    st.session_state.key1 = [-27.467276, 153.024198]
    st.session_state.key2 = 16
point_clicked = st.session_state.key
location_ini  = st.session_state.key1
zoom_Start    = st.session_state.key2 

Update the values when a user click on the map, the next run in the same session will use those new values

 st.session_state.key = point_clicked
       st.session_state.key1 = location_ini
       st.session_state.key2 = map_data['zoom']

Currently I don’t know how to stop Streamlit from redrawing the map, as I am only interested in updating the markers.

Database

it works with Any Database as long as it has a minimum support for GIS functions, Currently I used bigQuery BI Engine as I am familiar with it and to speak freely :), it is very cheap for this kind of workload, small Data and potentially a lot of concurrency 🙂

I tried PowerBI Datamart but it seems Python access is blocked , DuckDB don’t support GIS functions yet, I am sure you can reproduce the results using only SQL, but I did not bother.

ST_DWithin(ST_GeogPoint(lng,lat),params.center,params.maxdist_m)

Take Away

I think there is a third way between no code and only Code, Streamlit managed to create a new category, maybe simple code 🙂 having said that BI Vendor should up their games, Location Parameter should not be that hard to implement.

%d bloggers like this: