# Feature Store Quick Start

This notebook gives you a quick overview of how you can intergrate the feature store service on Hopsworks into your machine learning pipeline. We'll go over four steps:

1. Generate some sample data (rather than reading data from disk just to make this notebook stand-alone)
2. Do some feature engineering on the data
3. **Save the engineered features to the feature store**
4. **Select a group of the features from the feature store and create a managed training dataset of tf records in the feature store**
5. Train a model on the training dataset

## Imports

We'll use numpy and pandas for data generation, pyspark for feature engineering, tensorflow and keras for model training, and the hops `featurestore` library for interacting with the feature store.

In [3]:
import numpy as np
import random
import pandas as pd
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql import Row
from hops import featurestore
from hops import experiment

## Generate Sample Data

Lets generate two sample datasets:

1. `houses_for_sale_data`:

```bash
+-------+--------+------------------+------------------+------------------+
|area_id|house_id|       house_worth|         house_age|        house_size|
+-------+--------+------------------+------------------+------------------+
|      1|       0| 11678.15482418699|133.88670106643886|366.80067322738535|
|      1|       1| 2290.436167500643|15994.969706808222|195.84014889823976|
|      1|       2| 8380.774578431328|1994.8576926471007|1544.5164614303735|
|      1|       3|11641.224696102923|23104.501275562343|1673.7222604337876|
|      1|       4| 5382.089422436954| 13903.43637058141| 274.2912104765028|
+-------+--------+------------------+------------------+------------------+

 |-- area_id: long (nullable = true)
 |-- house_id: long (nullable = true)
 |-- house_worth: double (nullable = true)
 |-- house_age: double (nullable = true)
 |-- house_size: double (nullable = true)
```
2. `houses_sold_data``
```bash
+-------+-----------------+-----------------+------------------+
|area_id|house_purchase_id|number_of_bidders|   sold_for_amount|
+-------+-----------------+-----------------+------------------+
|      1|                0|                0| 70073.06059070028|
|      1|                1|               15| 146.9198329740602|
|      1|                2|                6|  594.802165433149|
|      1|                3|               10| 77187.84123130841|
|      1|                4|                1|110627.48922722359|
+-------+-----------------+-----------------+------------------+

 |-- area_id: long (nullable = true)
 |-- house_purchase_id: long (nullable = true)
 |-- number_of_bidders: long (nullable = true)
 |-- sold_for_amount: double (nullable = true)
```

We'll use this data for predicting what a house is sold for based on features about the **area** where the house is.

### Generation of `houses_for_sale_data`

In [4]:
area_ids = list(range(1,51))
house_sizes = []
house_worths = []
house_ages = []
house_area_ids = []
for i in area_ids:
    for j in list(range(1,100)):
        house_sizes.append(abs(np.random.normal()*1000)/i)
        house_worths.append(abs(np.random.normal()*10000)/i)
        house_ages.append(abs(np.random.normal()*10000)/i)
        house_area_ids.append(i)
house_ids = list(range(len(house_area_ids)))
houses_for_sale_data  = pd.DataFrame({
        'area_id':house_area_ids,
        'house_id':house_ids,
        'house_worth': house_worths,
        'house_age': house_ages,
        'house_size': house_sizes
    })
houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)

In [5]:
houses_for_sale_data_spark_df.show(5)

+-------+--------+------------------+------------------+------------------+
|area_id|house_id|       house_worth|         house_age|        house_size|
+-------+--------+------------------+------------------+------------------+
|      1|       0|7954.6041492424665|18744.948373115192| 262.5787314631974|
|      1|       1|1358.0084781070925|18504.222822538602|  809.432184084531|
|      1|       2|11154.843104129464|  6791.12075051502|1562.9281829769877|
|      1|       3| 3516.765032301006| 8032.966064701901| 235.4698790067539|
|      1|       4| 17528.84349464171| 7575.332365371018| 530.4529032760828|
+-------+--------+------------------+------------------+------------------+
only showing top 5 rows

In [6]:
houses_for_sale_data_spark_df.printSchema()

root
 |-- area_id: long (nullable = true)
 |-- house_id: long (nullable = true)
 |-- house_worth: double (nullable = true)
 |-- house_age: double (nullable = true)
 |-- house_size: double (nullable = true)

### Generation of `houses_sold_data`

In [7]:
house_purchased_amounts = []
house_purchases_bidders = []
house_purchases_area_ids = []
for i in area_ids:
    for j in list(range(1,1000)):
        house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)
        house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))
        house_purchases_area_ids.append(i)
house_purchase_ids = list(range(len(house_purchases_bidders)))
houses_sold_data  = pd.DataFrame({
        'area_id':house_purchases_area_ids,
        'house_purchase_id':house_purchase_ids,
        'number_of_bidders': house_purchases_bidders,
        'sold_for_amount': house_purchased_amounts
    })
houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)

In [8]:
houses_sold_data_spark_df.show(5)

+-------+-----------------+-----------------+------------------+
|area_id|house_purchase_id|number_of_bidders|   sold_for_amount|
+-------+-----------------+-----------------+------------------+
|      1|                0|                1|29395.848291318987|
|      1|                1|                3|16425.712634068295|
|      1|                2|               40|116255.52034231767|
|      1|                3|               14| 32392.42818851318|
|      1|                4|               12|  63486.6189252683|
+-------+-----------------+-----------------+------------------+
only showing top 5 rows

In [9]:
houses_sold_data_spark_df.printSchema()

root
 |-- area_id: long (nullable = true)
 |-- house_purchase_id: long (nullable = true)
 |-- number_of_bidders: long (nullable = true)
 |-- sold_for_amount: double (nullable = true)

## Feature Engineering

Lets generate some aggregate features such as sum and averages from our datasets. 

1. `houses_for_sale_features`:

```bash
 |-- area_id: long (nullable = true)
 |-- avg_house_age: double (nullable = true)
 |-- avg_house_size: double (nullable = true)
 |-- avg_house_worth: double (nullable = true)
 |-- sum_house_age: double (nullable = true)
 |-- sum_house_size: double (nullable = true)
 |-- sum_house_worth: double (nullable = true)
```

2. `houses_sold_features`

```bash
 |-- area_id: long (nullable = true)
 |-- avg_num_bidders: double (nullable = true)
 |-- avg_sold_for: double (nullable = true)
 |-- sum_number_of_bidders: long (nullable = true)
 |-- sum_sold_for_amount: double (nullable = true)
```

### Generate Features From `houses_for_sale_data`

In [10]:
sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").sum()
count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").count()
sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, "area_id")
sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \
    .withColumnRenamed("sum(house_age)", "sum_house_age") \
    .withColumnRenamed("sum(house_worth)", "sum_house_worth") \
    .withColumnRenamed("sum(house_size)", "sum_house_size") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_house_for_sale(row):
    avg_house_worth = row.sum_house_worth/float(row.num_rows)
    avg_house_size = row.sum_house_size/float(row.num_rows)
    avg_house_age = row.sum_house_age/float(row.num_rows)
    return Row(
        sum_house_worth=row.sum_house_worth, 
        sum_house_age=row.sum_house_age,
        sum_house_size=row.sum_house_size,
        area_id = row.area_id,
        avg_house_worth = avg_house_worth,
        avg_house_size = avg_house_size,
        avg_house_age = avg_house_age
       )
houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(
    lambda row: compute_average_features_house_for_sale(row)
).toDF()

In [11]:
houses_for_sale_features_df.printSchema()

root
 |-- area_id: long (nullable = true)
 |-- avg_house_age: double (nullable = true)
 |-- avg_house_size: double (nullable = true)
 |-- avg_house_worth: double (nullable = true)
 |-- sum_house_age: double (nullable = true)
 |-- sum_house_size: double (nullable = true)
 |-- sum_house_worth: double (nullable = true)

### Generate Features from `houses_sold_data`

In [12]:
sum_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").sum()
count_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").count()
sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, "area_id")
sum_count_houses_sold_df = sum_count_houses_sold_df \
    .withColumnRenamed("sum(number_of_bidders)", "sum_number_of_bidders") \
    .withColumnRenamed("sum(sold_for_amount)", "sum_sold_for_amount") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_houses_sold(row):
    avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)
    avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)
    return Row(
        sum_number_of_bidders=row.sum_number_of_bidders, 
        sum_sold_for_amount=row.sum_sold_for_amount,
        area_id = row.area_id,
        avg_num_bidders = avg_num_bidders,
        avg_sold_for = avg_sold_for
       )
houses_sold_features_df = sum_count_houses_sold_df.rdd.map(
    lambda row: compute_average_features_houses_sold(row)
).toDF()

In [13]:
houses_sold_features_df.printSchema()

root
 |-- area_id: long (nullable = true)
 |-- avg_num_bidders: double (nullable = true)
 |-- avg_sold_for: double (nullable = true)
 |-- sum_number_of_bidders: long (nullable = true)
 |-- sum_sold_for_amount: double (nullable = true)

## Save Features to the Feature Store

The Featue store has an abstraction of a **feature group** which is a set of features that naturally belong together that typically are computed using the same feature engineering job and the same raw dataset. 

Lets create two feature groups:

1. `houses_for_sale_featuregroup`

2. `houses_sold_featuregroup`

In [14]:
featurestore.create_featuregroup(
    houses_for_sale_features_df,
    "houses_for_sale_featuregroup",
    description="aggregate features of houses for sale per area",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use demo_featurestore_admin000_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [15]:
featurestore.create_featuregroup(
    houses_sold_features_df,
    "houses_sold_featuregroup",
    description="aggregate features of sold houses per area",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use demo_featurestore_admin000_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

## Create a Training Dataset

The feature store has an abstraction of a **training dataset**, which is a dataset with a set of features (potentially from many different feature groups) and labels (in case of supervised learning). 

Let's create a training dataset called *predict_house_sold_for_dataset* using the following features:

- avg_house_age
- avg_house_size
- avg_house_worth
- avg_num_bidders

and the target variable is:

- avg_sold_for

In [16]:
features_df = featurestore.get_features(["avg_house_age", "avg_house_size", 
                                         "avg_house_worth", "avg_num_bidders", 
                                         "avg_sold_for"])
featurestore.create_training_dataset(
    features_df, "predict_house_sold_for_dataset",
    data_format="tfrecords",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Running sql: use demo_featurestore_admin000_featurestore against offline feature store
Logical query plan for getting 5 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT avg_sold_for, avg_house_age, avg_house_size, avg_num_bidders, avg_house_worth FROM houses_sold_featuregroup_1 JOIN houses_for_sale_featuregroup_1 ON houses_sold_featuregroup_1.`area_id`=houses_for_sale_featuregroup_1.`area_id` against offline feature store
Training Dataset created successfully

## Use the Training Dataset to Train a Model

When creating training datasets through the feature store API, the training dataset becomes *managed* by Hopsworks, meaning that it will get automatic versioning, documentation, API support, and analysis. 

Let's create a simple neural network and train it for the regression task of predicting the target variable `avg_sold_for`.

### Define the Train Function
By refactoring our code into functions it makes it easier to manage and also means that we can utilize the **Experiments** service in Hopsworks.

In [29]:
def train_fn():
    import tensorflow as tf
    from hops import featurestore
    from hops import tensorboard
    
    NUM_EPOCHS = 30
    BATCH_SIZE = 10
    LEARNING_RATE = 0.001
    SHUFFLE_BUFFER_SIZE = 10000
    INPUT_SHAPE=4
    
    def create_tf_dataset():
        dataset_dir = featurestore.get_training_dataset_path("predict_house_sold_for_dataset")
        input_files = tf.io.gfile.glob(dataset_dir + "/part-r-*")
        dataset = tf.data.TFRecordDataset(input_files)
        tf_record_schema = featurestore.get_training_dataset_tf_record_schema("predict_house_sold_for_dataset")
        feature_names = ["avg_house_age", "avg_house_size", "avg_house_worth", "avg_num_bidders"]
        label_name = "avg_sold_for"
        def decode(example_proto):
            example = tf.io.parse_single_example(example_proto, tf_record_schema)
            x = []
            for feature_name in feature_names:
                x.append(example[feature_name])
            y = [tf.cast(example[label_name], tf.float32)]
            return x,y
        dataset = dataset.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE).repeat(NUM_EPOCHS)
        return dataset
    dataset = create_tf_dataset()

    def create_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu', input_shape = (INPUT_SHAPE,), batch_size=BATCH_SIZE),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(1)])
        return model
    model = create_model()
    
    model.compile(optimizer=tf.keras.optimizers.Adam(LEARNING_RATE), loss='mse',  metrics=['accuracy'])
    tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0,
                             write_graph=True, write_images=True)
    callbacks = [tb_callback]
    callbacks.append(tf.keras.callbacks.ModelCheckpoint(tensorboard.logdir() + '/checkpoint-{epoch}.h5',
                    monitor='acc', verbose=0, save_best_only=True))
    history = model.fit(dataset, epochs=NUM_EPOCHS, steps_per_epoch = 5, callbacks=callbacks)
    return -float(history.history["loss"][-1])

### Start The Training Process Using a Reproducible Experiment

In [30]:
experiment.launch(train_fn, 
                  name="feature store quick start example", 
                  local_logdir=True)

Finished Experiment 

('hdfs://rpc.namenode.service.consul:8020/Projects/demo_featurestore_admin000/Experiments/application_1600264891477_0036_7', {'metric': -34666088.0, 'log': 'Experiments/application_1600264891477_0036_7/output.log'})