---
title: "Model Serving with Docker/Kubernetes and Scikit-Learn - Iris Flower Classification"
date: 2021-02-24
type: technical_note
draft: false
---

# Model Serving with Docker/Kubernetes and Scikit-Learn - Iris Flower Classification
---
*INPUT --> MODEL --> PREDICTION*

> **NOTE:** It is assumed that a model called *irisflowerclassifier* is already available in Hopsworks. An example of training a model for the *Iris flower classification problem* is available in `Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb`

## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)

![hops.png](../../../images/hops.png)

## The `hops` python library

`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on <a href="https://github.com/logicalclocks/hops-util-py">github</a>.

### Using the `hdfs` module
The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`

```python
# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
```

```python
from hops import hdfs
# Uploading the traied model to hdfs
hdfs.copy_to_hdfs("iris_flower_knn.pkl", "Resources/iris_flower", overwrite=True)

# Downloading the iris flower model to the current working directory
iris_flower_model_hdfs_path = hdfs.project_path() + "Resources/iris_flower/iris_flower_knn.pkl"
local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)
```

## Serve the Iris Flower classifier

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:

```python
from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/irisflowerclassifier/1/iris_flower_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_flower_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"
```

Then upload this python script to some folder in your project and go to the "Model Serving" service in Hopsworks:

![sklearn_serving1.png](./../../../images/sklearn_serving1.png)

Then click on "create serving" and configure your serving:

![sklearn_serving2.png](./../../../images/sklearn_serving2.png)

Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.

![sklearn_serving3.png](./../../../images/sklearn_serving3.png)

### Query Model Repository for best IrisFlowerClassifier Model

In [1]:
from hops import model
from hops.model import Metric

MODEL_NAME = "irisflowerclassifier"
EVALUATION_METRIC="accuracy"
IRIS_FLOWER_CLASSIFIER_SCRIPT = "iris_flower_classifier.py"

best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)

print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])

Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}


### Create Model Serving of Exported Model

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`

In [2]:
from hops import serving

In [3]:
# Create serving
SERVING_NAME = MODEL_NAME
SCRIPT_PATH = "Models/" + MODEL_NAME + "/" + str(best_model['version']) + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT

serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                         SCRIPT_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed
                         topic_name="CREATE", # topic name or CREATE to create a new topic for inference logging, otherwise NONE
                         instances=1 # number of replicas
                         )

Inferring model server from artifact files: FLASK
Creating serving irisflowerclassifier for artifact /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py ...
Serving irisflowerclassifier successfully created


In [4]:
for s in serving.get_all():
    print(s.name)

irisflowerclassifier


After the serving have been created, you can find it in the Hopsworks UI by going to the "Model Serving" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: 

- `get_servings()`
- `get_serving_id(serving_name)`
- `get_serving_model_path(serving_name)`
- `get_serving_type(serving_name)`
- `get_serving_version(serving_name)`
- `get_serving_kafka_topic(serving_name)`
- `get_serving_status(serving_name)`
- `exist(serving_name)`


In [5]:
print("Info: \tid: {},\n \
       model_path: {},\n \
       model_version: {},\n \
       artifact_version: {},\n \
       model_server: {},\n \
       serving_tool: {}".format(
    serving.get_id(SERVING_NAME),
    serving.get_model_path(SERVING_NAME),
    serving.get_model_version(SERVING_NAME),
    serving.get_artifact_version(SERVING_NAME),
    serving.get_model_server(SERVING_NAME),
    serving.get_serving_tool(SERVING_NAME)))

Info: 	id: 789,
        model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py,
        model_version: 1,
        artifact_version: 0,
        model_server: FLASK,
        serving_tool: DEFAULT


Or describe a serving using
- `describe(serving_name)`

In [6]:
serving.describe(SERVING_NAME)

id: 789, name: irisflowerclassifier, model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py, model_version: 1, artifact_version: 0, transformer: None, model_server: FLASK, serving_tool: DEFAULT, requested_instances: 1, available_instances: 0, available_transformer_instances: 0, predictor_resource_config: {'cores': 1, 'memory': 1024, 'gpus': 0}, creator: Admin Admin, created: 2021-07-07T09:44:24Z, status: Stopped, kafka_topic_dto: <hops.kafka.KafkaTopicDTO object at 0x7f5a10ff13d0>


## Classify flowers with the Iris Flower classifier

### Start Model Serving Server

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

In [7]:
import time
if serving.get_status(SERVING_NAME) == "Running":
    serving.stop(SERVING_NAME)
time.sleep(10)

Start new serving

In [8]:
serving.start(SERVING_NAME)

Starting serving with name: irisflowerclassifier...
Serving with name: irisflowerclassifier successfully started


Wait until serving is up and running

In [9]:
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(15)

### Send Prediction Requests to the Served Model using Hopsworks REST API

For making inference requests you can use the utility method `serving.make_inference_request`

In [10]:
import json
import random

NUM_FEATURES = 4

for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)

{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}


## Monitor Prediction Logs

### Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner.

In [11]:
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup a Kafka consumer and subscribe to the topic containing the prediction logs

In [12]:
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

In [13]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

In [15]:
PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 5):
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            
            print("serving: {}, version: {}, timestamp: {},\n"\
                  "        http_response_code: {}, model_server: {}, serving_tool: {}".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["responseHttpCode"],
                       event_dict["modelServer"],
                       event_dict["servingTool"]))
            
            if PRINT_INSTANCES:
                print("instances: {}\n".format(event_dict["inferenceRequest"]))
            if PRINT_PREDICTIONS:
                prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
                print("predictions: {}\n".format(prediction))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")

serving: irisflowerclassifier, version: 1, timestamp: 1625651538595,
        http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2

serving: irisflowerclassifier, version: 1, timestamp: 1625651538772,
        http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2

serving: irisflowerclassifier, version: 1, timestamp: 1625651538928,
        http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2

serving: irisflowerclassifier, version: 1, timestamp: 1625651539069,
        http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2

serving: irisflowerclassifier, version: 1, timestamp: 1625651539230,
        http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 1

