{ "cells": [ { "cell_type": "markdown", "id": "158cf7c4", "metadata": {}, "source": [ "---\n", "title: \"3. Windowed aggregations using spark streaming and ingestion to the online feature store\"\n", "date: 2021-04-25\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "id": "3f623b8d", "metadata": {}, "source": [ "## Import necessary libraries" ] }, { "cell_type": "code", "execution_count": 1, "id": "c0e74a65", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
1application_1620686564138_0002pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "import json\n", "\n", "from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col\n", "from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType\n", "\n", "from hops import kafka, tls, hdfs" ] }, { "cell_type": "code", "execution_count": 2, "id": "f7d32ea3", "metadata": {}, "outputs": [], "source": [ "# name of the kafka topic to read card transactions from\n", "KAFKA_TOPIC_NAME = \"credit_card_transactions\"" ] }, { "cell_type": "markdown", "id": "98131074", "metadata": {}, "source": [ "## Create a stream from the kafka topic\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "5d5c4653", "metadata": {}, "outputs": [], "source": [ "df_read = spark \\\n", " .readStream \\\n", " .format(\"kafka\") \\\n", " .option(\"kafka.bootstrap.servers\", kafka.get_broker_endpoints()) \\\n", " .option(\"kafka.security.protocol\",kafka.get_security_protocol()) \\\n", " .option(\"kafka.ssl.truststore.location\", tls.get_trust_store()) \\\n", " .option(\"kafka.ssl.truststore.password\", tls.get_key_store_pwd()) \\\n", " .option(\"kafka.ssl.keystore.location\", tls.get_key_store()) \\\n", " .option(\"kafka.ssl.keystore.password\", tls.get_key_store_pwd()) \\\n", " .option(\"kafka.ssl.key.password\", tls.get_trust_store_pwd()) \\\n", " .option(\"kafka.ssl.endpoint.identification.algorithm\", \"\") \\\n", " .option(\"startingOffsets\", \"earliest\")\\\n", " .option(\"subscribe\", KAFKA_TOPIC_NAME) \\\n", " .load()" ] }, { "cell_type": "code", "execution_count": 4, "id": "ac139428", "metadata": {}, "outputs": [], "source": [ "# Define schema to read from kafka topic \n", "parse_schema = StructType([StructField('tid', StringType(), True),\n", " StructField('datetime', StringType(), True),\n", " StructField('cc_num', StringType(), True),\n", " StructField('amount', StringType(), True)])" ] }, { "cell_type": "code", "execution_count": 5, "id": "a1a46500", "metadata": {}, "outputs": [], "source": [ "# Deserialise data from and create streaming query\n", "df_deser = df_read.selectExpr(\"CAST(value AS STRING)\")\\\n", " .select(from_json(\"value\", parse_schema).alias(\"value\"))\\\n", " .select(\"value.tid\", \"value.datetime\", \"value.cc_num\", \"value.amount\")\\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as string)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")" ] }, { "cell_type": "code", "execution_count": 6, "id": "974d5d78", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "df_deser.isStreaming" ] }, { "cell_type": "code", "execution_count": 7, "id": "fc8fb713", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- tid: string (nullable = true)\n", " |-- datetime: string (nullable = true)\n", " |-- cc_num: long (nullable = true)\n", " |-- amount: double (nullable = true)" ] } ], "source": [ "df_deser.printSchema()" ] }, { "cell_type": "markdown", "id": "e8569b9e", "metadata": {}, "source": [ "## Create windowing aggregations over different time windows using spark streaming." ] }, { "cell_type": "code", "execution_count": 8, "id": "cd25fb4c", "metadata": {}, "outputs": [], "source": [ "# 10 minute window\n", "windowed10mSignalDF =df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"10 minutes\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_10m\"), stddev(\"amount\").alias(\"stdev_amt_per_10m\"), count(\"cc_num\").alias(\"num_trans_per_10m\"))\\\n", " .select(\"cc_num\", \"num_trans_per_10m\", \"avg_amt_per_10m\", \"stdev_amt_per_10m\")" ] }, { "cell_type": "code", "execution_count": 9, "id": "c17039c1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed10mSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 10, "id": "0d24ae78", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_10m: long (nullable = false)\n", " |-- avg_amt_per_10m: double (nullable = true)\n", " |-- stdev_amt_per_10m: double (nullable = true)" ] } ], "source": [ "windowed10mSignalDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 11, "id": "2f17da8d", "metadata": {}, "outputs": [], "source": [ "# 1 hour window\n", "windowed1hSignalDF = \\\n", " df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"60 minutes\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_1h\"), stddev(\"amount\").alias(\"stdev_amt_per_1h\"), count(\"cc_num\").alias(\"num_trans_per_1h\"))\\\n", " .select(\"cc_num\", \"num_trans_per_1h\", \"avg_amt_per_1h\", \"stdev_amt_per_1h\")" ] }, { "cell_type": "code", "execution_count": 12, "id": "d4e79d45", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed1hSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 13, "id": "f75ee2b0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_1h: long (nullable = false)\n", " |-- avg_amt_per_1h: double (nullable = true)\n", " |-- stdev_amt_per_1h: double (nullable = true)" ] } ], "source": [ "windowed1hSignalDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 14, "id": "cac528ba", "metadata": {}, "outputs": [], "source": [ "# 12 hour window\n", "windowed12hSignalDF = \\\n", " df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"12 hours\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_12h\"), stddev(\"amount\").alias(\"stdev_amt_per_12h\"), count(\"cc_num\").alias(\"num_trans_per_12h\"))\\\n", " .select(\"cc_num\", \"num_trans_per_12h\", \"avg_amt_per_12h\", \"stdev_amt_per_12h\")" ] }, { "cell_type": "code", "execution_count": 15, "id": "0f3885e7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed12hSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 16, "id": "67215fe2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_12h: long (nullable = false)\n", " |-- avg_amt_per_12h: double (nullable = true)\n", " |-- stdev_amt_per_12h: double (nullable = true)" ] } ], "source": [ "windowed12hSignalDF.printSchema()" ] }, { "cell_type": "markdown", "id": "b3ec05c8", "metadata": {}, "source": [ "### Establish a connection with your Hopsworks feature store." ] }, { "cell_type": "code", "execution_count": 17, "id": "3a8b8bec", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "connection = hsfs.connection()\n", "# get a reference to the feature store, you can access also shared feature stores by providing the feature store name\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "id": "bcde7ec5", "metadata": {}, "source": [ "## Get feature groups from hopsworks feature store." ] }, { "cell_type": "code", "execution_count": 18, "id": "6c7fe2bc", "metadata": {}, "outputs": [], "source": [ "card_transactions = fs.get_feature_group(\"card_transactions\", version = 1)\n", "card_transactions_10m_agg = fs.get_feature_group(\"card_transactions_10m_agg\", version = 1)\n", "card_transactions_1h_agg = fs.get_feature_group(\"card_transactions_1h_agg\", version = 1)\n", "card_transactions_12h_agg = fs.get_feature_group(\"card_transactions_12h_agg\", version = 1)" ] }, { "cell_type": "markdown", "id": "c53a8d30", "metadata": {}, "source": [ "## Insert streaming dataframes to the online feature group\n", "\n", "Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group." ] }, { "cell_type": "code", "execution_count": 19, "id": "c46f1c20", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_10m_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)" ] }, { "cell_type": "code", "execution_count": 20, "id": "00df46c5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_1h_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)" ] }, { "cell_type": "code", "execution_count": 21, "id": "146a5566", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_12h_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)" ] }, { "cell_type": "markdown", "id": "73e0f3d9", "metadata": {}, "source": [ "### Check if spark streaming query is active" ] }, { "cell_type": "code", "execution_count": 22, "id": "385e30e6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_10m.isActive" ] }, { "cell_type": "code", "execution_count": 23, "id": "781683d3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_1h.isActive" ] }, { "cell_type": "code", "execution_count": 24, "id": "ad3f8ed2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_12h.isActive" ] }, { "cell_type": "markdown", "id": "83583bae", "metadata": {}, "source": [ "#### We can also check status of a query and if there are any exceptions trown." ] }, { "cell_type": "code", "execution_count": 27, "id": "06d36130", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}" ] } ], "source": [ "query_10m.status" ] }, { "cell_type": "code", "execution_count": 28, "id": "b79635f3", "metadata": {}, "outputs": [], "source": [ "query_10m.exception()" ] }, { "cell_type": "markdown", "id": "f9ffde32", "metadata": {}, "source": [ "### Lets check if data was ingested in to the online feature store" ] }, { "cell_type": "code", "execution_count": 31, "id": "da596366", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+-----------------+------------------+------------------+\n", "|cc_num |num_trans_per_12h|avg_amt_per_12h |stdev_amt_per_12h |\n", "+----------------+-----------------+------------------+------------------+\n", "|4226063306212844|5 |1134.8 |2370.804468076185 |\n", "|4467512729899486|9 |205.1577777777778 |303.5732691945133 |\n", "|4232153519700594|12 |129.64416666666668|250.8604352463132 |\n", "|4376360021712050|8 |78.41125 |122.69854702102805|\n", "|4867010117638802|8 |1224.9825 |3033.6029145826396|\n", "|4956860373932956|8 |598.58125 |1242.149174184319 |\n", "|4997591057565538|7 |127.6014285714286 |211.0837646866421 |\n", "|4965123463794391|9 |96.02777777777777 |179.76414115291306|\n", "|4671096685272336|5 |146.296 |199.37470458912284|\n", "|4001837582277998|7 |193.3242857142857 |292.91272988480955|\n", "|4135449811055770|6 |1212.2166666666667|1507.44335556155 |\n", "|4136262720215016|11 |811.387272727273 |2476.740706473291 |\n", "|4208317936968510|7 |267.33285714285716|330.5734197176318 |\n", "|4893308344742860|8 |290.56 |333.44267401930256|\n", "|4213741526478791|5 |137.602 |185.38636659689945|\n", "|4260567335033291|9 |329.4222222222222 |306.2573898609541 |\n", "|4683617042712171|8 |32.47 |32.10560031254005 |\n", "|4802174255861762|9 |709.7955555555556 |1933.2421975602997|\n", "|4925013053127624|11 |855.5918181818183 |1638.8087694714 |\n", "|4830349319515887|12 |975.3983333333332 |2249.727648986684 |\n", "+----------------+-----------------+------------------+------------------+\n", "only showing top 20 rows" ] } ], "source": [ "fs.sql(\"SELECT * FROM card_transactions_12h_agg_1\",online=True).show(20,False)" ] }, { "cell_type": "code", "execution_count": 32, "id": "f9d58f4a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "100" ] } ], "source": [ "fs.sql(\"SELECT * FROM card_transactions_12h_agg_1\",online=True).count()" ] }, { "cell_type": "markdown", "id": "7918c363", "metadata": {}, "source": [ "## Insert data in to offline feature group.\n", "Hopsworks online feature store will store latest avaible value of feature for low latency model serving. However, we also want to store data in to the offline feature store to store historical data. " ] }, { "cell_type": "code", "execution_count": 33, "id": "84addd50", "metadata": {}, "outputs": [], "source": [ "def foreach_batch_function_card(batchDF, epoch_id):\n", " batchDF.persist()\n", " print(epoch_id)\n", " extra_hudi_options = {\n", " \"hoodie.bulkinsert.shuffle.parallelism\":\"1\", \n", " \"hoodie.insert.shuffle.parallelism\":\"1\", \n", " \"hoodie.upsert.shuffle.parallelism\":\"1\",\n", " \"hoodie.parquet.compression.ratio\":\"0.5\"\n", " }\n", " # Transform and write batchDF\n", " card_transactions.insert(batchDF,write_options=extra_hudi_options, storage=\"offline\")\n", " batchDF.unpersist()\n", "\n", "hudi_card = df_deser.writeStream.foreachBatch(foreach_batch_function_card)\\\n", " .option(\"checkpointLocation\", hdfs.project_path() + \"/Resources/checkpoint-card\")\\\n", " .start() " ] }, { "cell_type": "code", "execution_count": 34, "id": "e30bfb38", "metadata": {}, "outputs": [], "source": [ "def foreach_batch_function_10m(batchDF, epoch_id):\n", " batchDF.persist()\n", " print(epoch_id)\n", " extra_hudi_options = {\n", " \"hoodie.bulkinsert.shuffle.parallelism\":\"1\", \n", " \"hoodie.insert.shuffle.parallelism\":\"1\", \n", " \"hoodie.upsert.shuffle.parallelism\":\"1\",\n", " \"hoodie.parquet.compression.ratio\":\"0.5\"\n", " }\n", " # Transform and write batchDF\n", " card_transactions_10m_agg.insert(batchDF,write_options=extra_hudi_options, storage=\"offline\")\n", " batchDF.unpersist()\n", "\n", "hudi_10m = windowed10mSignalDF.writeStream.foreachBatch(foreach_batch_function_10m)\\\n", " .option(\"checkpointLocation\", hdfs.project_path() + \"/Resources/checkpoint-data10m\")\\\n", " .start() " ] }, { "cell_type": "code", "execution_count": 35, "id": "b9f0b4f3", "metadata": {}, "outputs": [], "source": [ "def foreach_batch_function_1h(batchDF, epoch_id):\n", " batchDF.persist()\n", " print(epoch_id)\n", " extra_hudi_options = {\n", " \"hoodie.bulkinsert.shuffle.parallelism\":\"1\", \n", " \"hoodie.insert.shuffle.parallelism\":\"1\", \n", " \"hoodie.upsert.shuffle.parallelism\":\"1\",\n", " \"hoodie.parquet.compression.ratio\":\"0.5\"\n", " }\n", " # Transform and write batchDF\n", " card_transactions_1h_agg.insert(batchDF,write_options=extra_hudi_options, storage=\"offline\")\n", " batchDF.unpersist()\n", "\n", "hudi_1h = windowed1hSignalDF.writeStream.foreachBatch(foreach_batch_function_1h)\\\n", " .option(\"checkpointLocation\", hdfs.project_path() + \"/Resources/checkpoint-1h\")\\\n", " .start()" ] }, { "cell_type": "code", "execution_count": 36, "id": "9ae34a9d", "metadata": {}, "outputs": [], "source": [ "def foreach_batch_function_12h(batchDF, epoch_id):\n", " batchDF.persist()\n", " print(epoch_id)\n", " extra_hudi_options = {\n", " \"hoodie.bulkinsert.shuffle.parallelism\":\"1\", \n", " \"hoodie.insert.shuffle.parallelism\":\"1\", \n", " \"hoodie.upsert.shuffle.parallelism\":\"1\",\n", " \"hoodie.parquet.compression.ratio\":\"0.5\"\n", " }\n", " # Transform and write batchDF\n", " card_transactions_12h_agg.insert(batchDF,write_options=extra_hudi_options, storage=\"offline\")\n", " batchDF.unpersist()\n", "\n", "hudi_12h = windowed12hSignalDF.writeStream.foreachBatch(foreach_batch_function_12h)\\\n", " .option(\"checkpointLocation\", hdfs.project_path() + \"/Resources/checkpoint-12h\")\\\n", " .start()" ] }, { "cell_type": "markdown", "id": "f5123cb6", "metadata": {}, "source": [ "### Check if queries are still active" ] }, { "cell_type": "code", "execution_count": 45, "id": "eec17344", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "hudi_card.isActive" ] }, { "cell_type": "code", "execution_count": 46, "id": "fb6fb102", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "hudi_10m.isActive" ] }, { "cell_type": "code", "execution_count": 47, "id": "94c18663", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "hudi_1h.isActive" ] }, { "cell_type": "code", "execution_count": 48, "id": "76dbf96d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "hudi_12h.isActive" ] }, { "cell_type": "markdown", "id": "dac57bf0", "metadata": {}, "source": [ "### Stop queries\n", "If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method." ] }, { "cell_type": "code", "execution_count": 43, "id": "c8ac956e", "metadata": {}, "outputs": [], "source": [ "query_10m.stop()\n", "query_1h.stop()\n", "query_12h.stop()" ] }, { "cell_type": "code", "execution_count": 44, "id": "7400dcbd", "metadata": {}, "outputs": [], "source": [ "hudi_card.stop()\n", "hudi_10m.stop()\n", "hudi_1h.stop()\n", "hudi_12h.stop()" ] }, { "cell_type": "code", "execution_count": null, "id": "c17368b4", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }