# Online Feature Store

This notebook includes examples of how to interact with the **Online** Feature Store in Hopsworks. The online feature store stores a subset of the feature data for real-time queries, suited for serving client-facing models. 

The online feature store contrasts to the **offline** feature store. The offline feature store contains historical data. The offline feature data is stored in Hive, a storage engine suited for large scale batch processing of data (such as *training* a machine learning model). On the other hand, the online feature store uses MySQL-Cluster database as the backend, a storage engine suited for smaller datasets that need to be queried in real-time.

### Imports

In [1]:
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.Row
import java.sql.Date
import java.sql.Timestamp
import spark.implicits._
import org.apache.spark.sql.types._

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
26,application_1569258228210_0030,spark,idle,Link,Link,âœ”


SparkSession available as 'spark'.
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.Row
import java.sql.Date
import java.sql.Timestamp
import spark.implicits._
import org.apache.spark.sql.types._


### Get JDBC Connection to the Online Feature Store

If your project's feature store has the online feature store enabled, there will be a storage connector for each user to access the online feature store. The storage connector can be accessed using the utility method `getOnlineFeaturestoreConnector()` in the Scala SDK. The storage connector includes information about the JDBC connection, the password, port, host, and username etc.

In [2]:
Hops.getOnlineFeaturestoreConnector.read

res1: io.hops.util.featurestore.dtos.storageconnector.FeaturestoreJdbcConnectorDTO = FeaturestoreJdbcConnectorDTO{connectionString='jdbc:mysql://10.0.2.15:3306/demo_featurestore_admin000', arguments='password=YTnQHFxNHwMlEpZboyJCgpZFSqyyKgQHXnUHJzSrVNhOslGRqKifTmzvRnhudipF,user=demo_featurestore_admin000_meb1'}


### Create a Feature Group with Online Feature Serving Enabled.

When a feature group has online feature serving enabled, it means that its data will be stored in both Hive (for historical queries) and MySQL Cluster (for online queries). To enable online feature serving of a feature group simply set the flag `online=True` when creating a feature group, as illustrated below.

#### Create Sample Data

In [3]:
val sampleData = Seq(
    Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
    Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
    Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
    Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)

sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]


In [4]:
sampleDf.show(5)

+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+



#### Save the Sample Data as a Feature group with online feature serving enabled


In [5]:
(Hops.createFeaturegroup("test_online_fg_scala_sdk")
     .setDataframe(sampleDf)
     .setOnline(true)
     .setPrimaryKey(List("id"))
     .write())

When creating a feature group, the spark dataframe is used to infer the data-schema and the feature types. The data schema is then used to create a Hive table (for offline data) and a MySQL table (for online data). If you want to have more control over the feature types for the MySQL table (e.g length of a varchar column) you can pass in the types in the optional argument `onlineTypes`, which takes a map of the form `feature_name --> feature_type`.

In [6]:
val onlineTypes = Map[String, String](
    "value" -> "DECIMAL"
)
val sampleDf2 = sampleDf.withColumnRenamed(
    "value", "value_test").withColumnRenamed(
    "country", "country_test").withColumnRenamed(
    "date", "date_test")
(Hops.createFeaturegroup("test_online_fg_scala_sdk_types")
     .setDataframe(sampleDf2)
     .setOnline(true)
     .setPrimaryKey(List("id"))
     .setOnlineTypes(onlineTypes)
     .write())

onlineTypes: scala.collection.immutable.Map[String,String] = Map(value -> DECIMAL)
sampleDf2: org.apache.spark.sql.DataFrame = [id: int, date_test: date ... 2 more fields]


### Read Features from Online Feature Store

The same methods for reading the offline feature store can be used to read from the online feature store by setting the argument `online=True`. **However, NOTE**: as the online feature store is supposed to be used for feature serving, it should be queried with primary-key lookups for getting the best performance. In fact, it is highly discouraged to use the online feature serving for doing full-table-scans. If you find yourself frequently needing to use `get_featuregroup(online=True)` to get the entire feature group (full-table scan), you are probably better of using the offline feature store. The online feature store is intended for quick primary key lookups, not data analysis.

To make the migration from the regular offline-featurestore API to the online-featurestore simple, for each example of reading from the online featurestore below, there is an accompanying example of reading from the offline feature store.

#### Free-Text SQL Query for the Online Feature Store

Featuregroups are stored as tables with the naming `featuregroupname_version` as Hive tables for offline features, and MySQL tables for online features.

In [9]:
//primary key lookup in MySQL
val df = (Hops.queryFeaturestore("SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1")
     .setOnline(true).read)

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]


In [10]:
df.show(5)

+-------+
|country|
+-------+
| Sweden|
+-------+



#### Free-Text SQL Query for the Offline Feature Store

In [11]:
//primary key lookup in MySQL
val df = (Hops.queryFeaturestore("SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1")
     .setOnline(false).read)

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]


In [12]:
df.show(5)

+-------+
|country|
+-------+
| Sweden|
+-------+



#### Read Online Version of Feature Group

In [13]:
val df = Hops.getFeaturegroup("test_online_fg_scala_sdk").setOnline(true).read

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]


In [14]:
df.show(5)

+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
|  2|2019-05-01|1.2151|Ireland|
+---+----------+------+-------+



