In a previous post I have shown how to use turbodbc to access Azure Synapse SQL-on-Demand endpoints. A common pattern is to use the openrowset function to query parquet data from an external data source like the azure blob storage:
select
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
where c_date='2020-09-01'
Common table expressions help to make the sql code more readable, especially if more than one external data source is queried. Once you have defined the CTE statements at the top you can use them like normal tables inside your queries:
WITH location AS
(SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
),
sales AS
(SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
)
SELECT location.l_id, sales.sales_euro
FROM sales JOIN location ON sales.l_id = location.l_id
where c_date = '2020-01-01'
Still writing such queries in data pipelines soon becomes cumbersome end error prone. So once we moved from writing the queries in the Azure Synapse Workbench to using them in our daily workflows with python, we wanted to have a better way to programmatically generate the SQL statements.
SQLAlchemy is still our library of choice to work with SQL in python. SQLAlchemy already has support for Microsoft SQL Server so most of the Azure Synapse SQL-on-Demand features are covered. I have not yet found a native way to work with openrowset
queries, but it's quite easy to use the text()
feature to inject the missing statement
import sqlalchemy as sa
cte_location_raw = '''
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
cte = sa.select([sa.text(cte_location_raw)]).cte('location')
q = sa.select([sa.column('l_id'), sa.column('l_code'), sa.column('l_name')]).select_from(cte)
The cte returns a Common Table Expression instance which is a subclass of the BaseSelect SELECT statement and can be used as such in other statements to generate the following code:
WITH location AS
(SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
)
SELECT l_id, l_code, l_name FROM location
The cte statement does not know about it's columns because it only gets passed the raw sql text. But you can annotate the sa.text
statement with a typemap
dictionary, so that it exposes which columns are available from the statement. By annotating the cte we can use the table.c.column
statement later to reference the columns instead of using sa.column('l_code')
as above.
import sqlalchemy as sa
cte_location_raw = '''
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
typemap = {"l_id": sa.Integer, "l_code": sa.String, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
cte = sa.select([sa.text(cte_location_raw, typemap=typemap)]).cte('location')
q = sa.select([cte.c.l_id, cte.c.l_name]).select_from(cte)
So putting everything together you can define and test your CTEs in python
import sqlalchemy as sa
cte_sales_raw = '''
SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
'''
cte_location_raw = '''
SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
typemap_location = {"l_id": sa.Integer, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
location = sa.select([sa.text(cte_location_raw, typemap=typemap_location).alias("tmp1")]).cte('location')
typemap_sales = {"l_id": sa.Integer, "c_date": sa.Date, "sales_euro": sa.Float}
sales = sa.select([sa.text(cte_sales_raw, typemap=typemap_sales).alias("tmp2")]).cte('sales')
and then compose more complex statements like with any other SQLAlchemy table definitions:
cols = [sales.c.c_date, sales.c.l_id, location.c.l_name, location.c.latitude, location.c.longitude]
q = sa.select(cols).select_from(sales.join(location, sales.c.l_id == location.c.l_id ))
In our production data pipelins at Blue Yonder we typically provide the building blocks to create complex queries in libraries that are maintained by a central team. Testing smaller parts with SQLAlchemy works much better and it's easier for data scientists to plug them together and concentrate on high level model logic.
We like the power of Azure SQL-on-Demand, but managing and testing complex SQL statements is still a challenge as you can already see by the result of the above code. But at least SQLAlchemy and Python make it easier:
WITH sales AS
(SELECT l_id AS l_id, c_date AS c_date, sales_euro AS sales_euro
FROM (
SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
)as tmp1),
location AS
(SELECT l_id AS l_id, l_name AS l_name, latitude AS latitude, longitude AS longitude
FROM (
SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
) as tmp2)
SELECT sales.c_date, sales.l_id, location.l_name, location.latitude, location.longitude
FROM sales JOIN location ON sales.l_id = location.l_id
from Planet Python
via read more
No comments:
Post a Comment