Demo Dataset
We are using the NY Taxi Dataset throughout this blog post because it is a real world dataset, has a reasonable size and some nice properties like different datatypes and includes some messy data (like all real world data engineering problems).
mkdir input
cd input
for i in {01..12}; do
wget get https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done
Looking at the first rows of the data gives us some insight about the columns and data format
$ head -n4 input/yellow_tripdata_2019-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,
Each of the files is roughly 700MiB uncompressed data:
$ du -sh input/yellow_tripdata_2018-0*
737M input/yellow_tripdata_2018-01.csv
715M input/yellow_tripdata_2018-02.csv
794M input/yellow_tripdata_2018-03.csv
784M input/yellow_tripdata_2018-04.csv
777M input/yellow_tripdata_2018-05.csv
734M input/yellow_tripdata_2018-06.csv
661M input/yellow_tripdata_2018-07.csv
661M input/yellow_tripdata_2018-08.csv
678M input/yellow_tripdata_2018-09.csv
To convert the data to parquet we are going to use pandas to read the csv and store it in one large parquet file:
import glob
import pandas as pd
files = glob.glob("input/yellow_tripdata_2018-*.csv")
def read_csv(filename):
return pd.read_csv(
filename,
dtype={"store_and_fwd_flag": "bool"},
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
index_col=False,
infer_datetime_format=True,
true_values=["Y"],
false_values=["N"],
)
dfs = list(map(read_csv, files))
df = pd.concat(dfs)
df.to_parquet("yellow_tripdata_2018.parquet")
The resulting parquet file has a size of 2.2GiB
, while the sum of the original CSV files was 11GiB
. Pandas supports two parquet implementations, fastparquet and pyarrow. They both have strengths and weaknesses. A comparison should be topic of an other blog post, and we are going to use pyarrow to analyze the data.
pyarrow can open a parquet file without directly reading all the data. It exposes metadata and only reads the necessary byte ranges of the file to get this information. This is extremely helpful when you are working with parquet files that are not available locally and stored in a remote location (like Amazon S3 or Azure Blob Storage), because you are only reading some kB instead of gigabytes of data to understand your dataset.
import pyarrow.parquet as pq
filename = "yellow_tripdata_2018.parquet"
pq_file = pq.ParquetFile(filename)
data = [["columns:", pq_file.metadata.num_columns],
["rows:", pq_file.metadata.num_rows],
["row_roups:", pq_file.metadata.num_row_groups]
]
So we are working with roughly 10 million records, 18 columns and the file has 2 row groups.
columns: 18
rows: 102804250
row_roups: 2
The next step is to have a look at the schema of the parquet file:
s = pq_file.metadata.schema
data = [[s.column(i).name, s.column(i).physical_type, s.column(i).logical_type] for i in range(len(s))]
Column | physical | logical |
---|---|---|
VendorID | INT64 | NONE |
tpep_pickup_datetime | INT64 | TIMESTAMP_MILLIS |
tpep_dropoff_datetime | INT64 | TIMESTAMP_MILLIS |
passenger_count | INT64 | NONE |
trip_distance | DOUBLE | NONE |
RatecodeID | INT64 | NONE |
store_and_fwd_flag | BOOLEAN | NONE |
PULocationID | INT64 | NONE |
DOLocationID | INT64 | NONE |
payment_type | INT64 | NONE |
fare_amount | DOUBLE | NONE |
extra | DOUBLE | NONE |
mta_tax | DOUBLE | NONE |
tip_amount | DOUBLE | NONE |
tolls_amount | DOUBLE | NONE |
improvement_surcharge | DOUBLE | NONE |
total_amount | DOUBLE | NONE |
Each column has a physical type that defines how the column is stored on disk and an optional logical type that is used to determine the actual data type. In case of the tpep_pickup_datetime
and tpep_pickup_datetime
the values are stored as INT64
types on disk but are represented as timestamps in pandas.
Logical types are used to extend the types that parquet can be used to store, by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses parquet's efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation, the parquet logical type definitions provides comprehensive documentation.
Now let's dive a little deeper into the file. A parquet file consists of one ore more row groups, which are a logical horizontal partitioning of the data into rows.
s = pq_file.metadata.schema
data = []
for rg in range(pq_file.metadata.num_row_groups):
rg_meta = pq_file.metadata.row_group(rg)
data.append([rg, rg_meta.num_rows, sizeof_fmt(rg_meta.total_byte_size)])
As we have written the parquet file with the default values in pandas we get row groups with a size on disk between 512MiB an 1,5GiB.
rowgroup | rows | size |
---|---|---|
0 | 67108864 | 1.4GiB |
1 | 35695386 | 753.0MiB |
To understand the defaults of the row group sizing, a little bit of historical context is necessary. The parquet file format was developed as a columnar data storage format of the Apache Hadoop ecosystem and its underlying Hadoop distributed file system (HDFS):
Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file. #
When working with parquet in python one does typically not use HDFS as a storage backend, but either the local file system or a cloud blob storage like Amazon S3 or Azure blob store. Depending on the read scenarios different row group sizes make sense.
Instead of concatenating the csv files in pandas and write them in one batch, on can use pyarrow.ParquetWriter directly to control how many rowgroups are written:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
months = range(1,13)
def read_csv(month):
filename ="input/yellow_tripdata_2018-{:02d}.csv".format(month)
df = pd.read_csv(
filename,
dtype={"store_and_fwd_flag": "bool"},
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
index_col=False,
infer_datetime_format=True,
true_values=["Y"],
false_values=["N"],
)
return df[(df['tpep_pickup_datetime'].dt.year=2018) & (df['tpep_pickup_datetime'].dt.month=month)]
dfs = list(map(read_csv, months))
table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('yellow_tripdata_2018-rowgroups.parquet', table.schema)
for df in dfs:
table = pa.Table.from_pandas(df, preserve_index=False)
writer.write_table(table)
writer.close()
I have also added some data cleansing because, as mentioned earlier, the taxi dataset includes some messy data and we only want to have rows within a monthly data set with the correct pickup_datetime
.
If we analyze the new parquet file again we can see that we now have a row group fore each month of data:
rowgroup | rows | total_byte_size |
---|---|---|
0 | 8492076 | 152.0MiB |
1 | 8173231 | 150.5MiB |
2 | 8040133 | 148.1MiB |
3 | 9430376 | 169.8MiB |
4 | 8821105 | 162.4MiB |
5 | 7849748 | 142.9MiB |
6 | 9305515 | 168.0MiB |
7 | 8145164 | 149.8MiB |
8 | 9224063 | 167.1MiB |
9 | 7849134 | 142.9MiB |
10 | 8713831 | 157.8MiB |
11 | 8759874 | 157.9MiB |
Next to the data in the rowgroups the parquet format specifies some metadata that is written per rowgroup:
rg_meta = pq_file.metadata.row_group(0)
rg_meta.column(0)
Per column we can retrieve metadata like compression, sizing and datatype, but also statistical information about the values stored in the rowgroup for the particular column.
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fa958ab72d0>
file_offset: 43125536
file_path:
physical_type: INT64
num_values: 8759557
path_in_schema: tpep_pickup_datetime
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x7fa958ab7510>
has_min_max: True
min: 2001-01-05 11:45:23
max: 2018-01-31 23:59:57
null_count: 0
distinct_count: 0
num_values: 8759557
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
converted_type (legacy): NONE
compression: SNAPPY
encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
has_dictionary_page: True
dictionary_page_offset: 1312236
data_page_offset: 2117164
total_compressed_size: 41813300
total_uncompressed_size: 68701768
Looking at the min
and max
statistics of the tpep_pickup_datetime
:
column = 1 # tpep_pickup_datetime
data = [["rowgroup", "min", "max"]]
for rg in range(pq_file.metadata.num_row_groups):
rg_meta = pq_file.metadata.row_group(rg)
data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
print_table(data)
The statistics show an interesting property. The values per rowgroup are disjunct. This means without reading the full data you can know which values to expect in which rowgroup.
rowgroup | min | max |
---|---|---|
0 | 2018-01-01 00:00:00 | 2018-01-31 23:59:57 |
1 | 2018-02-01 00:00:00 | 2018-02-28 23:59:58 |
2 | 2018-03-01 00:00:00 | 2018-03-31 23:59:57 |
3 | 2018-04-01 00:00:00 | 2018-04-30 23:59:58 |
4 | 2018-05-01 00:00:00 | 2018-05-31 23:59:59 |
5 | 2018-06-01 00:00:00 | 2018-06-30 23:59:59 |
6 | 2018-07-01 00:00:00 | 2018-07-31 23:59:59 |
7 | 2018-08-01 00:00:00 | 2018-08-31 23:59:59 |
8 | 2018-09-01 00:00:00 | 2018-09-30 23:59:59 |
9 | 2018-10-01 00:00:00 | 2018-10-31 23:59:58 |
10 | 2018-11-01 00:00:00 | 2018-11-30 23:59:59 |
11 | 2018-12-01 00:00:00 | 2018-12-31 23:59:58 |
If columns are sorted and/or rowgroups have disjunct values in a dataset, readers can take advantagea of this through a feature called predicate pushdown
. To get all taxi trips on a certain day 2018-02-20
the parquet reader can now look at the rowgroup statistics, compare the predicate tpep_pickup_datetime.min <= 2019-02-20 and tpep_pickup_datetime.max >= 2019-02-20
against it and only read the parts of the file that potentially include rows for the day. In our case one would only have to read the rowgroup 1
and by this 150MiB
instead of 2.1 GiB
.
In contrast if we print the statistic for the column trip_distance
:
rowgroup | min | max |
---|---|---|
0 | 0.0 | 189483.84 |
1 | 0.0 | 1061.2 |
2 | 0.0 | 302.8 |
3 | 0.0 | 943.5 |
4 | 0.0 | 910.8 |
5 | 0.0 | 833.1 |
6 | 0.0 | 7655.76 |
7 | 0.0 | 5381.5 |
8 | 0.0 | 329.63 |
9 | 0.0 | 302.0 |
10 | 0.0 | 932.9 |
11 | 0.0 | 602.3 |
Even if readers would only be interested in rows with a certaintrip_distance
, one would have to read the whole dataset most of the time. Only for distances greater 1000 one could skip some of the rowgroups.
Summary
Query engines on parquet files like Hive, Presto or Dremio provide predicate pushdown out of the box to speed up query times and reduce I/O.
In the python ecosystem fastparquet has support for predicate pushdown on row group level. pyarrow has an open ticket for an efficient implementation in the parquet C++ reader.
Implementing predicate pushdown in python on top of the exposed statistics is not that hard. In my team we have done this within kartothek to speed up reads from large datasets from the Azure Blob storage.
from Planet Python
via read more
No comments:
Post a Comment