{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### HSFS training datasets\n", "\n", "Training datasets is the third building block of the Hopsworks Feature Store. Data scientists can query the feature store (see [feature_exploration](./feature_exploration.ipynb) notebook) and materialize their query in training datasets.\n", "\n", "Training datasets can be saved in a ML framework friendly format (eg. TfRecords, CSV, Numpy) and then be fed to a machine learning model for training.\n", "\n", "Training datasets can also be stored on external storage systems like Amazon S3 or GCS to be read by external model training platforms.\n", "\n", "As with the previous notebooks, the first step is to establish a connection with the Hopsworks feature store and get the feature store handle" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
6application_1604957327609_0010pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "# Create a connection\n", "connection = hsfs.connection()\n", "# Get the feature store handle for the project's feature store\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a training dataset from a query\n", "\n", "In the previous notebook ([feature_exploration](./feature_exploration.ipynb)) we walked through how to explore and query the Hopsworks feature store using HSFS. We can use the queries produced in the previous notebook to create a training dataset." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "VersionWarning: No version provided for getting feature group `sales_fg`, defaulting to `1`.\n", "VersionWarning: No version provided for getting feature group `exogenous_fg`, defaulting to `1`." ] } ], "source": [ "sales_fg = fs.get_feature_group('sales_fg')\n", "exogenous_fg = fs.get_feature_group('exogenous_fg')\n", "\n", "query = sales_fg.select_all()\\\n", " .join(exogenous_fg.select(['fuel_price', 'unemployment', 'cpi']))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As for the feature groups, we first need to generate a metadata object representing the training dataset. After that, we can call the `save()` method to persist it in the Hopsworks feature store.\n", "Different file formats are available: `csv`, `tfrecord`, `npy`, `hdf5`, `avro`, `parquet`, `orc`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"csv\",\n", " version=1)\n", "\n", "td.save(query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Pass write options\n", "\n", "When you save a training dataset, you have the possibility of specifying additional parameters to the Spark writer. For instance, in the example below, we are adding the headers to the CSV file." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"csv\",\n", " version=2)\n", "\n", "td.save(query, {'hearder': 'true'})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Split the training dataset\n", "\n", "If you are training a model, you might want to split the training datasets into different slices (training, test and validation). HSFS allows you to specify the split sizes. You can also provide a seed for the random splitter, if you want to reproduce a training dataset." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"csv\",\n", " splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},\n", " version=3)\n", "\n", "td.save(query, {'hearder': 'true'})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Save the dataset on an external storage system\n", "\n", "If you are training your model on an external machine learning platform (e.g. SageMaker), you might want to save the training dataset on an external storage system (e.g. S3). You can take advantage of the Hopsworks storage connectors (see [documentation](https://hopsworks.readthedocs.io/en/latest/featurestore/guides/featurestore.html#configuring-storage-connectors-for-the-feature-store)).\n", "\n", "Assuming you have created an S3 storage connector name `td_bucket_connector`, you can create an external training dataset as follows:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "td_bucket_connector = fs.get_storage_connector(\"td_bucket_connector\", \"S3\")\n", "\n", "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"csv\",\n", " storage_connector=td_bucket_connector,\n", " version=4)\n", "\n", "### This code is expected to fail if you connector is not configured properly\n", "td.save(query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Replay the query that generated the training dataset\n", "\n", "If you created a training dataset from a query object, then you can ask the feature store to return the set of features (in order) and the set of joins that generated. \n", "This feature is useful if you are serving a model in production and you want to augment the inference vector with features taken from the online feature store" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SELECT `fg0`.`sales_last_quarter_store_dep`, `fg0`.`store`, `fg0`.`sales_last_month_store_dep`, `fg0`.`sales_last_year_store`, `fg0`.`sales_last_quarter_store`, `fg0`.`sales_last_six_month_store_dep`, `fg0`.`sales_last_year_store_dep`, `fg0`.`sales_last_month_store`, `fg0`.`dept`, `fg0`.`sales_last_six_month_store`, `fg0`.`weekly_sales`, `fg0`.`is_holiday`, `fg0`.`date`, `fg1`.`fuel_price`, `fg1`.`unemployment`, `fg1`.`cpi`\n", "FROM `demo_fs_meb10000`.`sales_fg_1` `fg0`\n", "INNER JOIN `demo_fs_meb10000`.`exogenous_fg_1` `fg1` ON `fg0`.`date` = `fg1`.`date` AND `fg0`.`store` = `fg1`.`store`\n", "VersionWarning: No version provided for getting training dataset `sales_model`, defaulting to `1`." ] } ], "source": [ "td = fs.get_training_dataset(name=\"sales_model\")\n", "print(td.query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a training dataset from a DataFrame\n", "\n", "If you need to apply additional transformations before creating a training dataset, you can create one from a Spark DataFrame instead of using a query.\n", "The `create_training_dataset` part stays the same, the difference is that we are going to pass a DataFrame to the `save()` method.\n", "\n", "As you have applied additional transformations between the query object and the training dataset, we won't be able to re-play the query for this specific training dataset." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import functions as F\n", "df = query.read()\n", "# Apply additional transformations\n", "df = (df.withColumn(\"is_holiday\", F.when(F.col(\"is_holiday\") == \"true\", 1 ).otherwise(0))\n", " .withColumn(\"unemployment\", F.col(\"unemployment\").cast(\"double\")) \n", " .withColumn(\"cpi\", F.col(\"cpi\").cast(\"double\"))\n", " .drop(\"date\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### save as `csv` format" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"csv\",\n", " splits={'train': 0.7, 'test': 0.2, 'validate': 0.1}, \n", " version=5)\n", "\n", "td.save(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### save as `tfrecord` format" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"sales_model\",\n", " description=\"Dataset to train the sales model\",\n", " data_format=\"tfrecord\",\n", " splits={'train': 0.7, 'test': 0.2, 'validate': 0.1}, \n", " version=6)\n", "\n", "td.save(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add a tag to a training dataset\n", "\n", "As for feature groups, you can add tags to a training dataset. Tags are indexed and you can search for them in the Hopsworks feature store UI. Tags are an useful tool to catalog the feature store. The `value` field can be omitted. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Tagging Training Datasets\n", "The feature store enables users to attach tags to training dataset in order to make them discoverable across feature\n", "stores. A tag is a simple {key: value} association, providing additional information about the data, such as for\n", "example geographic origin. This is useful in an organization as it makes easier to discover for data scientists, reduces\n", "duplicated work in terms of for example data preparation. The tagging feature is only available in the enterprise version." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Define tags that can be attached\n", "The first step is to define a set of tags that can be attached. Such as for example “Country” to tag data as being from\n", "a certain geographic location and “Sport” to further associate a type of Sport with the data.\n", "\n", "![Define tags that can be attached](images/creating_tags.gif)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Attach tags using the UI\n", "Tags can then be attached using the feature store UI or programmatically using the API.\n", "Attaching tags to feature group.\n", "\n", "![Attach tags using the UI](images/attach_tags.gif)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "td = fs.get_training_dataset(\"sales_model\", 5)\n", "td.add_tag(\"model\", value=\"sales\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From the HSFS API you can also list all the tags associated with a specific training dataset" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[{'model': 'sales'}]" ] } ], "source": [ "td = fs.get_training_dataset(\"sales_model\", 5)\n", "td.get_tag()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read a training dataset\n", "\n", "As for feature groups, you can call the methods `show()` method to get a preview of the training dataset and `read()` to get a Spark DataFrame of it." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+\n", "|sales_last_quarter_store_dep|store|sales_last_month_store_dep|sales_last_year_store|sales_last_quarter_store|sales_last_six_month_store_dep|sales_last_year_store_dep|sales_last_month_store|dept|sales_last_six_month_store|weekly_sales|is_holiday| date|fuel_price|unemployment| cpi|\n", "+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+\n", "| 0.0| 20| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 55| 0.0| 32362.95| false|2010-02-05 00:00:00| 2.784| 8.187|204.2471935|\n", "| 0.0| 20| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 94| 0.0| 63787.83| false|2010-02-05 00:00:00| 2.784| 8.187|204.2471935|\n", "| 0.0| 20| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 22| 0.0| 17597.83| false|2010-02-05 00:00:00| 2.784| 8.187|204.2471935|\n", "| 0.0| 20| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 30| 0.0| 9488.37| false|2010-02-05 00:00:00| 2.784| 8.187|204.2471935|\n", "| 0.0| 20| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 2| 0.0| 85812.69| false|2010-02-05 00:00:00| 2.784| 8.187|204.2471935|\n", "+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+\n", "only showing top 5 rows" ] } ], "source": [ "td = fs.get_training_dataset(\"sales_model\", 2)\n", "td.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you have splitted your training dataset, you can also read a single split" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "295125" ] } ], "source": [ "td = fs.get_training_dataset(\"sales_model\", 6)\n", "td.read(\"train\").count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Input the training dataset to a model training loop\n", "If you are training a model, HSFS provides `tf_data` method that returns `TFDataEngine` object with utility methods to read training dataset as `tf.data.Dataset` object to read the training dataset and feed it to a model training loop efficiently. \n", "* Currently `TFDataEngine` provides 2 utility methods `tf_record_dataset` and `tf_csv_dataset` for reading `.tfrecord` and `.csv` files, respectivelly.\n", "* Both methods support only following feature types `string`, `short`, `int`, `long`, `float` and `double`.\n", "* In both methods you can set `process` argument to `True` and they will return `PrefetchDataset` ready to input to model training loop.\n", "* If you would like to apply your own logic to feature transformation using `tf.data.Dataset` then set `process` argument to `False`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### proccess using `tf_record_dataset`:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "train_input = td.tf_data(target_name='weekly_sales', split='train', is_training=True)\n", "train_input_processed = train_input.tf_record_dataset(process=True, batch_size =32, num_epochs=1)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "train_input_processed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Apply custom logic to `tf_record_dataset`:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "td = fs.get_training_dataset(\"sales_model\", 6)\n", "\n", "train_input = td.tf_data(target_name=None, split='train', is_training=True)\n", "train_input_not_processed = train_input.tf_record_dataset()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "train_input_not_processed" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "batch_size = 32\n", "num_epochs = 1 \n", "\n", "def custom_impl(example):\n", " feature_names = [td_feature.name for td_feature in td.schema] \n", " label_name = feature_names.pop(feature_names.index('weekly_sales'))\n", " x = [tf.cast(example[feature_name], tf.float32) for feature_name in feature_names]\n", " y = example[label_name]\n", " return x,y\n", "\n", "train_input_custum_processed = train_input_not_processed.map(lambda value: custom_impl(value))\\\n", " .shuffle(num_epochs * batch_size)\\\n", " .repeat(num_epochs * batch_size)\\\n", " .cache()\\\n", " .batch(batch_size, drop_remainder=True)\\\n", " .prefetch(tf.data.experimental.AUTOTUNE)\n" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "train_input_custum_processed" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }