## 1. Churn Feature Engineering Pyspark

In [97]:
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.linalg import Vectors

In [None]:
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
connector = fs.get_storage_connector("snowflake_spark_connector", "JDBC")

### 1.1 Data

In [99]:
df = (spark.read.
format("net.snowflake.spark.snowflake").
option("sfURL", "https://ra96958.eu-central-1.snowflakecomputing.com").
option("sfUser", "HOPSWORKS").
option("sfPassword", connector.arguments).
option("sfDatabase", "ML_WORKSHOP").    
option("sfSchema", "PUBLIC").
option("sfWarehouse", "HOPSWORKS_WH").
option("sfRole", "HOPSWORKS_ROLE").
option("dbtable", "CUSTOMER_CHURN").
load())

In [100]:
df = df.toDF('customer_id', 'gender', 'senior_citizen', 'partner', 'dependents',
       'tenure', 'phone_service', 'multiple_lines', 'internet_service',
       'online_security', 'online_backup', 'device_protection', 'tech_support',
       'streaming_tv', 'streaming_movies', 'contract', 'paperless_billing',
       'payment_method', 'monthly_charges', 'total_charges', 'churn')

df = df.withColumn("tenure",df.tenure.cast(DoubleType()))

### 1.2 Data Manipulation and Processing

In [101]:
categoricalColumns = ['gender','senior_citizen','partner','dependents','phone_service','multiple_lines','internet_service', 'online_security', 'online_backup', 'device_protection', 'tech_support',
                      'streaming_tv', 'streaming_movies', 'contract', 'paperless_billing','payment_method']

numericalColumns = ['tenure', 'monthly_charges', 'total_charges']
df = df.fillna(0, subset=numericalColumns )

stages = [] # stages in our Pipeline
out_cat_col = []
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "_Index")
    stages += [stringIndexer]
    out_cat_col += [categoricalCol + "_Index"]

assembler = VectorAssembler(
    inputCols=numericalColumns,
    outputCol="numFeatures")
stages += [assembler]

# scaling numericalColumns
standardScaler = StandardScaler(inputCol="numFeatures", outputCol="scaledFeatures",withStd=True, withMean=False)
stages += [standardScaler]
    
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="churn", outputCol="label")
stages += [label_stringIdx]

In [102]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

In [104]:
def extract(row):
    col_list = []
    col_list.append("label")
    for x in out_cat_col:
        col_list.append(row[x])
    return tuple(col_list) + tuple(row["scaledFeatures"].toArray().tolist())

telcom_churn_features = dataset.rdd.map(extract).toDF(["label" ]+ categoricalColumns + numericalColumns)

### 1.4 Create FeatureGroups

In [None]:
val telcoFg = (fs.createFeatureGroup()
                 .name("telco_customer_features")
                 .version(1)
                 .description("Telecom customer features")
                 .onlineEnabled(true)
                 .timeTravelFormat(TimeTravelFormat.HUDI)
                 .primaryKeys(Seq("customer_id"))
                 .statisticsEnabled(true)
                 .build())

telcoFg.save(telcom_churn_features)