Posted on

Hey Everyone! So recently, I was working on building a Spark Streaming Application to communicate with Kafka cluster with either SASL_PLAINTEXT or SASL_SSL protocol enabled, in other words, a secure cluster.

Before we start discussing any further, I would like to share that in this demo application, we are using Cloudera Distribution of Apache Spark(CDS) and Cloudera Distribution of Apache Kafka(CDK).

Spark kafka

The Cloudera Distribution of Apache Kafka 2.0.0 (based on Apache Kafka 0.9.0) introduced a new consumer API that allowed consumers to read data from a secure Kafka cluster.

So for kerberos enabled cluster, we need CDS Version: 2.1+ and CDK Version: 2.1+ to make the combination work:

Cloudera Distribution of Spark 2.1 release 1 or higher
Cloudera Distribution of Kafka 2.1.0 or higher

also you need KAFKA 0-10 integration (export SPARK_KAFKA_VERSION=0.10 before launch spark2-submit).

This demo is build to get the counts of various words in every batch interval fetched from a secured cluster using this Spark Streaming Application.

Please follow the mentioned steps to build and execute this sample Spark Streaming Application to interact with secure Kafka cluster.

Download & Build the Demo App:

1. Download or Clone the Git repo available here Demo App GitHub Link.

2. Build this application using maven.

$ cd secure-spark-streaming-kafka

$ mvn clean package

3. If your application is built successfully, you can observe the mentioned output.

[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.061 s

4. After building your application, take the generated uber jar from target/secure-spark-streaming-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar to the spark client node(where you are going to launch the query from).

Create the required file & permission:

5. Once you have the uber jar available in the client node, the next step is to create a jaas.conf file for JAAS configuration for Kerberos access. As per Apache Kafka Documentation, Kafka uses the Java Authentication and Authorization Service (JAAS) for SASL configuration.

Sample jaas.conf:

KafkaClient { required

Modify keytab & principal value to your actual value. KafkaServer is the section name in the JAAS file used by each KafkaServer/Broker. This section provides SASL configuration options for the broker including any SASL client connections made by the broker for inter-broker communication.

6. Also, check the permission on the created jaas.conf and keytab files to avoid file permission exceptions.

Execute the Spark Application:

7. Once all the above steps are completed successfully, we have to execute the spark2-submit command.

 SPARK_KAFKA_VERSION=0.10 spark2-submit \
--num-executors 2 \ 
--master yarn \ 
--deploy-mode client \ 
--files /<path>/jaas.conf,/<path>/user.kafka.keytab \ 
--driver-java-options "" \ 
--class com.github.ankit.spark.example.DirectKafkaWordCount \ 
--conf "" \ 
/<path>/kafka-spark-secure-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \ 
<broker_hostname>:9092 \ 
<topic_name> \ 

Note: Please replace the below-mentioned parameters with actual value.

<broker_hostname> :: Broker hostname.

<path> :: actual file path.

Modify the last parameter to “true” if you have SSL enabled and change the port accordingly.

8. As your Spark streaming application is running now, in another terminal, start a producer that publishes to a topic using “kafka-console-producer” client.

9. You can observe the counts of various words in every batch interval, in your spark streaming driver’s stdout.

I hope you are able to follow the steps and have successfully executed the Spark streaming application. In case errors are encountered please try to go through the steps again and troubleshoot the issue. I will be happy to help you guys out, so comments on this blog with your queries.


Leave a Reply

Your email address will not be published. Required fields are marked *