{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Online Feature Serving\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
4application_1614033055547_0006pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "# Create a connection\n", "connection = hsfs.connection()\n", "# Get the feature store handle for the project's feature store\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "An inference vector is only available for training datasets generated by online enabled feature groups each with at least 1 primary key.\n", "In the notebook [training_datasets.ipynb](../basics/training_datasets.ipynb), we have already created online enabled feature group `sales_fg` with version 3. \n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "sales_fg_meta = fs.get_feature_group(name=\"sales_fg\", version=3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`store_fg` and `exogenous_fg` are not yet online enabled. Lets create new, online enabled version of these feature groups " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "store_fg = fs.get_feature_group(name=\"store_fg\", version=1).read()\n", "store_fg_meta = fs.create_feature_group(name=\"store_fg\",\n", " version=2,\n", " primary_key=['store'],\n", " online_enabled=True,\n", " description=\"Store related features\",\n", " time_travel_format=None,\n", " statistics_config={\"enabled\": True, \"histograms\": True, \"correlations\": True, \"exact_uniqueness\": True})\n", "store_fg_meta.save(store_fg)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "exogenous_fg = fs.get_feature_group(name=\"exogenous_fg\", version=1).read()\n", "exogenous_fg_meta = fs.create_feature_group(name=\"exogenous_fg\",\n", " version=2,\n", " primary_key=['store', 'date'],\n", " online_enabled=True,\n", " description=\"External features that influence sales, but are not under the control of the distribution chain\",\n", " time_travel_format=None)\n", "exogenous_fg_meta.save(exogenous_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In addition to containing only online enabled feature groups each with at least 1 primary key, training datasets must me generated from hsfs query object to be able to build inference vector during model serving. " ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "sales_fg_meta = fs.get_feature_group(name=\"sales_fg\", version=3)\n", "store_fg_meta = fs.get_feature_group(name=\"store_fg\", version=2)\n", "exogenous_fg_meta = fs.get_feature_group(name=\"exogenous_fg\", version=2)\n", "\n", "\n", "query = sales_fg_meta.select([\"weekly_sales\", \"sales_last_month_store\", \"sales_last_quarter_store\", \n", " \"sales_last_year_store_dep\", \"sales_last_month_store_dep\", \"sales_last_quarter_store_dep\", \n", " \"sales_last_six_month_store_dep\", \"sales_last_six_month_store\", \"sales_last_year_store\"])\\\n", " .join(store_fg_meta.select([\"num_depts\", \"size\"]))\\\n", " .join(exogenous_fg_meta.select(['fuel_price']))\n", "\n", "td_meta = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"tfrecord\",\n", " splits={'train': 0.7, 'test': 0.2, 'validate': 0.1}, \n", " version=8)\n", "\n", "td_meta.save(query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "hsfs TrainingDataset object provides utility method `get_serving_vector` to build serving vector from online feature store. This method method expects dict object where keys are feature group primary key names. \n", "#### To identify which primary key names are used for this training dataset query use `serving_keys` method" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'store', 'date', 'dept'}" ] } ], "source": [ "td_meta = fs.get_training_dataset(\"sales_model\", 8)\n", "#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself\n", "td_meta.init_prepared_statement() \n", "td_meta.serving_keys" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For demo purposes lets prepare list of primary key values that we are interested in to buils feature vectore from online feature store" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "incoming_data = [(31,\"2010-02-05\",47),\n", " (2,\"2010-02-12\",92),\n", " (20,\"2010-03-05\",11),\n", " (4,\"2010-04-02\",52),\n", " (12,\"2010-05-07\",27)\n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get feature vector of primary keys in `incoming_data`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Iterate over incoming_data and use `td_meta.get_serving_vector` to retrieve serving vector for each primary key combination" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0.0, 0.0, 0.0, 0.0, 0.0, 86.0, 0.0, 0.0, 0.0, 76, 203750, 2.572]\n", "[2136989.4600000004, 178982.89, 178982.89, 178982.89, 2136989.4600000004, 182142.89, 178982.89, 2136989.4600000004, 2136989.4600000004, 78, 202307, 2.548]\n", "[8570247.079999998, 119627.54000000001, 119627.54000000001, 119627.54000000001, 8570247.079999998, 21309.54, 119627.54000000001, 8570247.079999998, 8570247.079999998, 78, 203742, 2.777]\n", "[15824390.549999991, 31183.239999999998, 15091.66, 31183.239999999998, 15824390.549999991, 3801.43, 31183.239999999998, 7525350.19, 15824390.549999991, 78, 205863, 2.74]\n", "[12237331.150000002, 30995.239999999998, 7391.76, 30995.239999999998, 13337377.520000005, 1764.75, 27481.239999999998, 3947290.049999999, 13337377.520000005, 75, 112238, 3.127]" ] } ], "source": [ "for i in incoming_data:\n", " serving_vector = td_meta.get_serving_vector({'store': i[0],'date': i[1], 'dept': i[2]})\n", " print (serving_vector)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }