{ "cells": [ { "cell_type": "markdown", "id": "a986564a", "metadata": {}, "source": [ "---\n", "title: \"Create empty feature groups for Online Feature Store\"\n", "date: 2021-04-25\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "code", "execution_count": 1, "id": "3ef8e3f0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
1application_1619309085643_0002pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "import json\n", "from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType" ] }, { "cell_type": "markdown", "id": "efdd9239", "metadata": {}, "source": [ "# Create empty feature groups \n", "In this demo example we are expecting to recieve data from Kafka topic, read using spark streaming, do streamig aggregations and ingest aggregated data to feature groups. Thus we will create empy feature groups where we will ingest streaming data." ] }, { "cell_type": "markdown", "id": "f330cabe", "metadata": {}, "source": [ "### Define schema for feature groups" ] }, { "cell_type": "code", "execution_count": 2, "id": "8ffcf548", "metadata": {}, "outputs": [], "source": [ "card_schema = StructType([StructField('tid', StringType(), True),\n", " StructField('datetime', StringType(), True),\n", " StructField('cc_num', LongType(), True),\n", " StructField('amount', DoubleType(), True)])\n", "\n", "schema_10m = StructType([StructField('cc_num', LongType(), True),\n", " StructField('num_trans_per_10m', LongType(), True),\n", " StructField('avg_amt_per_10m', DoubleType(), True),\n", " StructField('stdev_amt_per_10m', DoubleType(), True)])\n", "\n", "schema_1h = StructType([StructField('cc_num', LongType(), True),\n", " StructField('num_trans_per_1h', LongType(), True),\n", " StructField('avg_amt_per_1h', DoubleType(), True),\n", " StructField('stdev_amt_per_1h', DoubleType(), True)])\n", "\n", "schema_12h = StructType([StructField('cc_num', LongType(), True),\n", " StructField('num_trans_per_12h', LongType(), True),\n", " StructField('avg_amt_per_12h', DoubleType(), True),\n", " StructField('stdev_amt_per_12h', DoubleType(), True)])" ] }, { "cell_type": "markdown", "id": "d9f70f3b", "metadata": {}, "source": [ "### Create empty spark dataframes " ] }, { "cell_type": "code", "execution_count": 3, "id": "2d9e7ee9", "metadata": {}, "outputs": [], "source": [ "empty_card_df = sqlContext.createDataFrame(sc.emptyRDD(), card_schema)\n", "empty_10m_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_10m)\n", "empty_1h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_1h)\n", "empty_12h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_12h)" ] }, { "cell_type": "markdown", "id": "1df1bc68", "metadata": {}, "source": [ "### Establish a connection with your Hopsworks feature store." ] }, { "cell_type": "code", "execution_count": 4, "id": "334fbb72", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "connection = hsfs.connection()\n", "# get a reference to the feature store, you can access also shared feature stores by providing the feature store name\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "id": "b9925e1a", "metadata": {}, "source": [ "### Create feature group metadata objects and save empty spark dataframes to materialise them in hopsworks feature store.\n", "\n", "Now We will create each feature group and enable online feature store. Since we are plannig to use these feature groups durring online model serving primary key(s) are required to retrieve feature vector from online feature store. " ] }, { "cell_type": "code", "execution_count": 5, "id": "e9dc8f4e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "card_transactions = fs.create_feature_group(\"card_transactions\", \n", " version = 1,\n", " online_enabled=False, \n", " statistics_config=False, \n", " primary_key=[\"tid\"])\n", "\n", "card_transactions.save(empty_card_df)" ] }, { "cell_type": "code", "execution_count": 6, "id": "4cd9d834", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "card_transactions_10m_agg = fs.create_feature_group(\"card_transactions_10m_agg\", \n", " version = 1,\n", " online_enabled=True, \n", " statistics_config=False, \n", " primary_key=[\"cc_num\"])\n", "\n", "card_transactions_10m_agg.save(empty_10m_agg_df)" ] }, { "cell_type": "code", "execution_count": 7, "id": "1b269be0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "card_transactions_1h_agg = fs.create_feature_group(\"card_transactions_1h_agg\", \n", " version = 1,\n", " online_enabled=True, \n", " statistics_config=False, \n", " primary_key=[\"cc_num\"])\n", "\n", "card_transactions_1h_agg.save(empty_1h_agg_df)" ] }, { "cell_type": "code", "execution_count": 8, "id": "4700fb09", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "card_transactions_12h_agg = fs.create_feature_group(\"card_transactions_12h_agg\", \n", " version = 1,\n", " online_enabled=True, \n", " statistics_config=False, \n", " primary_key=[\"cc_num\"])\n", "\n", "card_transactions_12h_agg.save(empty_12h_agg_df)" ] }, { "cell_type": "code", "execution_count": null, "id": "243c7b91", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }