Running a Serverless DuckDB on Google Cloud

TL;DR : it easy to setup and works relatively well, but there is a catch, watch out for Cloud Storage throughput. I shared a notebook here

in previous blog, I showed a POC how to run Queries from a Colab notebook against a delta lake table, but what if you want to run the same Query from other tools, or if you want to run a Query in a different region and avoid egress fees, turn out it is extremely easy to setup.

And here is an Overall View of the architecture. the most important decision is to make sure Cloud Storage and Cloud function are in the same region, in my case “us-central1”, you can call the function from anywhere.

Google Cloud Functions

As I said before the code is very simple, I spent some time googling to convert the results from a byte to json to dataframe, I think beside BigQuery, Google Cloud function is the easiest service to setup, just write your code and Google Cloud handle the rest , just for fun, I used a machine with 8 CPU and 32 GB of RAM.

import pyarrow.dataset as ds
import duckdb
import json
lineitem = ds.dataset("gs://xxxxx/lineitem",format="parquet", partitioning="hive")
con = duckdb.connect()
def Query(request):
    SQL = request.get_json().get('name')
    df = con.execute(SQL).df()
    return json.dumps(df.to_json(orient="records")), 200, {'Content-Type': 'application/json'}

How to Call web API

a lot of tool can send a web api call as long as it has the correct authentication, I started with a python script for a simple reason as it was the easiest to get the code from the internet.

Here is the interesting part, you write any arbitrary SQL code, Then you send an API call, in return you get a json with the results, a user don’t need to know anything about the cloud function, all he needs is the web address and write a correct SQL Query.

Performance considerations

This is where it get very interesting, I have no clue where is the bottleneck, but we can ask some Questions

Cold Start

  • The First example was 334 ms, that’s impressive, but I was cheating, I showed the best case scenario, Google Cloud function or more precisely Cloud Run was already running so no cold start and DuckDB was running a local Query which did not require a call to Cloud storage
  • Currently Cold Start for Cloud run Gen2 is around 10 second, notice it is still in preview.

Transfer from Google Cloud Storage

let’s try this simple Query, we get the result in 29 second

The Same Query using my Laptop, 600 ms, btw table lineitem contains 60 Million rows !!!

I don’t know why the massive difference, I presume network speed is limited, but when I look at the the bucket stats, it is actually very good, nearly 500 MB/S

I am no expert in network, but that number don’t seems right !!, when I check how much Data the cloud function is receiving then the whole discrepancy start making more sense, in average I am getting around 30 MB/s, the maximum was 50 MB/s , I have to say this is really slow !!!

File Pruning

Arrow dataset is smart enough to prune columns that are not used for partitions, in this Query, I made a filter on L_shipdate , notice the parquet file was sorted on that field, and as expected the performance is very good 1.7 second, DuckDB scan only the row groups that contains the date ‘1998-09-02’

Dataset Catalog

I am defining a very rudimentary catalog, the user can just call ” Show Tables”

You can even check the table schema

Cloud Storage throughput is the bottleneck.

having a speed of 30 MB/S make the whole setup just good for POC or doing a Query on a small dataset , I don’t know the reason why such a poor performance from Cloud Run, I suspect Apache arrow implementation is not optimized for GCP although it works very well in a local file system.

Another missing piece is the lack of cache, it would have being good if somehow DuckDB cache the data already Queried, but cache is very hard to implement specially if you want cache invalidation, and you risk reinventing a full Data warehouse. I genuinely hope it is a bug and cloud run can provide a better network performance.

Aggregate 100 GB using PowerQuery and DuckDB

Maybe not known enough, but you can run a python script inside PowerQuery in the desktop, and deploy it to PowerBI service using a personal gateway.

it is just a POC that showcase you can do a lot; just using a decent laptop and python.

let’s say, you have a folder with 100 GB of parquet files, and you want to aggregate some results for further analysis in PowerBI, Obviously importing 100 GB maybe not be a great idea, in this scenario, from what I can see, people recommend stuff Like PySpark or Dask , which I guess are great option, but if you want a lightweight option with a simple pip install then DuckDB is amazing !!

For testing, first write your SQL Query using only 1 parquet file, when you are happy with the results, change the path from “folder/x.parquet” to “folder/*.parquet”

In this example I have a folder of 100 GB of Parquet files.

in PowerQuery you need just to insert a python script, you don’t need to know anything about python, it is just a standard SQL Query, PowerQuery is smart enough to know that df is a dataframe, and it will show the result in the next step.

import duckdb 
con = duckdb.connect()
df =con.execute('''
select  L_RETURNFLAG,L_LINESTATUS,count(*) from
 'C:/xxxxx/parquet/lineitem/*.parquet' 
        group by 1,2
''').df()

And here is the final results.

The duration will depend mainly on your CPU and SSD speed, DuckDB consume little memory, in that example less than 2 GB, the secret sauce is parallel scan of Parquet files.

Traditionally Big Data means; Data that don’t fit into RAM, nowadays, Big Data is how much Data can fit in your SSD, and that’s a welcome change.

Advance Geospatial analysis using location Parameter with Streamlit

This blog is a POC of something that I always wanted to have in a BI tool, and I tried Tableau, PowerBI and Data Studio, without success ( not interested in adding an invisible grid as a hack), The idea is extremely simple yet very powerful, retrieve data when you click on a map, you may think it should simple, it seems BI tool are good at retrieving data based on filter, but it is very hard to push a parameter from a map back to a source data.

Traditionally, if you want to have this kind of interactivity, you need to write code, to be honest the idea of writing javascript and learning how to deploy a web server was not very interesting for me, but luckily we have a new Option in Streamlit

Streamlit is a code first, web app platform using only Python, web page are generated behind the scene, and there are a lot of component where you need to write a minimum of code, and deployment is absolutely trivial using Streamlit Cloud, and because it is open source, you can deploy using alternative approach like Cloud Run, or Azure

I came across this component Streamlit-Folium recently, and it is magnificent work, when you click on a map, it does provide variable back on the last location clicked zoom, bounds etc, all for free, no code required !!!!

All I have done is copied the code from the source and built a SQL Query that take the last clicked item filter all the “cafe” in a radius of 500 m, the SQL Code is copied from this previous Blog

The Source Data is nearly half a Million, as you can imagine plotting a massive dataset just to see a small portion is a waste of computer resources.

here is the final results

Here an example of a SQL Query generated.

State management

I added the code here, again it was too easy to write as I nearly copied everything from the component sample code, the tricky part was how to update the value of a variable which was already declared, Streamlit has a brilliant solution using State Management, the solution is very simple

Assign a default value when the Streamlit run for the first time

if 'key' not in st.session_state:
    st.session_state.key = '( 153.024198,-27.467276)'
    st.session_state.key1 = [-27.467276, 153.024198]
    st.session_state.key2 = 16
point_clicked = st.session_state.key
location_ini  = st.session_state.key1
zoom_Start    = st.session_state.key2 

Update the values when a user click on the map, the next run in the same session will use those new values

 st.session_state.key = point_clicked
       st.session_state.key1 = location_ini
       st.session_state.key2 = map_data['zoom']

Currently I don’t know how to stop Streamlit from redrawing the map, as I am only interested in updating the markers.

Database

it works with Any Database as long as it has a minimum support for GIS functions, Currently I used bigQuery BI Engine as I am familiar with it and to speak freely :), it is very cheap for this kind of workload, small Data and potentially a lot of concurrency 🙂

I tried PowerBI Datamart but it seems Python access is blocked , DuckDB don’t support GIS functions yet, I am sure you can reproduce the results using only SQL, but I did not bother.

ST_DWithin(ST_GeogPoint(lng,lat),params.center,params.maxdist_m)

Take Away

I think there is a third way between no code and only Code, Streamlit managed to create a new category, maybe simple code 🙂 having said that BI Vendor should up their games, Location Parameter should not be that hard to implement.

Pushdown Filters to Parquet From PowerBI Using Synapse Serverless

TL;DR, we use filepath()  function to get the partition Data which can be passed from PowerBI when used with Synapse Serverless, Potentially Making substantial cost saving when used in Incremental refresh as less files will be scanned.

Edit : there is another Technique to incrementally load parquet files without a Database

This blog is just me experimenting with the possibility of passing the filters from PowerBI to a Parquet file using Synapse Serverless.

when you deal with On demand Query engine Like Athena, BigQuery, Synapse Serverless etc, the less files read, the less cost incurred, so if somehow you manage to read only the files that contain the data without scanning all your folders, you can make substantial saving.

To Be clear, the first time I used Parquet was yesterday, so I have very basic knowledge at this stage.

1- Get the Source Data

I have a csv file with 3 Millions rows, here is the schema

2- Partition The Data

as far as my understanding of Synapse Serverless engine, when you filter by date for example, The engine will scan the whole file, maybe if the column is sorted, it will scan less data, I don’t know and have not try it yet, instead we are going to partition the Table by date, which is simply generating lot of files split by the date values, I imagine generating a lot of small files is not optimal too, it will reduce cost but potentially making the Query slower ( at the last this how other Engine Works).

I am using this python script to generate the partitions

import pandas as pd
import datetime
df = pd.read_csv('box.csv',parse_dates=True)
df['SETTLEMENTDATE'] = pd.to_datetime(df['SETTLEMENTDATE'])
df['Date'] = df['SETTLEMENTDATE'].dt.date
df.to_parquet('nem.parquet',partition_cols=['Date'],allow_truncated_timestamps=True)

and here is the results, a folder of parquet files grouped by Date

which I can read using the PowerBI Desktop

I just casually read a parquet file, without any Programing Language !!! ( Kudos for the Product team), this is only to test the parquet is properly generated.

3- Load The files to Azure Storage

This is only a show case, but in a production workflow, you maybe using something like ADF or Python Cloud Functions, anyway to upload a lot of files, I am using Azure storage explorer

now we can start using Synapse Analytics

4- Create a View In Synapse Serverless

we are going to leverage filepath()  function to get the partition date, please see the documentation , here is the Query to create a view

USE [test];
GO
DROP VIEW IF EXISTS parquet;
GO

CREATE VIEW parquet AS
SELECT
 *,convert(Datetime,result.filepath(1),120) as date
FROM
    OPENROWSET(
        BULK 'https://xxxxxxxxxx.dfs.core.windows.net/parquet/nem.parquet/Date=*/*.parquet',
        FORMAT='PARQUET'
    )  result

5- Test if the filter Partition Works

Let’s try some testing to see how much is scanned

no Filter, scan all the files, data processed 48 MB

now with filter , only Date 11/04/2018, Data processed 1 MB

6- Test in PowerBI

I build this small model, Date Table is import and Parquet is DirectQuery ( be cautious when using DirectQuery, PBI can be very chatty and generate a lot of SQL Queries, Currently, it is better to use only import mode, until cache support is added)

Case 1: View all Dates

Data Processed : 43 MB

Case 2: Filter Some Dates

let’s filter only 2 days

Note : please use Query reduction option in PowerBI desktop, otherwise, every time you move the slicer, a query will be generated

Here is the result

I was really excited when I saw the results: 1 MB Synapse Serverless was smart enough to scan only 2 files

7- Incremental Refresh

Instead of Direct Query, let’s try a more practical use case, I configured Incremental refresh to change dates only for the Last 3 days

and here is the List of Queries generated by PowerBI

I have only Data for 2018 and 2019, the second refresh (that Started at 3:45 PM) just refreshed the data for the Last three days and because there is no files for those dates the Query returned 0 MB, which is great.

Another Nice functionality is select top 100 which is generated by PowerQuery to check field type scanned only 1 MB !!!!

Just to be sure, I have done another refresh at 4 :24 PM and checked the Partitions using SSMS

Only the last three partions were refreshed and PQ sent only 6 Queries ( 1 select data for 1 day and the other check field type)

I think Azure storage + Synapse analytics Serverless + PowerBI Incremental Refresh may end up as a very powerful Pattern.

%d bloggers like this: