{ "cells": [ { "cell_type": "raw", "metadata": {}, "source": [ "---\n", "title: \"Model Serving with KFServing and Scikit-learn - Iris Flower Classification\"\n", "date: 2021-01-10\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Model Serving with KFServing and Scikit-Learn - Iris Flower Classification\n", "---\n", "*INPUT --> MODEL --> PREDICTION*\n", "\n", "

This notebook requires KFServing to be installed

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> **NOTE:** It is assumed that a model called *irisflowerclassifier* is already available in Hopsworks. An example of training a model for the *Iris flower classification problem* is available in `Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)\n", "\n", "![hops.png](../../../images/hops.png)\n", "\n", "### The `hops` python library\n", "\n", "`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n", "\n", "Have a feature request or encountered an issue? Please let us know on github." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Serve the Iris Flower classifier" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Query Model Registry for best Iris Classifier Model" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully.\n" ] } ], "source": [ "import hsml\n", "\n", "conn = hsml.connection()\n", "mr = conn.get_model_registry()" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Model name: irisflowerclassifier\n", "Model version: 1\n", "{'accuracy': '0.98'}\n" ] } ], "source": [ "MODEL_NAME=\"irisflowerclassifier\"\n", "EVALUATION_METRIC=\"accuracy\"\n", "\n", "best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, \"max\")\n", "\n", "print('Model name: ' + best_model.name)\n", "print('Model version: ' + str(best_model.version))\n", "print(best_model.training_metrics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Model Serving of Exported Model" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from hops import serving" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2022-01-17 15:14:07,102 INFO: Serving irisflowerclassifier successfully created\n" ] } ], "source": [ "# Create serving instance\n", "SERVING_NAME = MODEL_NAME\n", "\n", "response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance\n", " model_path=best_model.model_path, # set the path of the model to be deployed\n", " model_server=\"PYTHON\", # set the model server to run the model\n", " kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version\n", " # optional arguments\n", " model_version=best_model.version, # set the version of the model to be deployed\n", " topic_name=\"CREATE\", # (optional) set the topic name or CREATE to create a new topic for inference logging\n", " inference_logging=\"ALL\", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL\n", " instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the serving instance is created, it will be shown in the \"Model Serving\" tab in the Hopsworks UI. You can view detailed information like server-logs and which Kafka Topic it is logging inference requests to.\n", "\n", "![kfserving_sklearn_modelonly_details.gif](../../../images/kfserving_sklearn_modelonly_details.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also use the Python module to query the Hopsworks REST API about information on the existing servings using methods like: \n", "\n", "- `get_all()`\n", "- `get_id(serving_name)`\n", "- `get_model_path(serving_name)`\n", "- `get_model_version(serving_name)`\n", "- `get_artifact_version(serving_name)`\n", "- `get_kafka_topic(serving_name)`\n", "- `...`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Info: \tid: 2523,\n", " model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier,\n", " model_version: 1,\n", " artifact_version: 0,\n", " model_server: PYTHON,\n", " serving_tool: KFSERVING\n" ] } ], "source": [ "print(\"Info: \\tid: {},\\n \\\n", " model_path: {},\\n \\\n", " model_version: {},\\n \\\n", " artifact_version: {},\\n \\\n", " model_server: {},\\n \\\n", " serving_tool: {}\".format(\n", " serving.get_id(SERVING_NAME),\n", " serving.get_model_path(SERVING_NAME),\n", " serving.get_model_version(SERVING_NAME),\n", " serving.get_artifact_version(SERVING_NAME),\n", " serving.get_model_server(SERVING_NAME),\n", " serving.get_serving_tool(SERVING_NAME)))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "irisflowerclassifier\n" ] } ], "source": [ "for s in serving.get_all():\n", " print(s.name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Classify flowers with the Iris Flower classifier" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Model Serving Server" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting serving with name: irisflowerclassifier...\n", "Serving with name: irisflowerclassifier successfully started\n" ] } ], "source": [ "if serving.get_status(SERVING_NAME) == 'Stopped':\n", " serving.start(SERVING_NAME)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import time\n", "while serving.get_status(SERVING_NAME) != \"Running\":\n", " time.sleep(5) # Let the serving startup correctly\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check the Server Logs\n", "\n", "You can access the server logs using Kibana by clicking on the 'Show logs' button in the action bar, and filter them using fields such as serving component (i.e., predictor or transformer) or container name among other things." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![kfserving_sklearn_modelonly_logs.gif](./../../../images/kfserving_sklearn_modelonly_logs.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Send Prediction Requests to the Served Model using Hopsworks REST API" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'predictions': [2]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [2]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n" ] } ], "source": [ "import json\n", "import random\n", "\n", "NUM_FEATURES = 4\n", "\n", "for i in range(20):\n", " data = {\"instances\" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}\n", " response = serving.make_inference_request(SERVING_NAME, data)\n", " print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitor Prediction Requests and Responses using Kafka" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "from hops import kafka\n", "from confluent_kafka import Producer, Consumer, KafkaError" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup Kafka consumer and subscribe to the topic containing the prediction logs" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)\n", "\n", "config = kafka.get_kafka_default_config()\n", "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n", "consumer = Consumer(config)\n", "topics = [TOPIC_NAME]\n", "consumer.subscribe(topics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the Kafka Avro schema from Hopsworks and setup an Avro reader" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "json_schema = kafka.get_schema(TOPIC_NAME)\n", "avro_schema = kafka.convert_json_schema_to_avro(json_schema)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read messages from the Kafka topic, parse them with the Avro schema and print the results" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO -> servingId: 2523, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641839298, inferenceId:9fff76ad-55e1-4c52-b0e9-7019ce79a249, messageType:response\n", "Predictions -> [2]\n", "\n", "INFO -> servingId: 2523, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641839298, inferenceId:03c8d155-cbb9-4907-bfc2-630d3777e56f, messageType:response\n", "Predictions -> [1]\n", "\n", "INFO -> servingId: 2523, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641839298, inferenceId:cd45feb2-d1c1-43b6-a612-f0dffeaf0e01, messageType:response\n", "Predictions -> [2]\n", "\n", "INFO -> servingId: 2523, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641839298, inferenceId:4b14a25a-fed1-45cc-a994-a1b37ba55543, messageType:response\n", "Predictions -> [2]\n", "\n", "INFO -> servingId: 2523, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641839299, inferenceId:dbe84132-b0ce-4196-bd5c-da72f1607f45, messageType:response\n", "Predictions -> [0]\n", "\n" ] } ], "source": [ "PRINT_INSTANCES=False\n", "PRINT_PREDICTIONS=True\n", "\n", "for i in range(0, 10):\n", " msg = consumer.poll(timeout=5.0)\n", " if msg is not None:\n", " value = msg.value()\n", " try:\n", " event_dict = kafka.parse_avro_msg(value, avro_schema) \n", " payload = json.loads(event_dict[\"payload\"])\n", " \n", " if (event_dict['messageType'] == \"request\" and not PRINT_INSTANCES) or \\\n", " (event_dict['messageType'] == \"response\" and not PRINT_PREDICTIONS):\n", " continue\n", " \n", " print(\"INFO -> servingId: {}, modelName: {}, modelVersion: {},\"\\\n", " \"requestTimestamp: {}, inferenceId:{}, messageType:{}\".format(\n", " event_dict[\"servingId\"],\n", " event_dict[\"modelName\"],\n", " event_dict[\"modelVersion\"],\n", " event_dict[\"requestTimestamp\"],\n", " event_dict[\"inferenceId\"],\n", " event_dict[\"messageType\"]))\n", "\n", " if event_dict['messageType'] == \"request\":\n", " print(\"Instances -> {}\\n\".format(payload['instances']))\n", " \n", " if event_dict['messageType'] == \"response\":\n", " print(\"Predictions -> {}\\n\".format(payload['predictions']))\n", "\n", " except Exception as e:\n", " print(\"A message was read but there was an error parsing it\")\n", " print(e)\n", " else:\n", " print(\"timeout.. no more messages to read from topic\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.11" } }, "nbformat": 4, "nbformat_minor": 4 }