{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Feature Engineering/Ingestion\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Store Tour - Python API\n", "\n", "This set of notebooks contain a tour/reference for the Hopsworks feature store Scala/Java API. The notebook is meant to be run from feature store demo projects on Hopsworks. We will go over best practices for using the API as well as common pitfalls.\n", "\n", "There are 3 notebooks:\n", "- Feature groups: Discover how to work with features and feature groups, both offline and online\n", "- [Feature Exploration](./feature_exploration.ipynb): Discover how to join features from different feature groups\n", "- [Training datasets](./training_datasets.ipynb): Discover how to save training datasets to be used by ML models\n", "\n", "The data required to run this tour is located in a zip file called `archive.zip` in the same directory as the notebooks. Head to the Dataset browser on Hopsworks and unzip it.\n", "\n", "## Features and Feature Groups\n", "\n", "The Hopsworks feature store is a centralized repository, within an organization, to manage machine learning features. A feature is a measurable property of a phenomenon. It could be a simple value such as the age of a customer, or it could be an aggregated value, such as the number of transactions made by a customer in the last 30 days.\n", "\n", "A feature is not restricted to an numeric value, it could be a string representing an address, or an image.\n", "\n", "![Feature Store Overview](../images/overview.svg \"Feature Store Overview\")\n", "\n", "A feature store is not a pure storage service, it goes hand-in-hand with feature computation. Feature engineering is the process of transforming raw data into a format that is compatible and understandable for predictive models.\n", "\n", "In this notebook we are going to focus on the left side of the picture above. In particular how data engeneers can create features and push them to the Hopsworks feature store so that they are available to the data scientists\n", "\n", "### HSFS library\n", "\n", "The Hopsworks feature feature store library is called `hsfs` (**H**opswork**s** **F**eature **S**tore). \n", "The library is Apache V2 licensed and available [here](https://github.com/logicalclocks/feature-store-api). The library is currently available for Python and JVM languages such as Scala and Java.\n", "In this notebook we are going to cover Python part.\n", "\n", "You can find the complete documentation of the library here: \n", "\n", "The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you'll be working with. \n", "\n", "By default `connection.get_feature_store()` returns the feature store of the project you are working with. However, it accepts also a project name as parameter to select a different feature store. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
0application_1612782748969_0002pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "# Create a connection\n", "connection = hsfs.connection()\n", "# Get the feature store handle for the project's feature store\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Engineering\n", "\n", "We are going to use a dataset containing information related to a chain of deparment stores. The dataset is taken from [Kaggle](https://www.kaggle.com/manjeetsingh/retaildataset?select=Features+data+set.csv).\n", "\n", "We are going to create 3 feature groups:\n", "- `stores_fg`: it's going to contain features related to the store itself. Mainly the category, the number of deparmetns and the size.\n", "- `sales_fg`: it's going to contain sales features for each store/deparment over the weeks. \n", "- `exogenous_fg`: it's going to contain features which are not related to the stores themselves, but they have an effect on sales. These features are, for instance, the gas price, the unemployment rate, temperature in the area and so on." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from hops import hdfs\n", "from pyspark.sql import functions as F\n", "\n", "stores_csv = spark.read\\\n", " .option(\"inferSchema\", \"true\")\\\n", " .option(\"header\", \"true\")\\\n", " .format(\"csv\")\\\n", " .load(\"hdfs:///Projects/{}/Jupyter/hsfs/archive/stores data-set.csv\".format(hdfs.project_name()))\n", "\n", "exogenous_csv = spark.read\\\n", " .option(\"inferSchema\", \"true\")\\\n", " .option(\"header\", \"true\")\\\n", " .format(\"csv\")\\\n", " .load(\"hdfs:///Projects/{}/Jupyter/hsfs/archive/Features data set.csv\".format(hdfs.project_name()))\n", "\n", "sales_csv = spark.read\\\n", " .option(\"inferSchema\", \"true\")\\\n", " .option(\"header\", \"true\")\\\n", " .format(\"csv\")\\\n", " .load(\"hdfs:///Projects/{}/Jupyter/hsfs/archive/sales data-set.csv\".format(hdfs.project_name()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Feature Engineering Stores" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "stores_depts_count = stores_csv\\\n", " .join(sales_csv, \"store\")\\\n", " .groupBy(\"store\")\\\n", " .agg(F.countDistinct(\"dept\"))\\\n", " .withColumnRenamed(\"count(dept)\", \"num_depts\")\n", "\n", "stores_fg = stores_csv\\\n", " .join(stores_depts_count, \"store\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create `store_fg` feature group\n", "\n", "Create a feature group named `store_fg`. The store is the primary key uniquely identifying all the remaining features in this feature group." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "store_fg_meta = fs.create_feature_group(name=\"store_fg\",\n", " version=1,\n", " primary_key=['store'],\n", " description=\"Store related features\",\n", " time_travel_format=None,\n", " statistics_config={\"enabled\": True, \"histograms\": True, \"correlations\": True})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Up to this point we have just created the metadata object representing the feature group. However, we haven't saved the feature group in the feature store yet. To do so, we can call the method `save` on the metadata object created in the cell above." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "store_fg_meta.save(stores_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Feature Engineering Sales" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Window\n", "days = lambda i: i * 86400 \n", "\n", "sales_df = sales_csv.withColumn('date', F.to_date(\"date\", 'dd/MM/yyy'))\\\n", " .withColumn('timestamp', F.unix_timestamp(\"date\"))\n", "\n", "# Define aggregation window to compute sales performances over the past period of time\n", "last_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-30), days(-1))\n", "last_quarter_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-90), days(-1))\n", "last_six_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-180), days(-1))\n", "last_year_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-365), days(-1))\n", "\n", "last_month_window_store = Window.partitionBy('store').orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-30), days(-1))\n", "last_quarter_window_store = Window.partitionBy('store').orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-90), days(-1))\n", "last_six_month_window_store = Window.partitionBy('store').orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-180), days(-1))\n", "last_year_window_store = Window.partitionBy('store').orderBy(F.col(\"timestamp\").cast(\"long\")).rangeBetween(days(-365), days(-1))\n", "\n", "# Build feature group dataframe\n", "sales_fg = sales_df.withColumn(\"sales_last_month_store_dep\", F.sum(\"weekly_sales\").over(last_month_window_store_dep))\\\n", " .withColumn(\"sales_last_quarter_store_dep\", F.sum(\"weekly_sales\").over(last_quarter_window_store_dep))\\\n", " .withColumn(\"sales_last_six_month_store_dep\", F.sum(\"weekly_sales\").over(last_six_month_window_store_dep))\\\n", " .withColumn(\"sales_last_year_store_dep\", F.sum(\"weekly_sales\").over(last_year_window_store_dep))\\\n", " .withColumn(\"sales_last_month_store\", F.sum(\"weekly_sales\").over(last_month_window_store))\\\n", " .withColumn(\"sales_last_quarter_store\", F.sum(\"weekly_sales\").over(last_quarter_window_store))\\\n", " .withColumn(\"sales_last_six_month_store\", F.sum(\"weekly_sales\").over(last_six_month_window_store))\\\n", " .withColumn(\"sales_last_year_store\", F.sum(\"weekly_sales\").over(last_year_window_store))\\\n", " .drop(\"timestamp\")\\\n", " .fillna(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create `sales_fg` feature group\n", "\n", "Differently from the `store_fg`, for the `sales_fg` we are going to define a composite primary key. This means that each entry in the `sales_fg` is going to be uniquely identified by the store, the department and the week. In this case we are going to specify also a partition key. Partitioning is a tool available at your disposal to improve the performances of querying a feature group." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "sales_fg_meta = fs.create_feature_group(name=\"sales_fg\",\n", " version=1,\n", " primary_key=['store', 'dept', 'date'],\n", " description=\"Sales related features\",\n", " time_travel_format=None, \n", " statistics_config=False)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "sales_fg_meta.save(sales_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When creating a feature group we can also specify a `partition key`. Partition keys help organize the feature data on the file system and improve perfomances when reading the feature group data. As for the `primary key`, also `partition key` can be a composite one." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "sales_part_fg_meta = fs.create_feature_group(name=\"sales_fg\",\n", " version=2,\n", " partition_key=['store'],\n", " description=\"Sales related features\",\n", " time_travel_format=None, \n", " statistics_config=False)\n", "sales_part_fg_meta.save(sales_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can enable a feature group to be online by setting the `online_enabled` flag to true. \n", "\n", "By default `HSFS` configures the feature group such that new feature data that gets saved or inserted is written to the offline feature store. If `online_enabled=True`, additionally, the data is saved to the online storage of the feature store. Note that the insert and save to both storages is not transactional.\n", "\n", "If you want to create a purely online feature group. Save the feature group with `online_enabled=True` but with an empty dataframe and subsequently use the insert with `storage=\"online\"` to overwrite the default and write to the online feature store only." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sales_part_fg_meta = fs.create_feature_group(name=\"sales_fg\",\n", " version=3,\n", " primary_key=['store', 'dept', 'date'],\n", " online_enabled=True,\n", " description=\"Sales related features\",\n", " time_travel_format=None, \n", " statistics_config=False)\n", "sales_part_fg_meta.save(sales_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Feature Engineering Exogenous features\n", "\n", "This feature group will contain exogenous features that can influence sales, but are not under the control of the distribution chain. These are the unemployment, the consumer price index (cpi) and so on.\n", "We are going to write these features as they are in the feature store" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "exogenous_fg = exogenous_csv.withColumn('date', F.to_date(\"date\", 'dd/MM/yyy'))\n", "\n", "exogenous_fg_meta = fs.create_feature_group(name=\"exogenous_fg\",\n", " version=1,\n", " primary_key=['store', 'date'],\n", " description=\"External features that influence sales, but are not under the control of the distribution chain\",\n", " time_travel_format=None, \n", " statistics_config={\"enabled\": True, \"histograms\": True, \"correlations\": True})\n", "exogenous_fg_meta.save(exogenous_fg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Append additional data\n", "\n", "You can add additional data to a feature group by calling the `insert` method. In the example below we assume that we got also the data for 2013 and we are going to append it to the existing `exogenous_fg`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "exogenous_fg_2013 = exogenous_fg.withColumn('date', F.date_add('date', 365))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "exogenous_fg_meta = fs.get_feature_group('exogenous_fg', 1)\n", "exogenous_fg_meta.insert(exogenous_fg_2013)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This will also recompute statistics after inserting new data. The new statistics will be saved along the metadata with a new commit time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Append an additional feature" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Appending features to a feature group is a non-breaking schema change compared to removing features, which will require creating a new version of the feature group.\n", "\n", "You can append a feature group by specifying a data type and default value for the new feature. The default value is necessary for the data that is already in the feature group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from hsfs.feature import Feature" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "exogenous_fg_meta.append_features([Feature(\"appended_feature\", type=\"double\", default_value=\"10.0\")])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete a feature group\n", "\n", "You can call the `delete` method on a feature group to delete the entire feature group." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "exogenous_fg_meta = fs.create_feature_group(name=\"exogenous_fg\",\n", " version=2,\n", " primary_key=['store', 'date'],\n", " description=\"External features that influence sales, but are not under the control of the distribution chain\",\n", " time_travel_format=None, \n", " statistics_config=False)\n", "exogenous_fg_meta.save(exogenous_fg)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "exogenous_fg_meta = fs.get_feature_group('exogenous_fg', 2)\n", "exogenous_fg_meta.delete()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }