{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Tensorflow 2 Keras example with differential evolution on Hopsworks\n", "---\n", "\n", "

Tested with TensorFlow 2.3.0

\n", "\n", "

\n", "

Machine Learning on Hopsworks\n", "

\n", "

\n", "\n", "![hops.png](../../../images/hops.png)\n", "\n", "## The `hops` python module\n", "\n", "`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n", "\n", "Have a feature request or encountered an issue? Please let us know on github.\n", "\n", "### Using the `experiment` module\n", "\n", "To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.\n", "\n", "The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.\n", "\n", "An Experiment could be a single Python program, which we refer to as an **Experiment**. \n", "\n", "Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. \n", "\n", "ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.\n", "\n", "### Using the `tensorboard` module\n", "The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project's Experiments folder.\n", "\n", "The directory could in practice be used to store other data that should be accessible after the experiment is finished.\n", "```python\n", "# Use this module to get the TensorBoard logdir\n", "from hops import tensorboard\n", "tensorboard_logdir = tensorboard.logdir()\n", "```\n", "\n", "### Using the `hdfs` module\n", "The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n", "\n", "```python\n", "# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n", "from hops import hdfs\n", "project_path = hdfs.project_path()\n", "```\n", "\n", "```python\n", "# Downloading the mnist dataset to the current working directory\n", "from hops import hdfs\n", "mnist_hdfs_path = hdfs.project_path() + \"Resources/mnist\"\n", "local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)\n", "```\n", "\n", "### Documentation\n", "See the following links to learn more about running experiments in Hopsworks\n", "\n", "- Learn more about experiments\n", "
\n", "- Building End-To-End pipelines\n", "
\n", "- Give us a star, create an issue or a feature request on Hopsworks github\n", "\n", "### Managing experiments\n", "Experiments service provides a unified view of all the experiments run using the `experiment` module.\n", "
\n", "As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.\n", "
\n", "
\n", "![Image7-Monitor.png](../../../images/experiments.gif)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
26application_1596813411095_0002pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "def tf2_keras_differential_evolution(kernel, pool, dropout):\n", "\n", " import sys\n", " \n", " import numpy as np\n", " import tensorflow as tf\n", " \n", " from hops import tensorboard\n", " from hops import devices\n", " from hops import hdfs\n", " \n", " import pydoop.hdfs as pydoop\n", " \n", " data_dir = hdfs.project_path()\n", " \n", " epochs=1 \n", " steps_per_epoch=5\n", " validation_steps=2 \n", " batch_size = 32\n", " shuffle_size = batch_size * 4\n", " num_classes = 10\n", " \n", " # Input image dimensions\n", " img_rows, img_cols = 28, 28\n", " input_shape = (img_rows, img_cols, 1)\n", " \n", "\n", " # Provide path to train and validation datasets\n", " train_filenames = [hdfs.project_path() + \"TourData/mnist/train/train.tfrecords\"]\n", " validation_filenames = [hdfs.project_path() + \"TourData/mnist/validation/validation.tfrecords\"]\n", " \n", " \n", " # Define input function\n", " def input_fn(filenames, batch_size):\n", " \n", " \n", " def _parser(serialized_example):\n", " \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n", " features = tf.io.parse_single_example(\n", " serialized_example,\n", " features={\n", " 'image_raw': tf.io.FixedLenFeature([], tf.string),\n", " 'label': tf.io.FixedLenFeature([], tf.int64),\n", " })\n", " \n", " image = tf.io.decode_raw(features['image_raw'], tf.uint8)\n", " image.set_shape([img_rows * img_cols])\n", " \n", " label = features['label'] \n", " \n", " return image, label\n", " \n", " \n", " def _normalize_img(image, label):\n", " \"\"\"Normalizes images\"\"\"\n", " image = tf.cast(image, tf.float32) / 255\n", " label = tf.cast(label, tf.int32) \n", " return image, label\n", "\n", " def _reshape_img(image, label):\n", " image = tf.reshape(image, [28, 28, 1])\n", " return image, label\n", " \n", " # Import MNIST data\n", " dataset = tf.data.TFRecordDataset(filenames)\n", " \n", " # Map the parser over dataset, and batch results by up to batch_size\n", " dataset = dataset.map(_parser, num_parallel_calls=tf.data.experimental.AUTOTUNE)\n", " \n", " \n", " dataset = dataset.map(\n", " _reshape_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)\n", " \n", " dataset = dataset.repeat(epochs * steps_per_epoch)\n", " dataset = dataset.cache()\n", " dataset = dataset.shuffle(shuffle_size)\n", " dataset = dataset.batch(batch_size) \n", " \n", " dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)\n", "\n", " return dataset\n", " \n", " model_dir = tensorboard.logdir()\n", " print('Using %s to store checkpoints.' % model_dir)\n", " \n", " # Define a Keras Model.\n", " model = tf.keras.Sequential()\n", " model.add(tf.keras.layers.Conv2D(32, kernel_size=kernel, padding='same',\n", " activation='relu',\n", " input_shape=input_shape))\n", " model.add(tf.keras.layers.Conv2D(64, kernel, padding='same',activation='relu'))\n", " model.add(tf.keras.layers.MaxPooling2D(pool_size=pool))\n", " model.add(tf.keras.layers.Dropout(dropout))\n", " model.add(tf.keras.layers.Flatten())\n", " model.add(tf.keras.layers.Dense(128, activation='relu'))\n", " model.add(tf.keras.layers.Dropout(dropout))\n", " model.add(tf.keras.layers.Dense(num_classes))\n", "\n", " model.compile(\n", " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " optimizer=tf.keras.optimizers.Adam(0.001),\n", " metrics=['accuracy'],\n", " )\n", "\n", " callbacks = [\n", " tf.keras.callbacks.TensorBoard(log_dir=model_dir),\n", " tf.keras.callbacks.ModelCheckpoint(filepath=model_dir),\n", " ]\n", "\n", " model.fit(input_fn(train_filenames, batch_size), \n", " verbose=0, \n", " epochs=epochs, \n", " steps_per_epoch=steps_per_epoch,\n", " validation_data=input_fn(validation_filenames, batch_size),\n", " validation_steps=validation_steps, \n", " callbacks=callbacks\n", " )\n", "\n", " score = model.evaluate(input_fn(validation_filenames, batch_size), steps=1, verbose=0)\n", " print('Test loss:', score[0])\n", " print('Test accuracy:', score[1])\n", " return {'accuracy': score[1], 'loss': score[0]} " ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Generation 1 || average metric: 0.3958333333333333, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']\n", "\n", "Generation 2 || average metric: 0.4635416666666667, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']\n", "\n", "Generation 3 || average metric: 0.46875, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']\n", "\n", "Generation 4 || average metric: 0.484375, best metric: 0.6875, best parameter combination: ['kernel=4', 'pool=3', 'dropout=0.01']\n", "\n", "Finished Experiment \n", "\n", "('hdfs://rpc.namenode.service.consul:8020/Projects/test/Experiments/application_1596813411095_0002_1/generation.1/kernel=4&pool=3&dropout=0.01', {'kernel': 4, 'pool': 3, 'dropout': 0.01}, {'accuracy': 0.6875, 'loss': 11.728448867797852, 'log': 'Experiments/application_1596813411095_0002_1/generation.1/kernel=4&pool=3&dropout=0.01/output.log'})" ] } ], "source": [ "from hops import experiment\n", "search_dict = {'kernel': [2,8], 'pool': [2,8], 'dropout': [0.01, 0.99]}\n", "# local_logdir starts the TensorBoard with a logdir on the local filesystem.\n", "# when the job is finished the contents of the logdir will be put automatically in your project\n", "experiment.differential_evolution(tf2_keras_differential_evolution, search_dict, name='tf2 keras mnist diff evo', local_logdir=True, optimization_key='accuracy')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }