{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Online transformation functions\"\n", "date: 2021-05-18\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Connection to HSFS" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDApplication IDKindStateSpark UIDriver log
3application_1650453136484_0004pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "connection = hsfs.connection()\n", "# get a reference to the feature store, you can access also shared feature stores by providing the feature store name\n", "fs = connection.get_feature_store();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Define Online Transformation\n", "To be able to attach transformation function to training datasets it has to be either part of the library\n", "[installed](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/python.html?highlight=install#installing-libraries) in Hopsworks\n", "or attached when starting a [Jupyter notebook](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jupyter.html?highlight=jupyter)\n", "or [Hopsworks job](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jobs.html).\n", "\n", "Don't decorate the transformation function with Pyspark `@udf` or `@pandas_udf`, as well as don't use any Pyspark dependencies.\n", "HSFS will decorate transformation function only if it is used inside Pyspark application.\n", " \n", "To successfully execute this example please install `transformation_fn_template` library from https://github.com/logicalclocks/transformation_fn_template\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from custom_functions import transformations\n", "plus_one_float_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, \n", " output_type=float, \n", " version=1)\n", "plus_one_float_meta.save()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "plus_one_int_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, \n", " output_type=int, \n", " version=2)\n", "plus_one_int_meta.save()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "plus_one_double_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, \n", " output_type=\"double\", version=3)\n", "plus_one_double_meta.save()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "date_string_2_timestamp_meta = fs.create_transformation_function(\n", " transformation_function=transformations.date_string_to_timestamp,\n", " output_type=\"long\", version=1)\n", "date_string_2_timestamp_meta.save()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "plus_one\n", "plus_one\n", "date_string_to_timestamp" ] } ], "source": [ "print(plus_one_float_meta.name)\n", "print(plus_one_int_meta.name)\n", "print(date_string_2_timestamp_meta.name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get all online transformations available in the feature store" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[, , , , , , , ]" ] } ], "source": [ "fs.get_transformation_functions()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get online transformation by name and version" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "plus_one\n", "1" ] } ], "source": [ "plus_one_meta = fs.get_transformation_function(name=\"plus_one\")\n", "print(plus_one_meta.name)\n", "print(plus_one_meta.version)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "plus_one\n", "1" ] } ], "source": [ "plus_one_float_meta = fs.get_transformation_function(name=\"plus_one\", version=1)\n", "print(plus_one_float_meta.name)\n", "print(plus_one_float_meta.version)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "plus_one\n", "2" ] } ], "source": [ "plus_one_int_meta = fs.get_transformation_function(name=\"plus_one\", version=2)\n", "print(plus_one_int_meta.name)\n", "print(plus_one_int_meta.version)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "date_string_to_timestamp\n", "1" ] } ], "source": [ "date_string_2_timestamp_meta = fs.get_transformation_function(name=\"date_string_to_timestamp\", version=1)\n", "print(date_string_2_timestamp_meta.name)\n", "print(date_string_2_timestamp_meta.version)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# View online transformation source code\n", "##### Since we are using pyspark kernel hsfs will add udf decorator " ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "from datetime import datetime\n", "\n", "def plus_one(value):\n", " return value + 1" ] } ], "source": [ "print(plus_one_float_meta.transformer_code)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "from datetime import datetime\n", "\n", "def plus_one(value):\n", " return value + 1" ] } ], "source": [ "print(plus_one_int_meta.transformer_code)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "from datetime import datetime\n", "\n", "def date_string_to_timestamp(input_date):\n", " date_format = \"%Y%m%d%H%M%S\"\n", " return int(float(datetime.strptime(input_date, date_format).timestamp()) * 1000)" ] } ], "source": [ "print(date_string_2_timestamp_meta.transformer_code)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete transformation function" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "plus_one_double_meta = fs.get_transformation_function(name=\"plus_one\", version=3)\n", "plus_one_double_meta.delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create training dataset with online transformation\n", "#### To use online transoformation function for training dataset it must be created from hsfs `Query` object. Following example assumes that you already craeted features groups using this notebook [time_travel_python.ipynb](../time_travel/time_travel_python.ipynb)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "economy_fg = fs.get_feature_group('economy_fg',2)\n", "demography_fg = fs.get_feature_group('demography_fg',2)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+" ] } ], "source": [ "economy_fg.read().show()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- id: integer (nullable = true)\n", " |-- salary: float (nullable = true)\n", " |-- commission: float (nullable = true)\n", " |-- car: string (nullable = true)\n", " |-- hvalue: float (nullable = true)\n", " |-- hyears: integer (nullable = true)\n", " |-- loan: float (nullable = true)\n", " |-- year: integer (nullable = true)" ] } ], "source": [ "economy_fg.read().printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training dataset needs to be created from hsfs `Query` object " ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "query = demography_fg.select(['age','elevel','zipcode']).join(economy_fg.select_all())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Provide transformation functions as dict, where key is feature name and value is online transformation function name " ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "td = fs.create_training_dataset(name=\"economy_td\",\n", " description=\"Dataset to train the some model\",\n", " data_format=\"csv\",\n", " transformation_functions={\"hyears\":plus_one_int_meta, \n", " \"loan\":plus_one_float_meta},\n", " statistics_config=None, \n", " version=1)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td.save(query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Online tranformation functions are now attached to training dataset as medadata and contain information to which feature groups they will be applied " ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "VersionWarning: No version provided for getting training dataset `economy_td`, defaulting to `1`." ] } ], "source": [ "td = fs.get_training_dataset(\"economy_td\")" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'hyears': , 'loan': }" ] } ], "source": [ "td.transformation_functions" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+--------+---+---------+----------+-----+--------+------+---------+----+\n", "|age|elevel| zipcode| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+------+--------+---+---------+----------+-----+--------+------+---------+----+\n", "| 56|level0|zipcode2| 4| 20000.0| 52593.63| car9|185000.0| 31| 99630.62|2020|\n", "| 54|level3|zipcode5| 1|110499.73| 0.0|car15|235000.0| 31| 354725.2|2020|\n", "| 49|level2|zipcode4| 3|119159.65| 0.0| car1|145000.0| 23|122026.08|2020|\n", "| 44|level4|zipcode8| 2|140893.77| 0.0|car20|135000.0| 3|395016.34|2020|\n", "+---+------+--------+---+---------+----------+-----+--------+------+---------+----+" ] } ], "source": [ "td.read().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### transformation functions will be also applied to feature vectores retrieved by `get_serving_vector` method" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'id'}" ] } ], "source": [ "td_meta = fs.get_training_dataset(\"economy_td\", 1)\n", "#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself\n", "td_meta.init_prepared_statement() \n", "td_meta.serving_keys" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[54, 'level3', 'zipcode5', 1, 110500.0, 0.0, 'car15', 235000.0, 31, 354725.0, 2020]" ] } ], "source": [ "td_meta.get_serving_vector({'id': 1})" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }