---
title: "Windowed aggregations using spark streaming and ingestion to the online feature store"
date: 2021-04-25
type: technical_note
draft: false
---

## Import necessary libraries

In [1]:
import json

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

from hops import kafka, tls, hdfs

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
1,application_1620686564138_0002,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
# name of the kafka topic to read card transactions from
KAFKA_TOPIC_NAME = "credit_card_transactions"

## Create a stream from the kafka topic


In [3]:
df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", KAFKA_TOPIC_NAME) \
  .load()

In [4]:
# Define schema to read from kafka topic 
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', StringType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('amount', StringType(), True)])

In [5]:
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", parse_schema).alias("value"))\
                   .select("value.tid", "value.datetime", "value.cc_num", "value.amount")\
                   .selectExpr("CAST(tid as string)", "CAST(datetime as string)", "CAST(cc_num as long)", "CAST(amount as double)")

In [6]:
df_deser.isStreaming

True

In [7]:
df_deser.printSchema()

root
 |-- tid: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- amount: double (nullable = true)

## Create windowing aggregations over different time windows using spark streaming.

In [8]:
# 10 minute window
windowed10mSignalDF =df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "10 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_10m"), stddev("amount").alias("stdev_amt_per_10m"), count("cc_num").alias("num_trans_per_10m"))\
    .select("cc_num", "num_trans_per_10m", "avg_amt_per_10m", "stdev_amt_per_10m")

In [9]:
windowed10mSignalDF.isStreaming

True

In [10]:
windowed10mSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_10m: long (nullable = false)
 |-- avg_amt_per_10m: double (nullable = true)
 |-- stdev_amt_per_10m: double (nullable = true)

In [11]:
# 1 hour window
windowed1hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "60 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_1h"), stddev("amount").alias("stdev_amt_per_1h"), count("cc_num").alias("num_trans_per_1h"))\
    .select("cc_num", "num_trans_per_1h", "avg_amt_per_1h", "stdev_amt_per_1h")

In [12]:
windowed1hSignalDF.isStreaming

True

In [13]:
windowed1hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_1h: long (nullable = false)
 |-- avg_amt_per_1h: double (nullable = true)
 |-- stdev_amt_per_1h: double (nullable = true)

In [14]:
# 12 hour window
windowed12hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "12 hours"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_12h"), stddev("amount").alias("stdev_amt_per_12h"), count("cc_num").alias("num_trans_per_12h"))\
    .select("cc_num", "num_trans_per_12h", "avg_amt_per_12h", "stdev_amt_per_12h")

In [15]:
windowed12hSignalDF.isStreaming

True

In [16]:
windowed12hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_12h: long (nullable = false)
 |-- avg_amt_per_12h: double (nullable = true)
 |-- stdev_amt_per_12h: double (nullable = true)

### Establish a connection with your Hopsworks feature store.

In [17]:
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

## Get feature groups from hopsworks feature store.

In [18]:
card_transactions = fs.get_feature_group("card_transactions", version = 1)
card_transactions_10m_agg = fs.get_feature_group("card_transactions_10m_agg", version = 1)
card_transactions_1h_agg = fs.get_feature_group("card_transactions_1h_agg", version = 1)
card_transactions_12h_agg = fs.get_feature_group("card_transactions_12h_agg", version = 1)

## Insert streaming dataframes to the online feature group

Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.

In [19]:
query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)



In [20]:
query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)



In [21]:
query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)



### Check if spark streaming query is active

In [22]:
query_10m.isActive

True

In [23]:
query_1h.isActive

True

In [24]:
query_12h.isActive

True

#### We can also check status of a query and if there are any exceptions trown.

In [27]:
query_10m.status

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}

In [28]:
query_10m.exception()

### Lets check if data was ingested in to the online feature store

In [31]:
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).show(20,False)

+----------------+-----------------+------------------+------------------+
|cc_num          |num_trans_per_12h|avg_amt_per_12h   |stdev_amt_per_12h |
+----------------+-----------------+------------------+------------------+
|4226063306212844|5                |1134.8            |2370.804468076185 |
|4467512729899486|9                |205.1577777777778 |303.5732691945133 |
|4232153519700594|12               |129.64416666666668|250.8604352463132 |
|4376360021712050|8                |78.41125          |122.69854702102805|
|4867010117638802|8                |1224.9825         |3033.6029145826396|
|4956860373932956|8                |598.58125         |1242.149174184319 |
|4997591057565538|7                |127.6014285714286 |211.0837646866421 |
|4965123463794391|9                |96.02777777777777 |179.76414115291306|
|4671096685272336|5                |146.296           |199.37470458912284|
|4001837582277998|7                |193.3242857142857 |292.91272988480955|
|4135449811055770|6      

In [32]:
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).count()

100

## Insert data in to offline feature group.
Hopsworks online feature store will store latest avaible value of feature for low latency model serving. However, we also want to store data in to the offline feature store to store historical data.  

In [33]:
def foreach_batch_function_card(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_card = df_deser.writeStream.foreachBatch(foreach_batch_function_card)\
                    .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-card")\
                    .start()    

In [34]:
def foreach_batch_function_10m(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_10m_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_10m = windowed10mSignalDF.writeStream.foreachBatch(foreach_batch_function_10m)\
                              .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-data10m")\
                              .start()    

In [35]:
def foreach_batch_function_1h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_1h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_1h = windowed1hSignalDF.writeStream.foreachBatch(foreach_batch_function_1h)\
                            .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-1h")\
                            .start()

In [36]:
def foreach_batch_function_12h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_12h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_12h = windowed12hSignalDF.writeStream.foreachBatch(foreach_batch_function_12h)\
                              .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-12h")\
                              .start()

### Check if queries are still active

In [45]:
hudi_card.isActive

True

In [46]:
hudi_10m.isActive

True

In [47]:
hudi_1h.isActive

True

In [48]:
hudi_12h.isActive

True

### Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method.

In [43]:
query_10m.stop()
query_1h.stop()
query_12h.stop()

In [44]:
hudi_card.stop()
hudi_10m.stop()
hudi_1h.stop()
hudi_12h.stop()