{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification\n", "---\n", "*FEATURE ENGINEERING --> MODEL TRAINING --> MODEL SERVING --> MODEL MONITORING*\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook we will, \n", "\n", "1. Load the Iris Flower dataset from HopsFS\n", "2. Do feature engineering on the dataset\n", "3. Save the features to the feature store\n", "4. Read the feature data from the feature store\n", "5. Train a KNN Model using SkLearn\n", "6. Save the trained model to HopsFS\n", "7. Launch a serving instance to serve the trained model\n", "8. Send some prediction requests to the served model\n", "9. Monitor the predictions through Kafka" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)\n", "\n", "![hops.png](../../images/hops.png)\n", "\n", "## The `hops` python library\n", "\n", "`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n", "\n", "Have a feature request or encountered an issue? Please let us know on github.\n", "\n", "### Using the `experiment` module\n", "\n", "To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.\n", "\n", "The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.\n", "\n", "An Experiment could be a single Python program, which we refer to as an **Experiment**. \n", "\n", "Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. \n", "\n", "ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.\n", "\n", "### Using the `hdfs` module\n", "The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n", "\n", "```python\n", "# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n", "from hops import hdfs\n", "project_path = hdfs.project_path()\n", "```\n", "\n", "```python\n", "from hops import hdfs\n", "# Uploading the traied model to hdfs\n", "hdfs.copy_to_hdfs(\"iris_knn.pkl\", \"Resources/iris_flower\", overwrite=True)\n", "\n", "# Downloading the iris flower model to the current working directory\n", "iris_flower_model_hdfs_path = hdfs.project_path() + \"Resources/iris_flower/iris_knn.pkl\"\n", "local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)\n", "```\n", "\n", "### Documentation\n", "See the following links to learn more about running experiments in Hopsworks\n", "\n", "- Learn more about experiments\n", "
\n", "- Building end-to-end pipelines\n", "
\n", "- Give us a star, create an issue or a feature request on Hopsworks github\n", "\n", "### Managing experiments\n", "Experiments service provides a unified view of all the experiments run using the `experiment` module.\n", "
\n", "As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.\n", "
\n", "
\n", "![Image7-Monitor.png](../../images/experiments.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import libraries" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
2application_1623896729676_0003pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from sklearn.neighbors import KNeighborsClassifier\n", "from sklearn.metrics import accuracy_score\n", "import joblib\n", "from pyspark.ml.feature import StringIndexer\n", "from pyspark.sql.types import IntegerType\n", "import numpy as np\n", "import time\n", "import json\n", "from hops import kafka, hdfs, serving, model, tls\n", "from confluent_kafka import Producer, Consumer, KafkaError\n", "import random" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare Training Dataset\n", "\n", "### Load Iris Dataset (csv)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "project_path = hdfs.project_path()\n", "iris_df = spark.read.format(\"csv\").option(\"header\", \"true\").option(\"inferSchema\", True).load(\n", " project_path + \"TourData/iris/iris.csv\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- sepal_length: double (nullable = true)\n", " |-- sepal_width: double (nullable = true)\n", " |-- petal_length: double (nullable = true)\n", " |-- petal_width: double (nullable = true)\n", " |-- variety: string (nullable = true)" ] } ], "source": [ "iris_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Feature Engineering\n", "\n", "The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- sepal_length: double (nullable = true)\n", " |-- sepal_width: double (nullable = true)\n", " |-- petal_length: double (nullable = true)\n", " |-- petal_width: double (nullable = true)\n", " |-- label: integer (nullable = true)" ] } ], "source": [ "encoder = StringIndexer(inputCol=\"variety\", outputCol=\"label\")\n", "fit_model = encoder.fit(iris_df)\n", "iris_df1 = fit_model.transform(iris_df)\n", "lookup_df = iris_df1.select([\"variety\", \"label\"]).distinct()\n", "iris_df2 = iris_df1.drop(\"variety\")\n", "iris_df3 = iris_df2.withColumn(\"label\", iris_df2[\"label\"].cast(IntegerType()))\n", "iris_df3.printSchema()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+-----------+------------+-----------+-----+\n", "|sepal_length|sepal_width|petal_length|petal_width|label|\n", "+------------+-----------+------------+-----------+-----+\n", "| 5.1| 3.5| 1.4| 0.2| 0|\n", "| 4.9| 3.0| 1.4| 0.2| 0|\n", "| 4.7| 3.2| 1.3| 0.2| 0|\n", "| 4.6| 3.1| 1.5| 0.2| 0|\n", "| 5.0| 3.6| 1.4| 0.2| 0|\n", "+------------+-----------+------------+-----------+-----+\n", "only showing top 5 rows" ] } ], "source": [ "iris_df3.show(5)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+\n", "| variety|label|\n", "+----------+-----+\n", "| Virginica| 2.0|\n", "|Versicolor| 1.0|\n", "| Setosa| 0.0|\n", "+----------+-----+" ] } ], "source": [ "lookup_df.show(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save Features to the Feature Store\n", "\n", "We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.\n", "\n", "**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the \"Settings\" tab in your project, select the feature store service and click \"Save\". " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "# Create a connection\n", "connection = hsfs.connection()\n", "# Get the feature store handle for the project's feature store\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "iris_features = fs.create_feature_group(name=\"iris_features\", version=1, time_travel_format=None)\n", "iris_features.save(iris_df3)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "iris_labels_lookup = fs.create_feature_group(name=\"iris_labels_lookup\", version=1, time_travel_format=None)\n", "iris_labels_lookup.save(lookup_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the Iris Training Dataset from the Feature Store" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " sepal_length sepal_width petal_width petal_length label\n", "0 5.1 3.5 0.2 1.4 0\n", "1 4.9 3.0 0.2 1.4 0\n", "2 4.7 3.2 0.2 1.3 0\n", "3 4.6 3.1 0.2 1.5 0\n", "4 5.0 3.6 0.2 1.4 0\n", "5 5.4 3.9 0.4 1.7 0\n", "6 4.6 3.4 0.3 1.4 0\n", "7 5.0 3.4 0.2 1.5 0\n", "8 4.4 2.9 0.2 1.4 0\n", "9 4.9 3.1 0.1 1.5 0" ] } ], "source": [ "iris_features = fs.get_feature_group(\"iris_features\", 1)\n", "df = iris_features.read().toPandas()\n", "df.head(10)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " sepal_length sepal_width petal_width petal_length label\n", "count 150.000000 150.000000 150.000000 150.000000 150.000000\n", "mean 5.843333 3.057333 1.199333 3.758000 1.000000\n", "std 0.828066 0.435866 0.762238 1.765298 0.819232\n", "min 4.300000 2.000000 0.100000 1.000000 0.000000\n", "25% 5.100000 2.800000 0.300000 1.600000 0.000000\n", "50% 5.800000 3.000000 1.300000 4.350000 1.000000\n", "75% 6.400000 3.300000 1.800000 5.100000 2.000000\n", "max 7.900000 4.400000 2.500000 6.900000 2.000000" ] } ], "source": [ "df.describe()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]\n", "y_df = df[[\"label\"]]\n", "X = x_df.values\n", "y = y_df.values.ravel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train an Iris Flower Classifier" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "IRIS_FLOWER_RESOURCES_PATH = \"/Resources/iris_flower\"\n", "IRIS_FLOWER_MODEL_PKL = \"iris_knn.pkl\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate a KNN Model using the Feature Data" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.98" ] } ], "source": [ "neighbors = random.randint(3, 30)\n", "iris_knn = KNeighborsClassifier(n_neighbors=neighbors)\n", "iris_knn.fit(X, y)\n", "y_pred = iris_knn.predict(X)\n", "acc = accuracy_score(y, y_pred)\n", "print(acc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save the Trained Model to HopsFS" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Started copying local path iris_knn.pkl to hdfs path hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000//Resources/iris_flower/iris_knn.pkl\n", "\n", "Finished copying" ] } ], "source": [ "joblib.dump(iris_knn, IRIS_FLOWER_MODEL_PKL)\n", "hdfs.mkdir(IRIS_FLOWER_RESOURCES_PATH)\n", "hdfs.copy_to_hdfs(IRIS_FLOWER_MODEL_PKL, IRIS_FLOWER_RESOURCES_PATH, overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Export the Trained Model to the Models Repository" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is a best-practice to put the python script that loads and run the model together with the trained model files. Below is the code for copying the script into the iris flower resources folder, which will be used when exporting the model." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "IRIS_FLOWER_CLASSIFIER_SCRIPT = \"iris_flower_classifier.py\"\n", "\n", "script_path = \"Jupyter/end_to_end_pipeline/sklearn/\" + IRIS_FLOWER_CLASSIFIER_SCRIPT\n", "hdfs.cp(script_path, IRIS_FLOWER_RESOURCES_PATH + \"/\" + IRIS_FLOWER_CLASSIFIER_SCRIPT, overwrite=True)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Exported model irisflowerclassifier as version 1 successfully.\n", "Polling irisflowerclassifier version 1 for model availability.\n", "get model:/hopsworks-api/api/project/120/models/irisflowerclassifier_1?filter_by=endpoint_id:120\n", "Model now available." ] } ], "source": [ "MODEL_NAME = \"irisflowerclassifier\"\n", "\n", "model.export(IRIS_FLOWER_RESOURCES_PATH, MODEL_NAME, metrics={'accuracy': acc})" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_knn.pkl\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_knn.pkl\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/program.ipynb" ] } ], "source": [ "for p in hdfs.ls(\"Models/\" + MODEL_NAME, recursive=True):\n", " print(p)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Serve the Trained Model\n", "\n", "To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:\n", "\n", "```python\n", "from sklearn.externals import joblib\n", "from hops import hdfs\n", "import os\n", "\n", "class Predict(object):\n", "\n", " def __init__(self):\n", " \"\"\" Initializes the serving state, reads a trained model from HDFS\"\"\"\n", " self.model_path = \"Models/irisflowerclassifier/1/iris_knn.pkl\"\n", " print(\"Copying SKLearn model from HDFS to local directory\")\n", " hdfs.copy_to_local(self.model_path)\n", " print(\"Reading local SkLearn model for serving\")\n", " self.model = joblib.load(\"./iris_knn.pkl\")\n", " print(\"Initialization Complete\")\n", "\n", "\n", " def predict(self, inputs):\n", " \"\"\" Serves a prediction request usign a trained model\"\"\"\n", " return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable\n", "\n", " def classify(self, inputs):\n", " \"\"\" Serves a classification request using a trained model\"\"\"\n", " return \"not implemented\"\n", "\n", " def regress(self, inputs):\n", " \"\"\" Serves a regression request using a trained model\"\"\"\n", " return \"not implemented\"\n", "```\n", "\n", "Then upload this python script to some folder in your project and go to the \"Model Serving\" service in Hopsworks:\n", "\n", "![sklearn_serving1.png](./../../images/sklearn_serving1.png)\n", "\n", "Then click on \"create serving\" and configure your serving:\n", "\n", "![sklearn_serving2.png](./../../images/sklearn_serving2.png)\n", "\n", "Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.\n", "\n", "![sklearn_serving3.png](./../../images/sklearn_serving3.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Query Model Repository for best IrisFlowerClassifier Model" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Model name: irisflowerclassifier\n", "Model version: 1\n", "{'accuracy': '0.98'}" ] } ], "source": [ "from hops.model import Metric\n", "EVALUATION_METRIC=\"accuracy\"\n", "\n", "best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)\n", "\n", "print('Model name: ' + best_model['name'])\n", "print('Model version: ' + str(best_model['version']))\n", "print(best_model['metrics'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Inferring model server from artifact files: FLASK\n", "Creating serving irisflowerclassifier for artifact /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py ...\n", "Serving irisflowerclassifier successfully created" ] } ], "source": [ "# Create serving\n", "SERVING_NAME = MODEL_NAME\n", "SCRIPT_PATH = \"Models/\" + MODEL_NAME + \"/\" + str(best_model['version']) + \"/\" + IRIS_FLOWER_CLASSIFIER_SCRIPT\n", "\n", "serving.create_or_update(SERVING_NAME, # define a name for the serving instance\n", " SCRIPT_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed\n", " topic_name=\"CREATE\", # topic name or CREATE to create a new topic for inference logging, otherwise NONE\n", " instances=1 # number of replicas\n", " )" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "irisflowerclassifier" ] } ], "source": [ "for s in serving.get_all():\n", " print(s.name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After the serving have been created, you can find it in the Hopsworks UI by going to the \"Model Serving\" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: \n", "\n", "- `get_servings()`\n", "- `get_serving_id(serving_name)`\n", "- `get_serving_model_path(serving_name)`\n", "- `get_serving_type(serving_name)`\n", "- `get_serving_version(serving_name)`\n", "- `get_serving_kafka_topic(serving_name)`\n", "- `get_serving_status(serving_name)`\n", "- `exist(serving_name)`\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Info: \tid: 788,\n", " model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py,\n", " model_version: 1,\n", " artifact_version: 0,\n", " model_server: FLASK,\n", " serving_tool: DEFAULT" ] } ], "source": [ "print(\"Info: \\tid: {},\\n \\\n", " model_path: {},\\n \\\n", " model_version: {},\\n \\\n", " artifact_version: {},\\n \\\n", " model_server: {},\\n \\\n", " serving_tool: {}\".format(\n", " serving.get_id(SERVING_NAME),\n", " serving.get_model_path(SERVING_NAME),\n", " serving.get_model_version(SERVING_NAME),\n", " serving.get_artifact_version(SERVING_NAME),\n", " serving.get_model_server(SERVING_NAME),\n", " serving.get_serving_tool(SERVING_NAME)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or describe a serving using\n", "- `describe(serving_name)`" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "id: 788, name: irisflowerclassifier, model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py, model_version: 1, artifact_version: 0, transformer: None, model_server: FLASK, serving_tool: DEFAULT, requested_instances: 1, available_instances: 0, available_transformer_instances: 0, predictor_resource_config: {'cores': 1, 'memory': 1024, 'gpus': 0}, creator: Admin Admin, created: 2021-07-07T09:38:40Z, status: Stopped, kafka_topic_dto: " ] } ], "source": [ "serving.describe(SERVING_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Classify Iris Flowers using the Trained Model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Shut down currently running serving" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "if serving.get_status(SERVING_NAME) == \"Running\":\n", " serving.stop(SERVING_NAME)\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start new serving" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting serving with name: irisflowerclassifier...\n", "Serving with name: irisflowerclassifier successfully started" ] } ], "source": [ "serving.start(SERVING_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait until serving is up and running" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "while serving.get_status(SERVING_NAME) != \"Running\":\n", " time.sleep(5) # Let the serving startup correctly\n", "time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Send Prediction Requests to the Served Model using Hopsworks REST API" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For making inference requests you can use the utility method `serving.make_inference_request`" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [1]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}" ] } ], "source": [ "NUM_FEATURES = 4\n", "\n", "for i in range(20):\n", " data = {\"inputs\" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}\n", " response = serving.make_inference_request(SERVING_NAME, data)\n", " print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitor Prediction Logs\n", "\n", "### Consume Prediction Requests and Responses using Kafka\n", "\n", "All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup a Kafka consumer and subscribe to the topic containing the prediction logs" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)\n", "\n", "config = kafka.get_kafka_default_config()\n", "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n", "consumer = Consumer(config)\n", "topics = [TOPIC_NAME]\n", "consumer.subscribe(topics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the Kafka Avro schema from Hopsworks and setup an Avro reader" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "json_schema = kafka.get_schema(TOPIC_NAME)\n", "avro_schema = kafka.convert_json_schema_to_avro(json_schema)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the lookup table from the Feature Store for converting numerical labels to categorical" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "iris_labels_lookup = fs.get_feature_group(\"iris_labels_lookup\", 1)\n", "iris_labels_lookup_df = iris_labels_lookup.read().toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read messages from the Kafka topic, parse them with the Avro schema and print the results" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "serving: irisflowerclassifier, version: 1, timestamp: 1625650784109,\n", " http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "predictions: 1, prediction_label:Versicolor\n", "\n", "serving: irisflowerclassifier, version: 1, timestamp: 1625650784277,\n", " http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "predictions: 2, prediction_label:Virginica\n", "\n", "serving: irisflowerclassifier, version: 1, timestamp: 1625650784416,\n", " http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "predictions: 1, prediction_label:Versicolor\n", "\n", "serving: irisflowerclassifier, version: 1, timestamp: 1625650784596,\n", " http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "predictions: 1, prediction_label:Versicolor\n", "\n", "serving: irisflowerclassifier, version: 1, timestamp: 1625650784732,\n", " http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "predictions: 2, prediction_label:Virginica" ] } ], "source": [ "PRINT_INSTANCES=False\n", "PRINT_PREDICTIONS=True\n", "\n", "for i in range(0, 5):\n", " msg = consumer.poll(timeout=1.0)\n", " if msg is not None:\n", " value = msg.value()\n", " try:\n", " event_dict = kafka.parse_avro_msg(value, avro_schema)\n", " \n", " print(\"serving: {}, version: {}, timestamp: {},\\n\"\\\n", " \" http_response_code: {}, model_server: {}, serving_tool: {}\".format(\n", " event_dict[\"modelName\"],\n", " event_dict[\"modelVersion\"],\n", " event_dict[\"requestTimestamp\"],\n", " event_dict[\"responseHttpCode\"],\n", " event_dict[\"modelServer\"],\n", " event_dict[\"servingTool\"]))\n", " \n", " if PRINT_INSTANCES:\n", " print(\"instances: {}\\n\".format(event_dict[\"inferenceRequest\"]))\n", " if PRINT_PREDICTIONS:\n", " prediction = json.loads(event_dict[\"inferenceResponse\"])[\"predictions\"][0]\n", " prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 'variety'].iloc[0]\n", " print(\"predictions: {}, prediction_label:{}\\n\".format(prediction, prediction_label))\n", "\n", " except Exception as e:\n", " print(\"A message was read but there was an error parsing it\")\n", " print(e)\n", " else:\n", " print(\"timeout.. no more messages to read from topic\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }