Sunday, September 27, 2020

Peter Hoffmann: Azure Synapse SQL-on-Demand Openrowset Common Table Expression with SQLAlchemy

In a previous post I have shown how to use turbodbc to access Azure Synapse SQL-on-Demand endpoints. A common pattern is to use the openrowset function to query parquet data from an external data source like the azure blob storage:

select
    result.filepath(1) as [c_date],
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [sales_euro] float,
    ) as [result]
where c_date='2020-09-01'

Common table expressions help to make the sql code more readable, especially if more than one external data source is queried. Once you have defined the CTE statements at the top you can use them like normal tables inside your queries:

WITH location AS
(SELECT
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [l_name] varchar(100),
        [latitude] float,
        [longitude] float
    ) as [result]
),
sales AS
(SELECT
    result.filepath(1) as [c_date],
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id]  bigint,
        [sales_euro]  float,
    ) as [result]
)

SELECT location.l_id, sales.sales_euro
FROM sales JOIN location ON sales.l_id = location.l_id
where c_date = '2020-01-01'

Still writing such queries in data pipelines soon becomes cumbersome end error prone. So once we moved from writing the queries in the Azure Synapse Workbench to using them in our daily workflows with python, we wanted to have a better way to programmatically generate the SQL statements.

SQLAlchemy is still our library of choice to work with SQL in python. SQLAlchemy already has support for Microsoft SQL Server so most of the Azure Synapse SQL-on-Demand features are covered. I have not yet found a native way to work with openrowset queries, but it's quite easy to use the text() feature to inject the missing statement

import sqlalchemy as sa

cte_location_raw = '''
*
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [l_name] varchar(100),
        [latitude] float,
        [longitude] float
    ) as [result]
'''

cte = sa.select([sa.text(cte_location_raw)]).cte('location')
q = sa.select([sa.column('l_id'), sa.column('l_code'), sa.column('l_name')]).select_from(cte)

The cte returns a Common Table Expression instance which is a subclass of the BaseSelect SELECT statement and can be used as such in other statements to generate the following code:

WITH location AS
(SELECT
        *
    FROM
        OPENROWSET(
            BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
            FORMAT='PARQUET'
        ) with(
            [l_id] bigint,
            [l_name] varchar(100),
            [latitude] float,
            [longitude] float
        ) as [result]
)
SELECT l_id, l_code, l_name FROM location

The cte statement does not know about it's columns because it only gets passed the raw sql text. But you can annotate the sa.text statement with a typemap dictionary, so that it exposes which columns are available from the statement. By annotating the cte we can use the table.c.column statement later to reference the columns instead of using sa.column('l_code') as above.

import sqlalchemy as sa

cte_location_raw = '''
*
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [l_name] varchar(100),
        [latitude] float,
        [longitude] float
    ) as [result]
'''

typemap = {"l_id": sa.Integer, "l_code": sa.String, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
cte = sa.select([sa.text(cte_location_raw, typemap=typemap)]).cte('location')
q = sa.select([cte.c.l_id, cte.c.l_name]).select_from(cte)

So putting everything together you can define and test your CTEs in python

import sqlalchemy as sa

cte_sales_raw = '''
SELECT
    result.filepath(1) as [c_date],
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id]                              bigint,
        [sales_euro]                             float,
    ) as [result]
'''
cte_location_raw = '''
SELECT
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [l_name] varchar(100),
        [latitude] float,
        [longitude] float
    ) as [result]
'''

typemap_location = {"l_id": sa.Integer, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
location = sa.select([sa.text(cte_location_raw, typemap=typemap_location).alias("tmp1")]).cte('location')
typemap_sales = {"l_id": sa.Integer,  "c_date": sa.Date, "sales_euro": sa.Float}
sales = sa.select([sa.text(cte_sales_raw, typemap=typemap_sales).alias("tmp2")]).cte('sales')

and then compose more complex statements like with any other SQLAlchemy table definitions:

cols = [sales.c.c_date, sales.c.l_id, location.c.l_name, location.c.latitude, location.c.longitude]
q = sa.select(cols).select_from(sales.join(location, sales.c.l_id == location.c.l_id ))

In our production data pipelins at Blue Yonder we typically provide the building blocks to create complex queries in libraries that are maintained by a central team. Testing smaller parts with SQLAlchemy works much better and it's easier for data scientists to plug them together and concentrate on high level model logic.

We like the power of Azure SQL-on-Demand, but managing and testing complex SQL statements is still a challenge as you can already see by the result of the above code. But at least SQLAlchemy and Python make it easier:

WITH sales AS
(SELECT l_id AS l_id, c_date AS c_date, sales_euro AS sales_euro
FROM (
SELECT
    result.filepath(1) as [c_date],
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [sales_euro] float,
    ) as [result]
)as tmp1),
location AS
(SELECT l_id AS l_id, l_name AS l_name, latitude AS latitude, longitude AS longitude
FROM (
SELECT
    *
FROM
    OPENROWSET(
        BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
        FORMAT='PARQUET'
    ) with(
        [l_id] bigint,
        [l_name] varchar(100),
        [latitude] float,
        [longitude] float
    ) as [result]
) as tmp2)
SELECT sales.c_date, sales.l_id, location.l_name, location.latitude, location.longitude
FROM sales JOIN location ON sales.l_id = location.l_id


from Planet Python
via read more

No comments:

Post a Comment

TestDriven.io: Working with Static and Media Files in Django

This article looks at how to work with static and media files in a Django project, locally and in production. from Planet Python via read...