# ParameterServerStrategy on Hopsworks
---

<font color='red'> <h3>Tested with TensorFlow 1.15.0</h3></font>

<p>
<h1>Machine Learning on <a href="https://github.com/logicalclocks/hopsworks">Hopsworks
</a></h1> 
</p>

![hops.png](../../images/hops.png)

## The `hops` python module

`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on <a href="https://github.com/logicalclocks/hops-util-py">github</a>.

### Using the `experiment` module

To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.

The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.

An Experiment could be a single Python program, which we refer to as an **Experiment**. 

Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. 

ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.

### Using the `tensorboard` module
The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project's Experiments folder.

The directory could in practice be used to store other data that should be accessible after the experiment is finished.
```python
# Use this module to get the TensorBoard logdir
from hops import tensorboard
tensorboard_logdir = tensorboard.logdir()
```

### Using the `hdfs` module
The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`

```python
# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
```

```python
# Downloading the mnist dataset to the current working directory
from hops import hdfs
mnist_hdfs_path = hdfs.project_path() + "Resources/mnist"
local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)
```

### Documentation
See the following links to learn more about running experiments in Hopsworks

- <a href="https://hopsworks.readthedocs.io/en/latest/hopsml/experiment.html">Learn more about experiments</a>
<br>
- <a href="https://hopsworks.readthedocs.io/en/latest/hopsml/hopsML.html">Building End-To-End pipelines</a>
<br>
- Give us a star, create an issue or a feature request on  <a href="https://github.com/logicalclocks/hopsworks">Hopsworks github</a>

### Managing experiments
Experiments service provides a unified view of all the experiments run using the `experiment` module.
<br>
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.
<br>
<br>
![Image7-Monitor.png](../../images/experiments.gif)

In [None]:
def parameter_server_mnist():

    import tensorflow as tf
    from hops import devices
    from hops import tensorboard
    import os
    import json
    
    # Extract information about cluster
    tf_cluster = json.loads(os.environ["TF_CONFIG"])
    
    # Workers + chief is the number of processes reading from HDFS
    num_workers = len(tf_cluster["cluster"]["worker"]) + 1
    
    # Both chief and workers needs unique task_index for sharding
    task_index = 0
    if tf_cluster["task"]["type"] == 'chief':
        task_index = num_workers - 1
    else:
        task_index = tf_cluster["task"]["index"]
    
    PREDICT = tf.estimator.ModeKeys.PREDICT
    EVAL = tf.estimator.ModeKeys.EVAL
    TRAIN = tf.estimator.ModeKeys.TRAIN
    learning_rate=0.002
    batch_size=128
    training_steps=5000
    
    train_filenames = [hdfs.project_path() + "TourData/mnist/train/train.tfrecords"]
    validation_filenames = [hdfs.project_path() + "TourData/mnist/validation/validation.tfrecords"]

    def build_estimator(config):
        """
        Build the estimator based on the given config and params.
        Args:
            config (RunConfig): RunConfig object that defines how to run the Estimator.
            params (object): hyper-parameters (can be argparse object).
        """
        return tf.estimator.Estimator(
            model_fn=model_fn,
            config=config,
        )


    def model_fn(features, labels, mode):
        """Model function used in the estimator.
        Args:
            features (Tensor): Input features to the model.
            labels (Tensor): Labels tensor for training and evaluation.
            mode (ModeKeys): Specifies if training, evaluation or prediction.
            params (object): hyper-parameters (can be argparse object).
        Returns:
            (EstimatorSpec): Model to be run by Estimator.
        """
        
        features = tf.cast(features, tf.float32)
        # Define model's architecture
        logits = architecture(features, mode)
        class_predictions = tf.argmax(logits, axis=-1)
        # Setup the estimator according to the phase (Train, eval, predict)
        loss = None
        train_op = None
        eval_metric_ops = {}
        predictions = class_predictions
        # Loss will only be tracked during training or evaluation.
        if mode in (TRAIN, EVAL):
            loss = tf.losses.sparse_softmax_cross_entropy(
                labels=tf.cast(labels, tf.int32),
                logits=logits)
        # Training operator only needed during training.
        if mode == TRAIN:
            train_op = get_train_op_fn(loss)
        # Evaluation operator only needed during evaluation
        if mode == EVAL:
            eval_metric_ops = {
                'accuracy': tf.metrics.accuracy(
                    labels=labels,
                    predictions=class_predictions,
                    name='accuracy')
            }
        # Class predictions and probabilities only needed during inference.
        if mode == PREDICT:
            predictions = {
                'classes': class_predictions,
                'probabilities': tf.nn.softmax(logits, name='softmax_tensor')
            }
        return tf.estimator.EstimatorSpec(
            mode=mode,
            predictions=predictions,
            loss=loss,
            train_op=train_op,
            eval_metric_ops=eval_metric_ops
        )


    def architecture(inputs, mode, scope='MnistConvNet'):
        """Return the output operation following the network architecture.
        Args:
            inputs (Tensor): Input Tensor
            mode (ModeKeys): Runtime mode (train, eval, predict)
            scope (str): Name of the scope of the architecture
        Returns:
             Logits output Op for the network.
        """
        with tf.variable_scope(scope):
            inputs = inputs / 255
            input_layer = tf.reshape(inputs, [-1, 28, 28, 1])
            conv1 = tf.layers.conv2d(
                inputs=input_layer,
                filters=20,
                kernel_size=[5, 5],
                padding='valid',
                activation=tf.nn.relu)
            pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
            conv2 = tf.layers.conv2d(
                inputs=pool1,
                filters=40,
                kernel_size=[5, 5],
                padding='valid',
                activation=tf.nn.relu)
            pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
            flatten = tf.reshape(pool2, [-1, 4 * 4 * 40])
            dense1 = tf.layers.dense(inputs=flatten, units=256, activation=tf.nn.relu)
            dropout = tf.layers.dropout(
                inputs=dense1, rate=0.5, training=mode==tf.estimator.ModeKeys.TRAIN)
            dense2 = tf.layers.dense(inputs=dropout, units=10)
            return dense2


    def get_train_op_fn(loss):
        """Get the training Op.
        Args:
             loss (Tensor): Scalar Tensor that represents the loss function.
             params (object): Hyper-parameters (needs to have `learning_rate`)
        Returns:
            Training Op
        """
        optimizer = tf.train.AdamOptimizer(learning_rate)
        train_op = optimizer.minimize(
            loss=loss,
            global_step=tf.train.get_global_step())
        return train_op

    def get_train_inputs(filenames, batch_size):

        def parser(serialized_example):
            """Parses a single tf.Example into image and label tensors."""
            features = tf.parse_single_example(
                serialized_example,
                features={
                    'image_raw': tf.FixedLenFeature([], tf.string),
                    'label': tf.FixedLenFeature([], tf.int64),
                })
            image = tf.decode_raw(features['image_raw'], tf.uint8)
            image.set_shape([28 * 28])

            # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
            image = tf.cast(image, tf.float32) / 255 - 0.5
            label = tf.cast(features['label'], tf.int32)
            return image, label

        
        # Import MNIST data
        dataset = tf.data.TFRecordDataset(filenames)
        
        # Shard Dataset on each worker so it gets unique samples
        dataset.shard(num_workers, task_index)

        # Map the parser over dataset, and batch results by up to batch_size
        dataset = dataset.map(parser)
        dataset = dataset.batch(batch_size)
        return dataset

    def get_eval_inputs(filenames, batch_size):

        def parser(serialized_example):
            """Parses a single tf.Example into image and label tensors."""
            features = tf.parse_single_example(
                serialized_example,
                features={
                    'image_raw': tf.FixedLenFeature([], tf.string),
                    'label': tf.FixedLenFeature([], tf.int64),
                })
            image = tf.decode_raw(features['image_raw'], tf.uint8)
            image.set_shape([28 * 28])

            # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
            image = tf.cast(image, tf.float32) / 255 - 0.5
            label = tf.cast(features['label'], tf.int32)
            return image, label

        
        # Import MNIST data
        dataset = tf.data.TFRecordDataset(filenames)

        # Map the parser over dataset, and batch results by up to batch_size
        dataset = dataset.map(parser)
        dataset = dataset.batch(batch_size)
        return dataset
    

     # Read parameters and input data
    mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
        
    config = tf.estimator.RunConfig(
        experimental_distribute=tf.contrib.distribute.DistributeConfig(
            train_distribute=tf.contrib.distribute.ParameterServerStrategy(
                num_gpus_per_worker=devices.get_num_gpus()),
            eval_distribute=tf.contrib.distribute.MirroredStrategy(
                num_gpus_per_worker=devices.get_num_gpus())),
            model_dir=tensorboard.logdir(),
            save_summary_steps=100,
            log_step_count_steps=100,
            save_checkpoints_steps=500)
        # Setup the Estimator
    model_estimator = build_estimator(config)
    # Setup and start training and validation
    train_spec = tf.estimator.TrainSpec(
         input_fn=lambda: get_train_inputs(train_filenames, batch_size),
         max_steps=training_steps)
    eval_spec = tf.estimator.EvalSpec(
         input_fn=lambda: get_eval_inputs(validation_filenames, batch_size),
         steps=None,
         start_delay_secs=10,  # Start evaluating after 10 sec.
         throttle_secs=30  # Evaluate only every 30 sec
    )
        
    tf.estimator.train_and_evaluate(model_estimator, train_spec, eval_spec)


In [None]:
from hops import experiment
from hops import hdfs

notebook = hdfs.project_path() + "Jupyter/Distributed_Training/parameter_server_strategy/mnist.ipynb"
experiment.parameter_server(parameter_server_mnist,
                  name='mnist estimator', 
                  description='A minimal mnist example with two hidden layers',
                  local_logdir=True)