---
title: "Feature Ingestion from Redshift with PySpark"
date: 2021-02-24
type: technical_note
draft: false
---

<h2 style="color: #1EB382;font-weight: bold;">Redshift Integration</h2>

This notebooks guides through the ingestion of Redshift data in the Hopsworks feature store. To follow this notebook users should have an existing Redshift cluster, if not, they can follow the AWS [documentation](https://docs.aws.amazon.com/ses/latest/DeveloperGuide/event-publishing-redshift-cluster.html).

The data for this tutorial is available in CSV format [here](../data/Sacramentorealestatetransactions.csv)
Users should create the following table in Redshift
```sql
CREATE TABLE telco(
    customer_id varchar(200),
    gender varchar(200),
    senior_citizen integer,
    partner varchar(200),
    dependents varchar(200),
    tenure integer,
    phone_service varchar(200),
    multiple_lines varchar(200),
    internet_service varchar(200),
    online_security varchar(200),
    online_backup varchar(200),
    device_protection varchar(200),
    tech_support varchar(200),
    streaming_tv varchar(200),
    streaming_movies varchar(200),
    contract varchar(200),
    paperless_billing varchar(200),
    payment_method varchar(200),
    monthly_charges double precision,
    total_charges varchar(200),
    churn varchar(200)
)
```

and populate the table using the copy command:
```sql
COPY telco
FROM 's3://bucket/telco_customer_churn.csv'
IAM_ROLE 'arn:aws:iam::xxxxxxxxx:role/role_name'
FORMAT as CSV
FILLRECORD
```

Once the data has been imported into Redshift, we can start ingesting it into the Hopsworks Feature Store. 

<h3 style="color: #1EB382;font-weight: bold;">Storage Connector</h3>

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector.The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.


<img src="images/connector_ui.png" alt="Redshift Connector UI" style="margin: auto; height: 600px; width:550px;"/>

- Cluster identifier: The name of the cluster

- Database driver: You can use the default JDBC Redshift Driver `com.amazon.redshift.jdbc42.Driver` (More on this later)

- Database endpoint: The endpoint for the database. Should be in the format of `[UUID].eu-west-1.redshift.amazonaws.com`

- Database name: The name of the database to query

- Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project.
The second option is to configure an IAM role. With IAM roles,  Jobs or notebooks launched on Hopsworks  do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. 
In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role.  For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role. 

With regards to the database driver, the library to interact with Redshift *is not* included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from  here. You then upload the driver files to the “Resources” dataset in your project. Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

The library can be downloaded here: https://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html#download-jdbc-driver

In [10]:
import hsfs
# Connect to the Hopsworks feature store
connection = hsfs.connection()
# Retrieve the metadata handle
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

<h3 style="color: #1EB382;font-weight: bold;">External (On-Demand) Feature Group</h3>

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source. 

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

In [11]:
# Retrieve the storage connector defined before
redshift_conn = fs.get_storage_connector("telco_redshift_cluster")

In [3]:
telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift",
                                                version=2,
                                                query="select * from telco",
                                                description="On-demand feature group for telecom customer data",
                                                storage_connector=redshift_conn,
                                                statistics_config=True)

In [5]:
telco_on_dmd.save()

<h3 style="color: #1EB382;font-weight: bold;">Engineer features and save to the Feature Store</h3>

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

In [13]:
telco_on_dmd.select(['customer_id', 'internet_service', 'phone_service', 'total_charges', 'churn']).show(5)

+-----------+----------------+-------------+-------------+-----+
|customer_id|internet_service|phone_service|total_charges|churn|
+-----------+----------------+-------------+-------------+-----+
| 7590-VHVEG|             DSL|           No|        29.85|   No|
| 5575-GNVDE|             DSL|          Yes|       1889.5|   No|
| 3668-QPYBK|             DSL|          Yes|       108.15|  Yes|
| 7795-CFOCW|             DSL|           No|      1840.75|   No|
| 9237-HQITU|     Fiber optic|          Yes|       151.65|  Yes|
+-----------+----------------+-------------+-------------+-----+
only showing top 5 rows

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. [This helper guide](https://docs.hopsworks.ai/latest/generated/query_vs_dataframe/) explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

In [32]:
spark_df = telco_on_dmd.read()

In [28]:
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [48]:
categoricalColumns = ['gender','senior_citizen','partner','dependents','phone_service','multiple_lines',
                      'internet_service', 'online_security', 'online_backup', 'device_protection', 'tech_support',
                      'streaming_tv', 'streaming_movies', 'contract', 'paperless_billing', 'payment_method', 'churn']

spark_df = spark_df.withColumn("total_charges", F.col("total_charges").cast(DoubleType()))\
                   .fillna(0)

stages = [] # stages in our Pipeline
output_cols = [('customer_id', 'customer_id')]
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    output_col = categoricalCol + "_Index"
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=output_col)
    stages += [stringIndexer]
    output_cols += [(categoricalCol, output_col)]

pipeline = Pipeline(stages=stages)
dataset = pipeline.fit(spark_df).transform(spark_df)
telco_fg_df = dataset.selectExpr(["{} as {}".format(col[1], col[0]) for col in output_cols])

In [51]:
telco_fg_df.show(5)

+-----------+------+--------------+-------+----------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+-----+
|customer_id|gender|senior_citizen|partner|dependents|phone_service|multiple_lines|internet_service|online_security|online_backup|device_protection|tech_support|streaming_tv|streaming_movies|contract|paperless_billing|payment_method|churn|
+-----------+------+--------------+-------+----------+-------------+--------------+----------------+---------------+-------------+-----------------+------------+------------+----------------+--------+-----------------+--------------+-----+
| 7590-VHVEG|   1.0|           0.0|    1.0|       0.0|          1.0|           2.0|             1.0|            0.0|          1.0|              0.0|         0.0|         0.0|             0.0|     0.0|              0.0|           0.0|  0.0|
| 5575-GNVDE|   0.0|           0.0|    0

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (`online_enabled=True`), thus enabling low latency access to the features using the online feature store API.

In [53]:
telco_fg = fs.create_feature_group(name="telco_customer_features",
                                version=1,
                                description="Telecom customer features",
                                online_enabled=True,
                                time_travel_format="HUDI",
                                primary_key=["customer_id"],
                                statistics_config=True)

In [None]:
telco_fg.save(telco_fg_df)