This tutorial shows how to enforce fine-grained access control over Kafka topics. In this tutorial you will use OPA to define and enforce an authorization policy stating:

  • Consumers of topics containing Personally Identifiable Information (PII) must be whitelisted.
  • Producers to topics with high fanout must be whitelisted.

In addition, this tutorial shows how to break up a policy with small helper rules to reuse logic and improve overall readability.

This tutorial requires to run Kafka, ZooKeeper, and OPA.

First, create an OPA policy that allows all requests. You will update this policy later in the tutorial.

policies/tutorial.rego:

  1. allow = true

Next, create a docker-compose.yaml file that runs OPA, ZooKeeper, and Kafka.

docker-compose.yaml:

  1. version: "2"
  2. services:
  3. opa:
  4. hostname: opa
  5. image: openpolicyagent/opa:0.30.2
  6. ports:
  7. - 8181:8181
  8. # WARNING: OPA is NOT running with an authorization policy configured. This
  9. # means that clients can read and write policies in OPA. If you are deploying
  10. # OPA in an insecure environment, you should configure authentication and
  11. # authorization on the daemon. See the Security page for details:
  12. # https://www.openpolicyagent.org/docs/security.html.
  13. command: "run --server --watch /policies"
  14. volumes:
  15. - ./policies:/policies
  16. zookeeper:
  17. image: confluentinc/cp-zookeeper:4.0.0-3
  18. environment:
  19. ZOOKEEPER_CLIENT_PORT: 2181
  20. zk_id: "1"
  21. kafka:
  22. hostname: kafka
  23. image: openpolicyagent/demo-kafka:1.0
  24. links:
  25. - zookeeper
  26. - opa
  27. ports:
  28. - "9092:9092"
  29. environment:
  30. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
  31. KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
  32. KAFKA_ADVERTISED_LISTENERS: "SSL://:9093"
  33. KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
  34. KAFKA_SSL_CLIENT_AUTH: required
  35. KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
  36. KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_creds
  37. KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds
  38. KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
  39. KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
  40. KAFKA_AUTHORIZER_CLASS_NAME: com.lbg.kafka.opa.OpaAuthorizer
  41. KAFKA_OPA_AUTHORIZER_URL: "http://opa:8181/v1/data/kafka/authz/allow"
  42. KAFKA_OPA_AUTHORIZER_ALLOW_ON_ERROR: "false"
  43. KAFKA_OPA_AUTHORIZER_CACHE_INITIAL_CAPACITY: 100
  44. KAFKA_OPA_AUTHORIZER_CACHE_MAXIMUM_SIZE: 100
  45. KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_MS: 600000

For more information on how to configure the OPA plugin for Kafka, see the github.com/open-policy-agent/contrib repository.

Once you have created the file, launch the containers for this tutorial.

  1. docker-compose --project-name opa-kafka-tutorial up

Now that the tutorial environment is running, we can define an authorization policy using OPA and test it.

Authentication

The Docker Compose file defined above requires SSL client authentication for clients that connect to the broker. Enabling SSL client authentication allows for service identities to be provided as input to your policy. The example below shows the input structure.

  1. {
  2. "operation": {
  3. "name": "Write"
  4. },
  5. "resource": {
  6. "resourceType": {
  7. "name": "Topic"
  8. },
  9. "name": "credit-scores"
  10. },
  11. "session": {
  12. "principalType": "User"
  13. },
  14. "clientAddress": "172.21.0.5",
  15. "sanitizedUser": "CN%3Danon_producer.tutorial.openpolicyagent.org%2COU%3DTUTORIAL%2CO%3DOPA%2CL%3DSF%2CST%3DCA%2CC%3DUS"
  16. }
  17. }

The client identity is extracted from the SSL certificates that clients present when they connect to the broker. The client identity information is encoded in the input.session.sanitizedUser field. This field can be decoded inside the policy.

Do not rely on these pre-generated SSL certificates in real-world scenarios. They are only provided for convenience/test purposes.

Kafka Authorizer JAR File

The Kafka image used in this tutorial includes a pre-installed JAR file that implements the interface. For more information on the authorizer see open-policy-agent/contrib/kafka_authorizer.

