Kafka Security with Kerberos

Apache Kafka developed as a durable and fast messaging queue handling real-time data feeds originally did not come with any security approach. Similar to Hadoop Kafka at the beginning was expected to be used in a trusted environment focusing on functionality instead of compliance. With the ever growing popularity and the widespread use of Kafka the community recently picked up traction around a complete security design including authentication with Kerberos and SSL, encryption, and authorization. Judging by the details of the security proposal found here the complete security measures will be included with the 0.9 release of Apache Kafka.

The releases of HDP 2.3 already today support a secure implementation of Kafka with authentication and authorization. Especially the integration with the security framework Apache Ranger this becomes a comprehensive security solution for any Hadoop deployment with real-time data demands. In this post we by example look at how working with a kerberized Kafka broker is different from before. Here working with the known shell tools and a custom Java producer.

A Secure Kafka Broker

A kerberized HDP cluster or secured Kafka broker has some new parameters that are worth covering before continuing. Probably the most important change is the used broker (security.inter.broker.protocol) protocol PLAINTEXTSASL instead of PLAINTEXT before. Starting with the release of the security proposal Kafka will support the following protocols:

  • PLAINTEXT (non-authenticated, non-encrypted)
  • SSL (SSL authentication, encrypted)
  • SASL+PLAINTEXT (authentication, non-encrypted)
  • SASL+SSL (encrypted authentication, encrypted transport)

The use of PLAINTEXTSASL indicates the use of Kerberos without transport encryption. Further a secure broker defines a super user granted with all required priviledges. Typically this users is intended to administrer the broker. By default the only super user (super.users) is kafka (user:kafka)

The authorization in Kafka is a plugable implementation which can for example be replaced with a Apache Ranger authorization implementation. Kafka by default comes with the kafka.security.auth.SimpleAclAuthorizer defined by the parameter authorizer.class.name .

Last but not least as any one somewhat familiar with Kerberos might know a translation from principal to user name has to be configured somehow. In Kafka this is defined by the parameter principal.to.local.class which is set to kafka.security.auth.KerberosPrincipalToLocal by default.

Authentication and Authorization

Working with a secure Kafka broker going forward requires a proper authentication as well as proper permission being setup for the user. If for example authentication is not provided the following GSSException would be the reuslt when wrting to a topic:

$ bin/kafka-console-producer.sh --broker-list one.hdp:6667 --topic test
dfadsf
[2015-11-14 15:06:34,520] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,one.hdp,6667)] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-11-14 15:06:34,525] ERROR fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,one.hdp,6667))] failed (kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,one.hdp,6667))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.CoreUtils$.read(CoreUtils.scala:193)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 12 more
[2015-11-14 15:07:10,159] ERROR Closing socket for /192.168.33.100 because of error (kafka.network.Processor)
java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)]
at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:192)
at org.apache.kafka.common.network.Channel.connect(Channel.java:71)
at kafka.network.Processor.handshake(SocketServer.scala:518)
at kafka.network.Processor.run(SocketServer.scala:407)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)]
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:199)
at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:182)
... 4 more
Caused by: GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)
at sun.security.jgss.GSSHeader.<init>(GSSHeader.java:97)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:306)
at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285)
at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:167)
... 5 more

Two things are required for a proper authentication with Kafka tools moving forward. At first you will require to kinit and obtain a valid TGT for a user principal existing in the used KDC. That can be archived like this for the user data_artist01:

$ kinit data_artist01
$ klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: data_artist01@MYCORP.NET

Valid starting     Expires            Service principal
11/13/15 15:06:57  11/14/15 15:06:57  krbtgt/MYCORP.NET@MYCORP.NET
    renew until 11/13/15 15:06:57

Next it might be required to set the security protocol to be used explicitly by using the –security-protcol config as shown below:

$ bin/kafka-console-producer.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL
fdsaf
[2015-11-14 15:12:55,734] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.AuthorizationException}] for topic [test]: 
class kafka.common.AuthorizationException  (kafka.producer.BrokerPartitionInfo)

https://cwiki.apache.org/confluence/display/KAFKA/Security#Security-Authorization

$ bin/kafka-acls.sh --list --topic test --config /usr/hdp/current/kafka-broker/conf/server.properties
Following is list of  acls for resource : TOPIC:test

$

As we can see from the command executed above no ACLs are currently assigned to our topic. For this Kafka provides the kafka-acls.sh shell script to adjust access permissions of topics which we are going to use to provide our user data_artist01 with proper access rights to the topic test. The help section of the script already gives a good idea of how it can be used for that:

--operations                            comma separated list of operations,    
                                          allowed Operations are:              
                                            READ                                  
                                            WRITE                                 
                                            CREATE                                
                                            DELETE                                
                                            ALTER                                 
                                            DESCRIBE                              
                                            CLUSTER_ACTION                        
                                            ALL 


--allowhosts <allowhosts>               comma separated list of hosts , *      
                                          indicates all hosts.                 
--allowprincipals <allowprincipals>     comma separated list of principals     
                                          where principal is in principalType:
                                          principalName format i.e. user:      
                                          bob@EXAMPLE.COM. Specifying user:*   
                                          will allow access to all users.

For now only the super user kafka has the rights to make any changes to the permissions in Kafka. To become the kafka user we simply use it’s keytab:

$ klist -kt /etc/security/keytabs/kafka.service.keytab
Keytab name: FILE:/etc/security/keytabs/kafka.service.keytab
KVNO Timestamp         Principal
---- ----------------- --------------------------------------------------------
   1 11/13/15 12:09:02 kafka/one.hdp@MYCORP.NET
   1 11/13/15 12:09:02 kafka/one.hdp@MYCORP.NET
   1 11/13/15 12:09:02 kafka/one.hdp@MYCORP.NET
   1 11/13/15 12:09:02 kafka/one.hdp@MYCORP.NET
   1 11/13/15 12:09:02 kafka/one.hdp@MYCORP.NET
$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/one.hdp@MYCORP.NET

Being the kafka user we can now adjust the access rights to our test topic and give the user data_artist01 READ, WRITE, DELTE, DESCRIBE privileges from any host (*).

$ /usr/hdp/current/kafka-broker/bin/kafka-acls.sh --add 
--allow-principals user:kafka-user --operation ALL 
--topic test --authorizer-properties zookeeper.connect=one.hdp:2181
Adding following acls for resource: Topic:test 
 	user:kafka-user has Allow permission for operations: All from hosts: * 

Following is list of acls for resource: Topic:test 
 	user:kafka-user has Allow permission for operations: All from hosts: *

Listing the permission of the test topic again:

$ bin/kafka-acls.sh --list --topic test --config /usr/hdp/current/kafka-broker/conf/server.properties
Following is list of  acls for resource : TOPIC:test
    user:data_artist01 has ALLOW permission for operations: READ,WRITE,DELETE,DESCRIBE from hosts: *

Writing to the topic should now be feasible for the user data_artist01:

[root@one kafka-broker]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: data_artist01@MYCORP.NET

Valid starting     Expires            Service principal
11/13/15 16:06:23  11/14/15 16:06:23  krbtgt/MYCORP.NET@MYCORP.NET
    renew until 11/13/15 16:06:23
[root@one kafka-broker]# bin/kafka-console-producer.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL
dfds
asf
asdf
asdf

# bin/kafka-simple-consumer-shell.sh --broker-list one.hdp:6667 --topic test --security-protocol PLAINTEXTSASL --partition 0
dfds
asf
asdf
asdf

As already mentioned before the authorization can easily also be done be Apache Ranger using a comprehensive UI and it’s auditing capabilities. For details see here.

Further Readings

Advertisements

One thought on “Kafka Security with Kerberos

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s