{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Train Telecom Customer Churn Prediction with XGBoost" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This tutorial is based on [this](https://www.kaggle.com/pavanraj159/telecom-customer-churn-prediction/comments#6.-Model-Performances) Kaggle notebook and [this](https://github.com/gojek/feast/tree/master/examples/feast-xgboost-churn-prediction-tutorial) Feast notebook" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
3application_1592283535818_0004pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from hops import featurestore" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Hyperparameter added: max_depth\n", "Hyperparameter added: learning_rate\n", "Hyperparameter added: gamma\n", "Hyperparameter added: reg_lambda" ] } ], "source": [ "from maggy import Searchspace\n", "\n", "# The searchspace can be instantiated with parameters\n", "sp = Searchspace(max_depth=('INTEGER', [2, 50]), learning_rate=('DISCRETE', [0.9, 0.5, 0.2, 0.1, 0.01, 0.001]), gamma=('DOUBLE', [0, 5]), reg_lambda=('DOUBLE', [0, 5]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define training logic in wrapper function:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "_cell_guid": "b1076dfc-b9ad-4769-8c92-a6c4dae69d19", "_kg_hide-input": false, "_uuid": "8f2839f25d086af736a60e9eeb907d3b93b6e0e5" }, "outputs": [], "source": [ "def train(max_depth, learning_rate, gamma, reg_lambda):\n", " import pandas as pd\n", " from hops import hdfs, featurestore\n", " from hops import pandas_helper as pandas\n", " import warnings\n", " warnings.filterwarnings(\"ignore\")\n", " from sklearn.model_selection import train_test_split\n", " from sklearn.metrics import accuracy_score, classification_report\n", " from xgboost import XGBClassifier\n", " from torch.utils.tensorboard import SummaryWriter\n", " from maggy import tensorboard\n", "\n", " # Get path to training dataset\n", " file_path = featurestore.get_training_dataset_path(\"telco_churn\")\n", " telecom_df = pandas.read_csv(hdfs.get_plain_path([path for path in hdfs.ls(file_path) if \".csv\" in path][0]))\n", " Id_col = ['customer_id']\n", " target_col = [\"churn\"]\n", " \n", " # Split into a train and test set\n", " train, test = train_test_split(telecom_df,test_size = .25 ,random_state = 111)\n", "\n", " # Seperating dependent and independent variables\n", " cols = [i for i in telecom_df.columns if i not in Id_col + target_col]\n", " training_x = train[cols]\n", " training_y = train[target_col]\n", " testing_x = test[cols]\n", " testing_y = test[target_col] \n", "\n", " # Instantiate classifier with hyperparameters as variables\n", " model = XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,\n", " colsample_bytree=1, gamma=gamma, learning_rate=learning_rate, max_delta_step=0,\n", " max_depth=max_depth, min_child_weight=1, missing=None, n_estimators=100,\n", " n_jobs=1, nthread=None, objective='binary:logistic', random_state=0,\n", " reg_alpha=0, reg_lambda=reg_lambda, scale_pos_weight=1, seed=None,\n", " silent=True, subsample=1)\n", "\n", " # Train model\n", " model.fit(training_x, training_y)\n", " predictions = model.predict(testing_x)\n", " probabilities = model.predict_proba(testing_x) \n", " \n", " coefficients = pd.DataFrame(model.feature_importances_)\n", " column_df = pd.DataFrame(cols)\n", " coef_sumry = (pd.merge(coefficients, column_df, left_index=True,\n", " right_index=True, how=\"left\"))\n", " coef_sumry.columns = [\"coefficients\", \"features\"]\n", " coef_sumry = coef_sumry.sort_values(by=\"coefficients\", ascending=False)\n", "\n", " accuracy=accuracy_score(testing_y, predictions)\n", "\n", " print(\"\\n Classification report : \\n\", classification_report(testing_y, predictions))\n", " print(\"Accuracy Score : \", accuracy)\n", "\n", " # use any tensorboard writer\n", " writer = SummaryWriter(tensorboard.logdir()+\"/validation\")\n", " writer.add_scalar('epoch_acc', accuracy, 1)\n", " writer.close()\n", " \n", " return accuracy" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "WARN: Can't reach Maggy server. No progress information and logs available. Job continues running anyway.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n", ": org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-0-154.eu-west-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 377, in main\n", " process()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 372, in process\n", " serializer.dump_stream(func(split_index, iterator), outfile)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " [Previous line repeated 1 more time]\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 352, in func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 801, in func\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/maggy/core/trialexecutor.py\", line 149, in _wrapper_fun\n", " retval = map_fun(**parameters)\n", " File \"\", line 15, in train\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 734, in ls\n", " hdfs_path = _expand_path(hdfs_path)\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 157, in _expand_path\n", " raise IOError(\"path %s must be a full hopsfs path or a relative path\" % hdfs_path)\n", "OSError: path s3://sagemakerjim/TRAINING_DATASETS/telco_churn_1/telco_churn must be a full hopsfs path or a relative path\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$class.foreach(Iterator.scala:891)\n", "\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n", "\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n", "\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n", "\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n", "\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n", "\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n", "\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\tat java.lang.Thread.run(Thread.java:748)\n", "\n", "Driver stacktrace:\n", "\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n", "\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n", "\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n", "\tat scala.Option.foreach(Option.scala:257)\n", "\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n", "\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n", "\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)\n", "\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n", "\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n", "\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n", "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n", "\tat java.lang.reflect.Method.invoke(Method.java:498)\n", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n", "\tat py4j.Gateway.invoke(Gateway.java:282)\n", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n", "\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n", "\tat java.lang.Thread.run(Thread.java:748)\n", "Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 377, in main\n", " process()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 372, in process\n", " serializer.dump_stream(func(split_index, iterator), outfile)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " [Previous line repeated 1 more time]\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 352, in func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 801, in func\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/maggy/core/trialexecutor.py\", line 149, in _wrapper_fun\n", " retval = map_fun(**parameters)\n", " File \"\", line 15, in train\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 734, in ls\n", " hdfs_path = _expand_path(hdfs_path)\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 157, in _expand_path\n", " raise IOError(\"path %s must be a full hopsfs path or a relative path\" % hdfs_path)\n", "OSError: path s3://sagemakerjim/TRAINING_DATASETS/telco_churn_1/telco_churn must be a full hopsfs path or a relative path\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$class.foreach(Iterator.scala:891)\n", "\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n", "\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n", "\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n", "\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n", "\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n", "\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n", "\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\t... 1 more\n", "\n", "Traceback (most recent call last):\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/maggy/experiment.py\", line 236, in lagom\n", " experiment_utils._get_logdir(app_id, run_id),\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 806, in foreachPartition\n", " self.mapPartitions(func).count() # Force evaluation\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 1055, in count\n", " return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 1046, in sum\n", " return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 917, in fold\n", " vals = self.mapPartitions(func).collect()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 816, in collect\n", " sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())\n", " File \"/srv/hops/spark/python/lib/py4j-src.zip/py4j/java_gateway.py\", line 1257, in __call__\n", " answer, self.gateway_client, self.target_id, self.name)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 63, in deco\n", " return f(*a, **kw)\n", " File \"/srv/hops/spark/python/lib/py4j-src.zip/py4j/protocol.py\", line 328, in get_return_value\n", " format(target_id, \".\", name), value)\n", "py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n", ": org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-0-154.eu-west-1.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 377, in main\n", " process()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 372, in process\n", " serializer.dump_stream(func(split_index, iterator), outfile)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " [Previous line repeated 1 more time]\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 352, in func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 801, in func\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/maggy/core/trialexecutor.py\", line 149, in _wrapper_fun\n", " retval = map_fun(**parameters)\n", " File \"\", line 15, in train\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 734, in ls\n", " hdfs_path = _expand_path(hdfs_path)\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 157, in _expand_path\n", " raise IOError(\"path %s must be a full hopsfs path or a relative path\" % hdfs_path)\n", "OSError: path s3://sagemakerjim/TRAINING_DATASETS/telco_churn_1/telco_churn must be a full hopsfs path or a relative path\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$class.foreach(Iterator.scala:891)\n", "\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n", "\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n", "\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n", "\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n", "\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n", "\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n", "\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\tat java.lang.Thread.run(Thread.java:748)\n", "\n", "Driver stacktrace:\n", "\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n", "\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n", "\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n", "\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n", "\tat scala.Option.foreach(Option.scala:257)\n", "\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n", "\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n", "\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)\n", "\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n", "\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n", "\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n", "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n", "\tat java.lang.reflect.Method.invoke(Method.java:498)\n", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n", "\tat py4j.Gateway.invoke(Gateway.java:282)\n", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n", "\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n", "\tat java.lang.Thread.run(Thread.java:748)\n", "Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 377, in main\n", " process()\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 372, in process\n", " serializer.dump_stream(func(split_index, iterator), outfile)\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 2499, in pipeline_func\n", " [Previous line repeated 1 more time]\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 352, in func\n", " File \"/srv/hops/spark/python/lib/pyspark.zip/pyspark/rdd.py\", line 801, in func\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/maggy/core/trialexecutor.py\", line 149, in _wrapper_fun\n", " retval = map_fun(**parameters)\n", " File \"\", line 15, in train\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 734, in ls\n", " hdfs_path = _expand_path(hdfs_path)\n", " File \"/srv/hops/anaconda/anaconda/envs/churn/lib/python3.6/site-packages/hops/hdfs.py\", line 157, in _expand_path\n", " raise IOError(\"path %s must be a full hopsfs path or a relative path\" % hdfs_path)\n", "OSError: path s3://sagemakerjim/TRAINING_DATASETS/telco_churn_1/telco_churn must be a full hopsfs path or a relative path\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)\n", "\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$class.foreach(Iterator.scala:891)\n", "\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n", "\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n", "\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n", "\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n", "\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n", "\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n", "\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n", "\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:121)\n", "\tat org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\t... 1 more\n", "\n", "\n" ] } ], "source": [ "from maggy import experiment\n", "result = experiment.lagom(train, \n", " searchspace=sp, \n", " optimizer='randomsearch', \n", " direction='max',\n", " num_trials=10, \n", " name='CHURN',\n", " hb_interval=5, \n", " es_interval=5,\n", " es_min=5\n", " )" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }