Apache Kafka employs Access Control Lists (ACLs) as a vital component of its security infrastructure. ACLs regulate access to Kafka resources by specifying which users or applications can perform specific actions. By implementing ACLs, Kafka administrators can enforce granular access control policies, ensuring data integrity and confidentiality within their Kafka clusters.
ACLs serve as a robust defence mechanism against unauthorised access, safeguarding sensitive data and preventing malicious activities within Kafka clusters. With ACLs, administrators can restrict topics, partitions, consumer groups, and administrative operations, empowering them to enforce compliance standards and protect against data breaches.
Kafka with Zookeeper vs. KRaft-based Kafka Clusters
Configuring ACLs in Kafka differs based on the underlying cluster architecture. In traditional Kafka clusters with Zookeeper, ACLs are managed through Zookeeper nodes, where access control settings are stored using the following class:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
However, in KRaft-based Kafka clusters, ACLs are integrated directly into the broker configuration, simplifying management and ensuring consistency across the cluster. KRaft-based cluster uses a different class:
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
Building on the my previous article Running Kafka in Kubernetes with Kraft mode and SASL authentication, I will demonstrate how to configure the broker to use ACLs, how to troubleshoot ACL-related issues, and some ACL examples.
Configuring Kafka for ACLs
Next are the parameters we need to add our previous Kafka Broker to enable ACLs and a new admin client configuration.
Broker
Enable SASL/PLAIN mechanism for the CONTROLLER
- name: KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_kafkaclient1="kafkaclient1-secret";
- name: KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN
- name: KAFKA_CONTROLLER_ENABLED_MECHANISMS
value: PLAIN
Update the security.protocol.map of the CONTROLLER to use SASL_PLAINTEXT
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:SASL_PLAINTEXT,SASL:SASL_PLAINTEXT"
Configure the authorizer and super.user
- name: KAFKA_SUPER_USERS
value: User:admin
- name: KAFKA_AUTHORIZER_CLASS_NAME
value: org.apache.kafka.metadata.authorizer.StandardAuthorizer
- name: KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND
value: "false"
Admin Client
Create a ConfigMap based on the sasl_client.properties file:
kubectl create configmap kafka-admin --from-file sasl_admin.properties -n kafka
kubectl describe configmaps -n kafka kafka-admin
Output:
configmap/kafka-admin created
Name: kafka-admin
Namespace: kafka
Labels: <none>
Annotations: <none>
Data
====
sasl_admin.properties:
----
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret";
BinaryData
====
Events: <none>
Mount the ConfigMap as a volume:
volumeMounts:
- mountPath: /etc/kafka/secrets/
name: kafka-admin
...
volumes:
- name: kafka-admin
configMap:
name: kafka-admin
Creating the deployment
Clone the repo:
git clone https://github.com/rafaelmnatali/kafka-k8s.git
cd acls
You can deploy Kafka using the following commands:
kubectl apply -f 00-namespace.yaml
kubectl apply -f 01-kafka.yaml
Verify communication across brokers
There should now be three Kafka brokers each running on separate pods within your cluster. Name resolution for the headless service and the three pods within the StatefulSet is automatically configured by Kubernetes as they are created, allowing for communication across brokers. See the related documentation for more details on this feature.
You can check the first pod's logs with the following command:
kubectl logs kafka-0
The name resolution of the three pods can take more time to work than it takes the pods to start, so you may see UnknownHostException warnings in the pod logs initially:
WARN [RaftManager nodeId=2] Error connecting to node kafka-1.kafka-headless.kafka.svc.cluster.local:29093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: kafka-1.kafka-headless.kafka.svc.cluster.local ...
Eventually, each pod will successfully resolve pod hostnames and end with a message stating the broker has been unfenced:
INFO [Controller 0] Unfenced broker: UnfenceBrokerRecord(id=1, epoch=176) (org.apache.kafka.controller.ClusterControlManager)
Testing Kafka ACLs
To test the ACLs, we will deploy two clients: admin and non-admin. Open two terminal windows and deploy the admin client in one and the non-admin in the another one.
Commands:
kubectl apply -f 02-kafka-client.yaml
kubectl apply -f 03-kafka-admin.yaml
Let's use the operation CREATE as an example of how to troubleshoot and solve permission errors in Kafka.
First, create a topic from the admin client:
kubectl exec -it kafka-admin -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-admin --command-config /etc/kafka/secrets/sasl_admin.properties"
Output:
Created topic kafka-admin
Second, try to create a topic from the non-admin client:
kubectl exec -it kafka-client -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-client --command-config /etc/kafka/secrets/sasl_client.properties"
Output:
Error while executing topic command : Authorization failed.
Our client does not have permission. Looking into the Kafka logs it's possible to see the following entry:
kafka-0 kafka [2024-03-13 20:10:17,259] INFO Principal = User:kafkaclient1 is Denied operation = CREATE from host = 10.244.0.9 on resource = Topic:LITERAL:kafka-admin for request = CreateTopics with resourceRefCount = 1 based on rule DefaultDeny (kafka.authorizer.logger)
⚠️ Note that in the log this is an INFO entry and not an ERROR.
Create the ACL from the admin client:
kubectl exec -it kafka-admin -- bash -c "kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --add --allow-principal User:kafkaclient1 --operation Create --allow-host 10.244.0.9 --cluster --command-config /etc/kafka/secrets/sasl_admin.properties"
Output:
Adding ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:kafkaclient1, host=10.244.0.9, operation=CREATE, permissionType=ALLOW)
Try to create a topic from the non-admin client again:
kubectl exec -it kafka-client -- bash -c "kafka-topics --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 -create --topic kafka-client --command-config /etc/kafka/secrets/sasl_client.properties"
Output:
Created topic kafka-client.
More Kafka ACLs examples
Read/Write in a Topic:
kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --add --allow-principal User:kafkaclient1 --operation Read --operation Write --allow-host 10.244.0.9 --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties
List ACLs:
kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --list --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties
Remove ACLs:
kafka-acls --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9092 --remove --topic kafka-client --command-config /etc/kafka/secrets/sasl_admin.properties
Summary
Access Control Lists (ACLs) are indispensable for securing Apache Kafka deployments, enabling administrators to enforce fine-grained access controls and mitigate security risks. Whether managing Kafka with Zookeeper or adopting KRaft-based Kafka clusters, ACLs play a pivotal role in ensuring data protection, compliance, and overall cluster security. By understanding and leveraging ACLs effectively, organisations can fortify their Kafka infrastructure against unauthorised access and potential threats, bolstering the integrity and confidentiality of their event streaming pipelines.
Comments