 # Feature Store Tour - Scala API
 
This notebook contains a tour/reference for the feature store Scala API on hopsworks. We will go over best practices for using the API as well as common pitfalls.
 
The notebook is designed to be used in combination with the Feature Store Tour on Hopsworks, it assumes that you have run the following feature engineering job: [job](https://github.com/logicalclocks/hops-examples/tree/master/featurestore) (**the job is added automatically when you start the feature store tour in Hopsworks. You can run the job by going to the 'Jobs' tab to the left in the Hopsworks project home page**). 

Which will produce the following model of feature groups in your project's feature store:

![Feature Store Model](./images/model.png "Feature Store Model")

In this notebook we will run queries over this feature store model. We will also create new feature groups and training datasets.

We will go from (1) features to (2) training datasets to (3) A trained model

## Feature Store 101

The simplest way to think about the feature store is as a central place to store curated /features/ within an organization. A feature is a measurable property of some phenomenon. It could be for example an image-pixel, a word from a piece of text, the age of a person, a coordinate emitted from a sensor, or an aggregate value like the average number of purchases within the last hour.

A feature store is a data management layer for machine learning that can optimize the machine learning workflow and provide an interface between data engineering and data science.

![Feature Store Overview](./images/overview.png "Feature Store Overview")

A feature store is not a pure service concept, it goes hand-in-hand with feature computation. Feature engineering is the process of transforming raw data into a format that is compatible and understandable for predictive models.

There are two interfaces to the feature store:

- Writing to the feature store, at the end of the feature engineering pipeline the features are written to the feature store, e.g:

```scala
val rawData = spark.read.format("csv").load(filename)
val polynomialFeatures = rawData.map((x: Float) => scala.math.pow(x, 2))
import io.hops.util.Hops
Hops.insertIntoFeaturegroup("polynomial_features").setDataframe(polynomialFeatures).write()
```
- Reading from the feature store, to train a model on a set of features, the features can be read from the feature store, e.g:

```scala
import io.hops.util.Hops
val features = List("team_budget", "average_attendance", "average_player_age")
val featuresDf = Hops.getFeatures(spark, features, Hops.getProjectFeaturestore)
```

As a data engineer/data scientist, you can think of the feature store as a middle-layer. Once you have computed a set of features, instead of writing them locally to a csv file, insert them in the feature store so that the features can get documented/versioned, backfilled, **and so that your colleagues can re-use your features!** 

In this notebook we will take a look at interacting with the Feature Store through the Scala SDK,there is also a Python SDK available if you prefer that.

## Imports

The hops library is automatically installed in all Hopsworks-projects.You can find API documentation [here](http://snurran.sics.se/hops/hops-util-javadoc/0.6.0-SNAPSHOT/).

In [1]:
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
81,application_1598227897403_0046,spark,idle,Link,Link


SparkSession available as 'spark'.
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._


## How The Scala API Works

All operations are lazy, meaning that when you call `operation1()` you get back a DTO object where you can set the different parameters of your operation (or let them be the default values). To execute the operation call `.read()` or `.write()` on the DTO.

## Get The Name of The Project's Feature Store

Each project with the feature store service enabled automatically gets its own feature store created. This feature store is only accessible within the project unless you decide to share it with other projects. The name of the feature store is `<project_name>_featurestore`, and you can get the name with the API method `getProjectFeaturestore`. 

In [2]:
Hops.getProjectFeaturestore().read()

res1: String = demo_featurestore_admin000_featurestore


## Get a List of All Feature Stores Accessible in the Current Project 

Feature Stores can be shared across projects in a multi-tenant manner, just like any Hopsworks-dataset can. You can read more about sharing datasets at [hops.io](hops.io), but in essence to share a dataset you just have to right click on it in your project. The features and featuregroups in the feature store are stored in a dataset called `<project_name>_featurestore.db` in your project.

![Share Feature Store](./images/share_featurestore.png "Share Feature Store")

In [3]:
Hops.getProjectFeaturestores().read()

res2: java.util.List[String] = [demo_featurestore_admin000_featurestore]


## Querying The Feature Store

The feature store can be queried programmatically and with raw SQL. When you query the feature store programmatically, the library will infer how to fetch the different features using a **query planner**. 

![Feature Store Query Planner](./images/query_optimizer.png "Feature Store Query Planner")

When interacting with the feature store it is sufficient to be familiar with three concepts:

- The **feature**, this refer to an individual versioned and documented feature in the feature store, e.g the age of a person.
- The **feature group**, this refer to a documented and versioned group of features stored as a Hive table that is linked to a specific Spark/Numpy/Pandas job that takes in raw data and outputs the computed features.
- The **training dataset**, this refer to a versioned and managed dataset of features, stored in HopsFS as tfrecords, .csv, .tsv, or parquet.

A feature group contains a group of features and a training dataset contains a set of features, potentially from many different feature groups.

![Feature Store Concepts](./images/concepts.png "Feature Store Contents")

When you query the feature store you will always get back the results in a spark dataframe. This is for scalability reasons. If the dataset is small and you want to work with it in memory you can convert it into a pandas dataframe or a numpy matrix using one line of code as we will demonstrate later on in this notebook.

### Fetch an Individual Feature

When retrieving a single feature from the featurestore, the hops-util-py library will infer in which feature group the feature belongs to by querying the metastore, but you can also explicitly specify which featuregroup and version to query. 

If there are multiple features of the same name in the featurestore, it is required to specify enough information to uniquely identify the feature (e.g specify feature group and version). If no featurestore is provided it will default to the project's featurestore.

To read an individual feature, use the method `getFeature`

In [4]:
Hops.getFeature("team_budget").read().show(5)

+-----------+
|team_budget|
+-----------+
|  12957.076|
|  2403.3704|
|  3390.3755|
|  13547.429|
|   9678.333|
+-----------+
only showing top 5 rows



You can also explicitly specify the arguments: spark-session, feature store, feature group and version (getFeature returns a builder object that you can populate with parameters using *setParameterName*):

In [5]:
Hops.getFeature("team_budget").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setFeaturegroup("teams_features").setVersion(1).read().show(5)

+-----------+
|team_budget|
+-----------+
|  12957.076|
|  2403.3704|
|  3390.3755|
|  13547.429|
|   9678.333|
+-----------+
only showing top 5 rows



### Fetch an Entire Feature Group

You can get an entire featuregroup from the API. If no feature store is provided the API will default to the project's feature store, if no version is provided it will default to version 1 of the feature group.

In [6]:
Hops.getFeaturegroup("teams_features").read().show(5)

+-------------+-----------+-------+
|team_position|team_budget|team_id|
+-------------+-----------+-------+
|            1|  12957.076|      1|
|            2|  2403.3704|      2|
|            3|  3390.3755|      3|
|            4|  13547.429|      4|
|            5|   9678.333|      5|
+-------------+-----------+-------+
only showing top 5 rows



Explicitly specifying the arguments:

In [7]:
Hops.getFeaturegroup("teams_features").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(1).read().show(5)

+-------------+-----------+-------+
|team_position|team_budget|team_id|
+-------------+-----------+-------+
|            1|  12957.076|      1|
|            2|  2403.3704|      2|
|            3|  3390.3755|      3|
|            4|  13547.429|      4|
|            5|   9678.333|      5|
+-------------+-----------+-------+
only showing top 5 rows



### Fetch A Set of Features

When retrieving a list of features from the featurestore, the hops-util-py library will infer which featuregroup the features belongs to by querying the metastore. If the features reside in different featuregroups, the library will also try to infer how to join the features together based on common columns. If the JOIN query cannot be inferred due to existence of multiple features with the same name or non-obvious JOIN query, the user need to supply enough information to the API call to be able to query the featurestore. If the user already knows the JOIN query it can also run featurestore.sql(joinQuery) directly (an example of this is shown further down in this notebook). If no featurestore is provided the API will default to the project's featurestore.

Example of querying the feature store for a list of features without specifying the feature groups and feature store:

In [8]:
Hops.getFeatures(List("team_budget", "average_attendance", "average_player_age")).read().show(5)

+-----------+------------------+------------------+
|team_budget|average_player_age|average_attendance|
+-----------+------------------+------------------+
|  12514.562|             24.63|         3587.5015|
|  1587.0897|             25.71|         2532.1638|
|  3839.0754|             25.63|         3397.8066|
|  16758.066|             25.65|          3271.934|
|  3966.3591|              25.5|         4074.8047|
+-----------+------------------+------------------+
only showing top 5 rows



We can also explicitly specify the feature groups where the features reside. Either the feature groups and versions can be specified by prepending feature names with `<feature group name>_<feature group version.`, or by providing a Map with entries of `<feature group name> -> <feature group version>`:

In [9]:
val features = List("teams_features_1.team_budget", 
                    "attendances_features_1.average_attendance", 
                    "players_features_1.average_player_age")
Hops.getFeatures(features).setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).read().show(5)

features: List[String] = List(teams_features_1.team_budget, attendances_features_1.average_attendance, players_features_1.average_player_age)
+-----------+------------------+------------------+
|team_budget|average_player_age|average_attendance|
+-----------+------------------+------------------+
|  12514.562|             24.63|         3587.5015|
|  1587.0897|             25.71|         2532.1638|
|  3839.0754|             25.63|         3397.8066|
|  16758.066|             25.65|          3271.934|
|  3966.3591|              25.5|         4074.8047|
+-----------+------------------+------------------+
only showing top 5 rows



In [10]:
val featuregroupsMap = Map[String, Integer](
    "teams_features"->1,
    "attendances_features"->1,
    "players_features"->1
)
val javaFeaturegroupsMap = new java.util.HashMap[String, Integer](featuregroupsMap)
Hops.getFeatures(features).setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setFeaturegroupsAndVersions(javaFeaturegroupsMap).read().show(5)

featuregroupsMap: scala.collection.immutable.Map[String,Integer] = Map(teams_features -> 1, attendances_features -> 1, players_features -> 1)
javaFeaturegroupsMap: java.util.HashMap[String,Integer] = {attendances_features=1, players_features=1, teams_features=1}
+-----------+------------------+------------------+
|team_budget|average_player_age|average_attendance|
+-----------+------------------+------------------+
|  12514.562|             24.63|         3587.5015|
|  1587.0897|             25.71|         2532.1638|
|  3839.0754|             25.63|         3397.8066|
|  16758.066|             25.65|          3271.934|
|  3966.3591|              25.5|         4074.8047|
+-----------+------------------+------------------+
only showing top 5 rows



If you have a lot of name collisions and it is not obvious how to infer the JOIN query to get the features from the feature store. You can explicitly specify the argument `joinKey` to the API (or you can provide the entire SQL query using the API method `.sql` as we will demonstrate later on in the notebook)

In [11]:
Hops.getFeatures(features).setJoinKey("team_id").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setFeaturegroupsAndVersions(javaFeaturegroupsMap).read().show(5)

+-----------+------------------+------------------+
|team_budget|average_player_age|average_attendance|
+-----------+------------------+------------------+
|  12514.562|             24.63|         3587.5015|
|  1587.0897|             25.71|         2532.1638|
|  3839.0754|             25.63|         3397.8066|
|  16758.066|             25.65|          3271.934|
|  3966.3591|              25.5|         4074.8047|
+-----------+------------------+------------------+
only showing top 5 rows



#### Advanced Eamples of Fetching Sets of Features and Common Pitfalls

Getting 12 features from 4 different feature groups:

In [12]:
val features1 = List("team_budget", "average_attendance", "average_player_age", "team_position", 
                     "sum_attendance", "average_player_rating", "average_player_worth", "sum_player_age", 
                     "sum_player_rating", "sum_player_worth", "sum_position", "average_position")
Hops.getFeatures(features1).read().show(5)

features1: List[String] = List(team_budget, average_attendance, average_player_age, team_position, sum_attendance, average_player_rating, average_player_worth, sum_player_age, sum_player_rating, sum_player_worth, sum_position, average_position)
+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|sum_attendance|team_position|sum_position|sum_player_age|sum_player_worth|team_budget|sum_player_rating|average_player_worth|average_player_age|average_attendance|average_position|average_player_rating|
+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|      71750.03|           31|      1188.0|        2463.0|        23187.08|  12514.562|        24030.572|            231.8708|             24.6

We can also explicitly specify the optional arguments: 

In [13]:
Hops.queryFeaturestore(
    "SELECT team_budget, score " +
    "FROM teams_features_1 JOIN games_features_1 ON " +
    "games_features_1.home_team_id = teams_features_1.team_id").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).read().show(5)

+-----------+-----+
|team_budget|score|
+-----------+-----+
|  12514.562|    1|
|  12514.562|    3|
|  12514.562|    1|
|  12514.562|    1|
|  12514.562|    1|
+-----------+-----+
only showing top 5 rows



### Free Text SQL Query from the Feature Store

For complex queries that cannot be inferred by the helper functions, enter the sql directly to the method `featurestore.sql()` it will default to the project specific feature store but you can also specify it explicitly. If you are proficient in SQL, this is the most efficient and preferred way to query the feature store.

In [14]:
Hops.queryFeaturestore("SELECT * FROM teams_features_1 WHERE team_position < 5").read().show(5)

+-----------+-------+-------------+
|team_budget|team_id|team_position|
+-----------+-------+-------------+
|  12957.076|      1|            1|
|  2403.3704|      2|            2|
|  3390.3755|      3|            3|
|  13547.429|      4|            4|
+-----------+-------+-------------+



## Writing to the Feature Store

### Creating New Feature Groups


Lets create a new featuregroup called **teams_features_spanish** that contains the same contents as the feature group teams_features except the the columns are renamed to spanish

In [15]:
val teamsFeaturesDf = Hops.getFeaturegroup("teams_features").read()

teamsFeaturesDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [team_position: int, team_budget: float ... 1 more field]


In [16]:
val teamsFeaturesDf2 = teamsFeaturesDf.withColumnRenamed(
    "team_id", "equipo_id").withColumnRenamed(
    "team_budget", "equipo_presupuesto").withColumnRenamed(
    "team_position", "equipo_posicion")

teamsFeaturesDf2: org.apache.spark.sql.DataFrame = [equipo_posicion: int, equipo_presupuesto: float ... 1 more field]


In [17]:
teamsFeaturesDf2.show(5)

+---------------+------------------+---------+
|equipo_posicion|equipo_presupuesto|equipo_id|
+---------------+------------------+---------+
|              1|         12957.076|        1|
|              2|         2403.3704|        2|
|              3|         3390.3755|        3|
|              4|         13547.429|        4|
|              5|          9678.333|        5|
+---------------+------------------+---------+
only showing top 5 rows



Lets now create a new featuregroup using the transformed dataframe (we'll explain the statistics part later on in this notebook)

In [18]:
Hops.createFeaturegroup("teams_features_spanish").setDataframe(teamsFeaturesDf2).write()

We can also explicitly specify optional arguments:

In [19]:
Hops.createFeaturegroup("teams_features_spanish").setDataframe(teamsFeaturesDf2).setFeaturestore(Hops.getProjectFeaturestore.read).setDescriptiveStats(false).setFeatureCorr(false).setFeatureHistograms(false).setClusterAnalysis(false).setStatColumns(List[String]().asJava).setNumBins(5).setCorrMethod("pearson").setDescription("a spanish version of teams_features").setVersion(2).write()

By default the new featuregroup will be created in the project's featurestore and the statistics for the new featuregroup will be computed based on the provided spark dataframe. You can configure this behaviour by modifying the default arguments and filling in extra metadata.

###  Create an On-Demand Feature Group

Feature Groups in Hopsworks can be of two types: 

- **Cached Feature Groups** are pre-computed and stored inside Hopsworks as Hive tables for historical data and MySQL Cluster tables for online data.
- **On-Demand Feature Groups** are computed on-demand using a JDBC connector and a SQL query. On-Demand Feature Groups can be stored in any JDBC-compliant data store.

To create an on-demand feature group, you must first configure a storage connector ti the JDBC backend that you want to query and then you can use the method `createFeaturegroup().setOnDemand(true)` to create the on-demand feature group.

By default, a Feature Store in Hopsworks has two default JDBC storage connectors configured:

- `project_name`: a storage connector for the Hive database of the project
- `project_name_featurestore`: a storage connector for the Feature Store database of the project

You can list the available storage connectors in the feature store, using the method `getStorageConnectors()`

In [20]:
Hops.getStorageConnectors().read()

res17: java.util.List[String] = [demo_featurestore_admin000_featurestore, demo_featurestore_admin000, demo_featurestore_admin000_meb1_onlinefeaturestore, demo_featurestore_admin000_Training_Datasets]


If we inspect the Feature Store hive database, we can see that there is a table called `games_features_1`, lets use the JDBC connector called `<project_name>_featurestore` and the SQL string `"SELECT * FROM games_features_1 WHERE score > 1"` to create an on-demand feature group called `games_features_on_demand`. 

In [21]:
Hops.queryFeaturestore("show tables").read().show(5)

+--------------------+--------------------+-----------+
|            database|           tableName|isTemporary|
+--------------------+--------------------+-----------+
|demo_featurestore...|attendances_featu...|      false|
|demo_featurestore...|    games_features_1|      false|
|demo_featurestore...|games_features_hu...|      false|
|demo_featurestore...|  players_features_1|      false|
|demo_featurestore...|season_scores_fea...|      false|
+--------------------+--------------------+-----------+
only showing top 5 rows



In [22]:
Hops.queryFeaturestore("describe games_features_1").read().show(5)

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|away_team_id|      int|      -|
|home_team_id|      int|      -|
|       score|      int|      -|
+------------+---------+-------+



In [23]:
val featuregroupName = "games_features_on_demand"
val storageConnector = "demo_featurestore_admin000_featurestore"
val query = "SELECT * FROM games_features_1 WHERE score > 1"
Hops.createFeaturegroup(featuregroupName).setOnDemand(true).setJdbcConnector(storageConnector).setSqlQuery(query).write()

featuregroupName: String = games_features_on_demand
storageConnector: String = demo_featurestore_admin000_featurestore
query: String = SELECT * FROM games_features_1 WHERE score > 1


On-Demand Feature Groups can be queried just like cached feature groups. When you query an on-demand feature group, it will open a JDBC connection and apply the associated SQL query and return the resulting dataframe.

In [24]:
Hops.getFeaturegroup("games_features_on_demand").read().show(5)

++
||
++
||
||
||
||
||
++
only showing top 5 rows



Lets create an on-demand feature group with the same JDBC connector and the table `players_features_1`, and see how we can make queries that join on-demand feature groups with cached feature groups dynamically.

In [25]:
val query = "SELECT average_player_age as average_player_age_on_dmd, average_player_worth as average_player_worth_on_dmd, team_id FROM players_features_1"
val storageConnector = Hops.getProjectName() + "_featurestore"
val featuregroupName = "players_features_on_demand"
Hops.createFeaturegroup(featuregroupName).setOnDemand(true).setJdbcConnector(storageConnector).setSqlQuery(query).write()

query: String = SELECT average_player_age as average_player_age_on_dmd, average_player_worth as average_player_worth_on_dmd, team_id FROM players_features_1
storageConnector: String = demo_featurestore_admin000_featurestore
featuregroupName: String = players_features_on_demand


When you make a query to the feature store that cross multiple feature groups, including on-demand feature groups. The Query Planner that comes with the Python SDK will first fetch the on-demand feature groups and then register them as SparkSQL temporary tables, before joining them with the features from the cached feature groups.

Note that when querying on-demand-featuregroups, you typically have to supply the `joinKey` and providing a map of Map with entries of `<feature group name> -> <feature group version>` manually as the query planner will often not have information about the columns in the on-demand feature groups and hence cannot infer the join key or infer in which on-demand feature group a certain feature exists.

In [None]:
val features = List("average_player_age_on_dmd", "average_player_worth_on_dmd", "average_attendance", "sum_attendance")
val featuregroupsMap = Map[String, Integer](
    "players_features_on_demand"->1,
    "attendances_features"->1
)
val javaFeaturegroupsMap = new java.util.HashMap[String, Integer](featuregroupsMap)
Hops.getFeatures(features).setJoinKey("team_id").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setFeaturegroupsAndVersions(javaFeaturegroupsMap).read().show(5)

In [None]:
Hops.getFeature("average_player_age_on_dmd").setFeaturegroup("players_features_on_demand").read().show(5)

### Create a Hudi Feature Group

TLDR; Hudi is a storage abstraction/library build on top of Spark. A Hudi dataset stores data in Parquet files and maintains additional metadata to make upserts efficient. A Hudi ingest job is intended to be run as a streaming ingest job, on an interval such as every 15 minutes, reading deltas from a message-bus like Kafka and ingesting the deltas incrementally into a data lake.

Hudi also makes it possible to **time-travel** by inspecting datasets at different commit times, which is especially useful for a feature store as it allows us to make ML experiments reproducible and easily be able to generate training data by backfilling labels.

In [28]:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.NonpartitionedKeyGenerator;
import org.apache.hudi.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import io.hops.util.Hops
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.NonpartitionedKeyGenerator
import org.apache.hudi.SimpleKeyGenerator
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.NonPartitionedExtractor
import org.apache.log4j.LogManager
import org.apache.log4j.Logger
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.DataFrameWriter
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import io.hops.util.Hops
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}


In [29]:
val sampleData = Seq(
    Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
    Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
    Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
    Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)
val partitionCols = List("date")
(Hops.createFeaturegroup("hudi_featuregroup_test_tour")
                         .setHudi(true)
                         .setPartitionBy(partitionCols)
                         .setDataframe(sampleDf)
                         .setPrimaryKey(List("id")).write())

sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
partitionCols: List[String] = List(date)


In [30]:
Hops.getFeaturegroup("hudi_featuregroup_test_tour").read.show(20)

+-------------------+-------------+-------+----------------------+------------------+---+--------------------+------+--------------------+
|_hoodie_commit_time|         date|country|_hoodie_partition_path|_hoodie_record_key| id|_hoodie_commit_seqno| value|   _hoodie_file_name|
+-------------------+-------------+-------+----------------------+------------------+---+--------------------+------+--------------------+
|     20200406190941|1565049600000|Belgium|         1565049600000|                 3|  3|  20200406190941_2_3|0.2151|31271941-9187-4d3...|
|     20200406190941|1556668800000|Ireland|         1556668800000|                 2|  2|  20200406190941_1_2|1.2151|dee6c042-d39b-4b5...|
|     20200406190941|1565049600000| Russia|         1565049600000|                 4|  4|  20200406190941_3_4|0.8151|e0c64345-6670-4b8...|
|     20200406190941|1551484800000| Sweden|         1551484800000|                 1|  1|  20200406190941_0_1|0.4151|82e0bd04-9c2b-476...|
+-------------------+------

You can also override Hudi specific arguments using the setHudiArgs and setHudiBasePath methods:

In [31]:
val hudiArgs = Map[String, String](
    "hoodie.datasource.write.payload.class"-> "org.apache.hudi.OverwriteWithLatestAvroPayload"
)

hudiArgs: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.write.payload.class -> org.apache.hudi.OverwriteWithLatestAvroPayload)


