top of page
Writer's pictureRafael Natali

Unlocking Kafka Security with Access Control Lists (ACLs)

Updated: Mar 28


Apache Kafka employs Access Control Lists (ACLs) as a vital component of its security infrastructure. ACLs regulate access to Kafka resources by specifying which users or applications can perform specific actions. By implementing ACLs, Kafka administrators can enforce granular access control policies, ensuring data integrity and confidentiality within their Kafka clusters.


ACLs serve as a robust defence mechanism against unauthorised access, safeguarding sensitive data and preventing malicious activities within Kafka clusters. With ACLs, administrators can restrict topics, partitions, consumer groups, and administrative operations, empowering them to enforce compliance standards and protect against data breaches.


Photo by Patrick Robert Doyle on Unsplash
Photo by Patrick Robert Doyle on Unsplash

Kafka with Zookeeper vs. KRaft-based Kafka Clusters


Configuring ACLs in Kafka differs based on the underlying cluster architecture. In traditional Kafka clusters with Zookeeper, ACLs are managed through Zookeeper nodes, where access control settings are stored using the following class:

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

However, in KRaft-based Kafka clusters, ACLs are integrated directly into the broker configuration, simplifying management and ensuring consistency across the cluster. KRaft-based cluster uses a different class:

authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

Building on the my previous article Running Kafka in Kubernetes with Kraft mode and SASL authentication, I will demonstrate how to configure the broker to use ACLs, how to troubleshoot ACL-related issues, and some ACL examples.


Configuring Kafka for ACLs


Next are the parameters we need to add our previous Kafka Broker to enable ACLs and a new admin client configuration.


Broker


  • Enable SASL/PLAIN mechanism for the CONTROLLER

- name: KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG
  value: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_kafkaclient1="kafkaclient1-secret"; 
- name: KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL
  value: PLAIN
- name: KAFKA_CONTROLLER_ENABLED_MECHANISMS
  value: PLAIN     
  • Update the security.protocol.map of the CONTROLLER to use SASL_PLAINTEXT

- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
  value: "CONTROLLER:SASL_PLAINTEXT,SASL:SASL_PLAINTEXT"
  • Configure the authorizer and super.user

- name: KAFKA_SUPER_USERS
  value: User:admin
- name: KAFKA_AUTHORIZER_CLASS_NAME
  value: org.apache.kafka.metadata.authorizer.StandardAuthorizer
- name: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND
  value: "false"

Admin Client


  • Create a ConfigMap based on the sasl_client.properties file:

kubectl create configmap kafka-admin --from-file sasl_admin.properties -n kafka    
kubectl describe configmaps -n kafka kafka-admin

Output:

configmap/kafka-admin created
Name:         kafka-admin
Namespace:    kafka
Labels:       <none>
Annotations:  <none>
    
Data
====
sasl_admin.properties:
----
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="admin" \
      password="admin-secret";
    
BinaryData
====

Events:  <none>
  • Mount the ConfigMap as a volume:

    volumeMounts:
        - mountPath: /etc/kafka/secrets/
          name: kafka-admin
    ...
    volumes:
    - name: kafka-admin
      configMap: 
        name: kafka-admin

Creating the deployment


Clone the repo:

git clone https://github.com/rafaelmnatali/kafka-k8s.git
cd acls

You can deploy Kafka using the following commands:

kubectl apply -f 00-namespace.yaml
kubectl apply -f 01-kafka.yaml

Verify communication across brokers


There should now be three Kafka brokers each running on separate pods within your cluster. Name resolution for the headless service and the three pods within the StatefulSet is automatically configured by Kubernetes as they are created, allowing for communication across brokers. See the related documentation for more details on this feature.


You can check the first pod's logs with the following command:

kubectl logs kafka-0

The name resolution of the three pods can take more time to work than it takes the pods to start, so you may see UnknownHostException warnings in the pod logs initially:

WARN [RaftManager nodeId=2] Error connecting to node kafka-1.kafka-headless.kafka.svc.cluster.local:29093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: kafka-1.kafka-headless.kafka.svc.cluster.local         ... 

Eventually, each pod will successfully resolve pod hostnames and end with a message stating the broker has been unfenced:

INFO [Controller 0] Unfenced broker: UnfenceBrokerRecord(id=1, epoch=176) (org.apache.kafka.controller.ClusterControlManager)

Testing Kafka ACLs


To test the ACLs, we will deploy two clients: admin and non-admin. Open two terminal windows and deploy the admin client in one and the non-admin in the another one.


Commands:

kubectl apply -f 02-kafka-client.yaml
kubectl apply -f 03-kafka-admin.yaml

Let's use the operation CREATE as an example of how to troubleshoot and solve permission errors in Kafka.


First, create a topic from the admin client:

kubectl exec -it kafka-admin -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-admin --command-config /etc/kafka/secrets/sasl_admin.properties"

Output:

Created topic kafka-admin

Second, try to create a topic from the non-admin client:

kubectl exec -it kafka-client -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-client --command-config /etc/kafka/secrets/sasl_client.properties"

Output:

Error while executing topic command : Authorization failed.

Our client does not have permission. Looking into the Kafka logs it's possible to see the following entry:

kafka-0 kafka [2024-03-13 20:10:17,259] INFO Principal = User:kafkaclient1 is Denied operation = CREATE from host = 10.244.0.9 on resource = Topic:LITERAL:kafka-admin for request = CreateTopics with resourceRefCount = 1 based on rule DefaultDeny (kafka.authorizer.logger)

⚠️ Note that in the log this is an INFO entry and not an ERROR.


Create the ACL from the admin client:

kubectl exec -it kafka-admin -- bash -c "kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --add --allow-principal User:kafkaclient1 --operation Create --allow-host 10.244.0.9 --cluster --command-config /etc/kafka/secrets/sasl_admin.properties"

Output:

Adding ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`: 
        (principal=User:kafkaclient1, host=10.244.0.9, operation=CREATE, permissionType=ALLOW)

Try to create a topic from the non-admin client again:

kubectl exec -it kafka-client -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-client --command-config /etc/kafka/secrets/sasl_client.properties"

Output:

Created topic kafka-client.

More Kafka ACLs examples


Read/Write in a Topic:

kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --add --allow-principal User:kafkaclient1 --operation Read --operation Write --allow-host 10.244.0.9 --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties

List ACLs:

kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --list --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties

Remove ACLs:

kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --remove --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties

Summary


Access Control Lists (ACLs) are indispensable for securing Apache Kafka deployments, enabling administrators to enforce fine-grained access controls and mitigate security risks. Whether managing Kafka with Zookeeper or adopting KRaft-based Kafka clusters, ACLs play a pivotal role in ensuring data protection, compliance, and overall cluster security. By understanding and leveraging ACLs effectively, organisations can fortify their Kafka infrastructure against unauthorised access and potential threats, bolstering the integrity and confidentiality of their event streaming pipelines.


References


59 views0 comments

Bình luận


bottom of page