---
title: "Online transformation functions"
date: 2021-05-18
type: technical_note
draft: false
---

### Create Connection to HSFS

In [1]:
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store();

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
3,application_1650453136484_0004,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

# Define Online Transformation
To be able to attach transformation function to training datasets it has to be either part of the library
[installed](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/python.html?highlight=install#installing-libraries) in Hopsworks
or attached when starting a [Jupyter notebook](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jupyter.html?highlight=jupyter)
or [Hopsworks job](https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jobs.html).

Don't decorate the transformation function with Pyspark `@udf` or `@pandas_udf`, as well as don't use any Pyspark dependencies.
HSFS will decorate transformation function only if it is used inside Pyspark application.
    
To successfully execute this example please install `transformation_fn_template` library from https://github.com/logicalclocks/transformation_fn_template


In [2]:
from custom_functions import transformations
plus_one_float_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, 
                                                        output_type=float, 
                                                        version=1)
plus_one_float_meta.save()

In [3]:
plus_one_int_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, 
                                                      output_type=int, 
                                                      version=2)
plus_one_int_meta.save()

In [4]:
plus_one_double_meta = fs.create_transformation_function(transformation_function=transformations.plus_one, 
                                                    output_type="double", version=3)
plus_one_double_meta.save()

In [5]:
date_string_2_timestamp_meta = fs.create_transformation_function(
                                            transformation_function=transformations.date_string_to_timestamp,
                                            output_type="long", version=1)
date_string_2_timestamp_meta.save()

In [6]:
print(plus_one_float_meta.name)
print(plus_one_int_meta.name)
print(date_string_2_timestamp_meta.name)

plus_one
plus_one
date_string_to_timestamp

## Get all online transformations available in the feature store

In [7]:
fs.get_transformation_functions()

[<hsfs.transformation_function.TransformationFunction object at 0x7f98892abcd0>, <hsfs.transformation_function.TransformationFunction object at 0x7f9889291070>, <hsfs.transformation_function.TransformationFunction object at 0x7f98892abc40>, <hsfs.transformation_function.TransformationFunction object at 0x7f9889291550>, <hsfs.transformation_function.TransformationFunction object at 0x7f98892916a0>, <hsfs.transformation_function.TransformationFunction object at 0x7f98892918e0>, <hsfs.transformation_function.TransformationFunction object at 0x7f98892919a0>, <hsfs.transformation_function.TransformationFunction object at 0x7f98892abd60>]

## Get online transformation by name and version

In [8]:
plus_one_meta = fs.get_transformation_function(name="plus_one")
print(plus_one_meta.name)
print(plus_one_meta.version)

plus_one
1

In [9]:
plus_one_float_meta = fs.get_transformation_function(name="plus_one", version=1)
print(plus_one_float_meta.name)
print(plus_one_float_meta.version)

plus_one
1

In [10]:
plus_one_int_meta = fs.get_transformation_function(name="plus_one", version=2)
print(plus_one_int_meta.name)
print(plus_one_int_meta.version)

plus_one
2

In [11]:
date_string_2_timestamp_meta = fs.get_transformation_function(name="date_string_to_timestamp", version=1)
print(date_string_2_timestamp_meta.name)
print(date_string_2_timestamp_meta.version)

date_string_to_timestamp
1

# View online transformation source code
##### Since we are using pyspark kernel hsfs will add udf decorator 

In [12]:
print(plus_one_float_meta.transformer_code)

from datetime import datetime

def plus_one(value):
    return value + 1

In [13]:
print(plus_one_int_meta.transformer_code)

from datetime import datetime

def plus_one(value):
    return value + 1

In [14]:
print(date_string_2_timestamp_meta.transformer_code)

from datetime import datetime

def date_string_to_timestamp(input_date):
    date_format = "%Y%m%d%H%M%S"
    return int(float(datetime.strptime(input_date, date_format).timestamp()) * 1000)

## Delete transformation function

In [15]:
plus_one_double_meta = fs.get_transformation_function(name="plus_one", version=3)
plus_one_double_meta.delete()

# Create training dataset with online transformation
#### To use online transoformation function for training dataset it must be created from hsfs `Query` object. Following example assumes that you already craeted features groups using this notebook [time_travel_python.ipynb](../time_travel/time_travel_python.ipynb)

In [16]:
economy_fg = fs.get_feature_group('economy_fg',2)
demography_fg = fs.get_feature_group('demography_fg',2)

In [17]:
economy_fg.read().show()

+---+---------+----------+-----+--------+------+---------+----+
| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+---------+----------+-----+--------+------+---------+----+
|  1|110499.73|       0.0|car15|235000.0|    30| 354724.2|2020|
|  2|140893.77|       0.0|car20|135000.0|     2|395015.34|2020|
|  3|119159.65|       0.0| car1|145000.0|    22|122025.08|2020|
|  4|  20000.0|  52593.63| car9|185000.0|    30| 99629.62|2020|
+---+---------+----------+-----+--------+------+---------+----+

In [18]:
economy_fg.read().printSchema()

root
 |-- id: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- commission: float (nullable = true)
 |-- car: string (nullable = true)
 |-- hvalue: float (nullable = true)
 |-- hyears: integer (nullable = true)
 |-- loan: float (nullable = true)
 |-- year: integer (nullable = true)

## Training dataset needs to be created from hsfs `Query` object 

In [19]:
query = demography_fg.select(['age','elevel','zipcode']).join(economy_fg.select_all())

#### Provide transformation functions as dict, where key is feature name and value is online transformation function name    

In [20]:
td = fs.create_training_dataset(name="economy_td",
                               description="Dataset to train the some model",
                               data_format="csv",
                               transformation_functions={"hyears":plus_one_int_meta, 
                                                         "loan":plus_one_float_meta},
                               statistics_config=None, 
                               version=1)

In [21]:
td.save(query)

<hsfs.training_dataset.TrainingDataset object at 0x7fa7d84f6050>

### Online tranformation functions are now attached to training dataset as medadata and contain information to which feature groups they will be applied 

In [22]:
td = fs.get_training_dataset("economy_td")



In [23]:
td.transformation_functions

{'hyears': <hsfs.transformation_function.TransformationFunction object at 0x7fa7d849b690>, 'loan': <hsfs.transformation_function.TransformationFunction object at 0x7fa7d849bc50>}

In [24]:
td.read().show()

+---+------+--------+---+---------+----------+-----+--------+------+---------+----+
|age|elevel| zipcode| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+------+--------+---+---------+----------+-----+--------+------+---------+----+
| 56|level0|zipcode2|  4|  20000.0|  52593.63| car9|185000.0|    31| 99630.62|2020|
| 54|level3|zipcode5|  1|110499.73|       0.0|car15|235000.0|    31| 354725.2|2020|
| 49|level2|zipcode4|  3|119159.65|       0.0| car1|145000.0|    23|122026.08|2020|
| 44|level4|zipcode8|  2|140893.77|       0.0|car20|135000.0|     3|395016.34|2020|
+---+------+--------+---+---------+----------+-----+--------+------+---------+----+

#### transformation functions will be also applied to feature vectores retrieved by `get_serving_vector` method

In [25]:
td_meta = fs.get_training_dataset("economy_td", 1)
#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself
td_meta.init_prepared_statement() 
td_meta.serving_keys

{'id'}

In [27]:
td_meta.get_serving_vector({'id': 1})

[54, 'level3', 'zipcode5', 1, 110500.0, 0.0, 'car15', 235000.0, 31, 354725.0, 2020]