{ "cells": [ { "cell_type": "raw", "metadata": {}, "source": [ "---\n", "title: \"Data Validation with Python\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Validation with the Hopsworks Feature Store\n", "\n", "In this notebook we introduce feature validation operations with the Hopsworks Feature Store and its client API, hsfs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Background" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Motivation\n", "\n", "Data ingested into the Feature Store form the basis for the data fed as input to algorithms that develope machine learning models. The Feature store is a place where curated feature data is stored, therefore it is important that this data is validated against different rules to it adheres to business requirements. \n", "\n", "For example, ingested features might be expected to never be empty or to lie within a certain range, for example a feature `age` should always be a non-negative number.\n", "\n", "The Hopsworks Feature Store provides users with an API to create `Expectations` on ingested feature data by utilizing the `Deequ` https://github.com/awslabs/deequ open source library. Feature validation is part of the HSFS Java/Scala and Python API for working with Feature Groups. Users work with the abstractions:\n", "\n", "- Rules: A set of validation rules applied on a Spark/PySpark dataframe that is inserted into a Feature Group. \n", "- Expectations: A set of rules that is applied on a set of features as provided by the user. Expecations are created at the feature store level and can be attached to multiple feature groups.\n", "- Validations: The results of expectations against the ingested dataframe are assigned a ValidationTime and are persisted within the Feature Store. Users can then retrieve validation results by validation time and by commit time for time-travel enabled feature groups.\n", "\n", "Feature Validation is disabled by default, by having the `validation_type` feature group attribute set to `NONE`. The list of allowed validation types are:\n", "- STRICT: Data validation is performed and feature group is updated only if validation status is \"Success\"\n", "- WARNING: Data validation is performed and feature group is updated only if validation status is \"Warning\" or lower\n", "- ALL: Data validation is performed and feature group is updated only if validation status is \"Failure\" or lower\n", "- NONE: Data validation not performed on feature group" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create time travel enabled feature group and Bulk Insert Sample Dataset\n", "\n", "For this demo we will use small sample of the Agarwal Generator that is a widely used dataset. It contains the hypothetical data of people applying for a loan. `Rakesh Agrawal, Tomasz Imielinksi, and Arun Swami, \"Database Mining: A Performance Perspective\", IEEE Transactions on Knowledge and Data Engineering, 5(6), December 1993.

`\n", "\n", "##### For simplicity of demo purposes we split Agarwal dataset into 3 freature groups and demostrate feature validaton on the economy_fg feature group: \n", "* `economy_fg` with customer id, salary, loan, value of house, age of house, commission and type of car features; " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Importing necessary libraries " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
6application_1612535100309_0042pysparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "Connected. Call `.close()` to terminate connection gracefully." ] } ], "source": [ "import hsfs\n", "from hsfs.rule import Rule\n", "import datetime\n", "from pyspark.sql import DataFrame, Row\n", "from pyspark.sql.types import *\n", "from pyspark.sql.functions import unix_timestamp, from_unixtime\n", "\n", "connection = hsfs.connection()\n", "# get a reference to the feature store, you can access also shared feature stores by providing the feature store name\n", "fs = connection.get_feature_store();" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "economy_fg_schema = StructType([\n", " StructField(\"id\", IntegerType(), True),\n", " StructField(\"salary\", FloatType(), True),\n", " StructField(\"commission\", FloatType(), True),\n", " StructField(\"car\", StringType(), True), \n", " StructField(\"hvalue\", FloatType(), True), \n", " StructField(\"hyears\", IntegerType(), True), \n", " StructField(\"loan\", FloatType(), True),\n", " StructField(\"year\", IntegerType(), True) \n", "])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create spark dataframes for each Feature groups" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "economy_bulk_insert_data = [\n", " Row(1, 110499.73, 0.0, \"car15\", 235000.0, 30, 354724.18, 2020),\n", " Row(2, 140893.77, 0.0, \"car20\", 135000.0, 2, 395015.33, 2020),\n", " Row(3, 119159.65, 0.0, \"car1\", 145000.0, 22, 122025.08, 2020),\n", " Row(4, 20000.0, 52593.63, \"car9\", 185000.0, 30, 99629.62, 2020) \n", "]\n", "\n", "economy_bulk_insert_df = spark.createDataFrame(economy_bulk_insert_data, economy_fg_schema)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+" ] } ], "source": [ "economy_bulk_insert_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Validation\n", "\n", "The next sections shows you how to create feature store expectations, attach them to feature groups, and apply them to dataframes being appended to the feature group. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover data validation rules supported in Hopsworks\n", "Hopsworks comes shipped with a set of data validation rules. These rules are **immutable**, uniquely identified by **name** and are available across all feature stores. These rules are used to create feature store expectations which can then be attached to feature groups." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'name': 'HAS_SIZE', 'predicate': 'VALUE', 'valueType': 'Integral', 'description': 'A rule that asserts the number of rows of the dataframe'}\n", "{'name': 'HAS_MEAN', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': 'A rule that asserts on the mean of the feature'}\n", "{'name': 'HAS_DATATYPE', 'predicate': 'ACCEPTED_TYPE', 'valueType': 'String', 'description': ''}\n", "{'name': 'HAS_SUM', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': 'A rule that asserts on the sum of the feature'}\n", "{'name': 'IS_LESS_THAN', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'IS_GREATER_THAN_OR_EQUAL_TO', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_STANDARD_DEVIATION', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_PATTERN', 'predicate': 'PATTERN', 'valueType': 'String', 'description': ''}\n", "{'name': 'HAS_NUMBER_OF_DISTINCT_VALUES', 'predicate': 'VALUE', 'valueType': 'Integral', 'description': ''}\n", "{'name': 'HAS_MAX', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': 'A rule that asserts on the max of the feature'}\n", "{'name': 'IS_CONTAINED_IN', 'predicate': 'LEGAL_VALUES', 'valueType': 'String', 'description': ''}\n", "{'name': 'IS_NON_NEGATIVE', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'IS_POSITIVE', 'predicate': 'VALUE', 'valueType': 'Boolean', 'description': ''}\n", "{'name': 'HAS_MUTUAL_INFORMATION', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_UNIQUE_VALUE_RATIO', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'IS_GREATER_THAN', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_COMPLETENESS', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_ENTROPY', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_MIN', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': 'A rule that asserts on the min of the feature'}\n", "{'name': 'HAS_UNIQUENESS', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_DISTINCTNESS', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_CORRELATION', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_APPROX_QUANTILE', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'HAS_APPROX_COUNT_DISTINCT', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': ''}\n", "{'name': 'IS_LESS_THAN_OR_EQUAL_TO', 'predicate': 'LEGAL_VALUES', 'valueType': 'Fractional', 'description': ''}\n", "[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]" ] } ], "source": [ "# Get all rule definitions available in Hopsworks\n", "rules = connection.get_rules()\n", "[print(rule.to_dict()) for rule in rules]" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'name': 'HAS_MAX', 'predicate': 'VALUE', 'valueType': 'Fractional', 'description': 'A rule that asserts on the max of the feature'}" ] } ], "source": [ "# Get a rule definition by name\n", "rule_max = connection.get_rule(\"HAS_MAX\")\n", "print(rule_max.to_dict())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Expectations based on Hopsworks rules\n", "\n", "Expectations are created at the feature store level. Multiple expectations can be created per feature store.\n", "\n", "An expectation is comprised from one or multiple rules and can refer to one or multiple features. An expectation can be utilized by attaching it to a feature group, as shown in the next sections" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "expectation.rules[0].to_dict(){'name': 'HAS_MIN', 'level': 'WARNING', 'min': 0, 'max': None, 'pattern': None, 'acceptedType': None, 'legalValues': None}\n", "ExpectationsApi.expectation.to_dict(){'name': 'sales', 'description': 'min and max sales limits', 'features': ['salary', 'commission'], 'rules': [, ]}\n", "ExpectationsApi.expectation.rules[0].to_dict(){'name': 'HAS_MIN', 'level': 'WARNING', 'min': 0, 'max': None, 'pattern': None, 'acceptedType': None, 'legalValues': None}\n", "ExpectationsApi.expectation.payload{\"name\": \"sales\", \"description\": \"min and max sales limits\", \"features\": [\"salary\", \"commission\"], \"rules\": [{\"name\": \"HAS_MIN\", \"level\": \"WARNING\", \"min\": 0, \"max\": null, \"pattern\": null, \"acceptedType\": null, \"legalValues\": null}, {\"name\": \"HAS_MAX\", \"level\": \"ERROR\", \"min\": null, \"max\": 1000000, \"pattern\": null, \"acceptedType\": null, \"legalValues\": null}]}\n", "expectation.rules[0].to_dict(){'name': 'HAS_MIN', 'level': 'ERROR', 'min': 2018, 'max': None, 'pattern': None, 'acceptedType': None, 'legalValues': None}\n", "ExpectationsApi.expectation.to_dict(){'name': 'year', 'description': 'validate year correctness', 'features': ['year'], 'rules': [, ]}\n", "ExpectationsApi.expectation.rules[0].to_dict(){'name': 'HAS_MIN', 'level': 'ERROR', 'min': 2018, 'max': None, 'pattern': None, 'acceptedType': None, 'legalValues': None}\n", "ExpectationsApi.expectation.payload{\"name\": \"year\", \"description\": \"validate year correctness\", \"features\": [\"year\"], \"rules\": [{\"name\": \"HAS_MIN\", \"level\": \"ERROR\", \"min\": 2018, \"max\": null, \"pattern\": null, \"acceptedType\": null, \"legalValues\": null}, {\"name\": \"HAS_MAX\", \"level\": \"WARNING\", \"min\": null, \"max\": 2021, \"pattern\": null, \"acceptedType\": null, \"legalValues\": null}]}" ] } ], "source": [ "expectation_sales = fs.create_expectation(\"sales\",\n", " description=\"min and max sales limits\",\n", " features=[\"salary\", \"commission\"], \n", " rules=[Rule(name=\"HAS_MIN\", level=\"WARNING\", min=0), Rule(name=\"HAS_MAX\", level=\"ERROR\", max=1000000)])\n", "expectation_sales.save()\n", "\n", "expectation_year = fs.create_expectation(\"year\",\n", " features=[\"year\"], \n", " description=\"validate year correctness\",\n", " rules=[Rule(name=\"HAS_MIN\", level=\"ERROR\", min=2018), Rule(name=\"HAS_MAX\", level=\"WARNING\", max=2021)])\n", "\n", "expectation_year.save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover Feature Store Expectations\n", "\n", "Using the Python API you can easily find out which expectations are availeble in this feature store." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'name': 'year', 'description': 'validate year correctness', 'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}]}\n", "{'name': 'sales', 'description': 'min and max sales limits', 'features': ['salary', 'commission'], 'rules': [{'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}]}\n", "[None, None]" ] } ], "source": [ "# Get all Feature Store expectations\n", "fs_expectations = fs.get_expectations()\n", "[print(expectation.to_dict()) for expectation in fs_expectations]" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'name': 'year', 'description': 'validate year correctness', 'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}]}" ] } ], "source": [ "# Get an expectation by its unique name\n", "fs_expectation = fs.get_expectation(\"year\")\n", "print(fs_expectation.to_dict())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create feature group with expectations and validation type\n", "\n", "Feature store expectations can be attached and detached from feature groups. That enables ingestions pipelines to validate incoming data against expectations. Expectations can be set when creating a feature group. \n", "Later in the notebook we describe the possible validation type values and what that means for the feature group ingestion. For the moment, we initialize the validation type to STRICT" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "economy_fg = fs.create_feature_group(\n", " name = \"economy_fg_p43\",\n", " description = \"Hudi Household Economy Feature Group\",\n", " version=1,\n", " primary_key = [\"id\"], \n", " partition_key = [\"year\"], \n", " time_travel_format = \"HUDI\",\n", " validation_type=\"STRICT\",\n", " expectations= [expectation_sales, expectation_year]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bulk insert data into the feature group\n", "Since we have not yet saved any data into newly created feature groups we will use Apache hudi terminology and `Bulk Insert` data. In HSFS its just issuing `save` method.\n", "\n", "Data will be validated prior to being persisted into the Feature Store." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "economy_fg.save(economy_bulk_insert_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Attach expectations to Feature Groups\n", "\n", "Expectations can be attached and detached from feature groups even after the latter are created. If an expectation is attached to a feature group, it will be used when inserted data is validated. An expectation can be attached to multiple feature groups, as long as the expectation's features exist in that feature group." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Detach expectation by using its name or the metadata object, example shows the latter\n", "economy_fg.detach_expectation(expectation_year)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "# Attach expectation by using its name or the metadata object, example shows the former\n", "economy_fg.attach_expectation(expectation_year)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Validations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### You can also validate the dataframe without having to insert the data into a feature group" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "" ] } ], "source": [ "economy_fg.validate(economy_bulk_insert_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### You get retrieve all the validations of a feature group" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'validationId': 15, 'validationTime': 1612890460069, 'expectationResults': [{'expectation': {'features': ['salary', 'commission'], 'rules': [{'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}], 'description': 'min and max sales limits', 'name': 'sales'}, 'results': [{'feature': 'salary', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '140893.765625'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '52593.62890625'}, {'feature': 'salary', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '20000.0'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '0.0'}], 'status': 'SUCCESS'}, {'expectation': {'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}], 'description': 'validate year correctness', 'name': 'year'}, 'results': [{'feature': 'year', 'message': 'Success', 'rule': {'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '2020.0'}, {'feature': 'year', 'message': 'Success', 'rule': {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '2020.0'}], 'status': 'SUCCESS'}]}\n", "{'validationId': 16, 'validationTime': 1612890570580, 'expectationResults': [{'expectation': {'features': ['salary', 'commission'], 'rules': [{'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}], 'description': 'min and max sales limits', 'name': 'sales'}, 'results': [{'feature': 'salary', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '140893.765625'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '52593.62890625'}, {'feature': 'salary', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '20000.0'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '0.0'}], 'status': 'SUCCESS'}, {'expectation': {'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}], 'description': 'validate year correctness', 'name': 'year'}, 'results': [{'feature': 'year', 'message': 'Success', 'rule': {'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '2020.0'}, {'feature': 'year', 'message': 'Success', 'rule': {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '2020.0'}], 'status': 'SUCCESS'}]}\n", "[None, None]" ] } ], "source": [ "economy_fg_validations = economy_fg.get_validations()\n", "[print(validation.to_dict()) for validation in economy_fg_validations]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ... or retrieve a validation by validation or commit time. \n", "\n", "Validation time is the timestamp when the validation started.\n", "\n", "Commit time is the time data was peristed in the time travel enabled feature group" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "commit_time = economy_fg.get_validations()[0].commit_time" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "validation_time = economy_fg.get_validations()[0].validation_time" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'validationId': 15, 'validationTime': 1612890460069, 'expectationResults': [{'expectation': {'features': ['salary', 'commission'], 'rules': [{'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}], 'description': 'min and max sales limits', 'name': 'sales'}, 'results': [{'feature': 'salary', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '140893.765625'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '52593.62890625'}, {'feature': 'salary', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '20000.0'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '0.0'}], 'status': 'SUCCESS'}, {'expectation': {'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}], 'description': 'validate year correctness', 'name': 'year'}, 'results': [{'feature': 'year', 'message': 'Success', 'rule': {'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '2020.0'}, {'feature': 'year', 'message': 'Success', 'rule': {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '2020.0'}], 'status': 'SUCCESS'}]}" ] } ], "source": [ "# Get validation by validation time\n", "validation = economy_fg.get_validations(validation_time=validation_time)[0]\n", "print(validation.to_dict())" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'validationId': 15, 'validationTime': 1612890460069, 'expectationResults': [{'expectation': {'features': ['salary', 'commission'], 'rules': [{'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}], 'description': 'min and max sales limits', 'name': 'sales'}, 'results': [{'feature': 'salary', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '140893.765625'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'ERROR', 'max': 1000000.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '52593.62890625'}, {'feature': 'salary', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '20000.0'}, {'feature': 'commission', 'message': 'Success', 'rule': {'level': 'WARNING', 'min': 0.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '0.0'}], 'status': 'SUCCESS'}, {'expectation': {'features': ['year'], 'rules': [{'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}], 'description': 'validate year correctness', 'name': 'year'}, 'results': [{'feature': 'year', 'message': 'Success', 'rule': {'level': 'ERROR', 'min': 2018.0, 'name': 'HAS_MIN'}, 'status': 'SUCCESS', 'value': '2020.0'}, {'feature': 'year', 'message': 'Success', 'rule': {'level': 'WARNING', 'max': 2021.0, 'name': 'HAS_MAX'}, 'status': 'SUCCESS', 'value': '2020.0'}], 'status': 'SUCCESS'}]}" ] } ], "source": [ "# Get validation by commit time\n", "validation = economy_fg.get_validations(commit_time=commit_time)[0]\n", "print(validation.to_dict())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get the status of a validation" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Validation status: SUCCESS" ] } ], "source": [ "print(\"Validation status: {}\".format(validation.status))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upsert new invalid data into a Feature Group\n", "\n", "Now we will try to upsert some invalid data (year feature does not meet the maximum expectation). An error is returned to the client along with the failed expectation" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2022|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2022|\n", "+---+---------+----------+-----+--------+------+---------+----+" ] } ], "source": [ "economy_upsert_data = [\n", " Row(1, 120499.73, 0.0, \"car17\", 205000.0, 30, 564724.18, 2022), #update\n", " Row(2, 160893.77, 0.0, \"car10\", 179000.0, 2, 455015.33, 2020), #update\n", " Row(5, 93956.32, 0.0, \"car15\", 135000.0, 1, 458679.82, 2020), #insert\n", " Row(6, 41365.43, 52809.15, \"car7\", 135000.0, 19, 216839.71, 2020), #insert\n", " Row(7, 94805.61, 0.0, \"car17\", 135000.0, 23, 233216.07, 2022) #insert \n", "]\n", "\n", "economy_upsert_df = spark.createDataFrame(economy_upsert_data, economy_fg_schema)\n", "\n", "economy_upsert_df.show(5)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "Metadata operation error: (url: https://hopsworks.glassfish.service.consul:8182/hopsworks-api/api/project/150/featurestores/98/featuregroups/64/validations). Server response: \n", "HTTP code: 417, HTTP reason: Expectation Failed, error code: 270149, error msg: Feature group validation checks did not pass, will not persist validation results., user msg: Results: [ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='2020.0', feature='year', rule=Rule{name=HAS_MIN, level=ERROR, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Failure, message='Value: 2022.0 does not meet the constraint requirement! HAS_MAX', value='2022.0', feature='year', rule=Rule{name=HAS_MAX, level=WARNING, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='year', features=[year], rules=[Rule{name=HAS_MIN, level=ERROR, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=WARNING, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}]}}]\n", "Traceback (most recent call last):\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py\", line 690, in insert\n", " write_options,\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py\", line 93, in insert\n", " validation = feature_group.validate(feature_dataframe)\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py\", line 857, in validate\n", " return self._data_validation_engine.validate(self, dataframe)\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/data_validation_engine.py\", line 115, in validate\n", " return self._feature_group_validation_api.put(feature_group, validation_python)\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/validations_api.py\", line 49, in put\n", " data=feature_group_validation.json(),\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/decorators.py\", line 35, in if_connected\n", " return fn(inst, *args, **kwargs)\n", " File \"/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/client/base.py\", line 147, in _send_request\n", " raise exceptions.RestAPIError(url, response)\n", "hsfs.client.exceptions.RestAPIError: Metadata operation error: (url: https://hopsworks.glassfish.service.consul:8182/hopsworks-api/api/project/150/featurestores/98/featuregroups/64/validations). Server response: \n", "HTTP code: 417, HTTP reason: Expectation Failed, error code: 270149, error msg: Feature group validation checks did not pass, will not persist validation results., user msg: Results: [ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='2020.0', feature='year', rule=Rule{name=HAS_MIN, level=ERROR, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Failure, message='Value: 2022.0 does not meet the constraint requirement! HAS_MAX', value='2022.0', feature='year', rule=Rule{name=HAS_MAX, level=WARNING, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='year', features=[year], rules=[Rule{name=HAS_MIN, level=ERROR, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=WARNING, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}]}}]\n", "\n" ] } ], "source": [ "# Insert call will fail as invalid data (year feature) is about to be ingested. Error shows the expectation that was not met\n", "economy_fg.insert(economy_upsert_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Validation type\n", "The validation type determines the validation behavior. Available types are:\n", "- STRICT: Data validation is performed and data is ingested into feature group is updated only if validation status is \"SUCCESS\"\n", "- WARNING: Data validation is performed and data is ingested into the feature group only if validation status is \"WARNING\" or \"SUCCESS\"\n", "- ALL: Data validation is performed and data is ingested into the feature group regardless of the validation status\n", "- NONE: Data validation not performed on feature group\n", "\n", "The validation type can easily be changed for a feature group" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "# The previous economy_upsert_df contains invalid data but we still want to persist the data, so we set the validation type from STRICT to ALL\n", "economy_fg.validation_type = \"ALL\"" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "# We try to insert the invalid df again\n", "economy_fg.insert(economy_upsert_df)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }