{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Titanic Dataset with the Feature Store\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Titanic Dataset for the Feature Store" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook prepares the Titanic dataset to be used with the feature store.\n", "\n", "The Titanic dataset contains information about the passengers of the famous Titanic ship. The training and test data come in form of two CSV files, which can be downloaded from the Titanic Competition page on [Kaggle](https://www.kaggle.com/c/titanic/data).\n", "\n", "Download the `train.csv` and `test.csv` files, and upload them to the `Resources` folder of your Hopsworks Project. If you prefer doing things using GUIs, then you can find the `Resources` by opening the **Data Sets** tab on the left menu bar.\n", "\n", "Once you have the two files uploaded on the `Resources` folder, you can proceed with the rest of the notebook." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
0application_1614293057610_0001pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "import tensorflow as tf\n", "from hops import hdfs\n", "from pyspark.sql import functions as F\n", "import hsfs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's begin by reading the training data into a Spark DataFrame:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# read the training data csv into a Spark DataFrame\n", "\n", "training_csv_path = hdfs.project_path() + 'Resources/' + 'train.csv'\n", "raw_df = spark.read.csv(training_csv_path, header=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can do some simple preprocessing. Rather than registering the whole dataset with the Feature Store, we just select a few of the columns, and cast all columns to `int`. Since the values of the `sex` column are either `male` or `female`, we also convert them to `0` or `1`, respectively. We also fill the missing values of the `age` column with `30`." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# simple preprocessing:\n", "# 1 - selecting a few of the columns\n", "# 2 - Filling the missing 'age' values with 30\n", "# 3 - changing sex to 0 or 1\n", "# 4 - casting all columns to int\n", "\n", "clean_train_df = raw_df.select('survived', 'pclass', 'sex', 'fare', 'age', 'sibsp', 'parch') \\\n", " .fillna({'age': 30}) \\\n", " .withColumn('sex',\n", " F.when(F.col('sex')=='male', 0)\n", " .otherwise(1))\\\n", " .withColumn('survived',\n", " F.col('survived').cast('int')) \\\n", " .withColumn('pclass',\n", " F.col('pclass').cast('int')) \\\n", " .withColumn('fare',\n", " F.col('fare').cast('int')) \\\n", " .withColumn('age',\n", " F.col('age').cast('int')) \\\n", " .withColumn('sibsp',\n", " F.col('sibsp').cast('int')) \\\n", " .withColumn('parch',\n", " F.col('parch').cast('int'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see how our \"clean\" dataframe looks like now:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+------+---+----+---+-----+-----+\n", "|survived|pclass|sex|fare|age|sibsp|parch|\n", "+--------+------+---+----+---+-----+-----+\n", "| 0| 3| 0| 7| 22| 1| 0|\n", "| 1| 1| 1| 71| 38| 1| 0|\n", "| 1| 3| 1| 7| 26| 0| 0|\n", "| 1| 1| 1| 53| 35| 1| 0|\n", "| 0| 3| 0| 8| 35| 0| 0|\n", "| 0| 3| 0| 8| 30| 0| 0|\n", "| 0| 1| 0| 51| 54| 0| 0|\n", "| 0| 3| 0| 21| 2| 3| 1|\n", "| 1| 3| 1| 11| 27| 0| 2|\n", "| 1| 2| 1| 30| 14| 1| 0|\n", "| 1| 3| 1| 16| 4| 1| 1|\n", "| 1| 1| 1| 26| 58| 0| 0|\n", "| 0| 3| 0| 8| 20| 0| 0|\n", "| 0| 3| 0| 31| 39| 1| 5|\n", "| 0| 3| 1| 7| 14| 0| 0|\n", "| 1| 2| 1| 16| 55| 0| 0|\n", "| 0| 3| 0| 29| 2| 4| 1|\n", "| 1| 2| 0| 13| 30| 0| 0|\n", "| 0| 3| 1| 18| 31| 1| 0|\n", "| 1| 3| 1| 7| 30| 0| 0|\n", "+--------+------+---+----+---+-----+-----+\n", "only showing top 20 rows" ] } ], "source": [ "clean_train_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The next step would be to create a *feature group* from our clean dataframe, so as to register it with the Project's Feature Store:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "connection = hsfs.connection()\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "# create a feature group from the training data DataFrame\n", "titantic_fg = fs.create_feature_group(name=\"titanic_training_all_features\",\n", " version=1,\n", " description=\"titanic training dataset with clean features\",\n", " time_travel_format=None,\n", " statistics_config={\"enabled\": True, \"histograms\": True, \"correlations\": True})\n", "titantic_fg.save(clean_train_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can forget about our previous \"clean\" dataframe that we read directly from the CSV file, and retrieve the training dataframe from the feature store:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# retrieve dataframe from feature store\n", "titanic_df = fs.get_feature_group('titanic_training_all_features',version=1)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+---+----+--------+------+---+-----+\n", "|sibsp|sex|fare|survived|pclass|age|parch|\n", "+-----+---+----+--------+------+---+-----+\n", "| 1| 0| 7| 0| 3| 22| 0|\n", "| 1| 1| 71| 1| 1| 38| 0|\n", "| 0| 1| 7| 1| 3| 26| 0|\n", "| 1| 1| 53| 1| 1| 35| 0|\n", "+-----+---+----+--------+------+---+-----+\n", "only showing top 4 rows" ] } ], "source": [ "titanic_df.show(4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we create a *training dataset* from the feature group. This is a very simple task using the Feature Store API. You can provide a name, and the data format for the dataset. For now, let's stick with `tfrecord`, TensorFlow's own file format." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "td = fs.create_training_dataset(name=\"titanic_train_dataset\",\n", " description=\"Dataset to train Titantic survival model\",\n", " data_format=\"tfrecord\",\n", " version=1)\n", "td.save(titanic_df.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Done! you can now use the titanic training data in your Projects!" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }