# Model Serving with KServe and Scikit-Learn - Iris Flower Classification
---

*INPUT --> [ TRANSFORMER --> ENRICHED INPUT ] --> MODEL --> PREDICTION*

<font color='red'> <h3>This notebook requires KServe to be installed</h3></font>

> **NOTE:** It is assumed that a model called *irisflowerclassifier* is already available in Hopsworks. An example of training a model for the *Iris flower classification problem* is available in `Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb`

## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)

![hops.png](../../../images/hops.png)

### Hopsworks Machine Learning (HSML) library

`hsml` is the library to interact with the Hopsworks Model Registry and Model Serving. The library makes it easy to export, manage and deploy models. To learn more about `hsml`, see the <a href="https://docs.hopsworks.ai/machine-learning-api/latest">Hopsworks Model Management</a> docs.

## Serve the Iris Flower classifier

Scikit-learn models can be serve directly on KServe without a custom predictor script. 

#### Transformer script (Optional) 

To serve a model with a transformer, write a python script that implements the `Transformer` class and the methods `preprocess` and `postprocess`, like this:

```python
class Transformer(object):
    def __init__(self):
        """Initialization code goes here"""
        # NOTE: The env var ARTIFACT_FILES_PATH contains the path to the model artifact files

    def preprocess(self, inputs):
        """Transform the request inputs. The object returned by this method will be used as model input."""
        return inputs

    def postprocess(self, outputs):
        """Transform the predictions computed by the model before returning a response."""
        return outputs
```

#### Create a connection to Hopsworks

In [1]:
import hsml

# Connect with Hopsworks
conn = hsml.connection()

# Retrieve the model registry handle
mr = conn.get_model_registry()

Connected. Call `.close()` to terminate connection gracefully.


### Query Model Registry for best Iris Classifier Model

In [2]:
MODEL_NAME = "irisflowerclassifier"

# Get the best version of the model
best_model = mr.get_best_model(MODEL_NAME, "accuracy", "max")

print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)

Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}


### Create a Deployment for the Trained Model

In [3]:
# Deploy the trained model
irisclassifier = best_model.deploy(serving_tool="KSERVE")

After the deployment has been created, you can find it in the Hopsworks UI by going to the "Deployments" tab. You can also use the class attributes or the `.describe()` method of a deployment object to access its metadata.

In [4]:
print("Deployment: " + irisclassifier.name)
irisclassifier.describe()

Deployment: irisflowerclassifier
{
    "artifact_version": 0,
    "batching_enabled": false,
    "created": "2022-05-18T14:41:41.316Z",
    "creator": "Admin Admin",
    "id": 8,
    "inference_logging": "ALL",
    "kafka_topic_dto": {
        "name": "CREATE",
        "num_of_partitions": 1,
        "num_of_replicas": 1
    },
    "model_name": "irisflowerclassifier",
    "model_path": "/Projects/demo_ml_meb10000/Models/irisflowerclassifier",
    "model_server": "PYTHON",
    "model_version": 1,
    "name": "irisflowerclassifier",
    "predictor": null,
    "predictor_resource_config": {
        "cores": 1,
        "gpus": 0,
        "memory": 1024
    },
    "requested_instances": 1,
    "serving_tool": "KSERVE"
}


#### Use a transformer to enrich model inputs (optional)

When creating a deployment for the trained model, you can attach a transformer by setting a custom transformer script.

In [None]:
# from hsml.transformer import Transformer

# TRANSFORMER_SCRIPT_PATH = mr.project_path + "/Jupyter/serving/kserve/sklearn/transformer.py" # or .ipynb

# irisclassifier = best_model.deploy(serving_tool="KSERVE",
#                                    transformer=Transformer(script_file=TRANSFORMER_SCRIPT_PATH))

## Classify flowers with the Iris Flower classifier

### Start Deployment

In [5]:
irisclassifier.start()

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

### Send Prediction Requests to the Deployed Model

For making inference requests you can use the `.predict()` method of the deployment metadata object.

In [6]:
for i in range(20):
    data = {"instances" : [best_model.input_example]}
    predictions = irisclassifier.predict(data)
    print(predictions)

{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}


## Monitor Prediction Logs

### Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner.

In [7]:
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

In [8]:
TOPIC_NAME = irisclassifier.inference_logger.kafka_topic.name

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

In [9]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

In [10]:
import json

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")

INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885019, inferenceId:2d23b009-f968-4f6b-995a-c79b4f4fa8af, messageType:response
Predictions -> [0]

INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885020, inferenceId:63712ace-ab66-48bf-89b7-e523745bac99, messageType:response
Predictions -> [0]

INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885021, inferenceId:9cd56df2-02f9-4227-ac30-fba1a2c06fdf, messageType:response
Predictions -> [0]

INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885021, inferenceId:73214f35-3792-42b8-a9e1-6a2ac4d7a5c1, messageType:response
Predictions -> [0]

INFO -> servingId: 8, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1652885022, inferenceId:485152f2-ef5d-4949-8b7f-4f0460524879, messageType:response
Predictions -> [0]

