{ "cells": [ { "cell_type": "raw", "metadata": {}, "source": [ "---\n", "title: \"Data Validation with Scala\"\n", "date: 2021-02-24\n", "type: technical_note\n", "draft: false\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Validation with the Hopsworks Feature Store\n", "\n", "In this notebook we introduce feature validation operations with the Hopsworks Feature Store and its client API, hsfs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Background" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Motivation\n", "\n", "Data ingested into the Feature Store form the basis for the data fed as input to algorithms that develope machine learning models. The Feature store is a place where curated feature data is stored, therefore it is important that this data is validated against different rules to it adheres to business requirements. \n", "\n", "For example, ingested features might be expected to never be empty or to lie within a certain range, for example a feature `age` should always be a non-negative number.\n", "\n", "The Hopsworks Feature Store provides users with an API to create `Expectations` on ingested feature data by utilizing the `Deequ` https://github.com/awslabs/deequ open source library. Feature validation is part of the HSFS Java/Scala and Python API for working with Feature Groups. Users work with the abstractions:\n", "\n", "- Rules: A set of validation rules applied on a Spark/PySpark dataframe that is inserted into a Feature Group. \n", "- Expectations: A set of rules that is applied on a set of features as provided by the user. Expecations are created at the feature store level and can be attached to multiple feature groups.\n", "- Validations: The results of expectations against the ingested dataframe are assigned a ValidationTime and are persisted within the Feature Store. Users can then retrieve validation results by validation time and by commit time for time-travel enabled feature groups.\n", "\n", "Feature Validation is disabled by default, by having the `validation_type` feature group attribute set to `NONE`. The list of allowed validation types are:\n", "- STRICT: Data validation is performed and feature group is updated only if validation status is \"Success\"\n", "- WARNING: Data validation is performed and feature group is updated only if validation status is \"Warning\" or lower\n", "- ALL: Data validation is performed and feature group is updated only if validation status is \"Failure\" or lower\n", "- NONE: Data validation not performed on feature group" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create time travel enabled feature group and Bulk Insert Sample Dataset\n", "\n", "For this demo we will use small sample of the Agarwal Generator that is a widely used dataset. It contains the hypothetical data of people applying for a loan. `Rakesh Agrawal, Tomasz Imielinksi, and Arun Swami, \"Database Mining: A Performance Perspective\", IEEE Transactions on Knowledge and Data Engineering, 5(6), December 1993.

`\n", "\n", "##### For simplicity of demo purposes we split Agarwal dataset into 3 freature groups and demostrate feature validaton on the economy_fg feature group: \n", "* `economy_fg` with customer id, salary, loan, value of house, age of house, commission and type of car features; " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Importing necessary libraries " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver log
7application_1612535100309_0043sparkidleLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n", "import com.logicalclocks.hsfs._\n", "import com.logicalclocks.hsfs.engine._\n", "import com.logicalclocks.hsfs.metadata.validation._\n", "import com.logicalclocks.hsfs.metadata.Expectation\n", "import scala.collection.JavaConversions._\n", "import collection.JavaConverters._\n", "import org.apache.spark.sql.{DataFrame, Row}\n", "import org.apache.spark.sql.catalyst.expressions.GenericRow\n", "import org.apache.spark.sql.types._\n", "import java.sql.Date\n", "import java.sql.Timestamp\n", "connection: com.logicalclocks.hsfs.HopsworksConnection = com.logicalclocks.hsfs.HopsworksConnection@409f5003\n", "fs: com.logicalclocks.hsfs.FeatureStore = FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}\n" ] } ], "source": [ "import com.logicalclocks.hsfs._\n", "import com.logicalclocks.hsfs.engine._\n", "import com.logicalclocks.hsfs.metadata.validation._\n", "import com.logicalclocks.hsfs.metadata.Expectation\n", "import scala.collection.JavaConversions._\n", "import collection.JavaConverters._\n", "\n", "import org.apache.spark.sql.{ DataFrame, Row }\n", "import org.apache.spark.sql.catalyst.expressions.GenericRow\n", "import org.apache.spark.sql.types._\n", "\n", "import java.sql.Date\n", "import java.sql.Timestamp\n", "\n", "val connection = HopsworksConnection.builder().build();\n", "val fs = connection.getFeatureStore();" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFgSchema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(salary,FloatType,true), StructField(commission,FloatType,true), StructField(car,StringType,true), StructField(hvalue,FloatType,true), StructField(hyears,IntegerType,true), StructField(loan,FloatType,true), StructField(year,IntegerType,true))\n" ] } ], "source": [ "val economyFgSchema = \n", " scala.collection.immutable.List(\n", " StructField(\"id\", IntegerType, true),\n", " StructField(\"salary\", FloatType, true),\n", " StructField(\"commission\", FloatType, true),\n", " StructField(\"car\", StringType, true), \n", " StructField(\"hvalue\", FloatType, true), \n", " StructField(\"hyears\", IntegerType, true), \n", " StructField(\"loan\", FloatType, true),\n", " StructField(\"year\", IntegerType, true) \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create spark dataframes for each Feature groups" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyBulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,110499.73,0.0,car15,235000.0,30,354724.2,2020], [2,140893.77,0.0,car20,135000.0,2,395015.34,2020], [3,119159.65,0.0,car1,145000.0,22,122025.08,2020], [4,20000.0,52593.63,car9,185000.0,30,99629.62,2020])\n", "economyBulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]\n" ] } ], "source": [ "val economyBulkInsertData = Seq(\n", " Row(1, 110499.73f, 0.0f, \"car15\", 235000.0f, 30, 354724.18f, 2020),\n", " Row(2, 140893.77f, 0.0f, \"car20\", 135000.0f, 2, 395015.33f, 2020),\n", " Row(3, 119159.65f, 0.0f, \"car1\", 145000.0f, 22, 122025.08f, 2020),\n", " Row(4, 20000.0f, 52593.63f, \"car9\", 185000.0f, 30, 99629.62f, 2020)\n", ")\n", "\n", "val economyBulkInsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(economyBulkInsertData),\n", " StructType(economyFgSchema)\n", ")" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|110499.73| 0.0|car15|235000.0| 30| 354724.2|2020|\n", "| 2|140893.77| 0.0|car20|135000.0| 2|395015.34|2020|\n", "| 3|119159.65| 0.0| car1|145000.0| 22|122025.08|2020|\n", "| 4| 20000.0| 52593.63| car9|185000.0| 30| 99629.62|2020|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "economyBulkInsertDf.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Validation\n", "\n", "The next sections shows you how to create feature store expectations, attach them to feature groups, and apply them to dataframes being appended to the feature group. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover data validation rules supported in Hopsworks\n", "Hopsworks comes shipped with a set of data validation rules. These rules are **immutable**, uniquely identified by **name** and are available across all feature stores. These rules are used to create feature store expectations which can then be attached to feature groups." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "rules: Seq[com.logicalclocks.hsfs.metadata.RuleDefinition] = Buffer(RuleDefinition(name=HAS_UNIQUENESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_DISTINCTNESS, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_CORRELATION, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_QUANTILE, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_APPROX_COUNT_DISTINCT, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=IS_LESS_THAN_OR_EQUAL_TO, predicate=LEGAL_VALUES, valueType=Fractional, description=), RuleDefinition(name=HAS_ENTROPY, predicate=VALUE, valueType=Fractional, description=), RuleDefinition(name=HAS_MIN, predicate=VALUE, valueType=Fra..." ] } ], "source": [ "// Get all available rule definitions\n", "val rules = connection.getRules()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ruleMax: com.logicalclocks.hsfs.metadata.RuleDefinition = RuleDefinition(name=HAS_MAX, predicate=VALUE, valueType=Fractional, description=A rule that asserts on the max of the feature)\n" ] } ], "source": [ "// Get a rule definition by name\n", "val ruleMax = connection.getRule(RuleName.HAS_MAX)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Expectations based on Hopsworks rules\n", "\n", "Expectations are created at the feature store level. Multiple expectations can be created per feature store.\n", "\n", "An expectation is comprised from one or multiple rules and can refer to one or multiple features. An expectation can be utilized by attaching it to a feature group, as shown in the next sections" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "expectationSales: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@52bc8329, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@2e84fc2d, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@4188c5c1)\n", "expectationYear: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5dd73ee4, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3d7c2e0, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=FeatureStore{id=98, name='demo_fs_meb10000_featurestore', projectId=150, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3c222139}, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@3940336c)\n" ] } ], "source": [ "// Create an expectation for the \"salary\" and \"commissio\" features so that their min value is \"10\" and their max is \"100\"\n", "val expectationSales = (fs.createExpectation()\n", " .rules(Seq(\n", " Rule.createRule(RuleName.HAS_MIN).min(0).level(Level.WARNING).build(), //Set rule by name\n", " Rule.createRule(ruleMax).max(1000000).level(Level.ERROR).build())) //Set rule by passing the RuleDefinition metadata\n", " .name(\"sales\")\n", " .description(\"min and max sales limits\")\n", " .features(Seq(\"salary\", \"commission\"))\n", " .build())\n", "expectationSales.save()\n", "\n", "// Create an expectation for the \"year\" feature so that its min value is between 2018-2019 and its max value is equal to 2021\n", "val expectationYear = (fs.createExpectation()\n", " .rules(Seq(\n", " Rule.createRule(RuleName.HAS_MIN).min(2018).level(Level.WARNING).build(),\n", " Rule.createRule(RuleName.HAS_MAX).max(2021).level(Level.ERROR).build()))\n", " .name(\"year\")\n", " .description(\"min and max limits\")\n", " .features(Seq(\"year\"))\n", " .build())\n", "expectationYear.save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover Feature Store Expectations\n", "\n", "Using the Python API you can easily find out which expectations are availeble in this feature store." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "fsExpectations: Seq[com.logicalclocks.hsfs.metadata.Expectation] = Buffer(Expectation(name=sales, description=min and max sales limits, features=[salary, commission], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@73440814, name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@68b15078, name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@64bcfca4), Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@7d545a36, name=HAS_M..." ] } ], "source": [ "// Retrieve all Feature Store expectations\n", "val fsExpectations = fs.getExpectations()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "yearExp: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@130559a2, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4bdf1a6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@2fbd7c66)\n" ] } ], "source": [ "// Retrieve a Feature Store expectation by name\n", "val yearExp = fs.getExpectation(\"year\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create feature group with expectations and validation type\n", "\n", "Feature store expectations can be attached and detached from feature groups. That enables ingestions pipelines to validate incoming data against expectations. Expectations can be set when creating a feature group. \n", "Later in the notebook we describe the possible validation type values and what that means for the feature group ingestion. For the moment, we initialize the validation type to STRICT" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@1f3c778c\n" ] } ], "source": [ "val economyFg = (fs.createFeatureGroup()\n", " .name(\"economy_fg48\")\n", " .description(\"Hudi Household Economy Feature Group\")\n", " .expectations(Seq(expectationSales, expectationYear))\n", " .validationType(ValidationType.STRICT)\n", " .version(1)\n", " .primaryKeys(Seq(\"id\"))\n", " .partitionKeys(Seq(\"year\"))\n", " .timeTravelFormat(TimeTravelFormat.HUDI)\n", " .build())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bulk insert data into the feature group\n", "Since we have not yet saved any data into newly created feature groups we will use Apache hudi terminology and `Bulk Insert` data. In HSFS its just issuing `save` method.\n", "\n", "Data will be validated prior to being persisted into the Feature Store." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "economyFg.save(economyBulkInsertDf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Attach expectations to Feature Groups\n", "\n", "Expectations can be attached and detached from feature groups even after the latter are created. If an expectation is attached to a feature group, it will be used when inserted data is validated. An expectation can be attached to multiple feature groups, as long as the expectation's features exist in that feature group." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "// Detach expectation by using its name or the metadata object, example shows the latter\n", "economyFg.detachExpectation(expectationYear)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "res19: com.logicalclocks.hsfs.metadata.Expectation = Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6b767f68, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@dd6947c, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@8221927)\n" ] } ], "source": [ "// Attach expectation by using its name or the metadata object, example shows the former\n", "economyFg.attachExpectation(\"year\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Validations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### You can also validate the dataframe without having to insert the data into a feature group" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "res20: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=19, validationTime=1612891168467, commitTime=null, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@39a295cf, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@36fdf3b6, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@444ead48), results=[ValidationResult(status=Success, me..." ] } ], "source": [ "economyFg.validate(economyBulkInsertDf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### You get retrieve all the validations of a feature group" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "validations: java.util.List[com.logicalclocks.hsfs.metadata.FeatureGroupValidation] = [FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@6c213ee3, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@3f20aaab, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@db7cd28), results=[Vali..." ] } ], "source": [ "// Get all validations of the feature group\n", "val validations = economyFg.getValidations()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ... or retrieve a validation by validation or commit time. \n", "\n", "Validation time is the timestamp when the validation started.\n", "\n", "Commit time is the time data was peristed in the time travel enabled feature group" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "validationTime: Long = 1612891077406\n" ] } ], "source": [ "val validationTime = validations(0).getValidationTime()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Commit time associated with the validation time when the data is committed to time-travel enabled feature group." ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "commitTime: Long = 1612891080000\n" ] } ], "source": [ "val commitTime = validations(0).getCommitTime()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get validation by Feature Group Commit Time" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@26d38c44, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@4ca150b, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@196348ed), results=[ValidationResult(statu..." ] } ], "source": [ "// Get a validation by commitTime\n", "val validation = economyFg.getValidation(commitTime, DataValidationEngine.ValidationTimeType.COMMIT_TIME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get validation by Feature Group Validation Time" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "validation: com.logicalclocks.hsfs.metadata.FeatureGroupValidation = FeatureGroupValidation(validationId=18, validationTime=1612891077406, commitTime=1612891080000, expectationResults=[ExpectationResult(status=Success, expectation=Expectation(name=year, description=min and max limits, features=[year], rules=[Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@72764f97, name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern=null, acceptedType=null, legalValues=null), Rule(featureStoreRulesApi=com.logicalclocks.hsfs.metadata.RulesApi@5c1d4d62, name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern=null, acceptedType=null, legalValues=null)], featureStore=null, expectationsEngine=com.logicalclocks.hsfs.engine.ExpectationsEngine@15e6c418), results=[ValidationResult(stat..." ] } ], "source": [ "// Get a validation by validationTyme\n", "val validation = economyFg.getValidation(validationTime, com.logicalclocks.hsfs.engine.DataValidationEngine.ValidationTimeType.VALIDATION_TIME)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get the status of a validation" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "res40: com.logicalclocks.hsfs.metadata.ExpectationResult.Status = Success\n" ] } ], "source": [ "validation.getStatus()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upsert new invalid data into a Feature Group\n", "\n", "Now we will try to upsert some invalid data (year feature does not meet the maximum expectation). An error is returned to the client along with the failed expectation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Generate Sample Upserts Data" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "economyUpsertData: Seq[org.apache.spark.sql.Row] = List([1,120499.73,0.0,car17,205000.0,30,564724.2,2022], [2,160893.77,0.0,car10,179000.0,2,455015.34,2020], [5,93956.32,0.0,car15,135000.0,1,458679.8,2020], [6,41365.43,52809.15,car7,135000.0,19,216839.7,2020], [7,94805.61,0.0,car17,135000.0,23,233216.06,2022])\n", "economyUpsertDf: org.apache.spark.sql.DataFrame = [id: int, salary: float ... 6 more fields]\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| id| salary|commission| car| hvalue|hyears| loan|year|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "| 1|120499.73| 0.0|car17|205000.0| 30| 564724.2|2022|\n", "| 2|160893.77| 0.0|car10|179000.0| 2|455015.34|2020|\n", "| 5| 93956.32| 0.0|car15|135000.0| 1| 458679.8|2020|\n", "| 6| 41365.43| 52809.15| car7|135000.0| 19| 216839.7|2020|\n", "| 7| 94805.61| 0.0|car17|135000.0| 23|233216.06|2022|\n", "+---+---------+----------+-----+--------+------+---------+----+\n", "\n" ] } ], "source": [ "val economyUpsertData = Seq(\n", " Row(1, 120499.73f, 0.0f, \"car17\", 205000.0f, 30, 564724.18f, 2022), //update\n", " Row(2, 160893.77f, 0.0f, \"car10\", 179000.0f, 2, 455015.33f, 2020), //update\n", " Row(5, 93956.32f, 0.0f, \"car15\", 135000.0f, 1, 458679.82f, 2020), //insert\n", " Row(6, 41365.43f, 52809.15f, \"car7\", 135000.0f, 19, 216839.71f, 2020), //insert\n", " Row(7, 94805.61f, 0.0f, \"car17\", 135000.0f, 23, 233216.07f, 2022) //insert\n", ")\n", "\n", "val economyUpsertDf = spark.createDataFrame(\n", " spark.sparkContext.parallelize(economyUpsertData),\n", " StructType(economyFgSchema)\n", ")\n", "\n", "economyUpsertDf.show(5)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "java.io.IOException: Error: 417{\"type\":\"restApiJsonResponse\",\"errorCode\":270149,\"errorMsg\":\"Feature group validation checks did not pass, will not persist validation results.\",\"usrMsg\":\"Results: [ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='2020.0', feature='year', rule=Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Failure, message='Value: 2022.0 does not meet the constraint requirement! HAS_MAX', value='2022.0', feature='year', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='year', features=[year], rules=[Rule{name=HAS_MIN, level=WARNING, min=2018.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=2021.0, pattern='null', acceptedType=null, legalValues=null}]}}, ExpectationResult{status=Failure, results=[ValidationResult{status=Success, message='Success', value='41365.4296875', feature='salary', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='0.0', feature='commission', rule=Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='160893.765625', feature='salary', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}, ValidationResult{status=Success, message='Success', value='52809.1484375', feature='commission', rule=Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}}], expectation=Expectation{name='sales', features=[salary, commission], rules=[Rule{name=HAS_MIN, level=WARNING, min=0.0, max=null, pattern='null', acceptedType=null, legalValues=null}, Rule{name=HAS_MAX, level=ERROR, min=null, max=1000000.0, pattern='null', acceptedType=null, legalValues=null}]}}]\"}\n", " at com.logicalclocks.hsfs.metadata.AuthorizationHandler.handleResponse(AuthorizationHandler.java:45)\n", " at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:223)\n", " at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:191)\n", " at com.logicalclocks.hsfs.metadata.HopsworksInternalClient.handleRequest(HopsworksInternalClient.java:148)\n", " at com.logicalclocks.hsfs.metadata.HopsworksClient.handleRequest(HopsworksClient.java:151)\n", " at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:135)\n", " at com.logicalclocks.hsfs.metadata.FeatureGroupValidationsApi.put(FeatureGroupValidationsApi.java:106)\n", " at com.logicalclocks.hsfs.engine.DataValidationEngine.validate(DataValidationEngine.java:64)\n", " at com.logicalclocks.hsfs.FeatureGroup.validate(FeatureGroup.java:394)\n", " at com.logicalclocks.hsfs.engine.FeatureGroupEngine.saveDataframe(FeatureGroupEngine.java:134)\n", " at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:294)\n", " at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:256)\n", " at com.logicalclocks.hsfs.FeatureGroup.insert(FeatureGroup.java:238)\n", " ... 62 elided\n", "\n" ] } ], "source": [ "// Insert call will fail as invalid data (year feature) is about to be ingested. Error shows the expectation that was not met\n", "economyFg.insert(economyUpsertDf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Validation type\n", "The validation type determines the validation behavior. Available types are:\n", "- STRICT: Data validation is performed and data is ingested into feature group is updated only if validation status is \"SUCCESS\"\n", "- WARNING: Data validation is performed and data is ingested into the feature group only if validation status is \"WARNING\" or \"SUCCESS\"\n", "- ALL: Data validation is performed and data is ingested into the feature group regardless of the validation status\n", "- NONE: Data validation not performed on feature group\n", "\n", "The validation type can easily be changed for a feature group" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [], "source": [ "// The previous economy_upsert_df contains invalid data but we still want to persist the data, so we set the validation type from STRICT to ALL\n", "economyFg.updateValidationType(ValidationType.ALL)" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [], "source": [ "// We try to insert the invalid df again\n", "economyFg.insert(economyUpsertDf)" ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "scala", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 4 }