{ "cells": [ { "cell_type": "markdown", "source": [ "---\n", "title: \"Creating a Petastorm dataset from MNIST example\"\n", "date: 2021-05-03\n", "type: technical_note\n", "draft: false\n", "---" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Creating a Petastorm MNIST dataset\n", "In this notebook we are going to create a Petastorm dataset from the famous MNIST dataset. Compared to ImageNette it has the advantage of being easily available through PyTorch. It is also considerably smaller which makes it easier to experiment with." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
181application_1617699042861_0008pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "from hops import hdfs\n", "import numpy as np\n", "from torchvision.datasets import MNIST" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "path = hdfs.project_path() + \"Resources/Petastorm\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Downloading the dataset with torchvision\n", "Torchvision provides a simple interface to download the MNIST dataset. Note that the download prior to version 0.9.1 is broken! If you have issues with this, please upgrade your installation to the latest version. For other workarounds, see [here](https://stackoverflow.com/questions/66577151/http-error-when-trying-to-download-mnist-data)." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz\n", "Failed to download (trying next):\n", "HTTP Error 503: Service Unavailable\n", "\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz\n", "Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw\n", "\n", "Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz\n", "Failed to download (trying next):\n", "HTTP Error 503: Service Unavailable\n", "\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz\n", "Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw\n", "\n", "Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz\n", "Failed to download (trying next):\n", "HTTP Error 503: Service Unavailable\n", "\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz\n", "Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw\n", "\n", "Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz\n", "Failed to download (trying next):\n", "HTTP Error 503: Service Unavailable\n", "\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz\n", "Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz\n", "Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw\n", "\n", "Processing...\n", "Done!\n", "9913344it [00:00, 11845783.99it/s] \n", "29696it [00:00, 357933.48it/s] \n", "1649664it [00:00, 2811158.81it/s] \n", "5120it [00:00, 4901811.57it/s] \n", "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/torchvision/datasets/mnist.py:502: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:143.)\n", " return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)" ] } ], "source": [ "path = hdfs.project_path() + \"DataSets/MNIST\"\n", "train_dataset = MNIST(path, download=True)\n", "test_dataset = MNIST(path, download=True, train=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setting up the petastorm dataset generation\n", "Now that we have our dataset, creating the petastorm dataset is exactly the same as with ImageNette. Note that for distributed training you need an even dataset. If your dataset is not even (meaning that each node sees the same amount of examples) you can increase the number of parquet files in order to allow for a more fine grained distribution among nodes." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec\n", "from petastorm.etl.dataset_metadata import materialize_dataset\n", "from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql.types import IntegerType\n", "\n", "\n", "MNISTSchema = Unischema('ScalarSchema', [\n", " UnischemaField('image', np.uint8, (1,28,28), NdarrayCodec(), False),\n", " UnischemaField('label', np.int8, (), ScalarCodec(IntegerType()), False)])\n", "\n", "def row_generator(idx, dataset):\n", " img, label = dataset[idx]\n", " return {'image': np.expand_dims(np.array(img, dtype=np.uint8), axis=0), 'label': label}\n", "\n", "\n", "def generate_MNIST_dataset(output_url, dataset):\n", " rowgroup_size_mb = 1\n", " rows_count = len(dataset)\n", " parquet_files_count = 100\n", " \n", " sc = spark.sparkContext\n", " # Wrap dataset materialization portion. Will take care of setting up spark environment variables as\n", " # well as save petastorm specific metadata\n", " with materialize_dataset(spark, output_url, MNISTSchema, rowgroup_size_mb):\n", " rows_rdd = sc.parallelize(range(rows_count))\\\n", " .map(lambda x: row_generator(x, dataset))\\\n", " .map(lambda x: dict_to_spark_row(MNISTSchema, x))\n", "\n", " spark.createDataFrame(rows_rdd, MNISTSchema.as_spark_schema()) \\\n", " .repartition(parquet_files_count) \\\n", " .write \\\n", " .mode('overwrite') \\\n", " .parquet(output_url)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generating the dataset\n", "Now that everything is set up, we can define our output paths and generate the datasets." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "train_path = hdfs.project_path() + \"DataSets/MNIST/PetastormMNIST/train_set\"\n", "test_path = hdfs.project_path() + \"DataSets/MNIST/PetastormMNIST/test_set\"" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "generate_MNIST_dataset(train_path, train_dataset)\n", "generate_MNIST_dataset(test_path, test_dataset)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }