In [1]:
# Setup the feature groups for the Flink pipelines
import pandas as pd
import hopsworks
from hsfs.feature import Feature
from datetime import datetime, timedelta, timezone

project = hopsworks.login()
fs = project.get_feature_store()

features = [
    Feature(name="interaction_month", type="string"),
    Feature(name="interaction_id", type="bigint"),
    Feature(name="user_id", type="bigint"),
    Feature(name="video_id", type="bigint"),
    Feature(name="category_id", type="bigint"),
    Feature(name="interaction_type", type="string"),
    Feature(name="watch_time", type="bigint"),
    Feature(name="interaction_date", type="timestamp"),
]

interactions_fg = fs.get_or_create_feature_group(
    name="interactions",
    description="Interactions data.",
    version=1,
    primary_key=["user_id"],
    partition_key=["interaction_month"],
    online_enabled=True,
    event_time="interaction_date"

)

interactions_fg.save(features)


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://35.243.150.76/p/120
Connected. Call `.close()` to terminate connection gracefully.
Feature Group created successfully, explore it at 
https://35.243.150.76/p/120/fs/68/fg/14


(None, None)

In [2]:
import hopsworks

project = hopsworks.login()

# create kafka topic
KAFKA_TOPIC_NAME = "live_interactions"
SCHEMA_NAME = "live_interactions_schema"

kafka_api = project.get_kafka_api()
job_api = project.get_jobs_api()

schema = {
    "type": "record",
    "name": SCHEMA_NAME,
    "namespace": "io.hops.examples.flink.examples",
    "fields": [
        {
            "name": "interaction_id",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "user_id",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "video_id",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "category_id",
            "type": [
                "null",
                "long"
            ]
        },
        {
            "name": "interaction_type",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "watch_time",
            "type": [
                "null",
                "long"
            ]
        },
        {
            "name": "interaction_date",
            "type": [
                "null",
                {
                    "type": "long",
                    "logicalType": "timestamp-micros"
                }
            ]
        },
        {
            "name": "interaction_month",
            "type": [
                "null",
                "string"
            ]
        }
    ]
}

kafka_api.create_schema(SCHEMA_NAME, schema)
kafka_api.create_topic(KAFKA_TOPIC_NAME, SCHEMA_NAME, 1, replicas=1, partitions=16)


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://35.243.150.76/p/120


KafkaTopic('live_interactions')