{ "cells": [ { "cell_type": "markdown", "id": "18b4fb84", "metadata": {}, "source": [ "---\n", "title: \"3. Windowed aggregations using spark streaming and ingestion to the online feature store\"\n", "date: 2021-04-25\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "id": "52052278", "metadata": {}, "source": [ "## Import necessary libraries" ] }, { "cell_type": "code", "execution_count": 1, "id": "62e29526", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDApplication IDKindStateSpark UIDriver log
18application_1648485762103_0044pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] } ], "source": [ "import json\n", "\n", "from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col\n", "from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType\n", "\n", "from hops import kafka, tls, hdfs" ] }, { "cell_type": "code", "execution_count": 2, "id": "791d222a", "metadata": {}, "outputs": [], "source": [ "# name of the kafka topic to read card transactions from\n", "KAFKA_TOPIC_NAME = \"credit_card_transactions\"" ] }, { "cell_type": "markdown", "id": "db664e63", "metadata": {}, "source": [ "## Create a stream from the kafka topic\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "7a60b596", "metadata": {}, "outputs": [], "source": [ "df_read = spark \\\n", " .readStream \\\n", " .format(\"kafka\") \\\n", " .option(\"kafka.bootstrap.servers\", kafka.get_broker_endpoints()) \\\n", " .option(\"kafka.security.protocol\",kafka.get_security_protocol()) \\\n", " .option(\"kafka.ssl.truststore.location\", tls.get_trust_store()) \\\n", " .option(\"kafka.ssl.truststore.password\", tls.get_key_store_pwd()) \\\n", " .option(\"kafka.ssl.keystore.location\", tls.get_key_store()) \\\n", " .option(\"kafka.ssl.keystore.password\", tls.get_key_store_pwd()) \\\n", " .option(\"kafka.ssl.key.password\", tls.get_trust_store_pwd()) \\\n", " .option(\"kafka.ssl.endpoint.identification.algorithm\", \"\") \\\n", " .option(\"startingOffsets\", \"earliest\")\\\n", " .option(\"subscribe\", KAFKA_TOPIC_NAME) \\\n", " .load()" ] }, { "cell_type": "code", "execution_count": 4, "id": "413c818e", "metadata": {}, "outputs": [], "source": [ "# Define schema to read from kafka topic \n", "parse_schema = StructType([StructField('tid', StringType(), True),\n", " StructField('datetime', StringType(), True),\n", " StructField('cc_num', StringType(), True),\n", " StructField('amount', StringType(), True)])" ] }, { "cell_type": "code", "execution_count": 5, "id": "19f9287b", "metadata": {}, "outputs": [], "source": [ "# Deserialise data from and create streaming query\n", "df_deser = df_read.selectExpr(\"CAST(value AS STRING)\")\\\n", " .select(from_json(\"value\", parse_schema).alias(\"value\"))\\\n", " .select(\"value.tid\", \"value.datetime\", \"value.cc_num\", \"value.amount\")\\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as string)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")" ] }, { "cell_type": "code", "execution_count": 6, "id": "ef6286d8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "df_deser.isStreaming" ] }, { "cell_type": "code", "execution_count": 7, "id": "33e6498c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- tid: string (nullable = true)\n", " |-- datetime: string (nullable = true)\n", " |-- cc_num: long (nullable = true)\n", " |-- amount: double (nullable = true)" ] } ], "source": [ "df_deser.printSchema()" ] }, { "cell_type": "markdown", "id": "56bf8db4", "metadata": {}, "source": [ "## Create windowing aggregations over different time windows using spark streaming." ] }, { "cell_type": "code", "execution_count": 8, "id": "851f3469", "metadata": {}, "outputs": [], "source": [ "# 10 minute window\n", "windowed10mSignalDF =df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"10 minutes\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_10m\"), stddev(\"amount\").alias(\"stdev_amt_per_10m\"), count(\"cc_num\").alias(\"num_trans_per_10m\"))\\\n", " .select(\"cc_num\", \"num_trans_per_10m\", \"avg_amt_per_10m\", \"stdev_amt_per_10m\")" ] }, { "cell_type": "code", "execution_count": 9, "id": "291505a3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed10mSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 10, "id": "9151e68f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_10m: long (nullable = false)\n", " |-- avg_amt_per_10m: double (nullable = true)\n", " |-- stdev_amt_per_10m: double (nullable = true)" ] } ], "source": [ "windowed10mSignalDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 11, "id": "d4b866d8", "metadata": {}, "outputs": [], "source": [ "# 1 hour window\n", "windowed1hSignalDF = \\\n", " df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"60 minutes\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_1h\"), stddev(\"amount\").alias(\"stdev_amt_per_1h\"), count(\"cc_num\").alias(\"num_trans_per_1h\"))\\\n", " .select(\"cc_num\", \"num_trans_per_1h\", \"avg_amt_per_1h\", \"stdev_amt_per_1h\")" ] }, { "cell_type": "code", "execution_count": 12, "id": "0477252b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed1hSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 13, "id": "0454f0f4", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_1h: long (nullable = false)\n", " |-- avg_amt_per_1h: double (nullable = true)\n", " |-- stdev_amt_per_1h: double (nullable = true)" ] } ], "source": [ "windowed1hSignalDF.printSchema()" ] }, { "cell_type": "code", "execution_count": 14, "id": "f268e9a5", "metadata": {}, "outputs": [], "source": [ "# 12 hour window\n", "windowed12hSignalDF = \\\n", " df_deser \\\n", " .selectExpr(\"CAST(tid as string)\", \"CAST(datetime as timestamp)\", \"CAST(cc_num as long)\", \"CAST(amount as double)\")\\\n", " .withWatermark(\"datetime\", \"60 minutes\") \\\n", " .groupBy(window(\"datetime\", \"12 hours\"), \"cc_num\") \\\n", " .agg(avg(\"amount\").alias(\"avg_amt_per_12h\"), stddev(\"amount\").alias(\"stdev_amt_per_12h\"), count(\"cc_num\").alias(\"num_trans_per_12h\"))\\\n", " .select(\"cc_num\", \"num_trans_per_12h\", \"avg_amt_per_12h\", \"stdev_amt_per_12h\")" ] }, { "cell_type": "code", "execution_count": 15, "id": "62de2b0c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "windowed12hSignalDF.isStreaming" ] }, { "cell_type": "code", "execution_count": 16, "id": "3d13fa2b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- cc_num: long (nullable = true)\n", " |-- num_trans_per_12h: long (nullable = false)\n", " |-- avg_amt_per_12h: double (nullable = true)\n", " |-- stdev_amt_per_12h: double (nullable = true)" ] } ], "source": [ "windowed12hSignalDF.printSchema()" ] }, { "cell_type": "markdown", "id": "7eb08ee3", "metadata": {}, "source": [ "### Establish a connection with your Hopsworks feature store." ] }, { "cell_type": "code", "execution_count": 17, "id": "28b5fcdc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "connection = hsfs.connection()\n", "# get a reference to the feature store, you can access also shared feature stores by providing the feature store name\n", "fs = connection.get_feature_store()" ] }, { "cell_type": "markdown", "id": "326cbda0", "metadata": {}, "source": [ "## Get feature groups from hopsworks feature store." ] }, { "cell_type": "code", "execution_count": 18, "id": "8ce8b25f", "metadata": {}, "outputs": [], "source": [ "card_transactions = fs.get_feature_group(\"card_transactions\", version = 1)\n", "card_transactions_10m_agg = fs.get_feature_group(\"card_transactions_10m_agg\", version = 1)\n", "card_transactions_1h_agg = fs.get_feature_group(\"card_transactions_1h_agg\", version = 1)\n", "card_transactions_12h_agg = fs.get_feature_group(\"card_transactions_12h_agg\", version = 1)" ] }, { "cell_type": "markdown", "id": "13835bd5", "metadata": {}, "source": [ "## Insert streaming dataframes to the online feature group\n", "\n", "Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group." ] }, { "cell_type": "code", "execution_count": 19, "id": "14118480", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_10m_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)" ] }, { "cell_type": "code", "execution_count": 20, "id": "50e6abb2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False" ] } ], "source": [ "not card_transactions_10m_agg.online_enabled and not card_transactions_10m_agg.stream" ] }, { "cell_type": "code", "execution_count": 21, "id": "5ae38299", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_1h_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)" ] }, { "cell_type": "code", "execution_count": 22, "id": "2c68ff6c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "StatisticsWarning: Stream ingestion for feature group `card_transactions_12h_agg`, with version `1` will not compute statistics." ] } ], "source": [ "query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)" ] }, { "cell_type": "markdown", "id": "808c5492", "metadata": {}, "source": [ "### Check if spark streaming query is active" ] }, { "cell_type": "code", "execution_count": 23, "id": "033fb191", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_10m.isActive" ] }, { "cell_type": "code", "execution_count": 24, "id": "a86a0469", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_1h.isActive" ] }, { "cell_type": "code", "execution_count": 25, "id": "a8414efd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True" ] } ], "source": [ "query_12h.isActive" ] }, { "cell_type": "markdown", "id": "57efd1be", "metadata": {}, "source": [ "#### We can also check status of a query and if there are any exceptions trown." ] }, { "cell_type": "code", "execution_count": 31, "id": "82866234", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}" ] } ], "source": [ "query_1h.status" ] }, { "cell_type": "code", "execution_count": 32, "id": "e81dd25d", "metadata": {}, "outputs": [], "source": [ "query_1h.exception()" ] }, { "cell_type": "markdown", "id": "4771cc3e", "metadata": {}, "source": [ "### Lets check if data was ingested in to the online feature store" ] }, { "cell_type": "code", "execution_count": 33, "id": "466a0b55", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+-----------------+------------------+------------------+\n", "|cc_num |num_trans_per_12h|avg_amt_per_12h |stdev_amt_per_12h |\n", "+----------------+-----------------+------------------+------------------+\n", "|4444037300542691|7 |154.51857142857145|238.93552871214524|\n", "|4609072304828342|7 |176.02714285714288|263.06316920176454|\n", "|4161715127983823|10 |928.3030000000001 |1809.7934375689888|\n", "|4223253728365626|13 |1201.686153846154 |2724.0564739389993|\n", "|4572259224622748|9 |1291.5500000000002|2495.189283160699 |\n", "|4436298663019939|11 |149.78636363636366|235.75729924109365|\n", "|4159210768503456|6 |37.303333333333335|26.403001092047596|\n", "|4231082502226286|10 |977.8430000000001 |2071.1095165208753|\n", "|4090612125343330|15 |646.7259999999999 |1336.9214811370616|\n", "|4416410688550228|11 |663.0627272727273 |1631.6188600717442|\n", "|4853206196105715|10 |1077.6439999999998|2793.72050986255 |\n", "|4032763187099525|10 |425.006 |1204.4018071612704|\n", "|4811343280984688|6 |237.08166666666662|305.3312340666554 |\n", "|4815447301191763|6 |78.35833333333333 |29.206839895248287|\n", "|4645884898081724|8 |171.48125000000002|200.51632676222223|\n", "|4872287670027309|3 |260.63666666666666|427.3684126527525 |\n", "|4524584153018280|7 |160.18714285714285|223.19407330200963|\n", "|4734811798843814|7 |122.81714285714285|166.56810466135522|\n", "|4609746692923340|9 |1401.2866666666669|3007.164227935514 |\n", "|4526611032294580|6 |109.52166666666666|150.55966530471125|\n", "+----------------+-----------------+------------------+------------------+\n", "only showing top 20 rows" ] } ], "source": [ "fs.sql(\"SELECT * FROM card_transactions_12h_agg_1\",online=True).show(20,False)" ] }, { "cell_type": "code", "execution_count": 34, "id": "5d72f7a8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "100" ] } ], "source": [ "fs.sql(\"SELECT * FROM card_transactions_12h_agg_1\",online=True).count()" ] }, { "cell_type": "markdown", "id": "1ed60609", "metadata": {}, "source": [ "### Stop queries\n", "If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method." ] }, { "cell_type": "code", "execution_count": 43, "id": "86a26cbe", "metadata": {}, "outputs": [], "source": [ "query_10m.stop()\n", "query_1h.stop()\n", "query_12h.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }