{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Store Quick Start\n", "\n", "This notebook gives you a quick overview of how you can intergrate the feature store service on Hopsworks into your machine learning pipeline. We'll go over four steps:\n", "\n", "1. Generate some sample data (rather than reading data from disk just to make this notebook stand-alone)\n", "2. Do some feature engineering on the data\n", "3. **Save the engineered features to the feature store**\n", "4. **Select a group of the features from the feature store and create a managed training dataset of tf records in the feature store**\n", "5. Train a model on the training dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Imports\n", "\n", "We'll use numpy and pandas for data generation, pyspark for feature engineering, tensorflow and keras for model training, and the hops `featurestore` library for interacting with the feature store." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
13application_1549128638243_0017pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "import numpy as np\n", "import random\n", "import pandas as pd\n", "from pyspark.sql import SQLContext\n", "sqlContext = SQLContext(sc)\n", "from pyspark.sql import Row\n", "from hops import featurestore\n", "import tensorflow as tf\n", "from tensorflow import keras\n", "from tensorflow.keras import layers\n", "from hops import experiment\n", "from tensorflow.python.keras.callbacks import TensorBoard\n", "from hops import tensorboard" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generate Sample Data\n", "\n", "Lets generate two sample datasets:\n", "\n", "1. `houses_for_sale_data`:\n", "\n", "```bash\n", "+-------+--------+------------------+------------------+------------------+\n", "|area_id|house_id| house_worth| house_age| house_size|\n", "+-------+--------+------------------+------------------+------------------+\n", "| 1| 0| 11678.15482418699|133.88670106643886|366.80067322738535|\n", "| 1| 1| 2290.436167500643|15994.969706808222|195.84014889823976|\n", "| 1| 2| 8380.774578431328|1994.8576926471007|1544.5164614303735|\n", "| 1| 3|11641.224696102923|23104.501275562343|1673.7222604337876|\n", "| 1| 4| 5382.089422436954| 13903.43637058141| 274.2912104765028|\n", "+-------+--------+------------------+------------------+------------------+\n", "\n", " |-- area_id: long (nullable = true)\n", " |-- house_id: long (nullable = true)\n", " |-- house_worth: double (nullable = true)\n", " |-- house_age: double (nullable = true)\n", " |-- house_size: double (nullable = true)\n", "```\n", "2. `houses_sold_data``\n", "```bash\n", "+-------+-----------------+-----------------+------------------+\n", "|area_id|house_purchase_id|number_of_bidders| sold_for_amount|\n", "+-------+-----------------+-----------------+------------------+\n", "| 1| 0| 0| 70073.06059070028|\n", "| 1| 1| 15| 146.9198329740602|\n", "| 1| 2| 6| 594.802165433149|\n", "| 1| 3| 10| 77187.84123130841|\n", "| 1| 4| 1|110627.48922722359|\n", "+-------+-----------------+-----------------+------------------+\n", "\n", " |-- area_id: long (nullable = true)\n", " |-- house_purchase_id: long (nullable = true)\n", " |-- number_of_bidders: long (nullable = true)\n", " |-- sold_for_amount: double (nullable = true)\n", "```\n", "\n", "We'll use this data for predicting what a house is sold for based on features about the **area** where the house is." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generation of `houses_for_sale_data`" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "area_ids = list(range(1,51))\n", "house_sizes = []\n", "house_worths = []\n", "house_ages = []\n", "house_area_ids = []\n", "for i in area_ids:\n", " for j in list(range(1,100)):\n", " house_sizes.append(abs(np.random.normal()*1000)/i)\n", " house_worths.append(abs(np.random.normal()*10000)/i)\n", " house_ages.append(abs(np.random.normal()*10000)/i)\n", " house_area_ids.append(i)\n", "house_ids = list(range(len(house_area_ids)))\n", "houses_for_sale_data = pd.DataFrame({\n", " 'area_id':house_area_ids,\n", " 'house_id':house_ids,\n", " 'house_worth': house_worths,\n", " 'house_age': house_ages,\n", " 'house_size': house_sizes\n", " })\n", "houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------------+--------+------------------+------------------+\n", "|area_id| house_age|house_id| house_size| house_worth|\n", "+-------+------------------+--------+------------------+------------------+\n", "| 1|2197.6538783413644| 0|1540.6164197194921| 936.4639471590823|\n", "| 1| 4575.640957556092| 1| 291.299158164445| 3827.151581426811|\n", "| 1|3056.2417991681514| 2|1051.5145743403905|22857.846837349654|\n", "| 1| 12768.38793285203| 3| 944.4714646085961| 3590.890018337794|\n", "| 1| 4722.828106705723| 4| 259.1621123268429|10525.711844055819|\n", "+-------+------------------+--------+------------------+------------------+\n", "only showing top 5 rows" ] } ], "source": [ "houses_for_sale_data_spark_df.show(5)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- area_id: long (nullable = true)\n", " |-- house_age: double (nullable = true)\n", " |-- house_id: long (nullable = true)\n", " |-- house_size: double (nullable = true)\n", " |-- house_worth: double (nullable = true)" ] } ], "source": [ "houses_for_sale_data_spark_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generation of `houses_sold_data`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "house_purchased_amounts = []\n", "house_purchases_bidders = []\n", "house_purchases_area_ids = []\n", "for i in area_ids:\n", " for j in list(range(1,1000)):\n", " house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)\n", " house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))\n", " house_purchases_area_ids.append(i)\n", "house_purchase_ids = list(range(len(house_purchases_bidders)))\n", "houses_sold_data = pd.DataFrame({\n", " 'area_id':house_purchases_area_ids,\n", " 'house_purchase_id':house_purchase_ids,\n", " 'number_of_bidders': house_purchases_bidders,\n", " 'sold_for_amount': house_purchased_amounts\n", " })\n", "houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+-----------------+-----------------+------------------+\n", "|area_id|house_purchase_id|number_of_bidders| sold_for_amount|\n", "+-------+-----------------+-----------------+------------------+\n", "| 1| 0| 8|103136.86683870589|\n", "| 1| 1| 4| 106496.0570260969|\n", "| 1| 2| 5|27771.654964553803|\n", "| 1| 3| 3| 195575.7653004581|\n", "| 1| 4| 20| 62427.45592116985|\n", "+-------+-----------------+-----------------+------------------+\n", "only showing top 5 rows" ] } ], "source": [ "houses_sold_data_spark_df.show(5)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- area_id: long (nullable = true)\n", " |-- house_purchase_id: long (nullable = true)\n", " |-- number_of_bidders: long (nullable = true)\n", " |-- sold_for_amount: double (nullable = true)" ] } ], "source": [ "houses_sold_data_spark_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Engineering\n", "\n", "Lets generate some aggregate features such as sum and averages from our datasets. \n", "\n", "1. `houses_for_sale_features`:\n", "\n", "```bash\n", " |-- area_id: long (nullable = true)\n", " |-- avg_house_age: double (nullable = true)\n", " |-- avg_house_size: double (nullable = true)\n", " |-- avg_house_worth: double (nullable = true)\n", " |-- sum_house_age: double (nullable = true)\n", " |-- sum_house_size: double (nullable = true)\n", " |-- sum_house_worth: double (nullable = true)\n", "```\n", "\n", "2. `houses_sold_features`\n", "\n", "```bash\n", " |-- area_id: long (nullable = true)\n", " |-- avg_num_bidders: double (nullable = true)\n", " |-- avg_sold_for: double (nullable = true)\n", " |-- sum_number_of_bidders: long (nullable = true)\n", " |-- sum_sold_for_amount: double (nullable = true)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate Features From `houses_for_sale_data`" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").sum()\n", "count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").count()\n", "sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, \"area_id\")\n", "sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \\\n", " .withColumnRenamed(\"sum(house_age)\", \"sum_house_age\") \\\n", " .withColumnRenamed(\"sum(house_worth)\", \"sum_house_worth\") \\\n", " .withColumnRenamed(\"sum(house_size)\", \"sum_house_size\") \\\n", " .withColumnRenamed(\"count\", \"num_rows\")\n", "def compute_average_features_house_for_sale(row):\n", " avg_house_worth = row.sum_house_worth/float(row.num_rows)\n", " avg_house_size = row.sum_house_size/float(row.num_rows)\n", " avg_house_age = row.sum_house_age/float(row.num_rows)\n", " return Row(\n", " sum_house_worth=row.sum_house_worth, \n", " sum_house_age=row.sum_house_age,\n", " sum_house_size=row.sum_house_size,\n", " area_id = row.area_id,\n", " avg_house_worth = avg_house_worth,\n", " avg_house_size = avg_house_size,\n", " avg_house_age = avg_house_age\n", " )\n", "houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(\n", " lambda row: compute_average_features_house_for_sale(row)\n", ").toDF()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- area_id: long (nullable = true)\n", " |-- avg_house_age: double (nullable = true)\n", " |-- avg_house_size: double (nullable = true)\n", " |-- avg_house_worth: double (nullable = true)\n", " |-- sum_house_age: double (nullable = true)\n", " |-- sum_house_size: double (nullable = true)\n", " |-- sum_house_worth: double (nullable = true)" ] } ], "source": [ "houses_for_sale_features_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate Features from `houses_sold_data`" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "sum_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").sum()\n", "count_houses_sold_df = houses_sold_data_spark_df.groupBy(\"area_id\").count()\n", "sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, \"area_id\")\n", "sum_count_houses_sold_df = sum_count_houses_sold_df \\\n", " .withColumnRenamed(\"sum(number_of_bidders)\", \"sum_number_of_bidders\") \\\n", " .withColumnRenamed(\"sum(sold_for_amount)\", \"sum_sold_for_amount\") \\\n", " .withColumnRenamed(\"count\", \"num_rows\")\n", "def compute_average_features_houses_sold(row):\n", " avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)\n", " avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)\n", " return Row(\n", " sum_number_of_bidders=row.sum_number_of_bidders, \n", " sum_sold_for_amount=row.sum_sold_for_amount,\n", " area_id = row.area_id,\n", " avg_num_bidders = avg_num_bidders,\n", " avg_sold_for = avg_sold_for\n", " )\n", "houses_sold_features_df = sum_count_houses_sold_df.rdd.map(\n", " lambda row: compute_average_features_houses_sold(row)\n", ").toDF()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- area_id: long (nullable = true)\n", " |-- avg_num_bidders: double (nullable = true)\n", " |-- avg_sold_for: double (nullable = true)\n", " |-- sum_number_of_bidders: long (nullable = true)\n", " |-- sum_sold_for_amount: double (nullable = true)" ] } ], "source": [ "houses_sold_features_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save Features to the Feature Store\n", "\n", "The Featue store has an abstraction of a **feature group** which is a set of features that naturally belong together that typically are computed using the same feature engineering job and the same raw dataset. \n", "\n", "Lets create two feature groups:\n", "\n", "1. `houses_for_sale_featuregroup`\n", "\n", "2. `houses_sold_featuregroup`" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Running sql: use demo_featurestore_admin000_featurestore" ] } ], "source": [ "featurestore.create_featuregroup(\n", " houses_for_sale_features_df,\n", " \"houses_for_sale_featuregroup\",\n", " description=\"aggregate features of houses for sale per area\",\n", " descriptive_statistics=False,\n", " feature_correlation=False,\n", " feature_histograms=False,\n", " cluster_analysis=False\n", ")" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Running sql: use demo_featurestore_admin000_featurestore" ] } ], "source": [ "featurestore.create_featuregroup(\n", " houses_sold_features_df,\n", " \"houses_sold_featuregroup\",\n", " description=\"aggregate features of sold houses per area\",\n", " descriptive_statistics=False,\n", " feature_correlation=False,\n", " feature_histograms=False,\n", " cluster_analysis=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a Training Dataset\n", "\n", "The feature store has an abstraction of a **training dataset**, which is a dataset with a set of features (potentially from many different feature groups) and labels (in case of supervised learning). \n", "\n", "Let's create a training dataset called *predict_house_sold_for_dataset* using the following features:\n", "\n", "- avg_house_age\n", "- avg_house_size\n", "- avg_house_worth\n", "- avg_num_bidders\n", "\n", "and the target variable is:\n", "\n", "- avg_sold_for" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Running sql: use demo_featurestore_admin000_featurestore\n", "Running sql: SELECT avg_house_age, avg_house_size, avg_house_worth, avg_num_bidders, avg_sold_for FROM houses_for_sale_featuregroup_1 JOIN houses_sold_featuregroup_1 ON houses_for_sale_featuregroup_1.`area_id`=houses_sold_featuregroup_1.`area_id`" ] } ], "source": [ "features_df = featurestore.get_features([\"avg_house_age\", \"avg_house_size\", \n", " \"avg_house_worth\", \"avg_num_bidders\", \n", " \"avg_sold_for\"])\n", "featurestore.create_training_dataset(\n", " features_df, \"predict_house_sold_for_dataset\",\n", " data_format=\"tfrecords\",\n", " descriptive_statistics=False,\n", " feature_correlation=False,\n", " feature_histograms=False,\n", " cluster_analysis=False\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use the Training Dataset to Train a Model\n", "\n", "When creating training datasets through the feature store API, the training dataset becomes *managed* by Hopsworks, meaning that it will get automatic versioning, documentation, API support, and analysis. \n", "\n", "Let's create a simple neural network and train it for the regression task of predicting the target variable `avg_sold_for`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Constants" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "NUM_EPOCHS = 30\n", "BATCH_SIZE = 10\n", "LEARNING_RATE = 0.001\n", "SHUFFLE_BUFFER_SIZE = 10000\n", "INPUT_SHAPE=4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Tensorflow Dataset" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "def create_tf_dataset():\n", " dataset_dir = featurestore.get_training_dataset_path(\"predict_house_sold_for_dataset\")\n", " input_files = tf.gfile.Glob(dataset_dir + \"/part-r-*\")\n", " dataset = tf.data.TFRecordDataset(input_files)\n", " tf_record_schema = featurestore.get_training_dataset_tf_record_schema(\"predict_house_sold_for_dataset\")\n", " feature_names = [\"avg_house_age\", \"avg_house_size\", \"avg_house_worth\", \"avg_num_bidders\"]\n", " label_name = \"avg_sold_for\"\n", " def decode(example_proto):\n", " example = tf.parse_single_example(example_proto, tf_record_schema)\n", " x = []\n", " for feature_name in feature_names:\n", " x.append(example[feature_name])\n", " y = [tf.cast(example[label_name], tf.float32)]\n", " return x,y\n", " dataset = dataset.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE).repeat(NUM_EPOCHS)\n", " return dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define Model" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "def create_model():\n", " model = tf.keras.Sequential([\n", " layers.Dense(64, activation='relu', input_shape = (INPUT_SHAPE,), batch_size=BATCH_SIZE),\n", " layers.Dense(64, activation='relu'),\n", " layers.Dense(1)])\n", " return model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define the Train Function\n", "By refactoring our code into functions it makes it easier to manage and also means that we can utilize the **Experiments** service in Hopsworks." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "def train_fn():\n", " dataset = create_tf_dataset()\n", " model = create_model()\n", " model.compile(optimizer=tf.train.AdamOptimizer(LEARNING_RATE), loss='mse', metrics=['accuracy'])\n", " tb_callback = TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0,\n", " write_graph=True, write_images=True)\n", " callbacks = [tb_callback]\n", " callbacks.append(keras.callbacks.ModelCheckpoint(tensorboard.logdir() + '/checkpoint-{epoch}.h5',\n", " monitor='acc', verbose=0, save_best_only=True))\n", " history = model.fit(dataset, epochs=NUM_EPOCHS, steps_per_epoch = 5, callbacks=callbacks)\n", " return -float(history.history[\"loss\"][-1])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start The Training Process Using a Reproducible Experiment" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Finished Experiment \n", "\n", "u'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Experiments/application_1549128638243_0017/launcher/run.2'" ] } ], "source": [ "experiment.launch(train_fn, \n", " name=\"feature store quick start example\", \n", " local_logdir=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "ipython3" } }, "nbformat": 4, "nbformat_minor": 2 }