{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# MultiWorkerMirroredStrategy on Hopsworks\n", "---\n", "\n", "

Tested with TensorFlow 1.15.0

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "

\n", "

Machine Learning on Hopsworks\n", "

\n", "

\n", "\n", "![hops.png](../../images/hops.png)\n", "\n", "## The `hops` python module\n", "\n", "`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n", "\n", "Have a feature request or encountered an issue? Please let us know on github.\n", "\n", "### Using the `experiment` module\n", "\n", "To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.\n", "\n", "The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.\n", "\n", "An Experiment could be a single Python program, which we refer to as an **Experiment**. \n", "\n", "Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. \n", "\n", "ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.\n", "\n", "### Using the `tensorboard` module\n", "The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project's Experiments folder.\n", "\n", "The directory could in practice be used to store other data that should be accessible after the experiment is finished.\n", "```python\n", "# Use this module to get the TensorBoard logdir\n", "from hops import tensorboard\n", "tensorboard_logdir = tensorboard.logdir()\n", "```\n", "\n", "### Using the `hdfs` module\n", "The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n", "\n", "```python\n", "# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n", "from hops import hdfs\n", "project_path = hdfs.project_path()\n", "```\n", "\n", "```python\n", "# Downloading the mnist dataset to the current working directory\n", "from hops import hdfs\n", "mnist_hdfs_path = hdfs.project_path() + \"Resources/mnist\"\n", "local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)\n", "```\n", "\n", "### Documentation\n", "See the following links to learn more about running experiments in Hopsworks\n", "\n", "- Learn more about experiments\n", "
\n", "- Building End-To-End pipelines\n", "
\n", "- Give us a star, create an issue or a feature request on Hopsworks github\n", "\n", "### Managing experiments\n", "Experiments service provides a unified view of all the experiments run using the `experiment` module.\n", "
\n", "As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.\n", "
\n", "
\n", "![Image7-Monitor.png](../../images/experiments.gif)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def mirrored_mnist():\n", "\n", " import tensorflow as tf\n", " from hops import devices\n", " from hops import tensorboard\n", "\n", " PREDICT = tf.estimator.ModeKeys.PREDICT\n", " EVAL = tf.estimator.ModeKeys.EVAL\n", " TRAIN = tf.estimator.ModeKeys.TRAIN\n", " learning_rate=0.002\n", " batch_size=128\n", " training_steps=5000\n", " \n", " train_filenames = [hdfs.project_path() + \"TourData/mnist/train/train.tfrecords\"]\n", " validation_filenames = [hdfs.project_path() + \"TourData/mnist/validation/validation.tfrecords\"]\n", "\n", " def build_estimator(config):\n", " \"\"\"\n", " Build the estimator based on the given config and params.\n", " Args:\n", " config (RunConfig): RunConfig object that defines how to run the Estimator.\n", " params (object): hyper-parameters (can be argparse object).\n", " \"\"\"\n", " return tf.estimator.Estimator(\n", " model_fn=model_fn,\n", " config=config,\n", " )\n", "\n", "\n", " def model_fn(features, labels, mode):\n", " \"\"\"Model function used in the estimator.\n", " Args:\n", " features (Tensor): Input features to the model.\n", " labels (Tensor): Labels tensor for training and evaluation.\n", " mode (ModeKeys): Specifies if training, evaluation or prediction.\n", " params (object): hyper-parameters (can be argparse object).\n", " Returns:\n", " (EstimatorSpec): Model to be run by Estimator.\n", " \"\"\"\n", " \n", " features = tf.cast(features, tf.float32)\n", " # Define model's architecture\n", " logits = architecture(features, mode)\n", " class_predictions = tf.argmax(logits, axis=-1)\n", " # Setup the estimator according to the phase (Train, eval, predict)\n", " loss = None\n", " train_op = None\n", " eval_metric_ops = {}\n", " predictions = class_predictions\n", " # Loss will only be tracked during training or evaluation.\n", " if mode in (TRAIN, EVAL):\n", " loss = tf.losses.sparse_softmax_cross_entropy(\n", " labels=tf.cast(labels, tf.int32),\n", " logits=logits)\n", " # Training operator only needed during training.\n", " if mode == TRAIN:\n", " train_op = get_train_op_fn(loss)\n", " # Evaluation operator only needed during evaluation\n", " if mode == EVAL:\n", " eval_metric_ops = {\n", " 'accuracy': tf.metrics.accuracy(\n", " labels=labels,\n", " predictions=class_predictions,\n", " name='accuracy')\n", " }\n", " # Class predictions and probabilities only needed during inference.\n", " if mode == PREDICT:\n", " predictions = {\n", " 'classes': class_predictions,\n", " 'probabilities': tf.nn.softmax(logits, name='softmax_tensor')\n", " }\n", " return tf.estimator.EstimatorSpec(\n", " mode=mode,\n", " predictions=predictions,\n", " loss=loss,\n", " train_op=train_op,\n", " eval_metric_ops=eval_metric_ops\n", " )\n", "\n", "\n", " def architecture(inputs, mode, scope='MnistConvNet'):\n", " \"\"\"Return the output operation following the network architecture.\n", " Args:\n", " inputs (Tensor): Input Tensor\n", " mode (ModeKeys): Runtime mode (train, eval, predict)\n", " scope (str): Name of the scope of the architecture\n", " Returns:\n", " Logits output Op for the network.\n", " \"\"\"\n", " with tf.variable_scope(scope):\n", " inputs = inputs / 255\n", " input_layer = tf.reshape(inputs, [-1, 28, 28, 1])\n", " conv1 = tf.layers.conv2d(\n", " inputs=input_layer,\n", " filters=20,\n", " kernel_size=[5, 5],\n", " padding='valid',\n", " activation=tf.nn.relu)\n", " pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)\n", " conv2 = tf.layers.conv2d(\n", " inputs=pool1,\n", " filters=40,\n", " kernel_size=[5, 5],\n", " padding='valid',\n", " activation=tf.nn.relu)\n", " pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)\n", " flatten = tf.reshape(pool2, [-1, 4 * 4 * 40])\n", " dense1 = tf.layers.dense(inputs=flatten, units=256, activation=tf.nn.relu)\n", " dropout = tf.layers.dropout(\n", " inputs=dense1, rate=0.5, training=mode==tf.estimator.ModeKeys.TRAIN)\n", " dense2 = tf.layers.dense(inputs=dropout, units=10)\n", " return dense2\n", "\n", "\n", " def get_train_op_fn(loss):\n", " \"\"\"Get the training Op.\n", " Args:\n", " loss (Tensor): Scalar Tensor that represents the loss function.\n", " params (object): Hyper-parameters (needs to have `learning_rate`)\n", " Returns:\n", " Training Op\n", " \"\"\"\n", " optimizer = tf.train.AdamOptimizer(learning_rate)\n", " train_op = optimizer.minimize(\n", " loss=loss,\n", " global_step=tf.train.get_global_step())\n", " return train_op\n", "\n", " def get_train_inputs(filenames, batch_size):\n", "\n", " def parser(serialized_example):\n", " \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n", " features = tf.parse_single_example(\n", " serialized_example,\n", " features={\n", " 'image_raw': tf.FixedLenFeature([], tf.string),\n", " 'label': tf.FixedLenFeature([], tf.int64),\n", " })\n", " image = tf.decode_raw(features['image_raw'], tf.uint8)\n", " image.set_shape([28 * 28])\n", "\n", " # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n", " image = tf.cast(image, tf.float32) / 255 - 0.5\n", " label = tf.cast(features['label'], tf.int32)\n", " return image, label\n", "\n", " \n", " # Import MNIST data\n", " dataset = tf.data.TFRecordDataset(filenames)\n", " \n", " # Map the parser over dataset, and batch results by up to batch_size\n", " dataset = dataset.map(parser)\n", " dataset = dataset.batch(batch_size)\n", " return dataset\n", "\n", " def get_eval_inputs(filenames, batch_size):\n", "\n", " def parser(serialized_example):\n", " \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n", " features = tf.parse_single_example(\n", " serialized_example,\n", " features={\n", " 'image_raw': tf.FixedLenFeature([], tf.string),\n", " 'label': tf.FixedLenFeature([], tf.int64),\n", " })\n", " image = tf.decode_raw(features['image_raw'], tf.uint8)\n", " image.set_shape([28 * 28])\n", "\n", " # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n", " image = tf.cast(image, tf.float32) / 255 - 0.5\n", " label = tf.cast(features['label'], tf.int32)\n", " return image, label\n", "\n", " \n", " # Import MNIST data\n", " dataset = tf.data.TFRecordDataset(filenames)\n", "\n", " # Map the parser over dataset, and batch results by up to batch_size\n", " dataset = dataset.map(parser)\n", " dataset = dataset.batch(batch_size)\n", " return dataset\n", " \n", "\n", " # Read parameters and input data\n", " mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()\n", " \n", " config = tf.estimator.RunConfig(\n", " train_distribute=tf.distribute.experimental.MultiWorkerMirroredStrategy(),\n", " model_dir=tensorboard.logdir(),\n", " save_summary_steps=100,\n", " log_step_count_steps=100,\n", " save_checkpoints_steps=500)\n", " # Setup the Estimator\n", " model_estimator = build_estimator(config)\n", " # Setup and start training and validation\n", " train_spec = tf.estimator.TrainSpec(\n", " input_fn=lambda: get_train_inputs(train_filenames, batch_size),\n", " max_steps=training_steps)\n", " eval_spec = tf.estimator.EvalSpec(\n", " input_fn=lambda: get_eval_inputs(validation_filenames, batch_size),\n", " steps=None,\n", " start_delay_secs=10, # Start evaluating after 10 sec.\n", " throttle_secs=30 # Evaluate only every 30 sec\n", " )\n", " \n", " tf.estimator.train_and_evaluate(model_estimator, train_spec, eval_spec)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from hops import experiment\n", "from hops import hdfs\n", "\n", "experiment.mirrored(mirrored_mnist,\n", " name='mnist estimator', \n", " description='A minimal mnist example with two hidden layers', local_logdir=True)" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }