Pushdown Filters to Parquet From PowerBI Using Synapse Serverless

TL;DR, we use filepath()  function to get the partition Data which can be passed from PowerBI when used with Synapse Serverless, Potentially Making substantial cost saving when used in Incremental refresh as less files will be scanned.

Edit : there is another Technique to incrementally load parquet files without a Database

This blog is just me experimenting with the possibility of passing the filters from PowerBI to a Parquet file using Synapse Serverless.

when you deal with On demand Query engine Like Athena, BigQuery, Synapse Serverless etc, the less files read, the less cost incurred, so if somehow you manage to read only the files that contain the data without scanning all your folders, you can make substantial saving.

To Be clear, the first time I used Parquet was yesterday, so I have very basic knowledge at this stage.

1- Get the Source Data

I have a csv file with 3 Millions rows, here is the schema

2- Partition The Data

as far as my understanding of Synapse Serverless engine, when you filter by date for example, The engine will scan the whole file, maybe if the column is sorted, it will scan less data, I don’t know and have not try it yet, instead we are going to partition the Table by date, which is simply generating lot of files split by the date values, I imagine generating a lot of small files is not optimal too, it will reduce cost but potentially making the Query slower ( at the last this how other Engine Works).

I am using this python script to generate the partitions

import pandas as pd
import datetime
df = pd.read_csv('box.csv',parse_dates=True)
df['SETTLEMENTDATE'] = pd.to_datetime(df['SETTLEMENTDATE'])
df['Date'] = df['SETTLEMENTDATE'].dt.date
df.to_parquet('nem.parquet',partition_cols=['Date'],allow_truncated_timestamps=True)

and here is the results, a folder of parquet files grouped by Date

which I can read using the PowerBI Desktop

I just casually read a parquet file, without any Programing Language !!! ( Kudos for the Product team), this is only to test the parquet is properly generated.

3- Load The files to Azure Storage

This is only a show case, but in a production workflow, you maybe using something like ADF or Python Cloud Functions, anyway to upload a lot of files, I am using Azure storage explorer

now we can start using Synapse Analytics

4- Create a View In Synapse Serverless

we are going to leverage filepath()  function to get the partition date, please see the documentation , here is the Query to create a view

USE [test];
GO
DROP VIEW IF EXISTS parquet;
GO

CREATE VIEW parquet AS
SELECT
 *,convert(Datetime,result.filepath(1),120) as date
FROM
    OPENROWSET(
        BULK 'https://xxxxxxxxxx.dfs.core.windows.net/parquet/nem.parquet/Date=*/*.parquet',
        FORMAT='PARQUET'
    )  result

5- Test if the filter Partition Works

Let’s try some testing to see how much is scanned

no Filter, scan all the files, data processed 48 MB

now with filter , only Date 11/04/2018, Data processed 1 MB

6- Test in PowerBI

I build this small model, Date Table is import and Parquet is DirectQuery ( be cautious when using DirectQuery, PBI can be very chatty and generate a lot of SQL Queries, Currently, it is better to use only import mode, until cache support is added)

Case 1: View all Dates

Data Processed : 43 MB

Case 2: Filter Some Dates

let’s filter only 2 days

Note : please use Query reduction option in PowerBI desktop, otherwise, every time you move the slicer, a query will be generated

Here is the result

I was really excited when I saw the results: 1 MB Synapse Serverless was smart enough to scan only 2 files

7- Incremental Refresh

Instead of Direct Query, let’s try a more practical use case, I configured Incremental refresh to change dates only for the Last 3 days

and here is the List of Queries generated by PowerBI

I have only Data for 2018 and 2019, the second refresh (that Started at 3:45 PM) just refreshed the data for the Last three days and because there is no files for those dates the Query returned 0 MB, which is great.

Another Nice functionality is select top 100 which is generated by PowerQuery to check field type scanned only 1 MB !!!!

Just to be sure, I have done another refresh at 4 :24 PM and checked the Partitions using SSMS

Only the last three partions were refreshed and PQ sent only 6 Queries ( 1 select data for 1 day and the other check field type)

I think Azure storage + Synapse analytics Serverless + PowerBI Incremental Refresh may end up as a very powerful Pattern.

Push Dataset in PowerBI Composite Model, Near real Time made easy !!!

I was always intrigued by the Push dataset in PowerBI but it did had a big showstopper, you can’t mix it with other tables, which made is of little use for me, until Dec 2020, where it become possible to use it in a composite Model, see this excellent introduction

To see how it works, I tried it with this dataset, the Power Generation in Australia every 5 minutes.

1- Create Dataset in PowerBI service

2- Get the URL

3- Use some tool to start streaming data

You need some tool to start pushing data into that Table, for example you can use Easymorph, Personally I have an existing python script that download the data and push it to a DB, I had to add only a couple of line of codes to make it push to PowerBI dataset directly, I copied the code from this function

Please make sure the datetime format is correct, otherwise it will not work.

###### post to PowerBI
            
            df['SETTLEMENTDATE'] = [datetime.strftime(item, "%Y-%m-%dT%H:%M:%SZ") for item in df['SETTLEMENTDATE']]
            
            REST_API_URL = "https://api.powerbi.com/beta/XXXXXXXXXXXXXXXXXXXXXX"
            body = bytes(df.to_json(orient='records'), encoding='utf-8')
            req = urllib2.Request(REST_API_URL, body)
            response = urllib2.urlopen(req)  
            print("PowerBI: HTTP {0} {1}\n".format(response.getcode(), response.read()))
            ################################################

4- Build a Composite Model in PowerBI Desktop

DirectQuery to the Push dataset and a dimension Table as Import, we need the dimension table to know the fuel source of every generator ( wind, Coal etc), The Dimension Table change very rarely, less than 20 new rows per year !!!!

And here is the report

Maybe not a big deal for a lot of users, but Export to web does not work currently with composite Model as of 5-Jan-2021

5- Refresh the Page

Please notice, even if the data source refresh, you still needs to click refresh to get the Latest data on the visuals, when I try to add Automatic page refresh I get this error, I am using a pro license not PPU !!!!

Edit : Fred Kaffenberger suggested a very clever workaround by using a hidden Play Axis custom viz to force the refresh, and indeed it works very well !!!

Take away

To be very clear, Pushing a 5 minutes dataset is not something new , but it was a a relatively complex task, with the new composite model it become nearly trivial, and that’s the magic of PowerBI.

Connect Streamlit to PowerBI service using XMLA end point

Streamlit is a new framework to build data web app using only python, you don’t need any knowledge of javascript/HTML.

Connecting to a PowerBI using Python is well documented , see those excellent tutorials here by David Eldersveld

using this code, I managed to build a small app that using an existing XMLA end point, first it will extract the existing models and then you can run arbitrary DAX queries.

please note as of August 2020, XMLA end point is a PowerBI premium only feature

the main connection string and how to export to a df was copied from this Answer in Stackoverflow

import adodbapi as ado
import numpy as np
import pandas as pd
import streamlit as st

def get_df(data):
    ar = np.array(data.ado_results) # turn ado results into a numpy array
    df = pd.DataFrame(ar).transpose() # create a dataframe from the array
    df.columns = data.columnNames.keys() # set column names
    return df
source=st.sidebar.text_input('Write your XMLA endpoint')
if source:
    with ado.connect("Provider=MSOLAP.8; Data Source="+source) as con:
        with con.cursor() as cur:
         cur.execute('select * from $SYSTEM.DBSCHEMA_CATALOGS')
         data = cur.fetchall()
         catalogue = get_df(data)
         catalogue_Select= st.sidebar.selectbox('Select Models', catalogue['catalog_name'])
dax=st.text_area('Write your DAX Query:')
if dax:
    with ado.connect("Provider=MSOLAP.8; Data Source="+source+" ;Initial catalog="+catalogue_Select) as con:
        with con.cursor() as cur:
         cur.execute(dax)
         data = cur.fetchall()
         df = get_df(data) 
         st.write (df)

and here is the result

Unfortunately adodbapi required Windows , which make deploying the app a bit harder, yo can try Azure Web app which has a windows runtime, I wish it was as easy as Heroku !!!

The good new Microsoft added recently the support for .Net Core, so hopefully I will Update the blog with a cross platform solution

to run the app on your laptop, just type

streamlit run app.py

it is a proof of concept but I see a lot of use cases, an obvious one is to build web app for visualization not supported by PowerBI like massive dataset maps, or 3 D viz.

PowerBI Incremental refresh using Python or R

In this blog, I will show how to leverage Python (or R) to implement an incremental refresh in PowerBI using PowerQuery and Python, nothing is really new ( I am sure Imke and Maxim has blogged about it before).

in a previous blog, I showed how to use R & Python integration to load data to a Database

This approach make sense only when you do a lot of heavy transformation and your data source change based on time.

As an example, in my previous job, we receive a new excel file every Monday (300K rows), this file gets approved and corrected every Thursday.

the workflow was:

save the files in a folder, do the transformation, which was fine , but after the first year, it was around 52 files, and although technically you need only to do transformation for the last file, and as PowerBI does not support incremental refresh, twice a week we redo everything, after two years, the refresh took nearly 30 Minutes and sometimes we get out of memory errors.

in the big picture,Half an hour was not that bad (we have a desktop just for refresh), the worst was, you refresh the model and once you finish, you get a new revision and you must refresh again.

Now using Python/R script, the idea is every file get transformed only 1 time, regardless of how many times you refresh, just by exporting the results of the transformation of every file as a csv in a staging folder.  

  • The first run is slow, as it will process all the existing files in Source Data, but the subsequent run, will transform only new files.
  • Let’s say File 2 was revised, all you need to do,is to delete File2.csv and it will be transformed again, but only that file.
  • Ok, if you see step 4, the files are reloaded each time, I am not too much worried about that, as the batch loading of csv files from a folder using PowerQuery is relatively fast (yes, a bit slow compared to R), the bottleneck is rather the transformation.

the code for python script is here, as you can see PowerQuery integration is amazing, just add a new step and you get a dataframe, that’s all,

# 'dataset' holds the input data for this script

df_by_filename = dataset.groupby("filename")

for (filename, filename_df) in df_by_filename:

    filename = filename.replace("zip", "csv")

    filename = filename.replace("PUBLIC_DAILY", "UNIT_PUBLIC_DAILY")    filename_df.to_csv("C:/results/"+filename,index=False)

the script split the dataframe by the column filename, and then export each file separately, currently it is saving into a local folder, but you can easily save those files into a cloud storage

to test it, I built a quick workflow using public data, PBIX here,  the source data is zip files in a public website, there is a new zip file daily, it is relatively complex transformation as you need to unzip the file split it, delete some columns etc, the first run is slow, as it is processing all the files (62 files), but the next run, will just process 1 file, you can simulate that just by deleting some csv files in the staging folder, when you refresh again, only the files deleted will be processed again.

I think the main take away is, Python and R integration are amazing tools to implement new possibilities that will not be necessary available in PowerBI, and you don’t need to be a programmer to use those integration, a serious search on stackoverflow will get you started quickly.