# Train Telecom Customer Churn Prediction with XGBoost

This tutorial is based on [this](https://www.kaggle.com/pavanraj159/telecom-customer-churn-prediction/comments#6.-Model-Performances) Kaggle notebook and [this](https://github.com/gojek/feast/tree/master/examples/feast-xgboost-churn-prediction-tutorial) Feast notebook

In [1]:
from hops import featurestore

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
3,application_1592283535818_0004,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
from maggy import Searchspace

# The searchspace can be instantiated with parameters
sp = Searchspace(max_depth=('INTEGER', [2, 50]), learning_rate=('DISCRETE', [0.9, 0.5, 0.2, 0.1, 0.01, 0.001]), gamma=('DOUBLE', [0, 5]), reg_lambda=('DOUBLE', [0, 5]))

Hyperparameter added: max_depth
Hyperparameter added: learning_rate
Hyperparameter added: gamma
Hyperparameter added: reg_lambda

Define training logic in wrapper function:

In [3]:
def train(max_depth, learning_rate, gamma, reg_lambda):
    import pandas as pd
    from hops import hdfs, featurestore
    from hops import pandas_helper as pandas
    import warnings
    warnings.filterwarnings("ignore")
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, classification_report
    from xgboost import XGBClassifier
    from torch.utils.tensorboard import SummaryWriter
    from maggy import tensorboard

    # Get path to training dataset
    file_path = featurestore.get_training_dataset_path("telco_churn")
    telecom_df = pandas.read_csv(hdfs.get_plain_path([path for path in hdfs.ls(file_path) if ".csv" in path][0]))
    Id_col     = ['customer_id']
    target_col = ["churn"]
    
    # Split into a train and test set
    train, test = train_test_split(telecom_df,test_size = .25 ,random_state = 111)

    # Seperating dependent and independent variables
    cols    = [i for i in telecom_df.columns if i not in Id_col + target_col]
    training_x = train[cols]
    training_y = train[target_col]
    testing_x  = test[cols]
    testing_y  = test[target_col]    

    # Instantiate classifier with hyperparameters as variables
    model = XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
                          colsample_bytree=1, gamma=gamma, learning_rate=learning_rate, max_delta_step=0,
                          max_depth=max_depth, min_child_weight=1, missing=None, n_estimators=100,
                          n_jobs=1, nthread=None, objective='binary:logistic', random_state=0,
                          reg_alpha=0, reg_lambda=reg_lambda, scale_pos_weight=1, seed=None,
                          silent=True, subsample=1)

    # Train model
    model.fit(training_x, training_y)
    predictions = model.predict(testing_x)
    probabilities = model.predict_proba(testing_x)    
    
    coefficients = pd.DataFrame(model.feature_importances_)
    column_df = pd.DataFrame(cols)
    coef_sumry = (pd.merge(coefficients, column_df, left_index=True,
                           right_index=True, how="left"))
    coef_sumry.columns = ["coefficients", "features"]
    coef_sumry = coef_sumry.sort_values(by="coefficients", ascending=False)

    accuracy=accuracy_score(testing_y, predictions)

    print("\n Classification report : \n", classification_report(testing_y, predictions))
    print("Accuracy   Score : ", accuracy)

    # use any tensorboard writer
    writer = SummaryWriter(tensorboard.logdir()+"/validation")
    writer.add_scalar('epoch_acc', accuracy, 1)
    writer.close()
    
    return accuracy

In [4]:
from maggy import experiment
result = experiment.lagom(train, 
                           searchspace=sp, 
                           optimizer='randomsearch', 
                           direction='max',
                           num_trials=10, 
                           name='CHURN',
                           hb_interval=5, 
                           es_interval=5,
                           es_min=5
                          )

WARN: Can't reach Maggy server. No progress information and logs available. Job continues running anyway.


An error was encountered:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-0-154.eu-west-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
  File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
  [Previous line repeated 1 more time]
  File "/srv/hops/spar