In [32]:
(Hops.createFeaturegroup("hudi_featuregroup_test_tour_second")
                         .setHudi(true)
                         .setPartitionBy(partitionCols)
                         .setDataframe(sampleDf)
                         .setHudiBasePath(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hudi_featuregroup_test_tour_second")
                         .setHudiArgs(hudiArgs)
                         .setPrimaryKey(List("id")).write())

In [33]:
Hops.getFeaturegroup("hudi_featuregroup_test_tour_second").read.show(20)

+-------------------+-------------+-------+----------------------+------------------+---+--------------------+------+--------------------+
|_hoodie_commit_time|         date|country|_hoodie_partition_path|_hoodie_record_key| id|_hoodie_commit_seqno| value|   _hoodie_file_name|
+-------------------+-------------+-------+----------------------+------------------+---+--------------------+------+--------------------+
|     20200406191011|1556668800000|Ireland|         1556668800000|                 2|  2|  20200406191011_1_6|1.2151|55e080ad-460c-4d2...|
|     20200406191011|1565049600000|Belgium|         1565049600000|                 3|  3|  20200406191011_2_7|0.2151|fba95e4f-a3e1-4d6...|
|     20200406191011|1551484800000| Sweden|         1551484800000|                 1|  1|  20200406191011_0_5|0.4151|da415afd-f4bf-450...|
|     20200406191011|1565049600000| Russia|         1565049600000|                 4|  4|  20200406191011_3_8|0.8151|336e31ef-cd2a-4b0...|
+-------------------+------

We can also utilize the `insertIntoFeaturegroup` wrapper to make Upserts into Hudi datasets:

In [None]:
import scala.collection.JavaConversions._
import collection.JavaConverters._
val upsertData = Seq(
    Row(5, Date.valueOf("2019-02-30"), 0.7921f, "Northern Ireland"), //Insert
    Row(1, Date.valueOf("2019-05-01"), 1.151f, "Norway"), //Update
    Row(3, Date.valueOf("2019-08-06"), 0.999f, "Belgium"), //Update
    Row(6, Date.valueOf("2019-08-06"), 0.0151f, "France") //Insert
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val upsertDf = spark.createDataFrame(
  spark.sparkContext.parallelize(upsertData),
  StructType(schema)
)
val partitionCols = List("date")
(Hops.insertIntoFeaturegroup("hudi_featuregroup_test_tour")
                         .setPartitionBy(partitionCols)
                         .setDataframe(upsertDf)
                         .setMode("append")
                         .setPrimaryKey(List("id")).write())

In [35]:
val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hudi_featuregroup_test_tour_1")

timeline: org.apache.hudi.common.table.HoodieTimeline = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20200406190941__commit__COMPLETED],[20200406191035__commit__COMPLETED]


In [36]:
val firstTimestamp = timeline.firstInstant.get.getTimestamp

firstTimestamp: String = 20200406190941


In [37]:
val secondTimestamp = timeline.nthInstant(1).get.getTimestamp

secondTimestamp: String = 20200406191035


### Synchronize a Hive Table with the Feature Store

The Feature Store SDK supports method for synchronizing existing Hive tables with the feature store, using `syncHiveTableWithFeaturestore`


Create an example Hive Table (praxis in the feature store is to have `_version` suffix on the table):

In [38]:
import scala.collection.JavaConversions._
import collection.JavaConverters._
val sampleData = Seq(
    Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
    Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
    Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
    Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)
spark.sql(s"use ${Hops.getProjectFeaturestore.read}")
sampleDf.write.mode("overwrite").saveAsTable("hive_fs_sync_example_1")

import scala.collection.JavaConversions._
import collection.JavaConverters._
sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
res30: org.apache.spark.sql.DataFrame = []


In [39]:
spark.sql("show tables").show(20, false)

+---------------------------------------+------------------------------------+-----------+
|database                               |tableName                           |isTemporary|
+---------------------------------------+------------------------------------+-----------+
|demo_featurestore_admin000_featurestore|attendances_features_1              |false      |
|demo_featurestore_admin000_featurestore|games_features_1                    |false      |
|demo_featurestore_admin000_featurestore|games_features_hudi_tour_1          |false      |
|demo_featurestore_admin000_featurestore|hive_fs_sync_example_1              |false      |
|demo_featurestore_admin000_featurestore|hudi_featuregroup_test_tour_1       |false      |
|demo_featurestore_admin000_featurestore|hudi_featuregroup_test_tour_second_1|false      |
|demo_featurestore_admin000_featurestore|players_features_1                  |false      |
|demo_featurestore_admin000_featurestore|season_scores_features_1            |false      |

Now we can synchronize the Hive table with the feature store (so that it shows up in the Feature store UI and so that we can query the table using the Feature Store API). The SyncTool will look for a Hive table callled `name_version`.

In [40]:
Hops.syncHiveTableWithFeaturestore("hive_fs_sync_example").setVersion(1).setDescription("Hive Sync Test").write

In [41]:
Hops.getFeaturegroups().read()

res34: java.util.List[String] = [attendances_features_1, games_features_1, games_features_hudi_tour_1, games_features_on_demand_1, games_features_on_demand_tour_1, hive_fs_sync_example_1, hudi_featuregroup_test_tour_1, hudi_featuregroup_test_tour_second_1, players_features_1, players_features_on_demand_1, season_scores_features_1, teams_features_1, teams_features_spanish_1, teams_features_spanish_2]


In [42]:
Hops.getFeaturegroup("hive_fs_sync_example").read.show(5)

+----------+-------+---+------+
|      date|country| id| value|
+----------+-------+---+------+
|2019-08-06|Belgium|  3|0.2151|
|2019-08-06| Russia|  4|0.8151|
|2019-03-02| Sweden|  1|0.4151|
|2019-05-01|Ireland|  2|1.2151|
+----------+-------+---+------+



###  Create a New Version of A Feature Group

To create a new version, simply use the `createFeaturegroup` method and change the version argument:

In [43]:
Hops.createFeaturegroup("teams_features_spanish").setDataframe(teamsFeaturesDf2).setVersion(3).write()

#### Get the Latest Version of a Feature Group (0 if no version exist)

In [44]:
val latestVersion = Hops.getLatestFeaturegroupVersion("teams_features_spanish").read()
latestVersion

latestVersion: Integer = 3
res37: Integer = 3


You can also specify arguments explicitly:

In [45]:
Hops.getLatestFeaturegroupVersion("teams_features_spanish").setFeaturestore(Hops.getProjectFeaturestore.read).read()

res38: Integer = 3


#### Updating the Metadata Cache

By default the metadata of the feature store will be cached on the client-side after the first query with the feature store, the metadata is used by most queries to the store to figure out how to fetch the features etc. To update the metadata use the following API method:

In [46]:
Hops.updateFeaturestoreMetadataCache().setFeaturestore(Hops.getProjectFeaturestore.read).write()

### Featuregroup Partitioning 

Featuregroups are stored as Hive tables, meaning that they can be partitioned for improved **read-query** performance. We use dynamic partitioning where the partition keys are specified on creation of a featuregroup. To set the partitions, simply specify the `partitionBy` argument to `createFeaturegroup()`. The `partition_by` argument should be set as a list of the columns that you want to partition the table on, see examples below.

Partitioning is not supported for training datasets as those are meant to be immutable blobs used for training (e.g petastorm or tfrecords), and do not need to be optimized for query performance. 

Feature groups in Hive might need to be read-query optimized however, since it might be used to create a lot of different training datasets, using different subsets of features.

As an example we can take the `games_features` featuregroup created by the featurestore tour and re-create it as a new featuregroup called `games_features_partitioned` where we partition on the `score` column.

In [47]:
val gamesFeaturesDf = Hops.getFeaturegroup("games_features").read()

gamesFeaturesDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [score: int, home_team_id: int ... 1 more field]


In [None]:
val partitionCols = List("score")
Hops.createFeaturegroup("games_features_partitioned").setPartitionBy(partitionCols).setDataframe(gamesFeaturesDf).setDescriptiveStats(false).setFeatureCorr(false).setFeatureHistograms(false).setClusterAnalysis(false).setDescription("games_features partitioned by score").write()

We can also partition on multiple columns (watch out so that the number of partitions don't get too many though). Also note that it is not allowed to partition on the primary key, which does not make sense either since the primary key should be unique. If you try to partition on the primary key, the partitioning will simply be skipped.


In [49]:
val partitionCols = List("score", "home_team_id")
Hops.createFeaturegroup("games_features_double_partitioned").setPartitionBy(partitionCols).setDataframe(gamesFeaturesDf).setDescriptiveStats(false).setFeatureCorr(false).setFeatureHistograms(false).setClusterAnalysis(false).setDescription("games_features partitioned by score and away_team_id").write()

partitionCols: List[String] = List(score, home_team_id)


To verify the partitions we can use the utility function `getFeaturegroupPartitions`:

In [None]:
Hops.getFeaturegroupPartitions("games_features_partitioned").read().show(10)

In [51]:
Hops.getFeaturegroupPartitions("games_features_double_partitioned").read().show(10)

+---------------+
|      partition|
+---------------+
| home_team_id=1|
|home_team_id=10|
|home_team_id=11|
|home_team_id=12|
|home_team_id=13|
|home_team_id=14|
|home_team_id=15|
|home_team_id=16|
|home_team_id=17|
|home_team_id=18|
+---------------+
only showing top 10 rows



### Inserting Into Existing Feature Groups

A best practice when working with features in HopsML is to first figure out a model of feature groups and create them  using the Feature Registry UI. This will prepare the feature group schema and create the Hive tables. Once the empty feature groups are created, then you can insert into these tables directly.

Lets first get some sample data to insert

In [52]:
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
val sampleData = Seq(
    Row(999, 41251.52f, 1),
    Row(998, 1319.4f, 8),
    Row(997, 21219.1f, 2)
)
val sampleSchema = List(
  StructField("equipo_id", IntegerType, true),
  StructField("equipo_presupuesto", FloatType, true),
  StructField("equipo_posicion", IntegerType, true)
)
val sampleDF = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(sampleSchema)
)

import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
sampleData: Seq[org.apache.spark.sql.Row] = List([999,41251.52,1], [998,1319.4,8], [997,21219.1,2])
sampleSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(equipo_id,IntegerType,true), StructField(equipo_presupuesto,FloatType,true), StructField(equipo_posicion,IntegerType,true))
sampleDF: org.apache.spark.sql.DataFrame = [equipo_id: int, equipo_presupuesto: float ... 1 more field]


In [53]:
sampleDF.show(5)

+---------+------------------+---------------+
|equipo_id|equipo_presupuesto|equipo_posicion|
+---------+------------------+---------------+
|      999|          41251.52|              1|
|      998|            1319.4|              8|
|      997|           21219.1|              2|
+---------+------------------+---------------+



In [54]:
sampleDF.count

res45: Long = 3


Lets inspect the contents of the featuregroup `teams_features_spanish` that we are going to insert the sample data into:

In [55]:
val spanishTeamsFeaturesDf = Hops.getFeaturegroup("teams_features_spanish").read()

spanishTeamsFeaturesDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [equipo_id: int, equipo_posicion: int ... 1 more field]


In [56]:
spanishTeamsFeaturesDf.show(5)

+---------+---------------+------------------+
|equipo_id|equipo_posicion|equipo_presupuesto|
+---------+---------------+------------------+
|        1|              1|         12957.076|
|        2|              2|         2403.3704|
|        3|              3|         3390.3755|
|        4|              4|         13547.429|
|        5|              5|          9678.333|
+---------+---------------+------------------+
only showing top 5 rows



In [57]:
spanishTeamsFeaturesDf.count

res47: Long = 50


Now we can insert the sample data and verify the new contents of the featuregroup. By default the insert mode is "append", the featurestore is the project's featurestore, the version is 1 and statistics will be updated based on the previously made settings when the featuregroup was created (we cover statistics later on in this notebook).

In [58]:
Hops.insertIntoFeaturegroup("teams_features_spanish").setDataframe(sampleDF).setMode("append").write()

You can also explicitly specify the optional arguments:

In [59]:
Hops.insertIntoFeaturegroup("teams_features_spanish").setDataframe(sampleDF).setMode("append").setVersion(1).write()

Lets fetch the updated feature group from the feature store and verify that the update was successful

In [60]:
val spanishTeamsFeaturesUpdatedDf = Hops.getFeaturegroup("teams_features_spanish").read()

spanishTeamsFeaturesUpdatedDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [equipo_id: int, equipo_posicion: int ... 1 more field]


In [61]:
spanishTeamsFeaturesUpdatedDf.show(5)

+---------+---------------+------------------+
|equipo_id|equipo_posicion|equipo_presupuesto|
+---------+---------------+------------------+
|        1|              1|         12957.076|
|        2|              2|         2403.3704|
|        3|              3|         3390.3755|
|        4|              4|         13547.429|
|        5|              5|          9678.333|
+---------+---------------+------------------+
only showing top 5 rows



In [62]:
spanishTeamsFeaturesUpdatedDf.count

res51: Long = 56


The two supported insert modes are "append" and "overwrite"

In [63]:
Hops.insertIntoFeaturegroup("teams_features_spanish").setDataframe(sampleDF).setMode("overwrite").write()

In [64]:
Hops.getFeaturegroup("teams_features_spanish").read().show(5)

+---------+---------------+------------------+
|equipo_id|equipo_posicion|equipo_presupuesto|
+---------+---------------+------------------+
|      999|          41251|               1.0|
|      998|           1319|               8.0|
|      997|          21219|               2.0|
+---------+---------------+------------------+



In [65]:
Hops.getFeaturegroup("teams_features_spanish").read().count

res54: Long = 3


### Import External Feature Datasets to the Feature Store

Feature datasets stored externally to Hopsworks can be imported using the `importFeaturegroup` operation in the Featurestore SDK. For example, say that you have a dataset stored on S3 that you want to import. To import the dataset from S3, first configure an s3 connector for your feature store, e.g with name `my_s3_connector`. The connector is used for authenticating with S3 and for specifying your bucket name. Once the connector is configured, you can use the `importFeaturegroup` operation as follows:
```scala
Hops.importFeaturegroup(featuregroup_name)
    .setExternalPath(s3_bucket_path)
    .setStorageConnector("my_s3_connector")
    .setDataFormat("tfrecords")
    .write
```

## Feature Group Statistics

Statistics about a featuregroup can be useful in the stage of feature engineering and when deciding which features to use for training. If statistics have been computed for a feature group, it can be viewed in the Hopsworks Feature Registry UI. 

This is particularly useful within large organizations where data scientists from different teams can re-use and explore new features by browsing features in the feature store and analyzing the statistics.

![Feature Registry Statistics Visualization](./images/fg_stats_1.png "Feature Registry Statistics Visualization")

As you might have notived earlier in this notebook, the `createFeaturegroup` method has arguments for configuring the statistics to be computed as data is added. These settings will be saved along with the rest of the meta data about the featuregroup and will be applied when new data is inserted into the featuregroup using the `insertIntoFeaturegroup` method.

You can use the `updateFeaturegroupStats()` method to update the settings for the statistics of a feature group and recompute the statistics without inserting any new data. By default it will compute the statistics based on the previous settings, use the project's featurestore and use version 1 of the featuregroup:

In [66]:
Hops.updateFeaturegroupStats("teams_features").write()

You can also specify the optional arguments:

In [67]:
Hops.updateFeaturegroupStats("teams_features").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(1).setDescriptiveStats(true).setFeatureCorr(true).setFeatureHistograms(true).setClusterAnalysis(true).setStatColumns(null).setNumBins(10).setCorrMethod("pearson").setNumClusters(5).write()

If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for example, you can use the argument `statColumns` to specify which columns to include:

In [68]:
val statColumns = List[String]("team_budget", "team_position").asJava
Hops.updateFeaturegroupStats("teams_features").setStatColumns(statColumns).write()

statColumns: java.util.List[String] = [team_budget, team_position]


## Training Datasets

To group data in the feature store we use three concepts:

- Feature
- Feature group
- Training Dataset

Typically during the feature engineering phase of a machine learning project, you compute a set of features for each type of data that you have, these features are naturally grouped into a documented and versioned **feature group**. 

In practice, it is common that organizations have many different type of datasets that they can extract features from, for example if you are building a recommendation system you might have demographic data about each user as well as user-activity data. 

When you train a machine learning model, you want to use all features that have predictive power and that the model can learn from. At this point, we can create a training dataset of features from several different feature groups and use that for training. That is the purpose of the training dataset abstraction. 

Of course you can always just save a group of features anywhere inside your project, e.g as a csv, or .tfrecords file. However, by using the feature store you can create **managed** training datasets. Managed training datasets will show up in the feature registry UI and will automatically be versioned, documented and reproducible. 

Metadata for a training dataset can be created from the Hopsworks UI or directly from the API with the function `createTrainingDataset`. The training datasets in a project are stored in a top-level dataset called `projectName_Training_Datasets`, (i.e `hdfs:///Projects/<ProjectName>/<ProjectName>_Training_Datasets`.

Once a training dataset have been created you can find it in the featurestore UI in hopsworks under the tab `Training datasets`, from there you can also edit the metadata if necessary. 

After a training dataset have been created with the necessary metadata you can save the actual data in the training dataset by using the API function `insertIntoTrainingDataset`.

### Create New Training Dataset

Lets create a dataset called `team_position_prediction` by using a set of relevant features from the featurestore. We will combine features from four different feature groups to form this training dataset: `teams_features`, `attendances_features`, `players_features`, `season_scores_features`.

#### Read Features

In [3]:
val features = List("team_budget", "average_attendance", "average_player_age", "team_position", 
                     "sum_attendance", "average_player_rating", "average_player_worth", "sum_player_age", 
                     "sum_player_rating", "sum_player_worth", "sum_position", "average_position")
val featuresDf = Hops.getFeatures(features).read()

features: List[String] = List(team_budget, average_attendance, average_player_age, team_position, sum_attendance, average_player_rating, average_player_worth, sum_player_age, sum_player_rating, sum_player_worth, sum_position, average_position)
featuresDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [sum_attendance: float, team_position: int ... 10 more fields]


In [4]:
featuresDf.show(5)

+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|sum_attendance|team_position|sum_position|sum_player_age|sum_player_worth|team_budget|sum_player_rating|average_player_worth|average_player_age|average_attendance|average_position|average_player_rating|
+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|     391919.47|            6|       563.0|        2485.0|       143524.64|    7307.94|        131123.84|           1435.2465|             24.85|         19595.973|           28.15|            1311.2384|
|     129249.24|           16|       841.0|        2545.0|       57687.785|   7326.092|         50224.14|           576.87787|             25.45|          6462.462|           42.05|   

#### Get the Latest Version of a Training Dataset (0 if no version exist)

In [5]:
val latestVersion = Hops.getLatestTrainingDatasetVersion("team_position_prediction").read()
latestVersion

latestVersion: Integer = 3
res3: Integer = 3


You can also explicitly specify optional arguments:

In [6]:
Hops.getLatestTrainingDatasetVersion("team_position_prediction").setFeaturestore(Hops.getProjectFeaturestore().read()).read()

res4: Integer = 3


#### Save as Training Dataset in TFRecords Format

Now we can create a training dataset from the dataframe with some extended metadata such as schema (automatically inferred). By default when you create a training dataset it will be in "tfrecords" format and statistics will be computed for all features. After the dataset have been created you can view and/or update the metadata about the training dataset from the Hopsworks featurestore UI

In [73]:
Hops.createTrainingDataset("team_position_prediction").setDataframe(featuresDf).write()

You can also override default parameter values: 

In [None]:
Hops.createTrainingDataset("team_position_prediction").setDataframe(featuresDf).setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read()).setVersion(2).setDescription("a dataset with features for football teams, used for training a model to predict league-position").setDataFormat("tfrecords").setDescriptiveStats(false).setFeatureCorr(false).setFeatureHistograms(false).setClusterAnalysis(false).setStatColumns(statColumns).setNumBins(5).setCorrMethod("pearson").setNumClusters(5).write()

#### Save as Training Dataset in TFRecord Format

In [7]:
Hops.createTrainingDataset("team_position_prediction_tfrecord").setDataframe(featuresDf).setDataFormat("tfrecord").setDescription("a dataset with features for football teams, used for training a model to predict league-position").write


In [8]:
val latestVersion = Hops.getLatestTrainingDatasetVersion("team_position_prediction_tfrecord").setFeaturestore(Hops.getProjectFeaturestore.read).read
latestVersion

latestVersion: Integer = 1
res6: Integer = 1


In [75]:
val latestVersion = Hops.getLatestTrainingDatasetVersion("team_position_prediction_csv").setFeaturestore(Hops.getProjectFeaturestore.read).read
latestVersion

latestVersion: Integer = 0
res63: Integer = 0


In [76]:
Hops.createTrainingDataset("team_position_prediction_csv").setDataframe(featuresDf).setDataFormat("csv").setDescription("a dataset with features for football teams, used for training a model to predict league-position").write

###  Create a New Version of A Training Dataset

To create a new version, simply use the `createTrainingDataset` method and specify the version argument:

In [77]:
val trainingDatasetVersion = Hops.getLatestTrainingDatasetVersion("team_position_prediction").read + 1
Hops.createTrainingDataset("team_position_prediction").setDataframe(featuresDf).setDataFormat("tfrecords").setDescription("a dataset with features for football teams, used for training a model to predict league-position").setVersion(trainingDatasetVersion).write

trainingDatasetVersion: Int = 2


### Inserting Into an Existing Training Dataset

Once a dataset have been created, its metadata is browsable in the featurestore registry in the Hopsworks UI. If you don't want to create a new training dataset but just overwrite or insert new data into an existing training dataset, you can use the API function `insertIntoTrainingDataset`.  

**Note**: "append" write mode is not supported for training datasets stored in tfrecords format, only "overwrite"

In [9]:
Hops.insertIntoTrainingDataset("team_position_prediction_tfrecord").setDataframe(featuresDf).setMode("append").write

In [78]:
Hops.insertIntoTrainingDataset("team_position_prediction_csv").setDataframe(featuresDf).setMode("overwrite").write

### Get Training Dataset Path

After a **managed dataset** have been created, it is easy to share it and re-use it for training various models. For example if the dataset have been materialized in tf-records format you can call the method `getTrainingDatasetPath()` to get the HDFS path and read it directly in your model training (e.g tensorflow) code.

In [79]:
Hops.getTrainingDatasetPath("team_position_prediction_csv").read

res67: String = hdfs://default/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/team_position_prediction_csv_1/team_position_prediction_csv


You can also override the default arguments:

In [80]:
Hops.getTrainingDatasetPath("team_position_prediction_csv").setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(Hops.getLatestTrainingDatasetVersion("team_position_prediction_csv").read).read

res68: String = hdfs://default/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/team_position_prediction_csv_1/team_position_prediction_csv


### Read Training Dataset into a Spark Dataframe

Typically training datasets are served into deep learning frameworks such as pytorch or tensorflow. However, training datasets can also be read into spark dataframes using the api method `getTrainingDataset()`

In [81]:
Hops.getTrainingDataset("team_position_prediction_csv").read.show(5)

+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|sum_attendance|team_position|sum_position|sum_player_age|sum_player_worth|team_budget|sum_player_rating|average_player_worth|average_player_age|average_attendance|average_position|average_player_rating|
+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|     99292.945|           23|      1100.0|        2663.0|       31547.941|  10290.323|        32760.031|            315.4794|             26.63|         4964.6475|            55.0|             327.6003|
|     48416.152|           39|      1232.0|        2618.0|       19433.916|  20347.281|        20515.598|           194.33916|             26.18|         2420.8076|            61.6|   

You can also override the default parameters:

In [82]:
Hops.getTrainingDataset("team_position_prediction_csv").setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(Hops.getLatestTrainingDatasetVersion("team_position_prediction_csv").read).read.show(5)

