# Petastorm Hello World Examples

In this notebook we will introduce Uber's Petastorm library (https://github.com/uber/petastorm) for creating training datasets for deep learning. We will go over some hello world examples and see how a petstorm store differs from other type of storage formats.

### Motivation

Petastorm is an open source **data access library**. The main motivation for this library is to make it easier for data scienstists to work with big data stored in Hadoop-like data lakes. The benefits of Petastorm are the following:

- It enables to use a single data format that can be used for both Tensorflow and PyTorch datasets (before Petastorm users on Hopsworks would typically have one dataset in tfrecords for training with Tensorflow and another in hdf5 for training with PyTorch). This reduces the number of ETL steps necessary to prepare data for deep learning.

- Petastorm datasets integrate very well in Apache Spark, the main processing engine used in Hopsworks. Petastorm datasets are built on top of Parquet, which has better support in Spark than for example TFRecords or HDF5.

- A Petastorm dataset is self-contained, the data is stored together with its schema, which means that a data scientist can read a dataset into tensorflow or Pytorch without having to specify the schema to parse the data. As compared to TFRecords, where you need the schema at read-time, and if any discrepancy between your schema and the data on disk you might run into erros where you have to manually inspect protobuf files to figure out the serialization errors. With Petastorm, the API looks a little bit like a database, you do not need to know anything a-priori about the schema of the data you just call `make_reader()` and then the library will use the metadata to infer everything for you. **Note**: When you create a petastorm in the first place and write it to disk you pay the price for the simplicity for *reading* petastorm datasets, i.e when you write Petastorm datasets you have to be very explicit with specifying the schema and other configuration.

- The dataset is optimized for filesystems like HDFS, streaming records from large files rather than using billions of small files. (Although HopsFS is a bit special in this regard as it can deal very well with both large and small files (https://www.logicalclocks.com/fixing-the-small-files-problem-in-hdfs/))

- Petastorm is supported in Hopsworks Feature Store

- When training deep learning models it is important that you can stream data in a way taht does not starve your GPUs, Petastorm is designed to be performant and usable for deep learning from the beginning. Moreover, petastorm have support partitioning data to optimize for distributed deep learning


![Petastorm 1](./../images/petastorm1.png "Petastorm 1")

### Background

TLDR; A petastorm dataset is a Parquet dataset with extended metadata.

Petastorm is built on top of Parquet files. Parquet is a columnar data format with great support for Spark and other big data applications such as Hive or HDFS. However, Parquet is not supported natively by deep learning frameworks such as Tensorflow or PyTorch. Morover, Parquet has some buit-in primitive data types like integer, string and binary, but it does not include higher-order tensors which is typical in datasets for deep learning. Petastorm solves this by adding a data-access library on top of Parquet and storing extended metadata as a custom-field in the Parquet schema in the footer, which enables to store tensor data in Parquet files.

Petastorm stores tensors as binary blobs in Parquet but also stores the necessary schema with information about the name of the tensors, the shape/dimensions, and the data type inside the tensor. Petastorm also provides a set of codecs for specifying how to compress the  data. All of this is specified in a Petastorm schema called a **Unischema**.

![Petastorm 2](./../images/petastorm2.png "Petastorm 2")

To be able to utilize the added metadata to the Parquet we use the Petastorm client library which uses Apache Arrow to manipulat the dataset in memory:

```python
import petastorm
```

### Generating A  Sample Petastorm Dataset

In [3]:
import numpy as np
from hops import hdfs, featurestore
import pyarrow as pa
import random
import pandas as pd
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType

# IMPORTANT: must import  tensorflow before petastorm.tf_utils due to a bug in petastorm
import tensorflow as tf
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from pyspark.sql.types import StructType, StructField, IntegerType
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.spark_utils import dataset_as_rdd
from petastorm import make_reader
from petastorm.tf_utils import tf_tensors, make_petastorm_dataset
from petastorm.pytorch import DataLoader
from petastorm import make_batch_reader

#### Specify the Petastorm Schema

Below we specify a sample petastorm schema with three fields, including multi-dimensional tensors which are not supported natively by Parquet. 

In [4]:
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])

The Petastorm Unischema is designed so that it can be converted to Spark/Numpy/Tensorflow/Pytorch schemas

In [5]:
HelloWorldSchema.as_spark_schema()

StructType(List(StructField(array_4d,BinaryType,false),StructField(id,IntegerType,false),StructField(image1,BinaryType,false)))

#### Create a Spark Dataframe that Conforms to the Petastorm Schema and Write the Petastorm Dataset

As petastorm datasets are built on top of Parquet files we can use Spark in combination with the context manager `materialize_dataset`. 

However, first we must create a spark dataframe that conforms to the Petastorm schema. To  do this we can first create a an rdd of dicts and use `dict_to-spark_row(Unischema, dict-row)`. We could also have created the spark dataframe directly from `HelloWorldSchema.as_spark_schema()`. 

In [6]:
OUTPUT_URL = hdfs.project_path() + "Resources/hello_world"

In [9]:
def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}


def generate_petastorm_dataset():
    """ Generates a petastorm dataset and saves it to HopsFS using Spark and materialize_dataset context manager """
    rowgroup_size_mb = 256
    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    filesystem_factory= lambda: pa.hdfs.connect(driver='libhdfs')
    with materialize_dataset(spark, OUTPUT_URL, HelloWorldSchema, rowgroup_size_mb,filesystem_factory=filesystem_factory):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(OUTPUT_URL)

By writing the parquet files (`spark.write.parquet()`) using the petastorm context manager `materialize_dataset()`, the petastorm library will take care of writing out the necessary petastorm-specific metadata to the parquet files at the end. 

In [10]:
generate_petastorm_dataset()

#### Alternative Ways of Creating Spark Data Frames that conforms to a Petastorm Schema

You don't have to use the `dict_to_spark_row` and `spark.createDataFrame(rdd, schema.as_spark_schema())`, these are just helper methods to make sure that your Unischema actually is consistent with the spark schema. An alternative way is illustrated below (with less gurantees about consistency)

In [11]:
sql_context = SQLContext(sc)
pandas_df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
spark_df = sql_context.createDataFrame(pandas_df)

In [12]:
spark_df.printSchema()

root
 |-- A: long (nullable = true)
 |-- B: long (nullable = true)
 |-- C: long (nullable = true)
 |-- D: long (nullable = true)

In [13]:
TestSchema = Unischema('TestSchema', [
    UnischemaField('A', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('B', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('C', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('D', np.int32, (), ScalarCodec(IntegerType()), False)
])
TEST_URL = hdfs.project_path() + "Logs/test.petastorm"

In [15]:
filesystem_factory= lambda: pa.hdfs.connect(driver='libhdfs')
with materialize_dataset(spark, TEST_URL, TestSchema, filesystem_factory=filesystem_factory):
    spark_df.write.mode('overwrite').parquet(TEST_URL)

### Read a Petastorm Dataset

A petastorm dataset can be read directly with Spark, Pytorch or Tensorflow using the Petastorm library. This is really where the Petastorm data format shines. Due to being so explicit with schema when writing the dataset, reading the data is very simple.

#### Reading a Petastorm Dataset using plain python

In [16]:
def python_hello_world():
    """ Creates a python reader to read a petastorm dataset"""
    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:
        # Pure python
        for sample in reader:
            print(sample.id)

In [17]:
python_hello_world()

5
6
7
8
9
0
1
2
3
4

#### Reading a Petastorm Dataset using Pyspark

To read a petastorm dataset in pyspspark we can either read dataframe directly with spark `spark.read.parquet(OUTPUT_URL)`, however this does not utilize the added metadata that petastorm creates. We can also read a petastorm dataset into pyspark by using Petastorm's utility methods, such as `dataset_as_rdd`.

In [18]:
def pyspark_hello_world():
    """ Reads a petastorm dataset into spark rdd and dataframe"""
    # dataset_as_rdd creates an rdd of named tuples.
    rdd = dataset_as_rdd(OUTPUT_URL, spark, [HelloWorldSchema.id, HelloWorldSchema.image1], hdfs_driver="libhdfs")
    print('An id in the dataset: ', rdd.first().id)

    # Create a dataframe object from a parquet file
    dataframe = spark.read.parquet(OUTPUT_URL)

    # Show a schema
    dataframe.printSchema()

    # Count all
    dataframe.count()

    # Show just some columns
    dataframe.select('id').show()

    # This is how you can use a standard SQL to query a dataset. Note that the data is not decoded in this case.
    number_of_rows = spark.sql(
        'SELECT count(id) '
        'from parquet.`{}` '.format(OUTPUT_URL)).collect()
    print('Number of rows in the dataset: {}'.format(number_of_rows[0][0]))

In [19]:
pyspark_hello_world()

An id in the dataset:  0
root
 |-- array_4d: binary (nullable = true)
 |-- id: integer (nullable = true)
 |-- image1: binary (nullable = true)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

Number of rows in the dataset: 10

#### Reading a Petastorm Dataset using Tensorflow

Petastorm enables to store multi-dimensional tensors (e.g images) as Parquet and then read it directly in Tensorflow using a very simple API that supports both the old TF-API and the tf.Dataset API. 

In [20]:
def tensorflow_hello_world():
    """ Creates a tensorflow reader for reading a petastorm dataset """
    # Example: tf_tensors will return tensors with dataset data
    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:
        tensor = tf_tensors(reader)
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample.id)

    # Example: use tf.data.Dataset API
    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample.id)

In [21]:
tensorflow_hello_world()

5
5

#### Reading a Petastorm Dataset using PyTorch

In [22]:
def pytorch_hello_world():
    """ Creates a PyTorch reader for reading a petastorm dataset """
    with DataLoader(make_reader(OUTPUT_URL, hdfs_driver="libhdfs")) as train_loader:
        sample = next(iter(train_loader))
        print(sample['id'])

In [23]:
pytorch_hello_world()

tensor([0], dtype=torch.int32)

### External Datasets in Petastorm

`external dataset` in Petastorm library refers to existing Parquet-stores that do not include the extended Petastorm metadata. Petastorm library is able to read such datasets as well by using `make_batch_reader`.

`make_batch_reader` works with any parquet stores and returns batches of records, as opposed to `make_reader` that only works with Petastorm datasets and returns one record at a time. 

#### Generate an external Petastorm dataset (regular Parquet store)

In [24]:
OUTPUT_URL2 = hdfs.project_path() + "Resources/hello_world_external"

In [25]:
def row_generator_external(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return Row(id=x, value1=random.randint(-255, 255), value2=random.randint(-255, 255))

In [26]:
def generate_external_dataset():
    """Creates an example dataset at output_url in Parquet format"""

    rows_count = 10
    rows_rdd = sc.parallelize(range(rows_count))\
        .map(row_generator_external)

    spark.createDataFrame(rows_rdd).\
        write.\
        mode('overwrite').\
        parquet(OUTPUT_URL2)

In [27]:
generate_external_dataset()

#### Read External Dataset using Plain Python and the Petastorm API

In [28]:
def python_hello_world_external():
    """ Creates a python reader for reading a regular parquet store using Petastorm library"""
    # Reading data from the non-Petastorm Parquet via pure Python
    with make_batch_reader(OUTPUT_URL2, schema_fields=["id", "value1", "value2"], hdfs_driver='libhdfs') as reader:
        for schema_view in reader:
            # make_batch_reader() returns batches of rows instead of individual rows
            print("Batched read:\nid: {0} value1: {1} value2: {2}".format(
                schema_view.id, schema_view.value1, schema_view.value2))

In [29]:
python_hello_world_external()

Batched read:
id: [5 6 7 8 9] value1: [-189   51 -169  244   81] value2: [  72 -175    8  -17 -134]
Batched read:
id: [0 1 2 3 4] value1: [-189   51 -169  244   81] value2: [  72 -175    8  -17 -134]

#### Read External Dataset using Pytorch and the Petastorm API

In [30]:
def pytorch_hello_world_external():
    """ Creates a pyTorch reader for reading a regular parquet store using Petastorm library"""
    with DataLoader(make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs')) as train_loader:
        sample = next(iter(train_loader))
        # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row
        print("id batch: {0}".format(sample['id']))


In [31]:
pytorch_hello_world_external()

id batch: tensor([[0, 1, 2, 3, 4]])

#### Read External Dataset using Tensorflow and the Petastorm API

In [32]:
def tensorflow_hello_world_external():
    """ Creates a Tensorflow reader for reading a regular parquet store using Petastorm library"""
    # Example: tf_tensors will return tensors with dataset data
    with make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs') as reader:
        tensor = tf_tensors(reader)
        with tf.Session() as sess:
            # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row
            batched_sample = sess.run(tensor)
            print("id batch: {0}".format(batched_sample.id))
    # Example: use tf.data.Dataset API
    with make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs') as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            batched_sample = sess.run(tensor)
            print("id batch: {0}".format(batched_sample.id))

In [33]:
tensorflow_hello_world_external()

id batch: [0 1 2 3 4]
id batch: [0 1 2 3 4]

## Advanced Features 

Petastorm also support more advanced features for data access:

- Selective column selection, since petastorm is built on a columnat format (Parquet) selective column selection is efficient (as opposed to row-based formats like TF-Records where you must parse the entire row in to memory to do column-selection).

- Parallelized reads

- Dataset Sharding for distributed training

- N-grams (windowing) support. If your data is sorted by time-stamp, Petstorm can efficiently do I/O and autoamtically window it into n-grams for you.  

- Row filtering (row predicates)

- Shuffling, when doing machine learning shuffling is an important step to avoid that you introduce artifical correlations in the dataset based on ordering

#### Selective Column Selection

In [34]:
def tensorflow_selective_column_selection(schema_fields):
    """ Tensorflow Selective Column Selection """

    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', schema_fields=schema_fields) as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print("Number of columns: {}".format(len(sample)))
            print("fields:")
            print(sample._fields)

In [35]:
OUTPUT_URL = hdfs.project_path() + "Resources/hello_world"

In [36]:
tensorflow_selective_column_selection(None) #read all fields

Number of columns: 3
fields:
('array_4d', 'id', 'image1')

In [37]:
tensorflow_selective_column_selection(["array_4d", "id"]) #read only fields 'array_4d' and 'id

Number of columns: 2
fields:
('array_4d', 'id')

#### Parallelized reads

In [38]:
def tensorflow_thread_pool_read(reader_pool_type, workers_count):
    """ Tensorflow Parallel Read with Threads """

    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', reader_pool_type = reader_pool_type, 
                     workers_count=workers_count) as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample._fields)

In [39]:
tensorflow_thread_pool_read("thread", 15) # thread pool of size 15 for reading

('array_4d', 'id', 'image1')

In [40]:
tensorflow_thread_pool_read("thread", 5) # process pool of size 5 for reading

('array_4d', 'id', 'image1')

#### Dataset Sharding

In [41]:
def tensorflow_dataset_sharding(shard_count, cur_shard):
    """ 
    Tensorflow dataset sharding 
    
    Args:
        :shard_count: An int denoting the number of shards to break this dataset into. Defaults to None
        :cur_shard: An int denoting the current shard number. 
                    Each node reading a shard should pass in a unique shard number in the range [0, shard_count). 
                    shard_count must be supplied as well.
    """

    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', shard_count=shard_count, 
                     cur_shard=cur_shard) as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample._fields)

In [42]:
tensorflow_dataset_sharding(2, 1) # shard dataset into 2 shards and read the first shard

('array_4d', 'id', 'image1')

#### Row Predicates

In [43]:
def tensorflow_dataset_sharding(predicate):
    """ 
    Tensorflow reader with row predicate
    """

    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', predicate=predicate) as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample.id)

In [44]:
#tensorflow_dataset_sharding(predicate = lambda x: x.id == 5)
from petastorm.predicates import in_lambda
predicate = in_lambda(["id"], lambda id: id==5)
tensorflow_dataset_sharding(predicate)

5

#### Shuffling 

In [45]:
def tensorflow_dataset_shuffling(shuffle_row_groups, shuffle_row_drop_partitions):
    """ 
    Tensorflow dataset shuffling
    
    Args:
         :shuffle_row_groups: Whether to shuffle row groups (the order in which full row groups are read)
         :shuffle_row_drop_partitions: This is is a positive integer which determines how many partitions 
                                       to break up a row group into for increased shuffling in exchange for 
                                       worse performance (extra reads). For example if you specify 
                                       2 each row group read will drop half of the rows within every 
                                       row group and read the remaining rows in separate reads. 
                                       It is recommended to keep this number below the regular row group 
                                       size in order to not waste reads which drop all rows.
    """

    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', predicate=predicate) as reader:
        dataset = make_petastorm_dataset(reader)
        iterator = dataset.make_one_shot_iterator()
        tensor = iterator.get_next()
        with tf.Session() as sess:
            sample = sess.run(tensor)
            print(sample._fields)

In [46]:
tensorflow_dataset_shuffling(True, 5)

('array_4d', 'id', 'image1')

### Integration with the Feature Store

Petastorm is a supported format for saving training datasets in the feature store. To save a spark dataframe in the Petastorm format in the feature store, use the method `featurestore.create_training_dataset()` and supply the dataframe together with petastorm arguments in a dict `petastorm_args`.

In [47]:
sql_context = SQLContext(sc)
pandas_df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
spark_df = sql_context.createDataFrame(pandas_df)
TestSchema = Unischema('TestSchema', [
    UnischemaField('A', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('B', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('C', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('D', np.int32, (), ScalarCodec(IntegerType()), False)
])

In [48]:
petastorm_args = {"schema": TestSchema}
featurestore.create_training_dataset(spark_df, "petastorm_hello_world", data_format="petastorm",
                                     petastorm_args=petastorm_args)

computing descriptive statistics for : petastorm_hello_world
computing feature correlation for: petastorm_hello_world
computing feature histograms for: petastorm_hello_world
computing cluster analysis for: petastorm_hello_world

In [49]:
df = featurestore.get_training_dataset("petastorm_hello_world")
df.printSchema()

root
 |-- A: long (nullable = true)
 |-- B: long (nullable = true)
 |-- C: long (nullable = true)
 |-- D: long (nullable = true)