{ "cells": [ { "cell_type": "raw", "metadata": {}, "source": [ "---\n", "title: \"Scikit-Learn End-to-End Example - Iris\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Iris Flower Classification and Serving Using SkLearn, HopsML, and the Hopsworks Feature Store\n", "\n", "In this notebook we will, \n", "\n", "1. Load the Iris Flower dataset from HopsFS\n", "2. Do feature engineering on the dataset\n", "3. Save the features to the feature store\n", "4. Read the feature data from the feature store\n", "5. Train a KNN Model using SkLearn\n", "6. Save the trained model to HopsFS\n", "7. Launch a serving instance to serve the trained model\n", "8. Send some prediction requests to the served model\n", "9. Monitor the predictions through Kafka\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Imports" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
6application_1617283456172_0007pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from sklearn.neighbors import KNeighborsClassifier\n", "from sklearn.metrics import accuracy_score\n", "import joblib\n", "from pyspark.ml.feature import StringIndexer\n", "from pyspark.sql.types import IntegerType\n", "import numpy as np\n", "import time\n", "import json\n", "from hops import kafka, hdfs, serving, model, tls\n", "from confluent_kafka import Producer, Consumer, KafkaError\n", "import random" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Dataset" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "project_path = hdfs.project_path()\n", "iris_df = spark.read.format(\"csv\").option(\"header\", \"true\").option(\"inferSchema\", True).load(\n", " project_path + \"TourData/iris/iris.csv\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- sepal_length: double (nullable = true)\n", " |-- sepal_width: double (nullable = true)\n", " |-- petal_length: double (nullable = true)\n", " |-- petal_width: double (nullable = true)\n", " |-- variety: string (nullable = true)" ] } ], "source": [ "iris_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Feature Engineering\n", "\n", "The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- sepal_length: double (nullable = true)\n", " |-- sepal_width: double (nullable = true)\n", " |-- petal_length: double (nullable = true)\n", " |-- petal_width: double (nullable = true)\n", " |-- label: integer (nullable = true)" ] } ], "source": [ "encoder = StringIndexer(inputCol=\"variety\", outputCol=\"label\")\n", "fit_model = encoder.fit(iris_df)\n", "iris_df1 = fit_model.transform(iris_df)\n", "lookup_df = iris_df1.select([\"variety\", \"label\"]).distinct()\n", "iris_df2 = iris_df1.drop(\"variety\")\n", "iris_df3 = iris_df2.withColumn(\"label\", iris_df2[\"label\"].cast(IntegerType()))\n", "iris_df3.printSchema()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+-----------+------------+-----------+-----+\n", "|sepal_length|sepal_width|petal_length|petal_width|label|\n", "+------------+-----------+------------+-----------+-----+\n", "| 5.1| 3.5| 1.4| 0.2| 2|\n", "| 4.9| 3.0| 1.4| 0.2| 2|\n", "| 4.7| 3.2| 1.3| 0.2| 2|\n", "| 4.6| 3.1| 1.5| 0.2| 2|\n", "| 5.0| 3.6| 1.4| 0.2| 2|\n", "+------------+-----------+------------+-----------+-----+\n", "only showing top 5 rows" ] } ], "source": [ "iris_df3.show(5)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+\n", "| variety|label|\n", "+----------+-----+\n", "| Virginica| 0.0|\n", "|Versicolor| 1.0|\n", "| Setosa| 2.0|\n", "+----------+-----+" ] } ], "source": [ "lookup_df.show(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save Features to the Feature Store\n", "\n", "We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.\n", "\n", "**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the \"Settings\" tab in your project, select the feature store service and click \"Save\". " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "# Create a connection\n", "connection = hsfs.connection()\n", "# Get the feature store handle for the project's feature store\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "iris_features = fs.create_feature_group(name=\"iris_features\", version=1, time_travel_format=None)\n", "iris_features.save(iris_df3)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "iris_labels_lookup = fs.create_feature_group(name=\"iris_labels_lookup\", version=1, time_travel_format=None)\n", "iris_labels_lookup.save(lookup_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the Iris Training Dataset from the Feature Store" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " sepal_length sepal_width petal_width petal_length label\n", "0 5.1 3.5 0.2 1.4 2\n", "1 4.9 3.0 0.2 1.4 2\n", "2 4.7 3.2 0.2 1.3 2\n", "3 4.6 3.1 0.2 1.5 2\n", "4 5.0 3.6 0.2 1.4 2\n", "5 5.4 3.9 0.4 1.7 2\n", "6 4.6 3.4 0.3 1.4 2\n", "7 5.0 3.4 0.2 1.5 2\n", "8 4.4 2.9 0.2 1.4 2\n", "9 4.9 3.1 0.1 1.5 2" ] } ], "source": [ "iris_features = fs.get_feature_group(\"iris_features\", 1)\n", "df = iris_features.read().toPandas()\n", "df.head(10)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " sepal_length sepal_width petal_width petal_length label\n", "count 150.000000 150.000000 150.000000 150.000000 150.000000\n", "mean 5.843333 3.057333 1.199333 3.758000 1.000000\n", "std 0.828066 0.435866 0.762238 1.765298 0.819232\n", "min 4.300000 2.000000 0.100000 1.000000 0.000000\n", "25% 5.100000 2.800000 0.300000 1.600000 0.000000\n", "50% 5.800000 3.000000 1.300000 4.350000 1.000000\n", "75% 6.400000 3.300000 1.800000 5.100000 2.000000\n", "max 7.900000 4.400000 2.500000 6.900000 2.000000" ] } ], "source": [ "df.describe()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]\n", "y_df = df[[\"label\"]]\n", "X = x_df.values\n", "y = y_df.values.ravel()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a KNN Model using the Feature Data" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.98" ] } ], "source": [ "neighbors = random.randint(3, 30)\n", "iris_knn = KNeighborsClassifier(n_neighbors=neighbors)\n", "iris_knn.fit(X, y)\n", "y_pred = iris_knn.predict(X)\n", "acc = accuracy_score(y, y_pred)\n", "print(acc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save the Trained Model to HopsFS" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Started copying local path iris_knn.pkl to hdfs path hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Resources/sklearn_model/iris_knn.pkl\n", "\n", "Finished copying" ] } ], "source": [ "joblib.dump(iris_knn, \"iris_knn.pkl\")\n", "hdfs.mkdir(\"Resources/sklearn_model\")\n", "hdfs.copy_to_hdfs(\"iris_knn.pkl\", \"Resources/sklearn_model\", overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Constants" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Serve the Trained Model\n", "\n", "To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:\n", "\n", "```python\n", "from sklearn.externals import joblib\n", "from hops import hdfs\n", "import os\n", "\n", "class Predict(object):\n", "\n", " def __init__(self):\n", " \"\"\" Initializes the serving state, reads a trained model from HDFS\"\"\"\n", " self.model_path = \"Models/iris_knn.pkl\"\n", " print(\"Copying SKLearn model from HDFS to local directory\")\n", " hdfs.copy_to_local(self.model_path)\n", " print(\"Reading local SkLearn model for serving\")\n", " self.model = joblib.load(\"./iris_knn.pkl\")\n", " print(\"Initialization Complete\")\n", "\n", "\n", " def predict(self, inputs):\n", " \"\"\" Serves a prediction request usign a trained model\"\"\"\n", " return self.model.predict(inputs).tolist() # Numpy Arrays are not JSON serializable\n", "\n", " def classify(self, inputs):\n", " \"\"\" Serves a classification request using a trained model\"\"\"\n", " return \"not implemented\"\n", "\n", " def regress(self, inputs):\n", " \"\"\" Serves a regression request using a trained model\"\"\"\n", " return \"not implemented\"\n", "```\n", "\n", "Then upload this python script to some folder in your project and go to the \"Model Serving\" service in Hopsworks:\n", "\n", "![sklearn_serving1.png](./../../images/sklearn_serving1.png)\n", "\n", "Then click on \"create serving\" and configure your serving:\n", "\n", "![sklearn_serving2.png](./../../images/sklearn_serving2.png)\n", "\n", "Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.\n", "\n", "![sklearn_serving3.png](./../../images/sklearn_serving3.png)\n", "\n", "It is a best-practice to put the script together with the trained model, below is the code for exporting the script from `Jupyter/Serving/sklearn/iris_flower_classifier.py` to `Models/irisFlowerClassifier/1/iris_flower_classifier.py`." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "script_path = \"Jupyter/Serving/sklearn/iris_flower_classifier.py\"\n", "hdfs.cp(\"Jupyter/End_To_End_Pipeline/sklearn/iris_flower_classifier.py\", \"Resources/sklearn_model/iris_flower_classifier.py\", overwrite=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Export the Trained Model to the Models Repository" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Exported model IrisFlowerClassifier as version 1 successfully.\n", "Polling IrisFlowerClassifier version 1 for model availability.\n", "get model:/hopsworks-api/api/project/125/models/IrisFlowerClassifier_1?filter_by=endpoint_id:125\n", "Model now available." ] } ], "source": [ "MODEL_NAME = \"IrisFlowerClassifier\"\n", "model.export(\"Resources/sklearn_model\", MODEL_NAME, metrics={'accuracy': acc})" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_knn.pkl\n", "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/program.ipynb" ] } ], "source": [ "for p in hdfs.ls(\"Models/\" + MODEL_NAME, recursive=True):\n", " print(p)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Query Model Repository for best IrisFlowerClassifier Model" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Model name: IrisFlowerClassifier\n", "Model version: 1\n", "{'accuracy': '0.98'}" ] } ], "source": [ "EVALUATION_METRIC=\"accuracy\"\n", "from hops.model import Metric\n", "\n", "best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)\n", "\n", "print('Model name: ' + best_model['name'])\n", "print('Model version: ' + str(best_model['version']))\n", "print(best_model['metrics'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Creating serving IrisFlowerClassifier for artifact /Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py ...\n", "Serving IrisFlowerClassifier successfully created" ] } ], "source": [ "# Create serving\n", "serving_name = MODEL_NAME\n", "script_path = \"Models/\" + MODEL_NAME + \"/\" + str(best_model['version']) + \"/iris_flower_classifier.py\"\n", "serving.create_or_update(serving_name, script_path, model_version=best_model['version'], model_server=\"FLASK\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After the serving have been created, you can find it in the Hopsworks UI by going to the \"Model Serving\" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: \n", "\n", "- `get_servings()`\n", "- `get_serving_id(serving_name)`\n", "- `get_serving_artifact_path(serving_name)`\n", "- `get_serving_type(serving_name)`\n", "- `get_serving_version(serving_name)`\n", "- `get_serving_kafka_topic(serving_name)`\n", "- `get_serving_status(serving_name)`\n", "- `exist(serving_name)`\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "IrisFlowerClassifier" ] } ], "source": [ "for s in serving.get_all():\n", " print(s.name)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "9" ] } ], "source": [ "serving.get_id(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py'" ] } ], "source": [ "serving.get_artifact_path(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'FLASK'" ] } ], "source": [ "serving.get_model_server(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'DEFAULT'" ] } ], "source": [ "serving.get_serving_tool(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1" ] } ], "source": [ "serving.get_version(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'IrisFlowerClassifier-inf2074'" ] } ], "source": [ "serving.get_kafka_topic(MODEL_NAME)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'Stopped'" ] } ], "source": [ "serving.get_status(MODEL_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Shut down currently running serving" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "if serving.get_status(MODEL_NAME) == \"Running\":\n", " serving.stop(MODEL_NAME)\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start new serving" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting serving with name: IrisFlowerClassifier...\n", "Serving with name: IrisFlowerClassifier successfully started" ] } ], "source": [ "serving.start(MODEL_NAME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait until serving is up and running" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "while serving.get_status(MODEL_NAME) != \"Running\":\n", " time.sleep(5) # Let the serving startup correctly\n", "time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Send Prediction Requests to the Served Model using Hopsworks REST API" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Constants" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "TOPIC_NAME = serving.get_kafka_topic(MODEL_NAME)\n", "NUM_FEATURES = 4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For making inference requests you can use the utility method `serving.make_inference_request`" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [2]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n", "{'predictions': [1]}\n", "{'predictions': [1]}\n", "{'predictions': [2]}\n", "{'predictions': [0]}\n", "{'predictions': [1]}\n", "{'predictions': [1]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}\n", "{'predictions': [0]}" ] } ], "source": [ "for i in range(20):\n", " data = {\"inputs\" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}\n", " response = serving.make_inference_request(MODEL_NAME, data)\n", " print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Monitor Prediction Requests and Responses using Kafka\n", "\n", "All prediction requestst are automatically logged to Kafka which means that you can keep track for yourr model's performance and its predictions in a scalable manner.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "config = kafka.get_kafka_default_config()\n", "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n", "consumer = Consumer(config)\n", "topics = [TOPIC_NAME]\n", "consumer.subscribe(topics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Read Kafka Avro Schema From Hopsworks and setup an Avro Reader" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [], "source": [ "json_schema = kafka.get_schema(TOPIC_NAME)\n", "avro_schema = kafka.convert_json_schema_to_avro(json_schema)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Read Lookup Table from the Feature Store for Converting Numerical Labels to Categorical" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "iris_labels_lookup = fs.get_feature_group(\"iris_labels_lookup\", 1)\n", "iris_labels_lookup_df = iris_labels_lookup.read().toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320082599,\n", "request: {\"inputs\": [[2.6821998564049085, 3.1347181390893635, 4.1322824869673305, 7.664782802694536]]},\n", "prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320082999,\n", "request: {\"inputs\": [[4.385168444555928, 4.6666622629029275, 4.221137886915623, 7.136881856956582]]},\n", "prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083197,\n", "request: {\"inputs\": [[1.4671917800589505, 3.292041049413281, 2.6843134988434856, 1.7075522843514763]]},\n", "prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083368,\n", "request: {\"inputs\": [[2.132378619200442, 2.7227978968761213, 3.36357273541271, 6.953955052737287]]},\n", "prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083482,\n", "request: {\"inputs\": [[6.422035759974221, 1.3941244796178403, 5.795176814006259, 7.644451300594337]]},\n", "prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083600,\n", "request: {\"inputs\": [[2.1564223865426646, 7.031899327566185, 2.1543742854961163, 5.763795334071594]]},\n", "prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083741,\n", "request: {\"inputs\": [[1.0107102970232713, 3.5829579964245886, 1.6719853403773581, 1.8988368274073077]]},\n", "prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084025,\n", "request: {\"inputs\": [[3.1602715794043754, 2.765328539315829, 4.768382860726245, 4.27978944541886]]},\n", "prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084148,\n", "request: {\"inputs\": [[3.7311398289041318, 1.4845084981610497, 3.2241708173955157, 1.9971560636775902]]},\n", "prediction:1, prediction_label:Versicolor, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT\n", "\n", "serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084240,\n", "request: {\"inputs\": [[5.5664113317734625, 1.1243826526542673, 2.2816725254317394, 4.297925101811321]]},\n", "prediction:1, prediction_label:Versicolor, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT" ] } ], "source": [ "for i in range(0, 10):\n", " msg = consumer.poll(timeout=1.0)\n", " if msg is not None:\n", " value = msg.value()\n", " try:\n", " event_dict = kafka.parse_avro_msg(value, avro_schema)\n", " prediction = json.loads(event_dict[\"inferenceResponse\"])[\"predictions\"][0]\n", " prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, \n", " 'variety'].iloc[0]\n", " print(\"serving: {}, version: {}, timestamp: {},\"\\\n", " \"\\nrequest: {},\\nprediction:{}, prediction_label:{}, http_response_code: {},\"\\\n", " \" model_server: {}, serving_tool: {}\\n\".format(\n", " event_dict[\"modelName\"],\n", " event_dict[\"modelVersion\"],\n", " event_dict[\"requestTimestamp\"],\n", " event_dict[\"inferenceRequest\"],\n", " prediction,\n", " prediction_label,\n", " event_dict[\"responseHttpCode\"],\n", " event_dict[\"modelServer\"],\n", " event_dict[\"servingTool\"]\n", " ))\n", " except Exception as e:\n", " print(\"A message was read but there was an error parsing it\")\n", " print(e)\n", " else:\n", " print(\"timeout.. no more messages to read from topic\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }