{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "title: \"Time-Travel with Spark\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Time travel operations in Hopsworks Feature Store\n", "\n", "In this notebook we will introduce time travel operations in Hopsworks Feature Store (HSFS). Currently HSFS supports Apache Hudi (http://hudi.apache.org/) a storage abstraction/library for doing **incremental** data ingestion to a Hopsworks Feature Store." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Background" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Motivation\n", "\n", "Traditional ETL typically involves taking a snapshot of a production database and doing a full load into a data lake (typically stored on a distributed file system). Using the snapshot approach for ETL is simple since the snapshot is immutable and can be loaded as an atomic unit into the data lake. However, the con of taking this approach to doing data ingestion is that it is *slow*. Even if just a single record have been updated since the last data ingestion, the entire table has to be re-written. If you are working with Big Data (TB or PB size datasets) then this will introduce significant *data latency* and *wasted resources* (majority of the writes when ingesting the snapshot is redundant as most of the records have not been updated since the last ETL step). \n", "\n", "This motivates the use-case for **incremental** data ingestion. Incremental data ingestion means that only deltas/changelogs since the last ingestion are inserted. With incremental processing, you process data in *mini-batches* and run the spark job frequently. The incremental model makes better use of resources and makes it easier to do complex processing and joins.\n", "\n", "In addition data is rarely immutable in practice. A bank transaction might be reverted, a customer might change his or her home adress, and a customer review might be updated, to give a few examples. This is where Hudi comes into the picture. Hudi stands for `Hadoop Upserts anD Incrementals` and brings two new primitives for data engineering on distributed file systems (in addition to append/read):\n", "\n", "- `Upsert`: the ability to do insertions (appends) and updates efficiently. \n", "- `Incremental reads`: the ability to read datasets incrementally using the notion of \"commits\"." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### How Hopsworks Feature Store time travel operations can be used for ML and Feature Pipelines\n", "\n", "Hudi is integrated in the Hopsworks Feature Store for doing incremental feature computation and for point-in-time correctness and backfilling of feature data.\n", "\n", "![Incremental Feature Engineering](../../images/featurestore_incremental_pull.png \"Incremetal Feature Engineering\")" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
18application_1609813430371_0019sparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "res1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4ee147e4\n" ] } ], "source": [ "spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create HUDI time travel enabled feature group and Bulk Insert Sample Dataset\n", "\n", "For this demo we will use small sample of the Agarwal Generator that is a widely used dataset. It contains the hypothetical data of people applying for a loan. `Rakesh Agrawal, Tomasz Imielinksi, and Arun Swami, \"Database Mining: A Performance Perspective\", IEEE Transactions on Knowledge and Data Engineering, 5(6), December 1993.

`\n", "\n", "##### For simplicity of demo purposes we will split Agarwal dataset into 3 freature groups and manualy create datasets: \n", "* `economy_fg` with customer id, salary, loan, value of house, age of house, commission and type of car features; \n", "* `demographic_fg` with customer id, age, education level, zip code,\n", "* `class_fg` which will contain labels wether loan was approved `class B` or rejected `class A`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Importing necessary libraries " ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "import com.logicalclocks.hsfs._\n", "import com.logicalclocks.hsfs.constructor._\n", "import scala.collection.JavaConversions._\n", "import collection.JavaConverters._\n", "import org.apache.spark.sql.{DataFrame, Row}\n", "import org.apache.spark.sql.catalyst.expressions.GenericRow\n", "import org.apache.spark.sql.types._\n", "import java.sql.Date\n", "import java.sql.Timestamp\n", "connection: com.logicalclocks.hsfs.HopsworksConnection = com.logicalclocks.hsfs.HopsworksConnection@6012fc8c\n", "fs: com.logicalclocks.hsfs.FeatureStore = FeatureStore{id=67, name='demo_fs_meb10000_featurestore', projectId=119, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@6cfc5ded}\n" ] } ], "source": [ "import com.logicalclocks.hsfs._\n", "import com.logicalclocks.hsfs.constructor._\n", "import scala.collection.JavaConversions._\n", "import collection.JavaConverters._\n", "\n", "import org.apache.spark.sql.{ DataFrame, Row }\n", "import org.apache.spark.sql.catalyst.expressions.GenericRow\n", "import org.apache.spark.sql.types._\n", "\n", "import java.sql.Date\n", "import java.sql.Timestamp\n", "\n", "val connection = HopsworksConnection.builder().build();\n", "val fs = connection.getFeatureStore();" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(salary,FloatType,true), StructField(commission,FloatType,true), StructField(car,StringType,true), StructField(hvalue,FloatType,true), StructField(hyears,IntegerType,true), StructField(loan,FloatType,true), StructField(year,IntegerType,true))\n", "demographicFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(age,IntegerType,true), StructField(elevel,StringType,true), StructField(zipcode,StringType,true))\n", "classFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(class,StringType,true), StructField(year,IntegerType,true))\n" ] } ], "source": [ "val economyFgSchema = \n", " scala.collection.immutable.List(\n", " StructField(\"id\", IntegerType, true),\n", " StructField(\"salary\", FloatType, true),\n", " StructField(\"commission\", FloatType, true),\n", " StructField(\"car\", StringType, true), \n", " StructField(\"hvalue\", FloatType, true), \n", " StructField(\"hyears\", IntegerType, true), \n", " StructField(\"loan\", FloatType, true),\n", " StructField(\"year\", IntegerType, true) \n", ")\n", "\n", "val demographicFgSchema = \n", " scala.collection.immutable.List(\n", " StructField(\"id\", IntegerType, true),\n", " StructField(\"age\", IntegerType, true),\n", " StructField(\"elevel\", StringType, true), \n", " StructField(\"zipcode\", StringType, true) \n", ")\n", "\n", "val classFgSchema = \n", " scala.collection.immutable.List(\n", " StructField(\"id\", IntegerType, true),\n", " StructField(\"class\", StringType, true),\n", " StructField(\"year\", IntegerType, true) \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create spark dataframes for each Feature groups" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,110499.73,0.0,car15,235000.0,30,354724.2,2020], [2,140893.77,0.0,car20,135000.0,2,395015.34,2020], [3,119159.65,0.0,car1,145000.0,22,122025.08,2020], [4,20000.0,52593.63,car9,185000.0,30,99629.62,2020])\n", "economyBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]\n" ] } ], "source": [ "val economyBulkInsertData = Seq(\n", " Row(1, 110499.73f, 0.0f, \"car15\", 235000.0f, 30, 354724.18f, 2020),\n", " Row(2, 140893.77f, 0.0f, \"car20\", 135000.0f, 2, 395015.33f, 2020),\n", " Row(3, 119159.65f, 0.0f, \"car1\", 145000.0f, 22, 122025.08f, 2020),\n", " Row(4, 20000.0f, 52593.63f, \"car9\", 185000.0f, 30, 99629.62f, 2020)\n", ")\n", "\n", "val economyBulkInsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(economyBulkInsertData),\n", " StructType(economyFgSchema)\n", ")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "economyBulkInsertDf.show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "demographicBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,54,level3,zipcode5], [2,44,level4,zipcode8], [3,49,level2,zipcode4], [4,56,level0,zipcode2])\n", "demographicBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]\n" ] } ], "source": [ "val demographicBulkInsertData = Seq(\n", " Row(1, 54, \"level3\", \"zipcode5\"),\n", " Row(2, 44, \"level4\", \"zipcode8\"),\n", " Row(3, 49, \"level2\", \"zipcode4\"),\n", " Row(4, 56, \"level0\", \"zipcode2\")\n", ")\n", "\n", "val demographicBulkInsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(demographicBulkInsertData),\n", " StructType(demographicFgSchema)\n", ")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------+--------+\n", "| id|age|elevel| zipcode|\n", "+---+---+------+--------+\n", "| 1| 54|level3|zipcode5|\n", "| 2| 44|level4|zipcode8|\n", "| 3| 49|level2|zipcode4|\n", "| 4| 56|level0|zipcode2|\n", "+---+---+------+--------+\n", "\n" ] } ], "source": [ "demographicBulkInsertDf.show()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "classBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,groupB,2020], [2,groupB,2020], [3,groupB,2020], [4,groupB,2020])\n", "classBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, class: string ... 1 more field]\n" ] } ], "source": [ "val classBulkInsertData = Seq(\n", " Row(1, \"groupB\", 2020),\n", " Row(2, \"groupB\", 2020),\n", " Row(3, \"groupB\", 2020),\n", " Row(4, \"groupB\", 2020)\n", ") \n", "\n", "val classBulkInsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(classBulkInsertData),\n", " StructType(classFgSchema)\n", ")" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+----+\n", "| id| class|year|\n", "+---+------+----+\n", "| 1|groupB|2020|\n", "| 2|groupB|2020|\n", "| 3|groupB|2020|\n", "| 4|groupB|2020|\n", "+---+------+----+\n", "\n" ] } ], "source": [ "classBulkInsertDf.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create feature groups \n", "\n", "Now we will create each feature group and enable time travel format `TimeTravelFormat.HUDI`. In Hopsworks Feature Store `primary` and `partition` keys are required to be privided for HUDI enabled feature groups." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@fe70f03\n" ] } ], "source": [ "val economyFg = (fs.createFeatureGroup()\n", " .name(\"economy_fg\")\n", " .description(\"Hudi Household Economy Feature Group\")\n", " .version(1)\n", " .primaryKeys(Seq(\"id\"))\n", " .partitionKeys(Seq(\"year\"))\n", " .hudiPrecombineKey(\"id\") \n", " .timeTravelFormat(TimeTravelFormat.HUDI)\n", " .onlineEnabled(true)\n", " .build())" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "demographyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@12a910f3\n" ] } ], "source": [ "val demographyFg = (fs.createFeatureGroup()\n", " .name(\"demography_fg\")\n", " .description(\"Hudi Demographic Feature Group\")\n", " .version(1)\n", " .primaryKeys(Seq(\"id\"))\n", " .partitionKeys(Seq(\"zipcode\"))\n", " .timeTravelFormat(TimeTravelFormat.HUDI)\n", " .onlineEnabled(true)\n", " .build())" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "classFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@46b5bec3\n" ] } ], "source": [ "val classFg = (fs.createFeatureGroup()\n", " .name(\"class_fg\")\n", " .description(\"Hudi Class Feature Group\")\n", " .version(1)\n", " .primaryKeys(Seq(\"id\"))\n", " .hudiPrecombineKey(\"year\")\n", " .timeTravelFormat(TimeTravelFormat.HUDI)\n", " .onlineEnabled(true)\n", " .build())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define user provided hudi options \n", "By default, Hudi tends to over-partition input. Recommended shuffle parallelism for `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` is atleast input_data_size/500MB" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "extra_hudi_options: scala.collection.immutable.Map[String,String] = Map(hoodie.insert.shuffle.parallelism -> 1, hoodie.upsert.shuffle.parallelism -> 1, hoodie.parquet.compression.ratio -> 0.5)\n" ] } ], "source": [ "val extra_hudi_options = Map(\"hoodie.bulkinsert.shuffle.parallelism\" -> \"1\",\n", " \"hoodie.insert.shuffle.parallelism\" -> \"1\", \n", " \"hoodie.upsert.shuffle.parallelism\" -> \"1\",\n", " \"hoodie.parquet.compression.ratio\" -> \"0.5\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bulk insert data into the feature group\n", "Since we have not yet saved any data into newly created feature groups we will use Apache hudi terminology and `Bulk Insert` data. In HSFS its just issuing `save` method. " ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "economyFg.save(economyBulkInsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "demographyFg.save(demographicBulkInsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "classFg.save(classBulkInsertDf, extra_hudi_options)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Hopsworks Feature Store Commits\n", "\n", "If you thoroughly followed this demo you probably noticed that Hopsworks Feature Store uses Apache Hudi as its time travel engine. Hudi introduces the notion of `commits` which means that it supports certain properties of traditional databases such as single-table transactions, snapshot isolation, atomic upserts and savepoints for data recovery. If an ingestion fails for some reason, no partial results will be written rather the ingestion will be roll-backed. The commit is implemented using atomic `mv` operation in HDFS. \n", "\n", "Currently, feature groups that we created contain only a single commit each as we've just done a single bulk-insert. Lets explore time line of `economyFg`:" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(1609945991000,{committedOn=20210106151311, rowsUpdated=0, rowsDeleted=0, rowsInserted=4})\n" ] } ], "source": [ "for ((k,v) <- economyFg.commitDetails()){\n", " println (k,v)\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inspect results" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "economyFg.read().show()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------+--------+\n", "|age| id|elevel| zipcode|\n", "+---+---+------+--------+\n", "| 44| 2|level4|zipcode8|\n", "| 56| 4|level0|zipcode2|\n", "| 54| 1|level3|zipcode5|\n", "| 49| 3|level2|zipcode4|\n", "+---+---+------+--------+\n", "\n" ] } ], "source": [ "demographyFg.read().show()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+----+\n", "| id| class|year|\n", "+---+------+----+\n", "| 3|groupB|2020|\n", "| 4|groupB|2020|\n", "| 2|groupB|2020|\n", "| 1|groupB|2020|\n", "+---+------+----+\n", "\n" ] } ], "source": [ "classFg.read().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upsert new data into a Feature Group\n", "\n", "So far we have not done anything time travel special, we simply did a regular bulk-insert of some data into a Hudi enabled feature group. We could have done the same thing using just regular None Hudi enabled Feature group. However now we will look into how we can do upserts, and how Hopsworks Feature store enables us to do this efficiently." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Generate Sample Upserts Data" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyUpsertData: Seq[org.apache.spark.sql.Row] = List([1,120499.73,0.0,car17,205000.0,30,564724.2,2020], [2,160893.77,0.0,car10,179000.0,2,455015.34,2020], [5,93956.32,0.0,car15,135000.0,1,458679.8,2020], [6,41365.43,52809.15,car7,135000.0,19,216839.7,2020], [7,94805.61,0.0,car17,135000.0,23,233216.06,2020])\n", "economyUpsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "val economyUpsertData = Seq(\n", " Row(1, 120499.73f, 0.0f, \"car17\", 205000.0f, 30, 564724.18f, 2020), //update\n", " Row(2, 160893.77f, 0.0f, \"car10\", 179000.0f, 2, 455015.33f, 2020), //update\n", " Row(5, 93956.32f, 0.0f, \"car15\", 135000.0f, 1, 458679.82f, 2020), //insert\n", " Row(6, 41365.43f, 52809.15f, \"car7\", 135000.0f, 19, 216839.71f, 2020), //insert\n", " Row(7, 94805.61f, 0.0f, \"car17\", 135000.0f, 23, 233216.07f, 2020) //insert\n", ")\n", "\n", "val economyUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(economyUpsertData),\n", " StructType(economyFgSchema)\n", ")\n", "\n", "economyUpsertDf.show(5)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "demographicUpsertData: Seq[org.apache.spark.sql.Row] = List([2,44,level1,zipcode8], [5,59,level1,zipcode2], [6,71,level2,zipcode3], [7,32,level1,zipcode2])\n", "demographicUpsertDf: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]\n", "+---+---+------+--------+\n", "| id|age|elevel| zipcode|\n", "+---+---+------+--------+\n", "| 2| 44|level1|zipcode8|\n", "| 5| 59|level1|zipcode2|\n", "| 6| 71|level2|zipcode3|\n", "| 7| 32|level1|zipcode2|\n", "+---+---+------+--------+\n", "\n" ] } ], "source": [ "val demographicUpsertData = Seq(\n", " Row(2, 44, \"level1\", \"zipcode8\"), //update\n", " Row(5, 59, \"level1\", \"zipcode2\"), //insert\n", " Row(6, 71, \"level2\", \"zipcode3\"), //insert\n", " Row(7, 32, \"level1\", \"zipcode2\") //insert\n", ")\n", "\n", "val demographicUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(demographicUpsertData),\n", " StructType(demographicFgSchema)\n", ")\n", "\n", "demographicUpsertDf.show()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "classUpsertData: Seq[org.apache.spark.sql.Row] = List([1,groupA,2020], [5,groupA,2020], [6,groupA,2020], [7,groupA,2020])\n", "classUpsertDf: org.apache.spark.sql.DataFrame = [id: int, class: string ... 1 more field]\n", "+---+------+----+\n", "| id| class|year|\n", "+---+------+----+\n", "| 1|groupA|2020|\n", "| 5|groupA|2020|\n", "| 6|groupA|2020|\n", "| 7|groupA|2020|\n", "+---+------+----+\n", "\n" ] } ], "source": [ "val classUpsertData = Seq(\n", " Row(1, \"groupA\", 2020), //update\n", " Row(5, \"groupA\", 2020), //insert\n", " Row(6, \"groupA\", 2020), //insert\n", " Row(7, \"groupA\", 2020) //insert\n", ") \n", "\n", "val classUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(classUpsertData),\n", " StructType(classFgSchema)\n", ")\n", "\n", "classUpsertDf.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Make the Upsert using Hopsworks Feature Store API\n", "In Hopsworks Feature Store issuing `insert` method on Apache Hudi enabled feature group will by default perform `Upsert` operation which means to either insert a new row, or on the basis of `parimary` and `partition` keys update already existing one." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "economyFg.insert(economyUpsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "demographyFg.insert(demographicUpsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "classFg.insert(classUpsertDf, extra_hudi_options)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect the results\n", "\n", "Notice that although Hudi enabled Feature group stores the old value of the records from the previous commit, when you query it will only return the values of the latest commit." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "economyFg.read().show()" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+------+--------+\n", "|age| id|elevel| zipcode|\n", "+---+---+------+--------+\n", "| 71| 6|level2|zipcode3|\n", "| 44| 2|level1|zipcode8|\n", "| 54| 1|level3|zipcode5|\n", "| 49| 3|level2|zipcode4|\n", "| 56| 4|level0|zipcode2|\n", "| 59| 5|level1|zipcode2|\n", "| 32| 7|level1|zipcode2|\n", "+---+---+------+--------+\n", "\n" ] } ], "source": [ "demographyFg.read().show()" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+----+\n", "| id| class|year|\n", "+---+------+----+\n", "| 1|groupA|2020|\n", "| 4|groupB|2020|\n", "| 2|groupB|2020|\n", "| 3|groupB|2020|\n", "| 5|groupA|2020|\n", "| 6|groupA|2020|\n", "| 7|groupA|2020|\n", "+---+------+----+\n", "\n" ] } ], "source": [ "classFg.read.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect the updated commit timeline of `economyFg`" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(1609945991000,{committedOn=20210106151311, rowsUpdated=0, rowsDeleted=0, rowsInserted=4})\n", "(1609946149000,{committedOn=20210106151549, rowsUpdated=2, rowsDeleted=0, rowsInserted=3})\n" ] } ], "source": [ "for ((k,v) <- economyFg.commitDetails()){\n", " println (k,v)\n", "}" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(1609946224000,{committedOn=20210106151704, rowsUpdated=1, rowsDeleted=0, rowsInserted=3})\n", "(1609946058000,{committedOn=20210106151418, rowsUpdated=0, rowsDeleted=0, rowsInserted=4})\n" ] } ], "source": [ "for ((k,v) <- demographyFg.commitDetails()){\n", " println (k,v)\n", "}" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(1609946089000,{committedOn=20210106151449, rowsUpdated=0, rowsDeleted=0, rowsInserted=4})\n", "(1609946274000,{committedOn=20210106151754, rowsUpdated=1, rowsDeleted=0, rowsInserted=3})\n" ] } ], "source": [ "for ((k,v) <- classFg.commitDetails()){\n", " println (k,v)\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Lets make one more commit to better demostrate time travel capabilities of Hopsworks Feature Store" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyUpsertData: Seq[org.apache.spark.sql.Row] = List([8,64410.62,39884.39,car20,125000.0,6,350707.38,2020], [9,128298.82,0.0,car19,135000.0,12,20768.06,2020], [10,100806.92,0.0,car8,135000.0,6,293106.66,2020])\n", "economyUpsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 8| 64410.62| 39884.39|car20|125000.0| 6|350707.38|2020|\n", "| 9|128298.82| 0.0|car19|135000.0| 12| 20768.06|2020|\n", "| 10|100806.92| 0.0| car8|135000.0| 6|293106.66|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "val economyUpsertData = Seq( \n", " Row(8, 64410.62f, 39884.39f, \"car20\", 125000.0f, 6, 350707.38f, 2020), //insert\n", " Row(9, 128298.82f, 0.0f, \"car19\", 135000.0f, 12, 20768.06f, 2020), //insert\n", " Row(10,100806.92f, 0.0f, \"car8\", 135000.0f, 6, 293106.65f, 2020) //insert \n", " \n", ")\n", "\n", "val economyUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(economyUpsertData),\n", " StructType(economyFgSchema)\n", ")\n", "\n", "economyUpsertDf.show()" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "demographicUpsertData: Seq[org.apache.spark.sql.Row] = List([8,33,level2,zipcode1], [9,32,level1,zipcode3], [10,58,level2,zipcode5])\n", "demographicUpsertDf: org.apache.spark.sql.DataFrame = [id: int, age: int ... 2 more fields]\n", "+---+---+------+--------+\n", "| id|age|elevel| zipcode|\n", "+---+---+------+--------+\n", "| 8| 33|level2|zipcode1|\n", "| 9| 32|level1|zipcode3|\n", "| 10| 58|level2|zipcode5|\n", "+---+---+------+--------+\n", "\n" ] } ], "source": [ "val demographicUpsertData = Seq( \n", " Row(8, 33, \"level2\", \"zipcode1\"), //insert\n", " Row(9, 32, \"level1\", \"zipcode3\"), //insert\n", " Row(10, 58, \"level2\", \"zipcode5\") //insert \n", ")\n", "\n", "val demographicUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(demographicUpsertData),\n", " StructType(demographicFgSchema)\n", ")\n", "\n", "demographicUpsertDf.show()" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "classUpsertData: Seq[org.apache.spark.sql.Row] = List([8,groupA,2020], [9,groupA,2020], [10,groupB,2020])\n", "classUpsertDf: org.apache.spark.sql.DataFrame = [id: int, class: string ... 1 more field]\n", "+---+------+----+\n", "| id| class|year|\n", "+---+------+----+\n", "| 8|groupA|2020|\n", "| 9|groupA|2020|\n", "| 10|groupB|2020|\n", "+---+------+----+\n", "\n" ] } ], "source": [ "val classUpsertData = Seq(\n", " Row(8, \"groupA\", 2020), //insert\n", " Row(9, \"groupA\", 2020), //insert\n", " Row(10, \"groupB\", 2020) //insert \n", ") \n", "\n", "val classUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(classUpsertData),\n", " StructType(classFgSchema)\n", ")\n", "\n", "classUpsertDf.show()" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "economyFg.insert(economyUpsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [], "source": [ "demographyFg.insert(demographicUpsertDf, extra_hudi_options)" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "classFg.insert(classUpsertDf, extra_hudi_options)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Time Travel Queries\n", "When `read` method is issued on `FeatureGroup` object, whithout any aparameters, most recent view of the Feature group will be returned. " ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "| 8| 64410.62| 39884.39|car20|125000.0| 6|350707.38|2020|\n", "| 9|128298.82| 0.0|car19|135000.0| 12| 20768.06|2020|\n", "| 10|100806.92| 0.0| car8|135000.0| 6|293106.66|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "economyFg.read().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using the timeline metadata we can inspect the value of a table at a specific point in time, as well as pull changes incrementally." ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "commitTimeline: java.util.Map[String,java.util.Map[String,String]] = {1609945991000={committedOn=20210106151311, rowsUpdated=0, rowsDeleted=0, rowsInserted=4}, 1609946356000={committedOn=20210106151916, rowsUpdated=0, rowsDeleted=0, rowsInserted=3}, 1609946149000={committedOn=20210106151549, rowsUpdated=2, rowsDeleted=0, rowsInserted=3}}\n", "(1609945991000,{committedOn=20210106151311, rowsUpdated=0, rowsDeleted=0, rowsInserted=4})\n", "(1609946356000,{committedOn=20210106151916, rowsUpdated=0, rowsDeleted=0, rowsInserted=3})\n", "(1609946149000,{committedOn=20210106151549, rowsUpdated=2, rowsDeleted=0, rowsInserted=3})\n" ] } ], "source": [ "val commitTimeline = economyFg.commitDetails()\n", "for ((k,v) <- commitTimeline){\n", " println (k,v)\n", "}" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFgCommitTimestamps: List[String] = List(20210106151311, 20210106151549, 20210106151916)\n" ] } ], "source": [ "val economyFgCommitTimestamps = economyFg.commitDetails().values().map(c => c.get(\"committedOn\")).toList.sorted" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "// pull 1st commit\n", "economyFg.read(economyFgCommitTimestamps(0)).show()" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "// pull 2nd commit\n", "economyFg.read(economyFgCommitTimestamps(1)).show()" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "| 8| 64410.62| 39884.39|car20|125000.0| 6|350707.38|2020|\n", "| 9|128298.82| 0.0|car19|135000.0| 12| 20768.06|2020|\n", "| 10|100806.92| 0.0| car8|135000.0| 6|293106.66|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "// pull 3rd commit\n", "economyFg.read(economyFgCommitTimestamps(2)).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hopsworks Feature Store also provides a method for incremental reads:" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2020|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "// Pull changes that happened between the first and second commits\n", "economyFg.readChanges(economyFgCommitTimestamps(0), economyFgCommitTimestamps(1)).show()" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 8| 64410.62| 39884.39|car20|125000.0| 6|350707.38|2020|\n", "| 9|128298.82| 0.0|car19|135000.0| 12| 20768.06|2020|\n", "| 10|100806.92| 0.0| car8|135000.0| 6|293106.66|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "// Pull changes that happened between the second and third commits \n", "economyFg.readChanges(economyFgCommitTimestamps(1), economyFgCommitTimestamps(2)).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Join Feature groups that correspond to specific point in time\n", "If we are interetsted to join Feature groups all of them correspong to one specific point in time then we can issue `asOf` method on join `Query` object. " ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@5d5f7b53\n", "demographyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@364e0911\n", "classFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@261952cf\n" ] } ], "source": [ "val economyFg = fs.getFeatureGroup(\"economy_fg\")\n", "val demographyFg = fs.getFeatureGroup(\"demography_fg\")\n", "val classFg = fs.getFeatureGroup(\"class_fg\")" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "joined_features: com.logicalclocks.hsfs.constructor.Query = SELECT `fg2`.`hvalue`, `fg2`.`car`, `fg2`.`commission`, `fg2`.`id`, `fg2`.`loan`, `fg2`.`salary`, `fg2`.`hyears`, `fg2`.`year`, `fg0`.`age`, `fg0`.`id`, `fg0`.`elevel`, `fg0`.`zipcode`, `fg1`.`year`, `fg1`.`id`, `fg1`.`class` FROM `fg2` `fg2` INNER JOIN `fg0` `fg0` ON `fg2`.`id` = `fg0`.`id` INNER JOIN `fg1` `fg1` ON `fg2`.`id` = `fg1`.`id`\n" ] } ], "source": [ "val joined_features = ((economyFg.selectAll())\n", " .join(demographyFg.selectAll(), Seq(\"id\"), JoinType.INNER)\n", " .join(classFg.selectAll(), Seq(\"id\"), JoinType.INNER)\n", " .asOf(economyFgCommitTimestamps(2)))" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "| hvalue| car|commission| id| loan| salary|hyears|year|age| id|elevel| zipcode|year| id| class|\n", "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "|205000.0|car17| 0.0| 1| 564724.2|120499.73| 30|2020| 54| 1|level3|zipcode5|2020| 1|groupA|\n", "|135000.0| car7| 52809.15| 6| 216839.7| 41365.43| 19|2020| 71| 6|level2|zipcode3|2020| 6|groupA|\n", "|145000.0| car1| 0.0| 3|122025.08|119159.65| 22|2020| 49| 3|level2|zipcode4|2020| 3|groupB|\n", "|135000.0|car15| 0.0| 5| 458679.8| 93956.32| 1|2020| 59| 5|level1|zipcode2|2020| 5|groupA|\n", "|185000.0| car9| 52593.63| 4| 99629.62| 20000.0| 30|2020| 56| 4|level0|zipcode2|2020| 4|groupB|\n", "|135000.0|car17| 0.0| 7|233216.06| 94805.61| 23|2020| 32| 7|level1|zipcode2|2020| 7|groupA|\n", "|179000.0|car10| 0.0| 2|455015.34|160893.77| 2|2020| 44| 2|level1|zipcode8|2020| 2|groupB|\n", "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "\n" ] } ], "source": [ "joined_features.read().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Join Feature groups that correspond to different points in time\n", "Hopswork Feature store also provides functionality to join Feature groups that correspond to different points in time. " ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFgQuery: com.logicalclocks.hsfs.constructor.Query = SELECT `fg0`.`hvalue`, `fg0`.`car`, `fg0`.`commission`, `fg0`.`id`, `fg0`.`loan`, `fg0`.`salary`, `fg0`.`hyears`, `fg0`.`year` FROM `fg0` `fg0`\n", "demographyTimestamps: List[String] = List(20210106151418, 20210106151704, 20210106152022)\n", "demographyFgQuery: com.logicalclocks.hsfs.constructor.Query = SELECT `fg0`.`age`, `fg0`.`id`, `fg0`.`elevel`, `fg0`.`zipcode` FROM `fg0` `fg0`\n", "classTimestamps: List[String] = List(20210106151449, 20210106151754, 20210106152114)\n", "classFgQuery: com.logicalclocks.hsfs.constructor.Query = SELECT `fg0`.`year`, `fg0`.`id`, `fg0`.`class` FROM `fg0` `fg0`\n" ] } ], "source": [ "val economyFgQuery = economyFg.selectAll().asOf(economyFgCommitTimestamps(2))\n", "\n", "val demographyTimestamps = demographyFg.commitDetails().values().map(c => c.get(\"committedOn\")).toList.sorted\n", "val demographyFgQuery = demographyFg.selectAll().asOf(demographyTimestamps(1))\n", "\n", "\n", "val classTimestamps = classFg.commitDetails().values().map(c => c.get(\"committedOn\")).toList.sorted\n", "val classFgQuery = classFg.selectAll().asOf(classTimestamps(0))" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "joined_features: com.logicalclocks.hsfs.constructor.Query = SELECT `fg2`.`hvalue`, `fg2`.`car`, `fg2`.`commission`, `fg2`.`id`, `fg2`.`loan`, `fg2`.`salary`, `fg2`.`hyears`, `fg2`.`year`, `fg0`.`age`, `fg0`.`id`, `fg0`.`elevel`, `fg0`.`zipcode`, `fg1`.`year`, `fg1`.`id`, `fg1`.`class` FROM `fg2` `fg2` INNER JOIN `fg0` `fg0` ON `fg2`.`id` = `fg0`.`id` INNER JOIN `fg1` `fg1` ON `fg2`.`id` = `fg1`.`id`\n" ] } ], "source": [ "val joined_features = economyFgQuery.join(demographyFgQuery, Seq(\"id\"), JoinType.INNER).join(classFgQuery, Seq(\"id\"), JoinType.INNER)" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "| hvalue| car|commission| id| loan| salary|hyears|year|age| id|elevel| zipcode|year| id| class|\n", "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "|205000.0|car17| 0.0| 1| 564724.2|120499.73| 30|2020| 54| 1|level3|zipcode5|2020| 1|groupB|\n", "|145000.0| car1| 0.0| 3|122025.08|119159.65| 22|2020| 49| 3|level2|zipcode4|2020| 3|groupB|\n", "|185000.0| car9| 52593.63| 4| 99629.62| 20000.0| 30|2020| 56| 4|level0|zipcode2|2020| 4|groupB|\n", "|179000.0|car10| 0.0| 2|455015.34|160893.77| 2|2020| 44| 2|level1|zipcode8|2020| 2|groupB|\n", "+--------+-----+----------+---+---------+---------+------+----+---+---+------+--------+----+---+------+\n", "\n" ] } ], "source": [ "joined_features.read().show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "scala", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }