In [1]:
from hops import hdfs
import hopsworks
import hsfs
from hsfs.client import exceptions

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
6,application_1669569214209_0015,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
hw_connection = hopsworks.connection()
project = hw_connection.get_project()
dataset_api = project.get_dataset_api()

Connected. Call `.close()` to terminate connection gracefully.

In [6]:
fs_connection = hsfs.connection()
fs = fs_connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

In [7]:
external_fg_1_name="external_fg1"
stream_fg_1_name = 'stream_fg1'
cached_fg_1_name = 'cached_fg1'
cached_fg_2_name = 'cached_fg2'
derived_stream_fg_1_name = 'derived_stream_fg1'
derived_cached_fg_1_name = 'derived_cached_fg1'
derived_cached_fg_2_name = 'derived_cached_fg2'
fv_1_name = 'feature_view1'
td_1_name = 'feature_view1_1'

## Preparation steps

### Setup HopsFS Connector
The HopsFS connector is required for the external feature groups creation. 

Create a dataset named `my-data` and a `external-dir` directory inside

In [5]:
external_dataset = "my-data"
external_dir = "external-dir"
data_file = "test.csv"
path = path=hdfs.project_path() + "/" + external_dataset + "/" + external_dir
if dataset_api.exists(path):
    if not dataset_api.exists(path + "/" + data_file):
        import requests
        url = "http://repo.hops.works/dev/alex/" + data_file
        r = requests.get(url)
        open(data_file , 'wb').write(r.content)
        dataset_api.upload(data_file, path)
else:        
    print("!!! please set up external dataset !!!")

Create a HopsFS connector named `data` (connector_name) and point it to the `my-data` (external_dataset) dataset.

In [6]:
connector_name="data"
try: 
    fs.get_storage_connector(connector_name)
except exceptions.RestAPIError:
    print("!!! please set up connector !!!")

<hsfs.storage_connector.HopsFSConnector object at 0x7f92efc82610>

## Feature Groups
The use cases considered for feature groups are two fold:
* types of feature groups:
    * cached
    * external
    * streaming
* meta state of a feature group:
    * base - feature group is in your project and is alive
    * deleted

### 1. External Feature Groups - Base

In [7]:
try:
    connector = fs.get_storage_connector(connector_name)
    df = connector.read(path=hdfs.project_path() + "/" + external_dataset + "/" + external_dir, options={"header":"true","inferSchema":"true"}, data_format="csv")
    df.show()
except exceptions.RestAPIError:
    print("!!! please set up connector !!!")

+---+-----------+-----------+
| id|ext_fg_col1|ext_fg_col2|
+---+-----------+-----------+
|  1|          1|          1|
|  2|          2|          2|
+---+-----------+-----------+

In [8]:
try:
    fs.get_external_feature_group(name=external_fg_1_name, version=1)
except exceptions.RestAPIError:
    external_fg_1 = fs.create_external_feature_group(name=external_fg_1_name,
                                                version=1,
                                                storage_connector=connector,
                                                data_format="csv",
                                                path=external_dir,
                                                primary_key=['id'], 
                                                statistics_config=False, 
                                                options={"header":"true","inferSchema":"true"})
    external_fg_1.save()

### 2. Streaming Feature Group - Base

In [9]:
stream_fg_1 = fs.get_or_create_feature_group(name=stream_fg_1_name, version=1, primary_key=['id'], statistics_config=False, stream=True)

In [10]:
fg_col1 = stream_fg_1_name + '_col1'
fg_col2 = stream_fg_1_name + '_col2'
fg_data = []
fg_data.append((1, 1, 1))
fg_spark_df = spark.createDataFrame(fg_data, ['id', fg_col1, fg_col2])

In [11]:
stream_fg_1.insert(fg_spark_df)

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/30
(None, None)

#### Run Backfiling Job

In [12]:
job = project.get_jobs_api().get_job(stream_fg_1_name + "_1_offline_fg_backfill")
job.run(await_termination=True)

Execution started, explore it at https://hopsworks0.logicalclocks.com/p/119/jobs/named/stream_fg1_1_offline_fg_backfill/executions
Execution('SUCCEEDED', 'AGGREGATING_LOGS', '2022-11-28T13:29:50.000Z', '-op offline_fg_backfill -path hdfs:///Projects/test/Resources/jobs/stream_fg1_1_offline_fg_backfill/config_1669642177749')

### 3. Cached Feature Group - Base 

In [13]:
cached_fg_1 = fs.get_or_create_feature_group(name=cached_fg_1_name, version=1, primary_key=['id'], statistics_config=False)

In [14]:
fg_col1 = cached_fg_1_name + '_col1'
fg_col2 = cached_fg_1_name + '_col2'
fg_data = []
fg_data.append((1, 1, 1))
fg_spark_df = spark.createDataFrame(fg_data, ['id', fg_col1, fg_col2])

In [15]:
cached_fg_1.insert(fg_spark_df)

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/31
(None, None)

### 4. Cached Feature Group - Base - To Be Deleted

In [16]:
cached_fg_2 = fs.get_or_create_feature_group(name=cached_fg_2_name, version=1, primary_key=['id'], statistics_config=False)

In [17]:
fg_col1 = cached_fg_2_name + '_col1'
fg_col2 = cached_fg_2_name + '_col2'
fg_data = []
fg_data.append((1, 1, 1))
fg_spark_df = spark.createDataFrame(fg_data, ['id', fg_col1, fg_col2])

In [18]:
cached_fg_2.insert(fg_spark_df)

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/32
(None, None)

### 5. Streaming Feature Group - Derived - Lvl1
stream_fg1 -> derived_stream_fg1

In [19]:
try:
    input_fg = fs.get_feature_group(stream_fg_1_name, version=1)
    parents=[input_fg]
    derived_stream_fg_1 = fs.get_or_create_feature_group(name=derived_stream_fg_1_name, version=1, primary_key=['id'], statistics_config=False,  stream=True, parents=parents)
except exceptions.RestAPIError:
    print("!!! parent feature groups do not exist !!!")

In [20]:
fg_col1 = derived_stream_fg_1_name + '_col1'
fg_col2 = derived_stream_fg_1_name + '_col2'
out_fg_query = input_fg.select_all()
derived_stream_fg_1.insert(out_fg_query.read())

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/33
(None, None)

#### Run Backfiling Job

In [21]:
job = project.get_jobs_api().get_job(derived_stream_fg_1_name + "_1_offline_fg_backfill")
job.run(await_termination=True)

Execution started, explore it at https://hopsworks0.logicalclocks.com/p/119/jobs/named/derived_stream_fg1_1_offline_fg_backfill/executions
Execution('SUCCEEDED', 'AGGREGATING_LOGS', '2022-11-28T13:33:22.000Z', '-op offline_fg_backfill -path hdfs:///Projects/test/Resources/jobs/derived_stream_fg1_1_offline_fg_backfill/config_1669642392966')

### 6. Cached Feature Group - Derived - Lvl1
cached_fg1 + cached_fg2 -> derived_cached_fg1

In [22]:
try:
    input_fg1 = fs.get_feature_group(cached_fg_1_name, version=1)
    input_fg2 = fs.get_feature_group(cached_fg_2_name, version=1)
    parents=[input_fg1, input_fg2]
    derived_cached_fg_1 = fs.get_or_create_feature_group(name=derived_cached_fg_1_name, version=1, primary_key=['id'], statistics_config=False, parents=parents)
except exceptions.RestAPIError:
    print("!!! parent feature groups do not exist !!!")

In [23]:
out_fg_query = input_fg1.select_all().join(input_fg2.select_all())
derived_cached_fg_1.insert(out_fg_query.read())

Feature Group created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fg/34
(None, None)

### 7. Cached Feature Group - Derived - Lvl2
derived_cached_fg1 + derived_stream_fg1 + external_fg1 -> derived_cached_fg2

In [8]:
try:
    input_fg1 = fs.get_feature_group(derived_cached_fg_1_name, version=1)
    input_fg2 = fs.get_feature_group(derived_stream_fg_1_name, version=1)
    input_fg3 = fs.get_external_feature_group(external_fg_1_name, version=1)
    parents=[input_fg1, input_fg2, input_fg3]
    derived_cached_fg_2 = fs.get_or_create_feature_group(name=derived_cached_fg_2_name, version=1, primary_key=['id'], statistics_config=False, parents=parents)
except exceptions.RestAPIError:
    print("!!! parent feature groups do not exist !!!")

In [10]:
out_fg_query = input_fg1.select_all().join(input_fg2.select_all()).join(input_fg3.select_all())
derived_cached_fg_2.insert(out_fg_query.read())

(None, None)

### 8. Delete Parent Cached Feature Group
Delete cached_fg2

In [11]:
delete_fg = fs.get_feature_group(cached_fg_2_name, version=1)
delete_fg.delete()

### 9. Create Feature View
derived_cached_fg2 -> feature_view1

In [12]:
try:
    input_fg = fs.get_feature_group(derived_cached_fg_2_name)
    out_fv_query = input_fg.select_all()
    fv_1 = fs.get_or_create_feature_view(name=fv_1_name, version=1, query=out_fv_query)
except exceptions.RestAPIError:
    print("!!! parent feature groups do not exist !!!")

Feature view created successfully, explore it at 
https://hopsworks0.logicalclocks.com/p/119/fs/67/fv/feature_view1/version/1

### 10. Materialize training dataset
feature_view1 -> feature_view1_1 (training dataset)

In [13]:
try:
    fs.get_training_dataset(name=td_1_name, version=1)
except exceptions.RestAPIError:
    fv = fs.get_feature_view(fv_1_name, version=1)
    fv.create_training_data(statistics_config=False)

(1, None)

### 11. Get Parent Feature Groups

#### Cached Feature Group

In [14]:
fg_name = derived_cached_fg_2_name
fg = fs.get_feature_group(fg_name, version=1)
fg.get_parent_feature_groups()

Artifact([<hsfs.feature_group.FeatureGroup object at 0x7f292f562640>, <hsfs.feature_group.FeatureGroup object at 0x7f292f556790>, <hsfs.feature_group.ExternalFeatureGroup object at 0x7f292f556100>], [], [])

In [19]:
fg_name = derived_cached_fg_1_name
fg = fs.get_feature_group(fg_name, version=1)
fg.get_generated_feature_groups()

Artifact([<hsfs.feature_group.FeatureGroup object at 0x7f292f5b8df0>], [], [])

In [21]:
fv_name = fv_1_name
fv = fs.get_feature_view(fv_name, version=1)
fv.get_parent_feature_groups()

Artifact([<hsfs.feature_group.FeatureGroup object at 0x7f292f6139d0>], [], [])