# HudiOnHops

In this notebook we will introduce the Apache Hudi storage abstraction/library (http://hudi.apache.org/) for doing **incremental** data ingestion to data lakes stored on Hops (e.g a Hopsworks Feature Store).

TLDR; Hudi is a storage abstraction/library build on top of Spark. A Hudi dataset stores data in Parquet files and maintains additional metadata to make upserts efficient. A Hudi ingest job is intended to be run as a streaming ingest job, on an interval such as every 15 minutes, reading deltas from a message-bus like Kafka and ingesting the deltas **incrementally** into a data lake.

![Incremental ETL](./../images/incr_load.png "Incremetal ETL")

## Background

### Motivation

Hudi is an open-source library for doing incremental ingestion of data for large analytical datasets stored on distributed file systems. The library was originally developed at Uber to improve their data latency, but  it is now an Apache project.

The main motivation for Hudi is that it reduces the **data latency** for ingesting large datasets into data lakes. Traditional ETL typically involves taking a snapshot of a production database and doing a full load into a data lake (typically stored on a distributed file system). Using the snapshot approach for ETL is simple since the snapshot is immutable and can be loaded as an atomic unit into the data lake. However, the con of taking this approach to doing data ingestion is that it is *slow*. Even if just a single record have been updated since the last data ingestion, the entire table has to be re-written. If you are working with Big Data (TB or PB size datasets) then this will introduce significant *data latency* (up to 24 hours in Uber's case) and *wasted resources* (majority of the writes when ingesting the snapshot is redundant as most of the records have not been updated since the last ETL step). 

This motivates the use-case for **incremental** data ingestion. Incremental data ingestion means that only deltas/changelogs since the last ingestion are inserted. 

Incremental ingestion lies in-between traditional batch ingestion and the streaming use-case. It can provide data latency as low as *minutes* for petabyte-scale datasets. The incremental mode for processing introduces new trade-offs compared to streaming and batch. It has lower data latency than traditional batch processing, but a slightly higher latency than stream processing. Why not go full-streaming instead of the incremental processing? It boils down to your requirements and trade-offs. If you need data latency in the order of seconds, then you have to use stream processing (e.g fraud detection). However if your business can do with data latency in the order of say 5 minutes (applications which are fine with this latency could be feature engineering pipelines, building dashboards, or doing near-real-time analytics), then incremental processing really shines. 

With incremental processing, you process data in *mini-batches* and run the spark job frequently, every 15 minutes or so. By using mini-batches rather than record-by-record streaming, the incremental model makes better use of resources and makes it easier to do complex processing and joins which are more suited for the batch-style of processing rather than stream-processing.

![Near Real Time](./../images/near_real_time.jpg "Near Real Time")

If the data is immutable by design, incremental processing can be done without any additional ingestion library, just use the *append* primitive supported in HDFS through some HDFS client, such as Spark, e.g:

```scala
newRecordsDf = (...)
newRecordsDf.write.format("hive").mode("append").insertInto(tableName)
```

Unfortunately, data is rarely immutable in practice. A bank transaction might be reverted, a customer might change his or her home adress, and a customer review might be updated, to give a few examples. This is where Hudi comes into the picture. Hudi stands for `Hadoop Upserts anD Incrementals` and brings two new primitives for data engineering on distributed file systems (in addition to append/read):

- `Upsert`: the ability to do insertions (appends) and updates efficiently. 
- `Incremental reads`: the ability to read datasets incrementally using the notion of "commits".

![Upserts](./../images/upsert_illustration.png "Upserts")

Lets consider the process of updating a single record in a data lake of Parquet files stored on a distributed file system. Without using Hudi, this would entail scanning the entire dataset to find the record in order to do the update and then rewrite the entire dataset: 

```scala
updatedRecordsDf = (...)
updatedRecordsDf.write.format("hive").mode("overwrite").insertInto(tableName) 
```

This does not scale and HDFS/Parquet is not designed for this use-case. With Hudi, the upsert operation is a first-class primitive in the ingestion framework and it is optimized to be fast using index-lookups and atomic updates. We will see how we can use Hudi for this purpose later on in the notebook, but essentially it is as simple as :

```scala
updatedRecordsDf = (...)
upsertDf.write.format("org.apache.hudi")
              .option("hoodie.datasource.write.operation", "upsert")
              ...
```

### What is Hudi

Hudi is a Spark library that is intended to be run as a streaming ingest job, and ingests data as mini-batches (typically on the order of one to two minutes). A Hudi job generally reads delta-updates from a message-bus like Kafka, and upserts them into a data lake stored on a distributed file system. By maintaining bloom indexes and commit logs, Hudi provide ACID transactions, time-travel and scalable upserts.

![Hudi Dataset](./../images/hudi_dataset.png "Hudi Dataset")

### How Hudi can be used for ML and Feature Pipelines

Hudi is integrated in the Hopsworks Feature Store for doing incremental feature computation and for point-in-time correctness and backfilling of feature data.

![Incremental Feature Engineering](./../images/featurestore_incremental_pull.png "Incremetal Feature Engineering")

## Examples

### Imports

In [1]:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.NonpartitionedKeyGenerator;
import org.apache.hudi.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.NonPartitionedExtractor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import io.hops.util.Hops
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1571823648811_0102,spark,idle,Link,Link,âœ”


SparkSession available as 'spark'.
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.NonpartitionedKeyGenerator
import org.apache.hudi.SimpleKeyGenerator
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.NonPartitionedExtractor
import org.apache.log4j.LogManager
import org.apache.log4j.Logger
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.DataFrameWriter
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import io.hops.util.Hops
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import java.sql.Date
import java.sql.Timestamp
import org.apache.h

### Bulk Insert of Sample Dataset into a Hudi Dataset

Lets first ingest some sample data into a new Hudi dataset. As this is the first ingestion, we don't have to think about whether our ingestion contains any updates, this type of ingestion is referred to as **bulk insert** in Hudi to distinguish it from **upserts** (updates and inserts) and **insert** (only append inserts).



#### Generate the sample data

In [2]:
val bulkInsertData = Seq(
    Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
    Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
    Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
    Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val bulkInsertDf = spark.createDataFrame(
  spark.sparkContext.parallelize(bulkInsertData),
  StructType(schema)
)
bulkInsertDf.show(5)

bulkInsertData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
bulkInsertDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+



#### Bulk load the sample data into a new Hudi dataset using the Hudi DataSource API (http://hudi.apache.org/writing_data.html)

We will create a new Hudi dataset/table called `hello_hudi_1` (naming convention for Hopsworks feature store is to have table_name_version) with the schema:

```
+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+
```
and the dataset will be partitioned on the `date` column. Moreover we will register the hudi dataset with the project's Hive database as an external table. 

When creating a Hudi dataset there are lots of options that you can tune by overriding the default values by simply chaining option(parameter,value) to the Spark writer. You can find a list of all options available here: http://hudi.apache.org/configurations.html

The most important options we will provide are the following:

- `format`: this is the format that the Hudi dataset will take, this should be set to `org.apache.hudi`. A hudi dataset consists of Parquet files, bloom index, and timeline metadata (more about the metadata later in this notebook)
- `hoodie.table.name`: the name of the Hudi dataset, it will also be used to register the table with query engines like Hive, Presto, and SparkSQL
- `hoodie.datasource.write.storage.type`: the storage type. Whether to use CopyOnWrite or MergeOnRead (this is related to Hudi internals that  we will discuss later on in this notebook)
- `hoodie.datasource.write.operation`: the operation to perform. Since this is the first time we insert into the table we can use `bulkinsert` and don't have to apply the extra processing for doing upserts.
- `hoodie.datasource.write.recordkey.field`: the key to uniquely identify a record in the dataset. This is used by Hudi when deciding whether an upsert is an update or an insert.
- `hoodie.datasource.write.partitionpath.field`: the field to partition the dataset on. When Hudi looks up a record in a Hudi dataset, it will first look up the partition (if the dataset is partitioned) and then use an index to look up which file inside the partition that contains the record.
- `hoodie.datasource.write.precombine.field`: Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field
- `hoodie.datasource.hive_sync.enable`: whether to sync the hudi dataset with the Hive metastore as an external table.
- `hoodie.datasource.hive_sync.table`: the hive table name to sync the hudi dataset with (external table)
- `hoodie.datasource.hive_sync.database`: the hive database to synchronize the hudi dataset with
- `hoodie.datasource.hive_sync.jdbcurl`: the JDBC url for the hive metastore
- `hoodie.datasource.hive_sync.partition_fields`: field in the dataset to use for determining hive partition columns.
- `mode`: spark write mode. 
- `save` the path for saving the Hudi dataset on HopsFS

In [3]:
val trustStore = Hops.getTrustStore
val pw = Hops.getKeystorePwd
val keyStore = Hops.getKeyStore
val hiveDb = Hops.getProjectFeaturestore.read
val jdbcUrl = (s"jdbc:hive2://10.0.2.15:9085/$hiveDb;" 
                + s"auth=noSasl;ssl=true;twoWay=true;sslTrustStore=$trustStore;"
                + s"trustStorePassword=$pw;sslKeyStore=$keyStore;keyStorePassword=$pw"
                )
val writer = (bulkInsertDf.write.format("org.apache.hudi")
              .option("hoodie.table.name", "hello_hudi_1")
              .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE")
              .option("hoodie.datasource.write.operation", "bulk_insert")
              .option("hoodie.datasource.write.recordkey.field","id")
              .option("hoodie.datasource.write.partitionpath.field", "date")
              .option("hoodie.datasource.write.precombine.field", "date")
              .option("hoodie.datasource.hive_sync.enable", "true")              
              .option("hoodie.datasource.hive_sync.table", "hello_hudi_1")
              .option("hoodie.datasource.hive_sync.database", hiveDb)
              .option("hoodie.datasource.hive_sync.jdbcurl", jdbcUrl)
              .option("hoodie.datasource.hive_sync.partition_fields", "date")
              .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor")
              .mode("overwrite"))
writer.save(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1")

trustStore: String = t_certificate
pw: String = EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3
keyStore: String = k_certificate
hiveDb: String = demo_featurestore_admin000_featurestore
jdbcUrl: String = jdbc:hive2://10.0.2.15:9085/demo_featurestore_admin000_featurestore;auth=noSasl;ssl=true;twoWay=true;sslTrustStore=t_certificate;trustStorePassword=EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3;sslKeyStore=k_certificate;keyStorePassword=EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3
writer: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@6dacfc3f


#### Inspect the results

If the Hudi bulk insert was successful we should now see a dataset created at the path `hdfs:///Projects/<projectName>/Resources/hello_hudi_1`. If we list that directory we can see that there are three partitions (recall that we specified the partition field to be `date` and we inserted the dataframe with the contents:

```
+---+----------+------+-------+
| id|      date| value|country|
+---+----------+------+-------+
|  1|2019-03-02|0.4151| Sweden|
|  2|2019-05-01|1.2151|Ireland|
|  3|2019-08-06|0.2151|Belgium|
|  4|2019-08-06|0.8151| Russia|
+---+----------+------+-------+
```

We can also note that there is a directory called .hoodie. This directory contains Hudi-specific metadata. For example, Hudi maintains timeline-metadata of all the commits made to a Hudi dataset. This enables you to do incremental reads as well as *time travel* (we will look more into this later). I.e in .hoodie there is now a file called `20190830094146.commit` which contains information about the commit that we just made. Inside this file there are various types of metadata about the commit, such as the path to all Parquet files involved in this commit in the various partitions.

In [4]:
(FileSystem.get(sc.hadoopConfiguration)
 .listStatus(new Path(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1"))
 .map(_.getPath).foreach(println)
)

hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/.hoodie
hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1551484800000
hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1556668800000
hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1565049600000


Inside each partition, the data is stored in regular parquet files: 

In [5]:
(FileSystem.get(sc.hadoopConfiguration)
 .listStatus(new Path(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1/1551484800000/"))
 .map(_.getPath).foreach(println)
)

hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1551484800000/.hoodie_partition_metadata
hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1551484800000/e4224951-7ca6-4760-8585-8443f5da18a3-0_0-5-7_20190904114951.parquet


If we inspect the metadata stored together with the data in the parquet files using a tool such as https://github.com/apache/parquet-mr/tree/master/parquet-tools 

```
/srv/hops/hadoop/bin/hadoop jar /tmp/parquet-tools-1.9.0.jar meta hdfs:///Projects/demo_featurestore_admin000/Resources/hello_hudi_1/1551484800000/1592f902-da1f-44c3-976b-035aebc93278-0_0-37-75_20190830101505.parquet
```
we can see that inside the parquet files, Hudi stores a BloomIndex so that it quickly can lookup whether a certain record is included inside a parquet file or not.

Sample metadata in the parquet file might be:

```
file:                   hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/Resources/hello_hudi/1551484800000/1592f902-da1f-44c3-976b-035aebc93278-0_0-37-75_20190830101505.parquet 
creator:                parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:                  org.apache.hudi.bloomfilter = /////wAAAB4BACd9PgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA...
extra:                  hoodie_min_record_key = 1 
extra:                  parquet.avro.schema = {"type":"record","name":"hello_hudi_record","namespace":"hoodie.hello_hudi","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"id","type":["int","null"]},{"name":"date","type":["long","null"]},{"name":"value","type":["float","null"]},{"name":"country","type":["string","null"]}]} 
extra:                  writer.model.name = avro 
extra:                  hoodie_max_record_key = 1 

file schema:            hoodie.hello_hudi.hello_hudi_record 
--------------------------------------------------------------------------------
_hoodie_commit_time:    OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_commit_seqno:   OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_record_key:     OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_partition_path: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_file_name:      OPTIONAL BINARY O:UTF8 R:0 D:1
id:                     OPTIONAL INT32 R:0 D:1
date:                   OPTIONAL INT64 R:0 D:1
value:                  OPTIONAL FLOAT R:0 D:1
country:                OPTIONAL BINARY O:UTF8 R:0 D:1

row group 1:            RC:1 TS:1031 OFFSET:4 
--------------------------------------------------------------------------------
_hoodie_commit_time:     BINARY GZIP DO:0 FPO:4 SZ:127/109/0.86 VC:1 ENC:PLAIN,RLE,BIT_PACKED
_hoodie_commit_seqno:    BINARY GZIP DO:0 FPO:131 SZ:152/134/0.88 VC:1 ENC:PLAIN,RLE,BIT_PACKED
_hoodie_record_key:      BINARY GZIP DO:0 FPO:283 SZ:62/44/0.71 VC:1 ENC:PLAIN,RLE,BIT_PACKED
_hoodie_partition_path:  BINARY GZIP DO:0 FPO:345 SZ:120/104/0.87 VC:1 ENC:PLAIN,RLE,BIT_PACKED
_hoodie_file_name:       BINARY GZIP DO:0 FPO:465 SZ:397/386/0.97 VC:1 ENC:PLAIN,RLE,BIT_PACKED
id:                      INT32 GZIP DO:0 FPO:862 SZ:73/55/0.75 VC:1 ENC:PLAIN,RLE,BIT_PACKED
date:                    INT64 GZIP DO:0 FPO:935 SZ:95/75/0.79 VC:1 ENC:PLAIN,RLE,BIT_PACKED
value:                   FLOAT GZIP DO:0 FPO:1030 SZ:75/55/0.73 VC:1 ENC:PLAIN,RLE,BIT_PACKED
country:                 BINARY GZIP DO:0 FPO:1105 SZ:87/69/0.79 VC:1 ENC:PLAIN,RLE,BIT_PACKED
```

#### Hudi Commits

Hudi introduces the notion of `commits` which means that it supports certain properties of traditional databases such as single-table transactions, snapshot isolation, atomic upserts and savepoints for data recovery. If an ingestion fails for some reason, no partial results will be written rather the ingestion will be roll-backed. The commit is implemented using atomic `mv` operation in HDFS. 

Currently, the hudi dataset contains only a single commit as we've just done a single bulk-insert:

In [6]:
HoodieDataSourceHelpers.latestCommit(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1")

res5: String = 20190904114951


In [7]:
HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1").toString

res6: String = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20190904114951__commit__COMPLETED]


#### Query the Hudi Dataset

Since we registered the hudi dataset with Hive (table name: `hello_hudi_1`) we can query it from Hive using SparkSQL or some other Hive client. 

In [8]:
spark.sql(s"use ${Hops.getProjectFeaturestore.read}")

res7: org.apache.spark.sql.DataFrame = []


In [9]:
spark.sql("show tables").show(5)

+--------------------+--------------------+-----------+
|            database|           tableName|isTemporary|
+--------------------+--------------------+-----------+
|demo_featurestore...|attendances_featu...|      false|
|demo_featurestore...|    games_features_1|      false|
|demo_featurestore...|        hello_hudi_1|      false|
|demo_featurestore...|  players_features_1|      false|
|demo_featurestore...|season_scores_fea...|      false|
+--------------------+--------------------+-----------+
only showing top 5 rows



If we inspect the Hive table we can see that Hudi created a bunch of extra columns for us to track lineage of the data, e.g SQL projections on the field `_hoodie_commit_time` can be used to make temporal queries and inspect the value of the table at different time steps.

In [10]:
spark.sql("describe hello_hudi_1").show(20, false)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|_hoodie_commit_time    |string   |null   |
|_hoodie_commit_seqno   |string   |null   |
|_hoodie_record_key     |string   |null   |
|_hoodie_partition_path |string   |null   |
|_hoodie_file_name      |string   |null   |
|id                     |int      |null   |
|value                  |float    |null   |
|country                |string   |null   |
|date                   |bigint   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|date                   |bigint   |null   |
+-----------------------+---------+-------+



To query the table we have to specify the format `org.apache.hudi` to tell Spark to use the Hudi input format, which will automatically filter the parquet files and only return the data of the latest commit. 

In [11]:
val hello_hudi_df = (spark.read.format("org.apache.hudi")
                     .load(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1/*/*"))
hello_hudi_df.registerTempTable("hello_hudi_df")
spark.sql("describe hello_hudi_df").show()

hello_hudi_df: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|                  id|      int|   null|
|                date|   bigint|   null|
|               value|    float|   null|
|             country|   string|   null|
+--------------------+---------+-------+



In [12]:
spark.sql("select id, value, date, country from hello_hudi_df").show()

+---+------+-------------+-------+
| id| value|         date|country|
+---+------+-------------+-------+
|  2|1.2151|1556668800000|Ireland|
|  4|0.8151|1565049600000| Russia|
|  3|0.2151|1565049600000|Belgium|
|  1|0.4151|1551484800000| Sweden|
+---+------+-------------+-------+



### Upsert into a Hudi Dataset

So far we have not done anything hudi-special, we simply did a regular bulk-insert of some data into a Hudi dataset. We could have done the same thing using just regular Spark without Hudi. However now we will look into how we can do upserts, and how Hudi enables us to do this efficiently.

#### Generate Sample Upserts Data

In [13]:
val upsertData = Seq(
    Row(5, Date.valueOf("2019-02-30"), 0.7921f, "Northern Ireland"), //Insert
    Row(1, Date.valueOf("2019-05-01"), 1.151f, "Norway"), //Update
    Row(3, Date.valueOf("2019-08-06"), 0.999f, "Belgium"), //Update
    Row(6, Date.valueOf("2019-08-06"), 0.0151f, "France") //Insert
)
val upsertDf = spark.createDataFrame(
  spark.sparkContext.parallelize(upsertData),
  StructType(schema)
)
upsertDf.show(5)

upsertData: Seq[org.apache.spark.sql.Row] = List([5,2019-03-02,0.7921,Northern Ireland], [1,2019-05-01,1.151,Norway], [3,2019-08-06,0.999,Belgium], [6,2019-08-06,0.0151,France])
upsertDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
+---+----------+------+----------------+
| id|      date| value|         country|
+---+----------+------+----------------+
|  5|2019-03-02|0.7921|Northern Ireland|
|  1|2019-05-01| 1.151|          Norway|
|  3|2019-08-06| 0.999|         Belgium|
|  6|2019-08-06|0.0151|          France|
+---+----------+------+----------------+



#### Make the Upsert using Hudi

1. Change `hoodie.datasource.write.operation` from `bulk_insert` to `upsert`. 
2. Change spark write mode from "overwrite" to "append".
3. Change `bulkInsertDf` to `upsertDf`

In [14]:
val trustStore = Hops.getTrustStore
val pw = Hops.getKeystorePwd
val keyStore = Hops.getKeyStore
val hiveDb = Hops.getProjectFeaturestore.read
val jdbcUrl = (s"jdbc:hive2://10.0.2.15:9085/$hiveDb;" 
                + s"auth=noSasl;ssl=true;twoWay=true;sslTrustStore=$trustStore;"
                + s"trustStorePassword=$pw;sslKeyStore=$keyStore;keyStorePassword=$pw"
                )
val writer = (upsertDf.write.format("org.apache.hudi")
              .option("hoodie.table.name", "hello_hudi_1")
              .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE")
              .option("hoodie.datasource.write.operation", "upsert")
              .option("hoodie.datasource.write.recordkey.field","id")
              .option("hoodie.datasource.write.partitionpath.field", "date")
              .option("hoodie.datasource.write.precombine.field", "date")
              .option("hoodie.datasource.hive_sync.enable", "true")              
              .option("hoodie.datasource.hive_sync.table", "hello_hudi_1")
              .option("hoodie.datasource.hive_sync.database", hiveDb)
              .option("hoodie.datasource.hive_sync.jdbcurl", jdbcUrl)
              .option("hoodie.datasource.hive_sync.partition_fields", "date")
              .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor")
              .mode("append"))
writer.save(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1")

trustStore: String = t_certificate
pw: String = EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3
keyStore: String = k_certificate
hiveDb: String = demo_featurestore_admin000_featurestore
jdbcUrl: String = jdbc:hive2://10.0.2.15:9085/demo_featurestore_admin000_featurestore;auth=noSasl;ssl=true;twoWay=true;sslTrustStore=t_certificate;trustStorePassword=EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3;sslKeyStore=k_certificate;keyStorePassword=EJBVJ7UBVK9O0ZFHQAGPMACAYF01PPWQU470BDIMCQAFYLW6G98ACVYKK0B9NRU3
writer: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@6e378681


#### Inspect the results

Notice that although Hudi stores the old value of the records from the previous commit, when you query the hive table using the `org.apache.hudi` file format, it will only return the values of the latest commit.

In [15]:
spark.sql("select id, value, date, country from hello_hudi_1").show(20)

+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  1|0.4151|1551484800000|          Sweden|
|  5|0.7921|1551484800000|Northern Ireland|
|  2|1.2151|1556668800000|         Ireland|
|  1| 1.151|1556668800000|          Norway|
|  4|0.8151|1565049600000|          Russia|
|  6|0.0151|1565049600000|          France|
+---+------+-------------+----------------+



#### Inspect the updated commit timeline

In [16]:
HoodieDataSourceHelpers.latestCommit(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1")

res16: String = 20190904115157


In [17]:
HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1").toString

res17: String = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20190904114951__commit__COMPLETED],[20190904115157__commit__COMPLETED]


In [18]:
val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1")
val firstTimestamp = timeline.firstInstant.get.getTimestamp
val secondTimestamp = timeline.nthInstant(1).get.getTimestamp

timeline: org.apache.hudi.common.table.HoodieTimeline = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20190904114951__commit__COMPLETED],[20190904115157__commit__COMPLETED]
firstTimestamp: String = 20190904114951
secondTimestamp: String = 20190904115157


### Time Travel

Using the timeline metadata we can inspect the value of a table at a specific point in time. We can pull changes incrementally from Hudi. 

In [19]:
spark.sql(s"select id, value, date, country from hello_hudi_1 where _hoodie_commit_time=$firstTimestamp").show()

+---+------+-------------+-------+
| id| value|         date|country|
+---+------+-------------+-------+
|  1|0.4151|1551484800000| Sweden|
|  2|1.2151|1556668800000|Ireland|
|  4|0.8151|1565049600000| Russia|
+---+------+-------------+-------+



In [20]:
spark.sql(s"select id, value, date, country from hello_hudi_1 where _hoodie_commit_time=$secondTimestamp").show()

+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  5|0.7921|1551484800000|Northern Ireland|
|  1| 1.151|1556668800000|          Norway|
|  6|0.0151|1565049600000|          France|
+---+------+-------------+----------------+



Hudi also has a feature for incremental reads, to use this we have to change the view-type option from the default "read optimized" to "incremental", this is done using the configuration parameter: `hoodie.datasource.view.type`. We also have to specify from which commit to we want to pull the changes, using the properties `hoodie.datasource.read.begin.instanttime` and `hoodie.datasource.read.end.instanttime`.


In [21]:
// Pull changes that happened *after* the first commit
val incrementalDf = (spark.read.format("org.apache.hudi")
             .option("hoodie.datasource.view.type", "incremental")
             .option("hoodie.datasource.read.begin.instanttime", firstTimestamp) 
             .load(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1"))
incrementalDf.registerTempTable("incremental_df")
spark.sql("select id, value, date, country from incremental_df").show(20)

incrementalDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  5|0.7921|1551484800000|Northern Ireland|
|  1| 1.151|1556668800000|          Norway|
|  6|0.0151|1565049600000|          France|
+---+------+-------------+----------------+



In [22]:
// Pull changes that include both commits (from 2017):
val incrementalDf = (spark.read.format("org.apache.hudi")
             .option("hoodie.datasource.view.type", "incremental")
             .option("hoodie.datasource.read.begin.instanttime", "20170830115554") 
             .load(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1"))
incrementalDf.registerTempTable("incremental_df")
spark.sql("select id, value, date, country from incremental_df").show(20)

incrementalDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  1|0.4151|1551484800000|          Sweden|
|  5|0.7921|1551484800000|Northern Ireland|
|  2|1.2151|1556668800000|         Ireland|
|  1| 1.151|1556668800000|          Norway|
|  4|0.8151|1565049600000|          Russia|
|  6|0.0151|1565049600000|          France|
+---+------+-------------+----------------+



In [23]:
//Pull only the first commit
val incrementalDf = (spark.read.format("org.apache.hudi")
             .option("hoodie.datasource.view.type", "incremental")
             .option("hoodie.datasource.read.begin.instanttime", "20170830115554")
             .option("hoodie.datasource.read.end.instanttime", firstTimestamp)
             .load(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1"))
incrementalDf.registerTempTable("incremental_df")
spark.sql("select id, value, date, country from incremental_df").show(20)

incrementalDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+------+-------------+-------+
| id| value|         date|country|
+---+------+-------------+-------+
|  2|1.2151|1556668800000|Ireland|
|  4|0.8151|1565049600000| Russia|
|  3|0.2151|1565049600000|Belgium|
|  1|0.4151|1551484800000| Sweden|
+---+------+-------------+-------+



In [24]:
//Pull only the second commit
val incrementalDf = (spark.read.format("org.apache.hudi")
             .option("hoodie.datasource.view.type", "incremental")
             .option("hoodie.datasource.read.begin.instanttime", firstTimestamp)
             .option("hoodie.datasource.read.end.instanttime", secondTimestamp)
             .load(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1"))
incrementalDf.registerTempTable("incremental_df")
spark.sql("select id, value, date, country from incremental_df").show(20)

incrementalDf: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 7 more fields]
+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  5|0.7921|1551484800000|Northern Ireland|
|  1| 1.151|1556668800000|          Norway|
|  6|0.0151|1565049600000|          France|
+---+------+-------------+----------------+



### Integration with Hopsworks Feature Store

So far we have created a Hudi dataset at the path `hdfs:///Projects/${Hops.getProjectName}/Resources/hello_hudi_1` and registered with the Hive metastore as an external table with the name `hello_hudi_1` in the featurestore Hive database (`Hops.getProjectFeaturestore.read`).

But the dataset have not yet been registered with the Feature store. To register the table with the feature store, we can use the Featurestore Scala SDK and the `syncHiveTableWithFeaturestore` method:

In [25]:
import scala.collection.JavaConverters._
Hops.getFeaturegroups.read.asScala.map(println)

import scala.collection.JavaConverters._
games_features_1
games_features_on_demand_tour_1
season_scores_features_1
attendances_features_1
players_features_1
teams_features_1
res32: scala.collection.mutable.Buffer[Unit] = ArrayBuffer((), (), (), (), (), ())


In [26]:
spark.sql("show tables").show(5, false)

+---------------------------------------+------------------------+-----------+
|database                               |tableName               |isTemporary|
+---------------------------------------+------------------------+-----------+
|demo_featurestore_admin000_featurestore|attendances_features_1  |false      |
|demo_featurestore_admin000_featurestore|games_features_1        |false      |
|demo_featurestore_admin000_featurestore|hello_hudi_1            |false      |
|demo_featurestore_admin000_featurestore|players_features_1      |false      |
|demo_featurestore_admin000_featurestore|season_scores_features_1|false      |
+---------------------------------------+------------------------+-----------+
only showing top 5 rows



In [27]:
Hops.syncHiveTableWithFeaturestore("hello_hudi").setVersion(1).setDescription("test").write()

We can verify that the Hudi dataset is now registered with the feature  store by going to the Feature store UI.

We can also list the names of all available feature groups using the method `getFeaturegroups`:

In [28]:
import scala.collection.JavaConverters._
Hops.getFeaturegroups.read.asScala.map(println)

import scala.collection.JavaConverters._
games_features_1
games_features_on_demand_tour_1
season_scores_features_1
attendances_features_1
players_features_1
teams_features_1
hello_hudi_1
res35: scala.collection.mutable.Buffer[Unit] = ArrayBuffer((), (), (), (), (), (), ())


Once the hudi dataset have been registered with the Feature Store, it can be read by using `getFeaturegroup`:

In [29]:
Hops.getFeaturegroup("hello_hudi").setVersion(1).read().show(5)

+-------------------+--------------------+------------------+----------------------+--------------------+---+------+----------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id| value|         country|         date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+------+----------------+-------------+
|     20190904115157|  20190904115157_0_5|                 3|         1565049600000|9d24f72d-e7da-497...|  3| 0.999|         Belgium|1565049600000|
|     20190904114951|  20190904114951_0_1|                 1|         1551484800000|e4224951-7ca6-476...|  1|0.4151|          Sweden|1551484800000|
|     20190904115157|  20190904115157_1_6|                 5|         1551484800000|e4224951-7ca6-476...|  5|0.7921|Northern Ireland|1551484800000|
|     20190904114951|  20190904114951_1_2|                 2|         1556668800000|7b71c5fc-73e3-481...|  2|1.2

We can also query the hudi dataset directly with SQL from the feature store SDK:

In [30]:
Hops.queryFeaturestore("select id, value, date, country from hello_hudi_1").read.show(5)

+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  1|0.4151|1551484800000|          Sweden|
|  5|0.7921|1551484800000|Northern Ireland|
|  2|1.2151|1556668800000|         Ireland|
|  1| 1.151|1556668800000|          Norway|
+---+------+-------------+----------------+
only showing top 5 rows



It is also possible to use the Feature store API directly for creating feature groups with `Hops.createFeaturegroup().setHudi(true)` this will create the Hudi dataset and register it with Hive and the Feature store. It will set good Hudi defaults, but you can override the defaults by providing your own Map<String,String> with hudi arguments: `Hops.createFeaturegroup().setHudi(true).setHudiArgs(map)`

In [2]:
import scala.collection.JavaConversions._
import collection.JavaConverters._
val sampleData = Seq(
    Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
    Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
    Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
    Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val sampleDf = spark.createDataFrame(
  spark.sparkContext.parallelize(sampleData),
  StructType(schema)
)
val partitionCols = List("date")
(Hops.createFeaturegroup("hudi_featuregroup_test")
                         .setHudi(true)
                         .setPartitionBy(partitionCols)
                         .setDataframe(sampleDf)
                         .setPrimaryKey(List("id")).write())

import scala.collection.JavaConversions._
import collection.JavaConverters._
sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
partitionCols: List[String] = List(date)


In [5]:
Hops.getFeaturegroup("hudi_featuregroup_test").read.show(20)

+-------------------+--------------------+------------------+----------------------+--------------------+---+------+-------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id| value|country|         date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+------+-------+-------------+
|     20190904130344|  20190904130344_1_6|                 2|         1556668800000|882d1680-3553-4be...|  2|1.2151|Ireland|1556668800000|
|     20190904130344|  20190904130344_2_7|                 3|         1565049600000|98dc24de-d156-4eb...|  3|0.2151|Belgium|1565049600000|
|     20190904130344|  20190904130344_0_5|                 1|         1551484800000|38d72e06-07e3-444...|  1|0.4151| Sweden|1551484800000|
|     20190904130344|  20190904130344_3_8|                 4|         1565049600000|935c8005-58d6-463...|  4|0.8151| Russia|1565049600000|
+-------------------+------

You can also override Hudi specific arguments using the `setHudiArgs` and `setHudiBasePath` methods:

In [4]:
val hudiArgs = Map[String, String](
    "hoodie.datasource.write.payload.class"-> "org.apache.hudi.OverwriteWithLatestAvroPayload"
)

hudiArgs: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.write.payload.class -> org.apache.hudi.OverwriteWithLatestAvroPayload)


In [5]:
(Hops.createFeaturegroup("hudi_featuregroup_test_second")
                         .setHudi(true)
                         .setPartitionBy(partitionCols)
                         .setDataframe(sampleDf)
                         .setHudiBasePath(s"hdfs:///Projects/${Hops.getProjectName}/Resources/hudi_featuregroup_test_second")
                         .setHudiArgs(hudiArgs)
                         .setPrimaryKey(List("id")).write())

In [6]:
Hops.getFeaturegroup("hudi_featuregroup_test_second").read.show(20)

+-------------------+--------------------+------------------+----------------------+--------------------+---+------+-------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id| value|country|         date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+------+-------+-------------+
|     20191029123951| 20191029123951_2_11|                 3|         1565049600000|54cbcf61-8a4b-413...|  3|0.2151|Belgium|1565049600000|
|     20191029123951| 20191029123951_1_10|                 2|         1556668800000|12da84f8-4b8e-4cb...|  2|1.2151|Ireland|1556668800000|
|     20191029123951| 20191029123951_3_12|                 4|         1565049600000|9b9ed741-eafb-4ca...|  4|0.8151| Russia|1565049600000|
|     20191029123951|  20191029123951_0_9|                 1|         1551484800000|5101dd1d-6735-47e...|  1|0.4151| Sweden|1551484800000|
+-------------------+------

We can also utilize the `insertIntoFeaturegroup` wrapper to make Upserts into Hudi datasets:

In [3]:
import scala.collection.JavaConversions._
import collection.JavaConverters._
val upsertData = Seq(
    Row(5, Date.valueOf("2019-02-30"), 0.7921f, "Northern Ireland"), //Insert
    Row(1, Date.valueOf("2019-05-01"), 1.151f, "Norway"), //Update
    Row(3, Date.valueOf("2019-08-06"), 0.999f, "Belgium"), //Update
    Row(6, Date.valueOf("2019-08-06"), 0.0151f, "France") //Insert
)
val schema = 
 scala.collection.immutable.List(
  StructField("id", IntegerType, true),
  StructField("date", DateType, true),
  StructField("value", FloatType, true),
  StructField("country", StringType, true) 
)
val upsertDf = spark.createDataFrame(
  spark.sparkContext.parallelize(upsertData),
  StructType(schema)
)
val partitionCols = List("date")
(Hops.insertIntoFeaturegroup("hudi_featuregroup_test")
                         .setPartitionBy(partitionCols)
                         .setDataframe(upsertDf)
                         .setMode("append")
                         .setPrimaryKey(List("id")).write())

import scala.collection.JavaConversions._
import collection.JavaConverters._
upsertData: Seq[org.apache.spark.sql.Row] = List([5,2019-03-02,0.7921,Northern Ireland], [1,2019-05-01,1.151,Norway], [3,2019-08-06,0.999,Belgium], [6,2019-08-06,0.0151,France])
schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true))
upsertDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
partitionCols: List[String] = List(date)


In [4]:
Hops.queryFeaturestore("select id, value, date, country from hudi_featuregroup_test_1").read.show(5)

+---+------+-------------+----------------+
| id| value|         date|         country|
+---+------+-------------+----------------+
|  3| 0.999|1565049600000|         Belgium|
|  6|0.0151|1565049600000|          France|
|  4|0.8151|1565049600000|          Russia|
|  1|0.4151|1551484800000|          Sweden|
|  5|0.7921|1551484800000|Northern Ireland|
+---+------+-------------+----------------+
only showing top 5 rows



In [5]:
val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(FileSystem.get(sc.hadoopConfiguration), 
                                     s"hdfs:///Projects/${Hops.getProjectName}/Resources/hudi_featuregroup_test_1")

timeline: org.apache.hudi.common.table.HoodieTimeline = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20190904140311__commit__COMPLETED],[20190904140651__commit__COMPLETED]


In [8]:
val firstTimestamp = timeline.firstInstant.get.getTimestamp

firstTimestamp: String = 20190904140311


In [7]:
val secondTimestamp = timeline.nthInstant(1).get.getTimestamp

secondTimestamp: String = 20190904140651
