# Iris Flower Classification and Serving Using SkLearn, HopsML, and the Hopsworks Feature Store

In this notebook we will, 

1. Load the Iris Flower dataset from HopsFS
2. Do feature engineering on the dataset
3. Save the features to the feature store
4. Read the feature data from the feature store
5. Train a KNN Model using SkLearn
6. Save the trained model to HopsFS
7. Launch a serving instance to serve the trained model
8. Send some prediction requests to the served model
9. Monitor the predictions through Kafka


### Imports

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, featurestore, serving, model, tls
from confluent_kafka import Producer, Consumer, KafkaError
import random

### Load Dataset

In [None]:
project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
    project_path + "TourData/iris/iris.csv")

In [None]:
iris_df.printSchema()

### Feature  Engineering

The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.

In [None]:
encoder = StringIndexer(inputCol="variety", outputCol="label")
fit_model = encoder.fit(iris_df)
iris_df1 = fit_model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
iris_df3.printSchema()

In [None]:
iris_df3.show(5)

In [None]:
lookup_df.show(3)

### Save Features to the Feature Store

We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.

**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the "Settings" tab in your project, select the feature store service and click "Save". 

In [None]:
featurestore.create_featuregroup(iris_df3, "iris_features")

In [None]:
featurestore.create_featuregroup(lookup_df, "iris_labels_lookup", feature_correlation=False, 
                                 feature_histograms=False, cluster_analysis=False)

### Read the Iris Training Dataset from the Feature Store

In [None]:
df = featurestore.get_featuregroup("iris_features", dataframe_type="pandas")
df.head(10)

In [None]:
df.describe()

In [None]:
x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = df[["label"]]
X = x_df.values
y = y_df.values.ravel()

### Train a KNN Model using the Feature Data

In [None]:
neighbors = random.randint(3, 30)
iris_knn = KNeighborsClassifier(n_neighbors=neighbors)
iris_knn.fit(X, y)
y_pred = iris_knn.predict(X)
acc = accuracy_score(y, y_pred)
print(acc)

### Save the Trained Model to HopsFS

In [None]:
joblib.dump(iris_knn, "iris_knn.pkl")
hdfs.mkdir("Resources/sklearn_model")
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources/sklearn_model", overwrite=True)

### Constants

### Serve the Trained Model

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:

```python
from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are not JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"
```

Then upload this python script to some folder in your project and go to the "Model Serving" service in Hopsworks:

![sklearn_serving1.png](./../../images/sklearn_serving1.png)

Then click on "create serving" and configure your serving:

![sklearn_serving2.png](./../../images/sklearn_serving2.png)

Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.

![sklearn_serving3.png](./../../images/sklearn_serving3.png)

It is a best-practice to put the script together with the trained model, below is the code for exporting the script from `Jupyter/Serving/sklearn/iris_flower_classifier.py` to `Models/irisFlowerClassifier/1/iris_flower_classifier.py`.

In [None]:
script_path = "Jupyter/Serving/sklearn/iris_flower_classifier.py"
hdfs.cp("Jupyter/End_To_End_Pipeline/sklearn/iris_flower_classifier.py", "Resources/sklearn_model/iris_flower_classifier.py", overwrite=True)

### Export the Trained Model to the Models Repository

In [None]:
MODEL_NAME = "IrisFlowerClassifier"
model.export("Resources/sklearn_model", MODEL_NAME, metrics={'accuracy': acc})

In [None]:
for p in hdfs.ls("Models/" + MODEL_NAME, recursive=True):
    print(p)

# Query Model Repository for best IrisFlowerClassifier Model

In [None]:
EVALUATION_METRIC="accuracy"
from hops.model import Metric

best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)

print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`

In [None]:
script_path = "Models/" + MODEL_NAME + "/" + str(best_model['version']) + "/iris_flower_classifier.py"
serving.create_or_update(script_path, MODEL_NAME, model_version=best_model['version'], serving_type="SKLEARN")

After the serving have been created, you can find it in the Hopsworks UI by going to the "Model Serving" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: 

- `get_servings()`
- `get_serving_id(serving_name)`
- `get_serving_artifact_path(serving_name)`
- `get_serving_type(serving_name)`
- `get_serving_version(serving_name)`
- `get_serving_kafka_topic(serving_name)`
- `get_serving_status(serving_name)`
- `exist(serving_name)`


In [None]:
for s in serving.get_all():
    print(s.name)

In [None]:
serving.get_id(MODEL_NAME)

In [None]:
serving.get_artifact_path(MODEL_NAME)

In [None]:
serving.get_type(MODEL_NAME)

In [None]:
serving.get_version(MODEL_NAME)

In [None]:
serving.get_kafka_topic(MODEL_NAME)

In [None]:
serving.get_status(MODEL_NAME)

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

In [None]:
if serving.get_status(MODEL_NAME) == "Running":
    serving.stop(MODEL_NAME)
time.sleep(10)

Start new serving

In [None]:
serving.start(MODEL_NAME)

Wait until serving is up and running

In [None]:
while serving.get_status(MODEL_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(5)

### Send Prediction Requests to the Served Model using Hopsworks REST API

#### Constants

In [None]:
TOPIC_NAME = serving.get_kafka_topic(MODEL_NAME)
NUM_FEATURES = 4

For making inference requests you can use the utility method `serving.make_inference_request`

In [None]:
for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(MODEL_NAME, data)
    print(response)

### Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for yourr model's performance and its predictions in a scalable manner.


##### Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs

In [None]:
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

##### Read Kafka Avro Schema From Hopsworks and setup an Avro Reader

In [None]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

##### Read Lookup Table from the Feature Store for Converting Numerical Labels to Categorical

In [None]:
iris_labels_lookup_df = featurestore.get_featuregroup("iris_labels_lookup", dataframe_type="pandas")

##### Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results

In [None]:
for i in range(0, 10):
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
            prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 
                                                         'variety'].iloc[0]
            print("serving: {}, version: {}, timestamp: {},"\
                  "\nrequest: {},\nprediction:{}, prediction_label:{}, http_response_code: {},"\
                  " serving_type: {}\n".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceRequest"],
                       prediction,
                       prediction_label,
                       event_dict["responseHttpCode"],
                       event_dict["servingType"]
            ))
        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")