## Tensorflow 2 Keras example with differential evolution on Hopsworks
---

<font color='red'> <h3>Tested with TensorFlow 2.3.0</h3></font>

<p>
<h1>Machine Learning on <a href="https://github.com/logicalclocks/hopsworks">Hopsworks
</a></h1> 
</p>

![hops.png](../../../images/hops.png)

## The `hops` python module

`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on <a href="https://github.com/logicalclocks/hops-util-py">github</a>.

### Using the `experiment` module

To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.

The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.

An Experiment could be a single Python program, which we refer to as an **Experiment**. 

Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. 

ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.

### Using the `tensorboard` module
The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project's Experiments folder.

The directory could in practice be used to store other data that should be accessible after the experiment is finished.
```python
# Use this module to get the TensorBoard logdir
from hops import tensorboard
tensorboard_logdir = tensorboard.logdir()
```

### Using the `hdfs` module
The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`

```python
# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
```

```python
# Downloading the mnist dataset to the current working directory
from hops import hdfs
mnist_hdfs_path = hdfs.project_path() + "Resources/mnist"
local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)
```

### Documentation
See the following links to learn more about running experiments in Hopsworks

- <a href="https://hopsworks.readthedocs.io/en/latest/hopsml/experiment.html">Learn more about experiments</a>
<br>
- <a href="https://hopsworks.readthedocs.io/en/latest/hopsml/hopsML.html">Building End-To-End pipelines</a>
<br>
- Give us a star, create an issue or a feature request on  <a href="https://github.com/logicalclocks/hopsworks">Hopsworks github</a>

### Managing experiments
Experiments service provides a unified view of all the experiments run using the `experiment` module.
<br>
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.
<br>
<br>
![Image7-Monitor.png](../../../images/experiments.gif)

In [1]:
def tf2_keras_differential_evolution(kernel, pool, dropout):

    import sys
    
    import numpy as np
    import tensorflow as tf
    
    from hops import tensorboard
    from hops import devices
    from hops import hdfs
    
    import pydoop.hdfs as pydoop
    
    data_dir = hdfs.project_path()
    
    epochs=1 
    steps_per_epoch=5
    validation_steps=2          
    batch_size = 32
    shuffle_size = batch_size * 4
    num_classes = 10
    
    # Input image dimensions
    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)
    

    # Provide path to train and validation datasets
    train_filenames = [hdfs.project_path() + "TourData/mnist/train/train.tfrecords"]
    validation_filenames = [hdfs.project_path() + "TourData/mnist/validation/validation.tfrecords"]
    
    
    # Define input function
    def input_fn(filenames, batch_size):
      
    
      def _parser(serialized_example):
            """Parses a single tf.Example into image and label tensors."""
            features = tf.io.parse_single_example(
                serialized_example,
                features={
                    'image_raw': tf.io.FixedLenFeature([], tf.string),
                    'label': tf.io.FixedLenFeature([], tf.int64),
                })
            
            image = tf.io.decode_raw(features['image_raw'], tf.uint8)
            image.set_shape([img_rows * img_cols])
            
            label = features['label']   
                
            return image, label
      
    
      def _normalize_img(image, label):
            """Normalizes images"""
            image = tf.cast(image, tf.float32) / 255
            label = tf.cast(label, tf.int32)        
            return image, label

      def _reshape_img(image, label):
        image = tf.reshape(image, [28, 28, 1])
        return image, label
                
      # Import MNIST data
      dataset = tf.data.TFRecordDataset(filenames)
        
      # Map the parser over dataset, and batch results by up to batch_size
      dataset = dataset.map(_parser, num_parallel_calls=tf.data.experimental.AUTOTUNE)
        
        
      dataset = dataset.map(
        _reshape_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
        
      dataset = dataset.repeat(epochs * steps_per_epoch)
      dataset = dataset.cache()
      dataset = dataset.shuffle(shuffle_size)
      dataset = dataset.batch(batch_size)   
  
      dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

      return dataset
    
    model_dir = tensorboard.logdir()
    print('Using %s to store checkpoints.' % model_dir)
    
    # Define a Keras Model.
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv2D(32, kernel_size=kernel, padding='same',
                        activation='relu',
                         input_shape=input_shape))
    model.add(tf.keras.layers.Conv2D(64, kernel,  padding='same',activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=pool))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Dense(num_classes))

    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(0.001),
        metrics=['accuracy'],
    )

    callbacks = [
        tf.keras.callbacks.TensorBoard(log_dir=model_dir),
        tf.keras.callbacks.ModelCheckpoint(filepath=model_dir),
    ]

    model.fit(input_fn(train_filenames, batch_size), 
        verbose=0, 
        epochs=epochs, 
        steps_per_epoch=steps_per_epoch,
        validation_data=input_fn(validation_filenames, batch_size),
        validation_steps=validation_steps,      
        callbacks=callbacks
    )

    score = model.evaluate(input_fn(validation_filenames, batch_size), steps=1, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    return {'accuracy': score[1], 'loss': score[0]}         

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
26,application_1596813411095_0002,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
from hops import experiment
search_dict = {'kernel': [2,8], 'pool': [2,8], 'dropout': [0.01, 0.99]}
# local_logdir starts the TensorBoard with a logdir on the local filesystem.
# when the job is finished the contents of the logdir will be put automatically in your project
experiment.differential_evolution(tf2_keras_differential_evolution, search_dict, name='tf2 keras mnist diff evo', local_logdir=True, optimization_key='accuracy')

Generation 1 || average metric: 0.3958333333333333, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']

Generation 2 || average metric: 0.4635416666666667, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']

Generation 3 || average metric: 0.46875, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']

Generation 4 || average metric: 0.484375, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']

Finished Experiment 

('hdfs://rpc.namenode.service.consul:8020/Projects/test/Experiments/application_1596813411095_0002_1/generation.1/kernel=4&pool=3&dropout=0.01', {'kernel': 4, 'pool': 3, 'dropout': 0.01}, {'accuracy': 0.6875, 'loss': 11.728448867797852, 'log': 'Experiments/application_1596813411095_0002_1/generation.1/kernel=4&pool=3&dropout=0.01/output.log'})