Secure Kafka Java Producer with Kerberos

The most recent release of Kafka 0.9 with it’s comprehensive security implementation has reached an important milestone. In his blog post Kafka Security 101 Ismael from Confluent describes the security features part of the release very well.

As a part II of the here published post about Kafka Security with Kerberos this post discussed a sample implementation of a Java Kafka producer with authentication. It is part of a mini series of posts discussing secure HDP clients, connecting services to a secured cluster, and kerberizing the HDP Sandbox (Download HDP Sandbox). In this effort at the end of this post we will also create a Kafka Servlet to publish messages to a secured broker.

Kafka provides SSL and Kerberos authentication. Only Kerberos is discussed here.

Kafka from now on supports four different communication protocols between Consumers, Producers, and Brokers. Each protocol considers different security aspects, while PLAINTEXT is the old insecure communication protocol.

  • PLAINTEXT (non-authenticated, non-encrypted)
  • SSL (SSL authentication, encrypted)
  • PLAINTEXT+SASL (authentication, non-encrypted)
  • SSL+SASL (encrypted authentication, encrypted transport)

A Kafka client needs to be configured to use the protocol of the corresponding broker. This tells the client to use authentication for communication with the broker:

Making use of Kerberos authentication in Java is provided by the Java Authentication and Authorization Service (JAAS) which is a pluggable authentication method similar to PAM supporting multiple authentication methods. In this case the authentication method being used is GSS-API for Kerberos.

Demo Setup

For JAAS a proper configuration of GSS would be needed in addition to being in possession of proper credentials, obviously. Some credentials can be created with MIT Kerberos like this:

The last line is not necessarily needed as it creates us a so called keytab – basically an encrypted password of the user – that can be used for password less authentication for example for automated services. We will make use of that here as well.

First we need to prepare a test topic to publish messages with proper privileges for our kafka-user:

As a sample producer we will use this:

With this setup we can go ahead demonstrating two ways to use a JAAS context to authenticate with the Kafka broker. At first we will configure a context to use the existing privileges possessed by the executing user. Next we use a so called keytab to demonstrate a password-less login for automated producer processes. At last we will look at a Servlet implementation provided here.

Authentication with User Login

To configure a JAAS config with userKeyTab set to false and useTicketCache to true, so that the privileges of the current users are being used.

We store this in a file under /home/kafka-user/kafka-jaas.conf and exeute the broker like this:

Using Keytab to Login

Next we will configure the JAAS context to use a generated keytab file instead of the security context of the executing user. Before we can do this we need to create the keytab storing it also under /home/kafka-user/kafka-user.keytab.

The JAAS configuration can now be changed to look like this:

This will use the keytab stored under /home/kafka-user/kafka-user.keytab while the user executing the producer must not be logged in to any security controller:

Kafka Producer Servlet

In a last example we will add a Kafka Servlet to the hdp-web-sample project previously described in this post. Our Servlet will get the topic and message as a GET parameter. The Servlet looks as follwoing:

Again we are changing the JAAS config of the Tomcat service to be able to make use of the previously generated keytab. The jaas.conf of Tomcat will contain now this:

After deploying the web app and restarting tomcat with this newly adapted JAAS config you should be able to publish message to a secured broker be triggering the following GET address from a browser http://one.hdp:8099/hdp-web/kafka?topic=test&msg=Test1 . The response should be a 200 OK like this:

tomcat_kafka_1

You might be having some issues and in particular seeing this Exception:

If are seeing the message javax.security.auth.login.LoginException: Unable to obtain password from user it likely refers to your keytab file, as being the users password. So make sure that the tomcat user is able to read that file stored under /home/kafka-user/kafka-user.keytab for example.

Further Readings

3 thoughts on “Secure Kafka Java Producer with Kerberos”

  1. Hi, I followed same thing but it gives me javax.security.auth.login.LoginException: Exception.
    Can you share your email id.

    1. How to give access through java. As we are creating topic through java api, we need to run acl throgh java only.
      It throws zookeeper exception. $NoAuthException: KeeperErrorCode.
      What would be the changes in jass file, to resolve this issue.

  2. I get below Exception
    kafka.common.KafkaException: fetching topic metadata for topics [Set(test001)] from broker [ArrayBuffer(BrokerEndPoint(2,cdh55.cs1hypers.com,9092), BrokerEndPoint(1,cdh54.cs1hypers.com,9092), BrokerEndPoint(0,cdh53.cs1hypers.com,9092))] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
    at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
    at kafka.utils.Logging$class.swallowError(Logging.scala:106)
    at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at kafka.krb.KafkaProducer.main(KafkaProducer.java:27)
    Caused by: java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)

Leave a Reply

Your email address will not be published. Required fields are marked *