#### Read Offline Version of Feature Group

In [15]:
val df = Hops.getFeaturegroup("test_online_fg_scala_sdk").setOnline(false).read

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]


In [16]:
df.show(5)

+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+



#### Read Online Version of individual Feature

In [17]:
val df = Hops.getFeature("country").setOnline(true).read

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]


In [18]:
df.show(5)

+-------+
|country|
+-------+
| Sweden|
|Belgium|
| Russia|
|Ireland|
+-------+



#### Read Offline Version of individual Feature

In [19]:
val df = Hops.getFeature("country").setOnline(false).read

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]


In [20]:
df.show(5)

+-------+
|country|
+-------+
| Sweden|
|Ireland|
|Belgium|
| Russia|
+-------+



#### Read Online Version of a list of individual Features

The featues can potentially span multiple feature groups, as long as all feature groups have online serving enabled, the feature store query planner will join the features on the fly.

In [21]:
val features = List("country", "date")
val df = Hops.getFeatures(features).setOnline(true).read

features: List[String] = List(country, date)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]


In [22]:
df.show(5)

+-------+----------+
|country|      date|
+-------+----------+
| Sweden|2019-03-02|
|Belgium|2019-08-06|
| Russia|2019-08-06|
|Ireland|2019-05-01|
+-------+----------+



#### Read Offline Version of a list of individual Features

In [23]:
val features = List("country", "date")
val df = Hops.getFeatures(features).setOnline(false).read

features: List[String] = List(country, date)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]


In [24]:
df.show(5)

+-------+----------+
|country|      date|
+-------+----------+
| Sweden|2019-03-02|
|Ireland|2019-05-01|
|Belgium|2019-08-06|
| Russia|2019-08-06|
+-------+----------+



#### List all feature groups that have online feature serving enabled

In [2]:
Hops.getFeaturegroups.setOnline(true).read

res1: java.util.List[String] = [test_online_fg_scala_sdk_types_1, online_featuregroup_test_types_1, test_online_fg_scala_sdk_1]


#### List all features that have online feature serving enabled

In [3]:
Hops.getFeaturesList.setOnline(true).read

res2: java.util.List[String] = [country_test, date_test, id, value_test, id, val_1_type_test, val_2_type_test, country, date, id, value]


### Enable Online Feature Serving for a Feature Group that is Offline-Only

By default when a feature group is created with `create_featuregroup()`, the feature group will not have online serving enabled, all data will be stored in the offline feature group (Hive). To create a feature group with online serving, pass the flag `online=True` to `create_featuregroup()` (an example is provided in the beginning of this notebook).

If you want to enable online feature serving for a feature group dynamically, after the feature group have been created, you can use the API call `enable_featuregroup_online` (this will create a MySQL table in the backend). Conversely, if you want to disable online feature serving, use the API call `disable_featuregroup_online` (this will drop the MySQL table in the backend).

#### Create Feature Group without Online Feature Serving Enabled

In [3]:
val sampleData = Seq(
    Row(1, 0.4151f, 0.915f),
    Row(2, 1.2151f, 0.151f),
    Row(3, 0.2151f, 0.7511f),
    Row(4, 0.8151f, 0.12541f)
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("test_col_3", FloatType, true),
  StructField("test_col_4", FloatType, true)
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)

(Hops.createFeaturegroup("enable_online_features_test_scala_sdk")
                         .setDataframe(sampleDf)
                         .setPrimaryKey(List("id"))
                         .write())

sampleData: Seq[org.apache.spark.sql.Row] = List([1,0.4151,0.915], [2,1.2151,0.151], [3,0.2151,0.7511], [4,0.8151,0.12541])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(test_col_3,FloatType,true), StructField(test_col_4,FloatType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, test_col_3: float ... 1 more field]


#### Enable Online Feature Serving for Offline-Only feature group

In [2]:
Hops.enableFeaturegroupOnline("enable_online_features_test_scala_sdk").write

#### DIsable Online Feature Serving for a feature group

In [2]:
Hops.disableFeaturegroupOnline("enable_online_features_test_scala_sdk").write

### Insert into Offline/Online Feature Groups

When inserting data into a feature group you can control whether the data should be written only to the offline feature group, only to the online feature group, or to both, using the parameters `online=True` and `offline=True`:

#### Generate Sample Data

In [2]:
val sampleData = Seq(
    Row(5, Date.valueOf("2015-02-30"), 2.001f, "Iran"),
    Row(6, Date.valueOf("2016-05-01"), 3.2171f, "Canada"))
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)

sampleData: Seq[org.apache.spark.sql.Row] = List([5,2015-03-02,2.001,Iran], [6,2016-05-01,3.2171,Canada])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]


#### Insert into Online Feature Group Only

In [3]:
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
                         .setDataframe(sampleDf)
                         .setMode("append")
                         .setOnline(true)
                         .setOffline(false)
                         .write())

#### Insert into Offline Feature Group Only

In [4]:
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
                         .setDataframe(sampleDf)
                         .setMode("append")
                         .setOnline(false)
                         .setOffline(true)
                         .write())

#### Insert into Online and Offline Feature Group

In [5]:
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
                         .setDataframe(sampleDf)
                         .setMode("overwrite")
                         .setOnline(true)
                         .setOffline(true)
                         .write())