# Model Serving with Docker/Kubernetes and Scikit-learn - Iris Flower Classification
---
*INPUT --> PREDICTOR (MODEL) --> PREDICTION*

Serving scikit-learn models with Docker or Kubernetes requires the implementation of a **predictor** script (i.e., python script) that implements the *Predict* class.

> **NOTE:** It is assumed that a model called *irisflowerclassifier* is already available in Hopsworks. An example of training a model for the *Iris flower classification problem* is available in `Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb`

## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks) 
![hops.png](../../../images/hops.png)

### Hopsworks Machine Learning (HSML) library

`hsml` is the library to interact with the Hopsworks Model Registry and Model Serving. The library makes it easy to export, manage and deploy models. To learn more about `hsml`, see the <a href="https://docs.hopsworks.ai/machine-learning-api/latest">Hopsworks Model Management</a> docs.


## Serve the Iris Flower classifier

#### Predictor script

To serve a Scikit-Learn Model, write a predictor script in Python that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:

```python
import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/irisflowerclassifier/1/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"
```


#### Create a connection to Hopsworks

In [1]:
import hsml

# Connect with Hopsworks
conn = hsml.connection()

# Retrieve the model registry handle
mr = conn.get_model_registry()

Connected. Call `.close()` to terminate connection gracefully.


### Query Model Registry for best Iris Flower Classifier Model

In [2]:
MODEL_NAME = "irisflowerclassifier"

best_model = mr.get_best_model(MODEL_NAME, "accuracy", "max")

print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print('Model metrics: ' + str(best_model.training_metrics))

Model name: irisflowerclassifier
Model version: 1
Model metrics: {'accuracy': '0.98'}


### Create Deployment for the Trained Model

After the deployment has been created, you can find it in the Hopsworks UI by going to the "Deployments" tab. You can also use the class attributes or the `.describe()` method of a deployment object to access its metadata.

In [3]:
PREDICTOR_SCRIPT_PATH = mr.project_path + "/Jupyter/serving/docker_and_kubernetes/python/predictor.py" # or .ipynb

# Deploy the trained model
irisclassifier = best_model.deploy(script_file=PREDICTOR_SCRIPT_PATH)

After the serving have been created, you can find it in the Hopsworks UI by going to the "Deployments" tab. You can also use the class attributes or the `.describe()` method of a deployment object to describe access its metadata.

In [4]:
print("Deployment: " + irisclassifier.name)
irisclassifier.describe()

Deployment: irisflowerclassifier
{
    "artifact_version": 2,
    "batching_enabled": false,
    "created": "2022-05-18T13:18:37.853Z",
    "creator": "Admin Admin",
    "id": 4,
    "inference_logging": "ALL",
    "kafka_topic_dto": {
        "name": "CREATE",
        "num_of_partitions": 1,
        "num_of_replicas": 1
    },
    "model_name": "irisflowerclassifier",
    "model_path": "/Projects/demo_ml_meb10000/Models/irisflowerclassifier",
    "model_server": "PYTHON",
    "model_version": 1,
    "name": "irisflowerclassifier",
    "predictor": "predictor.py",
    "predictor_resource_config": {
        "cores": 1,
        "gpus": 0,
        "memory": 1024
    },
    "requested_instances": 1,
    "serving_tool": "DEFAULT"
}


## Classify flowers with the Iris Flower classifier

### Start Deployment

In [5]:
irisclassifier.start()

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

### Check the Model Server Logs

You can access the server logs using Kibana by clicking on the 'Show logs' button in the action bar, and filter them using fields such as serving component (i.e., predictor or transformer) or container name among other things.

![docker_sklearn_predictor_logs.gif](./../../../images/docker_sklearn_predictor_logs.gif)

### Send Prediction Requests to the Served Model

For making inference requests you can use the `.predict()` method of the deployment metadata object.

In [7]:
for i in range(20):
    data = {"instances" : [best_model.input_example]}
    predictions = irisclassifier.predict(data)
    print(predictions)

{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}


## Monitor Prediction Logs

### Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner.

In [8]:
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup a Kafka consumer and subscribe to the topic containing the prediction logs

In [9]:
TOPIC_NAME = irisclassifier.inference_logger.kafka_topic.name

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

In [10]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

In [12]:
import json

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 5):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            
            print("serving: {}, version: {}, timestamp: {},\n"\
                  "        http_response_code: {}, model_server: {}, serving_tool: {}".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["responseHttpCode"],
                       event_dict["modelServer"],
                       event_dict["servingTool"]))
            
            if PRINT_INSTANCES:
                print("instances: {}\n".format(event_dict["inferenceRequest"]))
            if PRINT_PREDICTIONS:
                prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
                print("predictions: {}\n".format(prediction))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")

serving: irisflowerclassifier, version: 1, timestamp: 1652883610404,
        http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT
predictions: 0

serving: irisflowerclassifier, version: 1, timestamp: 1652883611088,
        http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT
predictions: 0

serving: irisflowerclassifier, version: 1, timestamp: 1652883611898,
        http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT
predictions: 0

serving: irisflowerclassifier, version: 1, timestamp: 1652883612895,
        http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT
predictions: 0

serving: irisflowerclassifier, version: 1, timestamp: 1652883613715,
        http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT
predictions: 0

