{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Image Classification with MNIST Using a Petastorm Dataset\n", "\n", "In this notebook we will read a training dataset saved in the Petastorm format in the project's feature store and use that to train a Deep CNN defined in Keras/Tensorflow to classify images of digits in the MNIST dataset.\n", "\n", "This notebook assumes that you have already created the training datasets in the feature store, which you can do by running this notebook: \n", "\n", "[Create Petastorm MNIST Dataset Notebook](PetastormMNIST_CreateDataset.ipynb)\n", "\n", "![Petastorm 6](./../images/petastorm6.png \"Petastorm 6\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Imports" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
4application_1559565096638_0006pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from hops import hdfs, featurestore, tensorboard, experiment\n", "\n", "# IMPORTANT: must import tensorflow before petastorm.tf_utils due to a bug in petastorm\n", "import tensorflow as tf\n", "from tensorflow import keras\n", "from tensorflow.python.keras.callbacks import TensorBoard\n", "import json\n", "import numpy as np\n", "import pydoop\n", "from petastorm import make_reader\n", "from petastorm.tf_utils import tf_tensors, make_petastorm_dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Constants\n", "\n", "In this tutorial we will just use static hyperparameters, you can potentially achieve better accuracy by optimizing the hyperparameters using hyperparameter search ([docs](https://hopsworks.readthedocs.io/en/latest/hopsml/hopsML.html))" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "TRAIN_DATASET_NAME = \"MNIST_train_petastorm\"\n", "TEST_DATASET_NAME = \"MNIST_test_petastorm\"\n", "BATCH_SIZE = 64\n", "SHUFFLE_BUFFER_SIZE = 3*BATCH_SIZE\n", "NUM_EPOCHS = 5\n", "STEPS_PER_EPOCH = 80" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Define The Model" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def create_model():\n", " \"\"\"\n", " Defines a three-layer CNN with batch normalization, dropout and max pooling, relu activation, \n", " and softmax output\n", " \"\"\"\n", " model = keras.Sequential()\n", " model.add(keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(28,28,1))) \n", " model.add(keras.layers.BatchNormalization())\n", " model.add(keras.layers.MaxPooling2D(pool_size=2))\n", " model.add(keras.layers.Dropout(0.3))\n", "\n", " model.add(keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu'))\n", " model.add(keras.layers.BatchNormalization())\n", " model.add(keras.layers.MaxPooling2D(pool_size=2))\n", " model.add(keras.layers.Dropout(0.3))\n", "\n", " model.add(keras.layers.Flatten())\n", " model.add(keras.layers.Dense(128, activation='relu'))\n", " model.add(keras.layers.Dropout(0.5))\n", " model.add(keras.layers.Dense(10, activation='softmax'))\n", " return model" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "_________________________________________________________________\n", "Layer (type) Output Shape Param # \n", "=================================================================\n", "conv2d (Conv2D) (None, 28, 28, 32) 320 \n", "_________________________________________________________________\n", "batch_normalization (BatchNo (None, 28, 28, 32) 128 \n", "_________________________________________________________________\n", "max_pooling2d (MaxPooling2D) (None, 14, 14, 32) 0 \n", "_________________________________________________________________\n", "dropout (Dropout) (None, 14, 14, 32) 0 \n", "_________________________________________________________________\n", "conv2d_1 (Conv2D) (None, 14, 14, 64) 18496 \n", "_________________________________________________________________\n", "batch_normalization_1 (Batch (None, 14, 14, 64) 256 \n", "_________________________________________________________________\n", "max_pooling2d_1 (MaxPooling2 (None, 7, 7, 64) 0 \n", "_________________________________________________________________\n", "dropout_1 (Dropout) (None, 7, 7, 64) 0 \n", "_________________________________________________________________\n", "flatten (Flatten) (None, 3136) 0 \n", "_________________________________________________________________\n", "dense (Dense) (None, 128) 401536 \n", "_________________________________________________________________\n", "dropout_2 (Dropout) (None, 128) 0 \n", "_________________________________________________________________\n", "dense_1 (Dense) (None, 10) 1290 \n", "=================================================================\n", "Total params: 422,026\n", "Trainable params: 421,834\n", "Non-trainable params: 192\n", "_________________________________________________________________" ] } ], "source": [ "create_model().summary()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Define Tensorflow Dataset\n", "\n", "Petastorm datasets can be read directly with tensorflow by using `make_reader` and `make_petastorm_dataset` from the Petastorm library" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def create_tf_dataset(dataset_url, shuffle_buffer_size, batch_size, num_epochs):\n", " \"\"\"\n", " Defines the Tensorflow Dataset Abstraction from the Petastorm Dataset.\n", " One-hot encodes the labels.\n", " \"\"\"\n", " with make_reader(dataset_url, num_epochs=None, hdfs_driver='libhdfs',\n", " workers_count=1, shuffle_row_groups=False) as train_reader:\n", " train_dataset = make_petastorm_dataset(train_reader)\n", " def preprocess(sample):\n", " return sample.image, tf.one_hot(sample.digit, 10)\n", " return train_dataset.map(preprocess).shuffle(shuffle_buffer_size).batch(batch_size).repeat(num_epochs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Put it All Together in a Training Function\n" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "def train_fn():\n", " # get dataset path from the featurestore\n", " train_dataset_path = featurestore.get_training_dataset_path(TRAIN_DATASET_NAME)\n", " # get dataset\n", " dataset = create_tf_dataset(train_dataset_path, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS)\n", " # define model\n", " model = create_model()\n", " # define optimizer\n", " model.compile(loss=keras.losses.categorical_crossentropy,optimizer=keras.optimizers.Adam(),metrics=['accuracy'])\n", " # setup tensorboard\n", " tb_callback = TensorBoard(log_dir=tensorboard.logdir(), histogram_freq=0,write_graph=True, write_images=True)\n", " # setup model checkpointing\n", " model_ckpt_callback = keras.callbacks.ModelCheckpoint(tensorboard.logdir() + '/checkpoint-{epoch}.h5',monitor='acc', verbose=0, save_best_only=True)\n", " callbacks = [tb_callback, model_ckpt_callback]\n", " # train model\n", " history = model.fit(dataset, epochs=NUM_EPOCHS, steps_per_epoch=STEPS_PER_EPOCH, callbacks=callbacks)\n", " # save training history to HDFS\n", " results_path = hdfs.project_path() + \"mnist/mnist_train_results.txt\"\n", " hdfs.dump(json.dumps(history.history), results_path)\n", " # save trained model\n", " model.save(\"mnist_tf_ps.h5\") #Keras can't save to HDFS in the current version so save to local fs first\n", " hdfs.copy_to_hdfs(\"mnist_tf_ps.h5\", hdfs.project_path() + \"mnist\", overwrite=True) # copy from local fs to hdfs\n", " # return latest accuracy\n", " return history.history[\"acc\"][-1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4: Training Experiments \n", "\n", "We can use the experiments service to run our train_fn function and handle things like Tensorboard, logging, versioning etc. While the experiment is running you can monitor it using Tensorboard and the logs, it is explained in the README [here](https://github.com/logicalclocks/hops-util-py)." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Finished Experiment \n", "\n", "'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Experiments/application_1559565096638_0006/launcher/run.2'" ] } ], "source": [ "experiment.launch(train_fn, name=\"mnist_tf_ps\", \n", " description=\"Petastorm MNIST Tensorflow Example\", local_logdir=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5: Plot Training Results\n", "\n", "Inside the `train_fn` function we saved the training history to HDFS, which means we can later read it in %%local mode for plotting." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Training Results From HDFS" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "%%local\n", "import json\n", "from hops import hdfs\n", "import matplotlib.pyplot as plt\n", "from pylab import rcParams\n", "results_path = hdfs.project_path() + \"mnist/mnist_train_results.txt\"\n", "results = json.loads(hdfs.load(results_path))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Plot Loss/Epoch During Training" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%%local\n", "%matplotlib inline\n", "y = results[\"loss\"] #loss\n", "x = list(range(1, len(y)+1))#epoch\n", "plt.title(\"Loss per Epoch - MNIST Training\")\n", "plt.xlabel(\"Epoch\")\n", "plt.ylabel(\"Loss\")\n", "plt.plot(x,y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Plot Accuracy/Epoch During Training" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[]" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "%%local\n", "%matplotlib inline\n", "y = results[\"acc\"] #acc\n", "x = list(range(1, len(y)+1))#epoch\n", "plt.title(\"Accuracy per Epoch - MNIST Training\")\n", "plt.xlabel(\"Epoch\")\n", "plt.ylabel(\"Accuracy\")\n", "plt.plot(x,y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 6: Evaluation Using Trained Model and Test Dataset\n", "\n", "Inside the `train_fn` function we saved the trained model to HDFS in the hdf5 format. We can load the weights of this model and use for serving predictions or for evaluation, in this example we will evaluate the model against the test set. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Model Weights" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "model_path_hdfs = hdfs.project_path() + \"mnist/\" + \"mnist_tf_ps.h5\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In future releases of Tensorflow, Keras will be able to read directly from HDFS, but currently it is not supported. To get around this we can download the hdf5 model in the local file system and load it from there using `model.load_weights()`. " ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Started copying hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/mnist_tf_ps.h5 to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/FatxKv_Lnvnybr5ulVBa6ZBxZaETIhWRCL9ga70hOV8/appcache/application_1559565096638_0006/container_e01_1559565096638_0006_01_000001/\n", "\n", "Finished copying" ] } ], "source": [ "local_path = hdfs.copy_to_local(model_path_hdfs, overwrite=True)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "'/srv/hops/hopsdata/tmp/nm-local-dir/usercache/FatxKv_Lnvnybr5ulVBa6ZBxZaETIhWRCL9ga70hOV8/appcache/application_1559565096638_0006/container_e01_1559565096638_0006_01_000001//mnist_tf_ps.h5'" ] } ], "source": [ "local_path" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "loaded_model = create_model()\n", "loaded_model.compile(loss=keras.losses.categorical_crossentropy,optimizer=keras.optimizers.Adam(),metrics=['accuracy'])\n", "loaded_model.load_weights(local_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluate Loaded Model" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "# We have 10 000 Test examples, we can evaluate in batches of 100 to speed up the process\n", "BATCH_SIZE = 100\n", "NUM_EPOCHS = 100 \n", "# get dataset path from the featurestore\n", "test_dataset_path = featurestore.get_training_dataset_path(TEST_DATASET_NAME)\n", "test_dataset = create_tf_dataset(test_dataset_path, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\r", "1/1 [==============================] - 1s 768ms/step\n", "Test loss: 0.0319652184844017\n", "Test accuracy: 0.9900000095367432" ] } ], "source": [ "score = loaded_model.evaluate(test_dataset, verbose=1, steps=1)\n", "print('Test loss:', score[0])\n", "print('Test accuracy:', score[1])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "ipython3" } }, "nbformat": 4, "nbformat_minor": 2 }