+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|sum_attendance|team_position|sum_position|sum_player_age|sum_player_worth|team_budget|sum_player_rating|average_player_worth|average_player_age|average_attendance|average_position|average_player_rating|
+--------------+-------------+------------+--------------+----------------+-----------+-----------------+--------------------+------------------+------------------+----------------+---------------------+
|     99292.945|           23|      1100.0|        2663.0|       31547.941|  10290.323|        32760.031|            315.4794|             26.63|         4964.6475|            55.0|             327.6003|
|     48416.152|           39|      1232.0|        2618.0|       19433.916|  20347.281|        20515.598|           194.33916|             26.18|         2420.8076|            61.6|   

### External Training Datasets

By default, training datasets in the Feature Store are stored in HopsFS. However, it is also possible to store datasets on S3 and just manage the metadata from Hopsworks Feature Store. 

To do this, configure an s3 connector for your feature store, e.g with name `my_s3_connector`, then you can save a training dataset to s3 by specifying the s3_connector name in the `sink` argument to `createTrainingDataset`:

```scala
val storageConnector = "my_s3_connector"
Hops.createTrainingDataset(training_dataset_name)
                          .setDataframe(df)
                          .setSink(s3StorageConnectorName).write()
```

### Update Training Dataset Stats

The API is similar to the one for updating the stats of a feature group:

In [83]:
Hops.updateTrainingDatasetStats("team_position_prediction").write

You can also override the default parameters:

In [84]:
Hops.updateTrainingDatasetStats("team_position_prediction").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(1).setDescriptiveStats(true).setFeatureCorr(true).setFeatureHistograms(true).setClusterAnalysis(true).setStatColumns(null).setNumBins(20).setCorrMethod("pearson").setNumClusters(5).write

## Get Featurestore Metadata
To explore the contents of the featurestore we recommend using the featurestore page in the Hopsworks UI but you can also get the metadata programmatically from the REST API

### Update Metadata Cache

In [85]:
Hops.updateFeaturestoreMetadataCache().write

You can also override default parameters:

In [86]:
Hops.updateFeaturestoreMetadataCache().setFeaturestore(Hops.getProjectFeaturestore.read).write

### List all Feature Stores Accessible In the Project

In [87]:
Hops.getProjectFeaturestores.read

res75: java.util.List[String] = [demo_featurestore_admin000_featurestore]


### List all Feature Groups in a Feature Store

In [88]:
Hops.getFeaturegroups().read()

res76: java.util.List[String] = [attendances_features_1, games_features_1, games_features_double_partitioned_1, games_features_hudi_tour_1, games_features_on_demand_1, games_features_on_demand_tour_1, hive_fs_sync_example_1, hudi_featuregroup_test_tour_1, hudi_featuregroup_test_tour_second_1, players_features_1, players_features_on_demand_1, season_scores_features_1, teams_features_1, teams_features_spanish_1, teams_features_spanish_2, teams_features_spanish_3]


Override default parameters:

In [89]:
Hops.getFeaturegroups().setFeaturestore(Hops.getProjectFeaturestore.read).read()

res77: java.util.List[String] = [attendances_features_1, games_features_1, games_features_double_partitioned_1, games_features_hudi_tour_1, games_features_on_demand_1, games_features_on_demand_tour_1, hive_fs_sync_example_1, hudi_featuregroup_test_tour_1, hudi_featuregroup_test_tour_second_1, players_features_1, players_features_on_demand_1, season_scores_features_1, teams_features_1, teams_features_spanish_1, teams_features_spanish_2, teams_features_spanish_3]


### List all Features in a Feature Store

In [90]:
Hops.getFeaturesList().read()

res78: java.util.List[String] = [average_attendance, sum_attendance, team_id, away_team_id, home_team_id, score, away_team_id, score, home_team_id, away_team_id, home_team_id, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, score, country, date, id, value, country, id, value, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, date, country, id, value, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, date, average_player_age, average_player_rating, average_player_worth, sum_player_age, sum_player_rating, sum_player_worth, team_id, average_position, sum_position, team_id, team_budget, team_id, team_position, equipo_id, eq...

Override default parameters:

In [91]:
Hops.getFeaturesList().setFeaturestore(Hops.getProjectFeaturestore.read).read()

res79: java.util.List[String] = [average_attendance, sum_attendance, team_id, away_team_id, home_team_id, score, away_team_id, score, home_team_id, away_team_id, home_team_id, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, score, country, date, id, value, country, id, value, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, date, country, id, value, _hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, date, average_player_age, average_player_rating, average_player_worth, sum_player_age, sum_player_rating, sum_player_worth, team_id, average_position, sum_position, team_id, team_budget, team_id, team_position, equipo_id, eq...

### List all Training Datasets in a Feature Store

In [92]:
Hops.getTrainingDatasets().read()

res80: java.util.List[String] = [team_position_prediction_1, team_position_prediction_2, team_position_prediction_csv_1, tour_training_dataset_test_1]


Override default parameters:

In [93]:
Hops.getTrainingDatasets().setFeaturestore(Hops.getProjectFeaturestore.read).read()

res81: java.util.List[String] = [team_position_prediction_1, team_position_prediction_2, team_position_prediction_csv_1, tour_training_dataset_test_1]


### Get All Metadata (Features, Feature groups, Training Datasets) for a Feature Store

In [94]:
Hops.getFeaturestoreMetadata().read()

res82: io.hops.util.featurestore.dtos.app.FeaturestoreMetadataDTO = FeaturestoreMetadataDTO{featurestore=FeaturestoreDTO{featurestoreId=67, featurestoreName='demo_featurestore_admin000_featurestore', created='Mon Apr 06 18:57:49 UTC 2020', hdfsStorePath='hopsfs://namenode.service.consul:8020/apps/hive/warehouse/demo_featurestore_admin000_featurestore.db', projectName='demo_featurestore_admin000', projectId=119, featurestoreDescription='Featurestore database for project: demo_featurestore_admin000', inodeId=10242}, featuregroups=[CachedFeaturegroupDTO{hiveTableId=4, hdfsStorePaths=[hopsfs://namenode.service.consul:8020/apps/hive/warehouse/demo_featurestore_admin000_featurestore.db/attendances_features_1], inputFormat='org.apache.hadoop.hive.ql.io.orc.OrcInputFormat', hiveTableType=MANAGE...

Override default parameters:

In [95]:
Hops.getFeaturestoreMetadata().setFeaturestore(Hops.getProjectFeaturestore.read).read()

res83: io.hops.util.featurestore.dtos.app.FeaturestoreMetadataDTO = FeaturestoreMetadataDTO{featurestore=FeaturestoreDTO{featurestoreId=67, featurestoreName='demo_featurestore_admin000_featurestore', created='Mon Apr 06 18:57:49 UTC 2020', hdfsStorePath='hopsfs://namenode.service.consul:8020/apps/hive/warehouse/demo_featurestore_admin000_featurestore.db', projectName='demo_featurestore_admin000', projectId=119, featurestoreDescription='Featurestore database for project: demo_featurestore_admin000', inodeId=10242}, featuregroups=[CachedFeaturegroupDTO{hiveTableId=4, hdfsStorePaths=[hopsfs://namenode.service.consul:8020/apps/hive/warehouse/demo_featurestore_admin000_featurestore.db/attendances_features_1], inputFormat='org.apache.hadoop.hive.ql.io.orc.OrcInputFormat', hiveTableType=MANAGE...

## Tagging featuregroups and training datasets - Only works in Enterprise edition

The feature store enables users to attach tags to a featuregroup or training dataset in order to make them discoverable across projects. 
This is useful in an organization so that data engineers can discover new featuregroups to work with and share them with other data engineers. Furthermore, making training datasets more easily discoverable for data scientists to increase productivity.

### Define tags as admin
A user with admin rights defines a set of organization wide tags that may be attached to featuregroups and training datasets.
In the example below two tags have been defined, "Country" and "Sport".

### Step 1: Navigate to the admin UI

![Feature Store Model](./images/to_admin.png "To admin UI")

### Step 2: Navigate to tag creation UI

![Feature Store Model](./images/admin_fs_tags.png "To admin tags UI")

### Step 3: Create "Country" and "Sport" tags

![Feature Store Model](./images/create_tags.png "Create tags")

### Attach tags to a Feature Group

In [3]:
Hops.setTagForFeaturegroup("teams_features").setTag("Country").setValue("Sweden").write()
Hops.setTagForFeaturegroup("teams_features").setTag("Sport").setValue("Football").write()

### Get all tags attached to a Feature Group

In [4]:
Hops.getTagsForFeaturegroup("teams_features").read()

res4: Object = {Sport=Football, Country=Sweden}


## From Raw Data to Features to Training Dataset to Model

Once a training dataset have been materialized, we can use it to train a model. In this section we will train an example model using the training dataset `team_position_prediction` that we just created. We will use the column **"team_position"** as the target to predict.

### Imports

In this example we will use Spark MLLib. However, the feature store is in theory agnostic to which framework or method you use for training the model, it works with PyTorch, Tensorflow, MxNet etc.

In [98]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression


### Constants and HyperParameters

In [99]:
val NUM_ITER = 1000
val ELASTIC_REG_PARAM = 0.8
val REG_LAMBDA_PARAM = 0.3

NUM_ITER: Int = 1000
ELASTIC_REG_PARAM: Double = 0.8
REG_LAMBDA_PARAM: Double = 0.3


## Read TFRecords Dataset into a Spark Dataframe

In [100]:
val dataset_df = Hops.getTrainingDataset("team_position_prediction").read()

dataset_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [average_player_rating: float, average_attendance: float ... 10 more fields]


In [101]:
dataset_df.show(5)

+---------------------+------------------+-----------------+------------+----------------+------------------+-------------+--------------------+-----------+----------------+--------------+--------------+
|average_player_rating|average_attendance|sum_player_rating|sum_position|sum_player_worth|average_player_age|team_position|average_player_worth|team_budget|average_position|sum_player_age|sum_attendance|
+---------------------+------------------+-----------------+------------+----------------+------------------+-------------+--------------------+-----------+----------------+--------------+--------------+
|            240.30573|         3587.5015|        24030.572|      1188.0|        23187.08|             24.63|           31|            231.8708|  12514.562|            59.4|        2463.0|      71750.03|
|            240.39302|         2532.1638|        24039.303|      1213.0|       22371.338|             25.71|           34|           223.71338|  1587.0897|           60.65|        257

You can also explictly specify the arguments to `getTrainingDataset()` method:

In [102]:
Hops.getTrainingDataset("team_position_prediction").setSpark(spark).setFeaturestore(Hops.getProjectFeaturestore.read).setVersion(1).read().show(5)

+---------------------+------------------+-----------------+------------+----------------+------------------+-------------+--------------------+-----------+----------------+--------------+--------------+
|average_player_rating|average_attendance|sum_player_rating|sum_position|sum_player_worth|average_player_age|team_position|average_player_worth|team_budget|average_position|sum_player_age|sum_attendance|
+---------------------+------------------+-----------------+------------+----------------+------------------+-------------+--------------------+-----------+----------------+--------------+--------------+
|            240.30573|         3587.5015|        24030.572|      1188.0|        23187.08|             24.63|           31|            231.8708|  12514.562|            59.4|        2463.0|      71750.03|
|            240.39302|         2532.1638|        24039.303|      1213.0|       22371.338|             25.71|           34|           223.71338|  1587.0897|           60.65|        257

## Convert the Dataframe into Spark MLLib data format

Spark MLLib models typically expect all features to be grouped into a single column instead of having one column per feature, we can use Spark's `VectorAssembler` to group our features together

In [103]:
dataset_df.printSchema

root
 |-- average_player_rating: float (nullable = true)
 |-- average_attendance: float (nullable = true)
 |-- sum_player_rating: float (nullable = true)
 |-- sum_position: float (nullable = true)
 |-- sum_player_worth: float (nullable = true)
 |-- average_player_age: float (nullable = true)
 |-- team_position: long (nullable = true)
 |-- average_player_worth: float (nullable = true)
 |-- team_budget: float (nullable = true)
 |-- average_position: float (nullable = true)
 |-- sum_player_age: float (nullable = true)
 |-- sum_attendance: float (nullable = true)



In [104]:
val transformedDf = new VectorAssembler().
  setInputCols(Array( "average_player_rating","average_attendance", "sum_player_rating", 
                     "sum_position", "sum_player_worth", "average_player_age", "average_player_worth",
                    "team_budget", "average_position", "sum_player_age", "sum_attendance")).
  setOutputCol("features").
  transform(dataset_df).
    drop("average_player_rating").
    drop("average_attendance").
    drop("sum_player_rating").
    drop("sum_player_worth").
    drop("average_player_age").
    drop("average_player_worth").
    drop("team_budget").
    drop("average_position").
    drop("sum_player_age").
    drop("sum_attendance").
    drop("sum_position")

transformedDf: org.apache.spark.sql.DataFrame = [team_position: bigint, features: vector]


In [105]:
transformedDf.printSchema

root
 |-- team_position: long (nullable = true)
 |-- features: vector (nullable = true)



### Define The Model Using Spark MLLib

We will use a linear regression model. In this tutorial we work with so little data that using a larger model does not make sense.

In [106]:
val lr = new LinearRegression().
    setLabelCol("team_position").
    setFeaturesCol("features").
    setMaxIter(NUM_ITER).
    setRegParam(REG_LAMBDA_PARAM).
    setElasticNetParam(ELASTIC_REG_PARAM)

lr: org.apache.spark.ml.regression.LinearRegression = linReg_a1bf3b70d821


### Train The Model using The Parsed Dataset

In [107]:
val lrModel = lr.fit(transformedDf)

lrModel: org.apache.spark.ml.regression.LinearRegressionModel = linReg_a1bf3b70d821


### Show Model Training Results

In [108]:
// print the output column and the input column and the truth label
lrModel.transform(transformedDf).select("features", "team_position", "prediction").show()

+--------------------+-------------+------------------+
|            features|team_position|        prediction|
+--------------------+-------------+------------------+
|[240.305725097656...|           31|33.705585537495324|
|[240.393020629882...|           34|34.796888112390135|
|[297.929351806640...|           28|28.590203155619367|
|[322.697967529296...|           26|29.921433049367838|
|[297.791961669921...|           27|  31.6461907936034|
|[160.234771728515...|           44|39.714082448977464|
|[602.1064453125,9...|           12|13.170646195550884|
|[401.686798095703...|           22| 25.74741768982263|
|[178.677520751953...|           47| 46.46419784609249|
|[7191.86328125,92...|            1|1.0090139372800984|
|[589.413146972656...|           13| 9.427614974389495|
|[1311.23840332031...|            6|  5.46625465494353|
|[502.241394042968...|           16|  18.3236884198956|
|[2814.01806640625...|            3| 4.737583036073698|
|[372.345794677734...|           20| 17.56639613

In [109]:
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@67a4adca
numIterations: 56
objectiveHistory: [0.5,0.4112410289397031,0.12518994433439795,0.08777706436856202,0.03913658284734929,0.039023660048731916,0.039021631295586376,0.03901820891749818,0.03899877146854687,0.038996622590289465,0.038986570501616684,0.03898519714099797,0.03898517236997994,0.03898515439811265,0.03898512325511424,0.03898510959220573,0.03898506916169151,0.038985041128821236,0.0389850040260811,0.03898496522452977,0.03898488771326733,0.038984698375715776,0.038983155982812503,0.03898302661971874,0.03898181041531472,0.03898164470509283,0.03898063375150436,0.038980055488513875,0.038978856839723006,0.03897755337182653,0.03897735366407972,0.03897705829014805,0.03897700336935097,0.03897696008492972,0.038976909759634505,0.03897688838840345,0.038976872666552954,0.038976818625222676,0.038976773319025576,0.03897676438201338,0.038976761236