---
title: "3. Windowed aggregations using spark streaming and ingestion to the online feature store"
date: 2021-04-25
type: technical_note
draft: false
---

## Import necessary libraries

In [1]:
import json

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

from hops import kafka, tls, hdfs

Starting Spark application


ID,Application ID,Kind,State,Spark UI,Driver log
18,application_1648485762103_0044,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
# name of the kafka topic to read card transactions from
KAFKA_TOPIC_NAME = "credit_card_transactions"

## Create a stream from the kafka topic


In [3]:
df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", KAFKA_TOPIC_NAME) \
  .load()

In [4]:
# Define schema to read from kafka topic 
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', StringType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('amount', StringType(), True)])

In [5]:
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", parse_schema).alias("value"))\
                   .select("value.tid", "value.datetime", "value.cc_num", "value.amount")\
                   .selectExpr("CAST(tid as string)", "CAST(datetime as string)", "CAST(cc_num as long)", "CAST(amount as double)")

In [6]:
df_deser.isStreaming

True

In [7]:
df_deser.printSchema()

root
 |-- tid: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- amount: double (nullable = true)

## Create windowing aggregations over different time windows using spark streaming.

In [8]:
# 10 minute window
windowed10mSignalDF =df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "10 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_10m"), stddev("amount").alias("stdev_amt_per_10m"), count("cc_num").alias("num_trans_per_10m"))\
    .select("cc_num", "num_trans_per_10m", "avg_amt_per_10m", "stdev_amt_per_10m")

In [9]:
windowed10mSignalDF.isStreaming

True

In [10]:
windowed10mSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_10m: long (nullable = false)
 |-- avg_amt_per_10m: double (nullable = true)
 |-- stdev_amt_per_10m: double (nullable = true)

In [11]:
# 1 hour window
windowed1hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "60 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_1h"), stddev("amount").alias("stdev_amt_per_1h"), count("cc_num").alias("num_trans_per_1h"))\
    .select("cc_num", "num_trans_per_1h", "avg_amt_per_1h", "stdev_amt_per_1h")

In [12]:
windowed1hSignalDF.isStreaming

True

In [13]:
windowed1hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_1h: long (nullable = false)
 |-- avg_amt_per_1h: double (nullable = true)
 |-- stdev_amt_per_1h: double (nullable = true)

In [14]:
# 12 hour window
windowed12hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "12 hours"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_12h"), stddev("amount").alias("stdev_amt_per_12h"), count("cc_num").alias("num_trans_per_12h"))\
    .select("cc_num", "num_trans_per_12h", "avg_amt_per_12h", "stdev_amt_per_12h")

In [15]:
windowed12hSignalDF.isStreaming

True

In [16]:
windowed12hSignalDF.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_12h: long (nullable = false)
 |-- avg_amt_per_12h: double (nullable = true)
 |-- stdev_amt_per_12h: double (nullable = true)

### Establish a connection with your Hopsworks feature store.

In [17]:
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

## Get feature groups from hopsworks feature store.

In [18]:
card_transactions = fs.get_feature_group("card_transactions", version = 1)
card_transactions_10m_agg = fs.get_feature_group("card_transactions_10m_agg", version = 1)
card_transactions_1h_agg = fs.get_feature_group("card_transactions_1h_agg", version = 1)
card_transactions_12h_agg = fs.get_feature_group("card_transactions_12h_agg", version = 1)

## Insert streaming dataframes to the online feature group

Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.

In [19]:
query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)



In [20]:
not card_transactions_10m_agg.online_enabled and not card_transactions_10m_agg.stream

False

In [21]:
query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)



In [22]:
query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)



### Check if spark streaming query is active

In [23]:
query_10m.isActive

True

In [24]:
query_1h.isActive

True

In [25]:
query_12h.isActive

True

#### We can also check status of a query and if there are any exceptions trown.

In [31]:
query_1h.status

{'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}

In [32]:
query_1h.exception()

### Lets check if data was ingested in to the online feature store

In [33]:
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).show(20,False)

+----------------+-----------------+------------------+------------------+
|cc_num          |num_trans_per_12h|avg_amt_per_12h   |stdev_amt_per_12h |
+----------------+-----------------+------------------+------------------+
|4444037300542691|7                |154.51857142857145|238.93552871214524|
|4609072304828342|7                |176.02714285714288|263.06316920176454|
|4161715127983823|10               |928.3030000000001 |1809.7934375689888|
|4223253728365626|13               |1201.686153846154 |2724.0564739389993|
|4572259224622748|9                |1291.5500000000002|2495.189283160699 |
|4436298663019939|11               |149.78636363636366|235.75729924109365|
|4159210768503456|6                |37.303333333333335|26.403001092047596|
|4231082502226286|10               |977.8430000000001 |2071.1095165208753|
|4090612125343330|15               |646.7259999999999 |1336.9214811370616|
|4416410688550228|11               |663.0627272727273 |1631.6188600717442|
|4853206196105715|10     

In [34]:
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).count()

100

### Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its `.stop()` method.

In [43]:
query_10m.stop()
query_1h.stop()
query_12h.stop()