Hopsworks provides Kafka-as-a-Service for streaming applications. Hopsworks provides by default the Hops java/scala library and Hops python library which make programming easier by abstracting away all the configuration boilerplate code such as Kafka endpoints, topics etc. Using these libraries, you can be up and running a simple Kafka on Hopsworks in minutes.
The following sections demonstrate different ways for writing Kafka applications on Hopsworks:
Our service is tightly coupled with our project-based model so only members of a project can use a specific Kafka topic, unless specified otherwise. The Kafka service on Hops is multi-tenant, allowing users to share topics between projects as desired.
If users prefer to be guided through the rest of the guide in Hopsworks, they can follow the Kafka Tour by selecting it from the available tours in the landing page.
Download and compile the example application
You can download and compile a sample Spark streaming by following these steps:
Create a Kafka topic and schema
The next step is to create a Kafka topic that the sample spark streaming application will produce to and consume from. To create a topic, we use the Kafka service available in Hopsworks.
{
"fields": [
{
"name": "timestamp",
"type": "string"
},
{
"name": "priority",
"type": "string"
},
{
"name": "logger",
"type": "string"
},
{
"name": "message",
"type": "string"
}
],
"name": "myrecord",
"type": "record"
}
Advanced Kafka Topic Creation
A Kafka topic by default will be accessible only to members of a
specific project. In order to share the topic with another project
click on the Kafka
service from the menu on the left. This will
bring you to Kafka main page as illustrated below. Then press the
the Share topic
button on the appropriate topic and select the
name of the project you would like to share with.
You can also fine grain access to Kafka topics by adding ACLs easily
through Hopsworks. Once you have created a Kafka topic, click on the
Kafka
service and then on the Add new ACL button.
When creating a new ACL you are given the following options:
When you are done with the ACL parameters click on the Create button.
As an example assume that we have already created a Kafka topic for our project and we have shared this topic with another project named another_sample_project. We would like members of the other project NOT to be able to produce on this topic. Then the ACL would look like the following.
If you would like to see more details about your Kafka topic click on
the Advanced view
button. In the picture below we
can see that there are three ACLs. The first is the default ACL which
is applied when a topic is created. The second was created when we
shared the topic with another project, allowing full access and
finally the third is the custom ACL we created before.
Upload the compiled sample application and use it to create Spark jobs on Hopsworks
Run the created producer/consumer jobs Run both jobs. While the consumer is running you can check its execution log. Use the Dataset browser to navigate to the directory /Resources/Data-APPLICATION_ID/. Right click on the file part-00000 and Preview the content.
A sample output would look like the following:
You can find several example notebooks using kafka at hops_examples.
In this section we will demonstrate how you can use a jupyter notebook and python to produce/consume kafka messages. In this section it is assumed that you have already created a Kafka topic named “test” to produce/consume from and that you have enabled anaconda (which comes with some pre-installed packages, including the python package kafka-confluent) in your project.
Start Jupyter
Start Jupyter by going to the Jupyter tab, selecting Spark(static or dynamic), filling in the system properties and pressing “Start”.
Create the new notebook
Create a new notebook and paste the following
from hops import kafka
from hops import tls
from confluent_kafka import Producer, Consumer
TOPIC_NAME = "test"
config = kafka.get_kafka_default_config()
producer = Producer(config)
consumer = Consumer(config)
consumer.subscribe(["test"])
# wait a little while before executing the rest of the code (put it in a different Jupyter cell)
# so that the consumer get chance to subscribe (asynchronous call)
for i in range(0, 10):
producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
# Trigger the sending of all messages to the brokers, 10sec timeout
producer.flush(10)
for i in range(0, 10):
msg = consumer.poll(timeout=5.0)
if msg is not None:
print('Consumed Message: {} from topic: {}'.format(msg.value(), msg.topic()))
else:
print("Topic empty, timeout when trying to consume message")
Connecting your Java/Scala Producers and Consumers from an external cluster to the one shipped with Hopsworks requires exporting the project certificates, keystore and trustore. These are used by the clients to securely connect and authenticate against the Hopsworks Kafka cluster. Exporting the certificates is done from the project’s Settings page as shown in the gif below. This will download the keyStore.jks, trustStore.jks and display the certificates’ password.
The Kafka clients’ configuration needs to include the following properties:
security.protocol=SSL
ssl.truststore.location=trustStore.jks
ssl.truststore.password=<password>
ssl.keystore.location=keyStore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
ssl.endpoint.identification.algorithm=""
For further information, please refer to the Apache Kafka security docs http://kafka.apache.org/23/documentation.html#security_ssl
If the clients can connect directly to the Kafka cluster, then the Kafka INTERNAL advertised listener (default port port 9091) needs to be set in the clients’ configuration. Otherwise the EXTERNAL advertised listener needs to be used (port 9092).
For further information, please refer to the Apache Kafka broker config docs http://kafka.apache.org/23/documentation.html#brokerconfigs
The management of Kafka Avro schemas in Hopsworks is compatible with Confluent Schema Registry v5.3.1. The client should be able to replace the base URL used for the Schema Registry and manage their schemas without any disruption.
Change the base URL used by Confluent Schema Registry (localhost:8081) to localhost:8181/project/{projectId}/kafka.
Schema Compatibility
By default, all schemas in Hopsworks are backward compatible. It is configurable using the /config endpoints. For more details see https://docs.confluent.io/5.3.0/schema-registry/develop/api.html#config .
Documentation
For a detailed description of the endpoints please see Confluent Schema Registry documentation v5.3.1: https://docs.confluent.io/5.3.0/schema-registry/develop/api.html