\n",
"\n",
"\n",
"\n",
"\n",
"## The `hops` python module\n",
"\n",
"`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n",
"\n",
"Have a feature request or encountered an issue? Please let us know on github.\n",
"\n",
"### Using the `experiment` module\n",
"\n",
"To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.\n",
"\n",
"The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.\n",
"\n",
"An Experiment could be a single Python program, which we refer to as an **Experiment**. \n",
"\n",
"Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. \n",
"\n",
"ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.\n",
"\n",
"### Using the `tensorboard` module\n",
"The `tensorboard` module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is `tensorboard.logdir()`, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project's Experiments folder.\n",
"\n",
"The directory could in practice be used to store other data that should be accessible after the experiment is finished.\n",
"```python\n",
"# Use this module to get the TensorBoard logdir\n",
"from hops import tensorboard\n",
"tensorboard_logdir = tensorboard.logdir()\n",
"```\n",
"\n",
"### Using the `hdfs` module\n",
"The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n",
"\n",
"```python\n",
"# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n",
"from hops import hdfs\n",
"project_path = hdfs.project_path()\n",
"```\n",
"\n",
"```python\n",
"# Downloading the mnist dataset to the current working directory\n",
"from hops import hdfs\n",
"mnist_hdfs_path = hdfs.project_path() + \"Resources/mnist\"\n",
"local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)\n",
"```\n",
"\n",
"### Documentation\n",
"See the following links to learn more about running experiments in Hopsworks\n",
"\n",
"- Learn more about experiments\n",
" \n",
"- Building End-To-End pipelines\n",
" \n",
"- Give us a star, create an issue or a feature request on Hopsworks github\n",
"\n",
"### Managing experiments\n",
"Experiments service provides a unified view of all the experiments run using the `experiment` module.\n",
" \n",
"As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.\n",
" \n",
" \n",
""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Finished Experiment \n",
"\n",
"('hdfs://rpc.namenode.service.consul:8020/Projects/demo/Experiments/application_1594231828166_0177_4', {'accuracy': 0.828125, 'log': 'Experiments/application_1594231828166_0177_4/chief_0_output.log'})"
]
}
],
"source": [
"def mirrored_training():\n",
"\n",
" import sys\n",
" \n",
" import numpy as np\n",
" import tensorflow as tf\n",
" \n",
" from hops import tensorboard\n",
" from hops import devices\n",
" from hops import hdfs\n",
" \n",
" import pydoop.hdfs as pydoop\n",
" \n",
" log_dir = tensorboard.logdir()\n",
" \n",
" # Define distribution strategy\n",
" strategy = tf.distribute.MirroredStrategy()\n",
"\n",
" # Define per device batch size\n",
" batch_size_per_replica = 32\n",
" \n",
" # Define global batch size\n",
" batch_size = batch_size_per_replica * strategy.num_replicas_in_sync\n",
" \n",
" # Define model hyper parameters\n",
" epochs = 10\n",
" steps_per_epoch = 5\n",
" validation_steps = 2\n",
" \n",
" kernel = 2\n",
" pool = 2\n",
" dropout = 0.01\n",
"\n",
" num_classes = 10\n",
" \n",
" # Input image dimensions\n",
" img_rows, img_cols = 28, 28\n",
" input_shape = (28, 28, 1)\n",
" \n",
" \n",
" train_filenames = [hdfs.project_path() + \"TourData/mnist/train/train.tfrecords\"]\n",
" validation_filenames = [hdfs.project_path() + \"TourData/mnist/validation/validation.tfrecords\"]\n",
" \n",
" # Create an iterator over the dataset\n",
" def data_input(filenames, batch_size=128, shuffle=False, repeat=None):\n",
"\n",
" def parser(serialized_example):\n",
" \"\"\"Parses a single tf.Example into image and label tensors.\"\"\"\n",
" features = tf.io.parse_single_example(\n",
" serialized_example,\n",
" features={\n",
" 'image_raw': tf.io.FixedLenFeature([], tf.string),\n",
" 'label': tf.io.FixedLenFeature([], tf.int64),\n",
" })\n",
" image = tf.io.decode_raw(features['image_raw'], tf.uint8)\n",
" image.set_shape([28 * 28])\n",
"\n",
" # Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]\n",
" image = tf.cast(image, tf.float32) / 255 - 0.5\n",
" label = tf.cast(features['label'], tf.int32)\n",
" # Reshape the tensor\n",
" image = tf.reshape(image, [img_rows, img_cols, 1])\n",
" \n",
" # Create a one hot array for your labels\n",
" label = tf.one_hot(label, num_classes)\n",
" \n",
" return image, label\n",
"\n",
" # Import MNIST data\n",
" dataset = tf.data.TFRecordDataset(filenames)\n",
"\n",
" # Map the parser over dataset, and batch results by up to batch_size\n",
" dataset = dataset.map(parser)\n",
" if shuffle:\n",
" dataset = dataset.shuffle(buffer_size=128)\n",
" dataset = dataset.batch(batch_size, drop_remainder=True)\n",
" dataset = dataset.repeat(repeat)\n",
" return dataset\n",
" \n",
" # construct model under distribution strategy scope\n",
" with strategy.scope():\n",
" \n",
" model = tf.keras.Sequential()\n",
" model.add(tf.keras.layers.Conv2D(32, kernel_size=(kernel, kernel),\n",
" activation='relu',\n",
" input_shape=input_shape))\n",
" model.add(tf.keras.layers.Conv2D(64, (kernel, kernel), activation='relu'))\n",
" model.add(tf.keras.layers.MaxPooling2D(pool_size=(pool, pool)))\n",
" model.add(tf.keras.layers.Dropout(dropout))\n",
" model.add(tf.keras.layers.Flatten())\n",
" model.add(tf.keras.layers.Dense(128, activation='relu'))\n",
" model.add(tf.keras.layers.Dropout(dropout))\n",
" model.add(tf.keras.layers.Dense(num_classes, activation='softmax'))\n",
"\n",
" opt = tf.keras.optimizers.Adadelta(1.0)\n",
"\n",
" model.compile(loss=tf.keras.losses.categorical_crossentropy,\n",
" optimizer=opt,\n",
" metrics=['accuracy'])\n",
" \n",
" \n",
" callbacks = [\n",
" tf.keras.callbacks.TensorBoard(log_dir=log_dir),\n",
" tf.keras.callbacks.ModelCheckpoint(filepath=log_dir),\n",
" ]\n",
" \n",
" model.fit(data_input(train_filenames, batch_size), \n",
" verbose=0, \n",
" epochs=epochs, \n",
" steps_per_epoch=steps_per_epoch,\n",
" validation_data=data_input(validation_filenames, batch_size),\n",
" validation_steps=validation_steps, \n",
" callbacks=callbacks\n",
" )\n",
" \n",
" score = model.evaluate(data_input(validation_filenames, batch_size), steps=1)\n",
" metrics = {'accuracy': score[1]}\n",
"\n",
" return metrics \n",
"\n",
" \n",
"from hops import experiment\n",
"experiment.mirrored(mirrored_training, name='mnist model', local_logdir=True, metric_key='accuracy')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "python",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 3
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}