Update the policies/tutorial.rego with the following content.

  1. #-----------------------------------------------------------------------------
  2. # High level policy for controlling access to Kafka.
  3. #
  4. # * Deny operations by default.
  5. # * Allow operations if no explicit denial.
  6. #
  7. # The kafka-authorizer-opa plugin will query OPA for decisions at
  8. # If the policy decision is _false_ the request is denied.
  9. #-----------------------------------------------------------------------------
  10. package kafka.authz
  11. default allow = false
  12. allow {
  13. not deny
  14. }
  15. deny {
  16. is_read_operation
  17. topic_contains_pii
  18. not consumer_is_whitelisted_for_pii
  19. }
  20. #-----------------------------------------------------------------------------
  21. # Data structures for controlling access to topics. In real-world deployments,
  22. # these data structures could be loaded into OPA as raw JSON data. The JSON
  23. # data could be pulled from external sources like AD, Git, etc.
  24. #-----------------------------------------------------------------------------
  25. consumer_whitelist = {"pii": {"pii_consumer"}}
  26. topic_metadata = {"credit-scores": {"tags": ["pii"]}}
  27. #-----------------------------------
  28. # Helpers for checking topic access.
  29. #-----------------------------------
  30. topic_contains_pii {
  31. topic_metadata[topic_name].tags[_] == "pii"
  32. }
  33. consumer_is_whitelisted_for_pii {
  34. consumer_whitelist.pii[_] == principal.name
  35. }
  36. #-----------------------------------------------------------------------------
  37. # Helpers for processing Kafka operation input. This logic could be split out
  38. # into a separate file and shared. For conciseness, we have kept it all in one
  39. # place.
  40. #-----------------------------------------------------------------------------
  41. is_write_operation {
  42. input.operation.name == "Write"
  43. }
  44. is_read_operation {
  45. input.operation.name == "Read"
  46. }
  47. is_topic_resource {
  48. input.resource.resourceType.name == "Topic"
  49. }
  50. topic_name = input.resource.name {
  51. is_topic_resource
  52. principal = {"fqn": parsed.CN, "name": cn_parts[0]} {
  53. parsed := parse_user(urlquery.decode(input.session.sanitizedUser))
  54. cn_parts := split(parsed.CN, ".")
  55. }
  56. parse_user(user) = {key: value |
  57. parts := split(user, ",")
  58. [key, value] := split(parts[_], "=")
  59. }

The Kafka authorization plugin is configured to query for the data.kafka.authz.allow decision. If the response is true the operation is allowed, otherwise the operation is denied. When the integration queries OPA it supplies a JSON representation of the operation, resource, and principal.

  1. {
  2. "operation": {
  3. "name": "Write"
  4. },
  5. "resource": {
  6. "name": "Topic"
  7. },
  8. "name": "credit-scores"
  9. },
  10. "session": {
  11. "principal": {
  12. "principalType": "User"
  13. },
  14. "clientAddress": "172.21.0.5",
  15. "sanitizedUser": "CN%3Danon_producer.tutorial.openpolicyagent.org%2COU%3DTUTORIAL%2CO%3DOPA%2CL%3DSF%2CST%3DCA%2CC%3DUS"
  16. }
  17. }

With the input value above, the answer is:

  1. true

The ./policies directory is mounted into the Docker container running OPA. When the files under this directory change, OPA is notified and the policies are automatically reloaded.

At this point, you can exercise the policy.

This step shows how you can grant fine-grained access to services using Kafka. In this scenario, some services are allowed to read PII data while others are not.

First, run kafka-console-producer to generate some data on the credit-scores topic.

  1. docker run --rm --network opakafkatutorial_default \
  2. openpolicyagent/demo-kafka:1.0 \
  3. bash -c 'for i in {1..10}; do echo "{\"user\": \"bob\", \"score\": $i}"; done | kafka-console-producer --topic credit-scores --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config'

This command will send 10 messages to the credit-scores topic. Bob’s credit score seems to be improving.

Next, run kafka-console-consumer and try to read data off the topic. Use the pii_consumer credentials to simulate a service that is allowed to read PII data.

  1. docker run --rm --network opakafkatutorial_default \
  2. openpolicyagent/demo-kafka:1.0 \
  3. kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/pii_consumer.ssl.config

Finally, run kafka-console-consumer again but this time try to use the anon_consumer credentials. The anon_consumer credentials simulate a service that has not been explicitly granted access to PII data.

  1. docker run --rm --network opakafkatutorial_default \
  2. openpolicyagent/demo-kafka:1.0 \
  3. kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config

Because the anon_consumer is not allowed to read PII data, the request will be denied and the consumer will output an error message.

First, add the following content to the policy file (./policies/tutorial.rego):

  1. deny {
  2. is_write_operation
  3. topic_has_large_fanout
  4. not producer_is_whitelisted_for_large_fanout
  5. }
  6. producer_whitelist = {
  7. "large-fanout": {
  8. "fanout_producer",
  9. }
  10. }
  11. topic_has_large_fanout {
  12. topic_metadata[topic_name].tags[_] == "large-fanout"
  13. }
  14. producer_is_whitelisted_for_large_fanout {
  15. producer_whitelist["large-fanout"][_] == principal.name
  16. }

Next, update the topic_metadata data structure in the same file to indicate that the click-stream topic has a high fanout.

  1. topic_metadata = {
  2. "click-stream": {
  3. "tags": ["large-fanout"],
  4. },
  5. "credit-scores": {
  6. "tags": ["pii"],
  7. }
  8. }

First, run kafka-console-producer and simulate a service with access to the click-stream topic.

  1. docker run --rm --network opakafkatutorial_default \
  2. openpolicyagent/demo-kafka:1.0 \
  3. bash -c 'for i in {1..10}; do echo "{\"user\": \"alice\", \"button\": $i}"; done | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/fanout_producer.ssl.config'

Next, run the kafka-console-consumer to confirm that the messages were published.

  1. docker run --rm --network opakafkatutorial_default \
  2. openpolicyagent/demo-kafka:1.0 \
  3. kafka-console-consumer --bootstrap-server kafka:9093 --topic click-stream --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config

Once you see the 10 messages produced by the first part of this step, exit the console consumer (^C).

Lastly, run kafka-console-producer to simulate a service that should not have access to high fanout topics.

  1. docker run --rm --network opakafkatutorial_default \

Because anon_producer is not authorized to write to high fanout topics, the request will be denied and the producer will output an error message.

Congratulations for finishing the tutorial!

At this point you have learned how to enforce fine-grained access control over Kafka topics. In addition, you have seen how to break down policies into smaller rules that can be reused and improve the overall readability over the policy.

If you want to use the Kafka Authorizer plugin that integrates Kafka with OPA, see the build and install instructions in the repository.