Knative Kafka Broker

    Notable features are:

    The Knative Kafka Broker stores incoming CloudEvents as Kafka records, using the . This means all CloudEvent attributes and extensions are mapped as headers on the Kafka record, while the of the CloudEvent corresponds to the value of the Kafka record.

    1. You have installed Knative Eventing.
    2. You have access to an Apache Kafka cluster.

    Tip

    If you need to set up a Kafka cluster, you can do this by following the instructions on the Strimzi Quickstart page.

    Installation

    1. Install the Kafka controller by entering the following command:

    2. Install the Kafka Broker data plane by entering the following command:

      1. kubectl apply --filename https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.6/eventing-kafka-broker.yaml
    3. Verify that kafka-controller, kafka-broker-receiver and kafka-broker-dispatcher are running, by entering the following command:

      1. kubectl get deployments.apps -n knative-eventing

      Example output:

      1. NAME READY UP-TO-DATE AVAILABLE AGE
      2. eventing-controller 1/1 1 1 10s
      3. eventing-webhook 1/1 1 1 9s
      4. kafka-controller 1/1 1 1 3s
      5. kafka-broker-dispatcher 1/1 1 1 4s
      6. kafka-broker-receiver 1/1 1 1 5s

    Create a Kafka Broker

    A Kafka Broker object looks like this:

    1. apiVersion: eventing.knative.dev/v1
    2. kind: Broker
    3. metadata:
    4. annotations:
    5. # case-sensitive
    6. eventing.knative.dev/broker.class: Kafka
    7. # Optional annotation to point to an externally managed kafka topic:
    8. # kafka.eventing.knative.dev/external.topic: <topic-name>
    9. name: default
    10. namespace: default
    11. spec:
    12. # Configuration specific to this broker.
    13. config:
    14. apiVersion: v1
    15. kind: ConfigMap
    16. name: kafka-broker-config
    17. namespace: knative-eventing
    18. # Optional dead letter sink, you can specify either:
    19. # - deadLetterSink.ref, which is a reference to a Callable
    20. # - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
    21. delivery:
    22. deadLetterSink:
    23. apiVersion: serving.knative.dev/v1
    24. kind: Service
    25. name: dlq-service

    The spec.config should reference any ConfigMap in any namespace that looks like the following:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-broker-config
    5. namespace: knative-eventing
    6. data:
    7. # Number of topic partitions
    8. default.topic.partitions: "10"
    9. # Replication factor of topic messages.
    10. default.topic.replication.factor: "3"
    11. # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
    12. bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

    This ConfigMap is installed in the Knative Eventing SYSTEM_NAMESPACE in the cluster. You can edit the global configuration depending on your needs. You can also override these settings on a per broker base, by referencing a different ConfigMap on a different namespace or with a different name on your Kafka Broker’s spec.config field.

    Note

    The default.topic.replication.factor value must be less than or equal to the number of Kafka broker instances in your cluster. For example, if you only have one Kafka broker, the default.topic.replication.factor value should not be more than 1.

    To set the Kafka broker as the default implementation for all brokers in the Knative deployment, you can apply global settings by modifying the config-br-defaults ConfigMap in the knative-eventing namespace.

    The following YAML is an example of a config-br-defaults ConfigMap using Kafka broker as the default implementation.

    Security

    Apache Kafka supports different security features, Knative supports the followings:

    To enable security features, in the ConfigMap referenced by broker.spec.config, we can reference a Secret:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-broker-config
    5. namespace: knative-eventing
    6. data:
    7. # Other configurations
    8. # ...
    9. # Reference a Secret called my_secret
    10. auth.secret.ref.name: my_secret

    The Secret my_secret must exist in the same namespace of the ConfigMap referenced by broker.spec.config, in this case: knative-eventing.

    Note

    Certificates and keys must be in PEM format.

    Authentication using SASL

    Knative supports the following SASL mechanisms:

    • PLAIN
    • SCRAM-SHA-256
    • SCRAM-SHA-512

    To use a specific SASL mechanism replace <sasl_mechanism> with the mechanism of your choice.

    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SASL_PLAINTEXT \
    3. --from-literal=sasl.mechanism=<sasl_mechanism> \
    4. --from-literal=user=<my_user> \
    5. --from-literal=password=<my_password>

    Authentication using SASL and encryption using SSL

    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SASL_SSL \
    3. --from-literal=sasl.mechanism=<sasl_mechanism> \
    4. --from-file=ca.crt=caroot.pem \
    5. --from-literal=user=<my_user> \
    6. --from-literal=password=<my_password>
    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SSL \
    3. --from-file=ca.crt=<my_caroot.pem_file_path> \
    4. --from-literal=user.skip=true

    Authentication and encryption using SSL

    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SSL \
    3. --from-file=ca.crt=<my_caroot.pem_file_path> \
    4. --from-file=user.crt=<my_cert.pem_file_path> \
    5. --from-file=user.key=<my_key.pem_file_path>

    Note

    ca.crt can be omitted to fallback to use system’s root CA set.

    Bring your own topic

    By default the Knative Kafka Broker creates its own internal topic, however it is possible to point to an externally managed topic, using the kafka.eventing.knative.dev/external.topic annotation:

    Note

    When using an external topic, the Knative Kafka Broker does not own the topic and is not responsible for managing the topic. This includes the topic lifecycle or its general validity. Other restrictions for general access to the topic may apply. See the documentation about using .

    Kafka consumers keep track of the last successfully sent events by committing offsets.

    Note

    To prevent negative impacts to performance, it is not recommended committing offsets every time an event is successfully sent to a subscriber.

    The interval can be changed by changing the config-kafka-broker-data-plane ConfigMap in the knative-eventing namespace by modifying the parameter auto.commit.interval.ms as follows:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: config-kafka-broker-data-plane
    5. data:
    6. # Some configurations omitted ...
    7. config-kafka-broker-consumer.properties: |
    8. # Some configurations omitted ...
    9. # Commit the offset every 5000 millisecods (5 seconds)

    Note

    Knative Kafka Broker guarantees at least once delivery, which means that your applications may receive duplicate events. A higher commit interval means that there is a higher probability of receiving duplicate events, because when a Consumer restarts, it restarts from the last committed offset.

    Kafka Producer and Consumer configurations

    Knative exposes all available Kafka producer and consumer configurations that can be modified to suit your workloads.

    You can change these configurations by modifying the config-kafka-broker-data-plane ConfigMap in the knative-eventing namespace.

    Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, and Consumer configurations.

    Enable debug logging for data plane components

    The following YAML shows the default logging configuration for data plane components, that is created during the installation step:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: kafka-config-logging
    5. namespace: knative-eventing
    6. data:
    7. config.xml: |
    8. <configuration>
    9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
    10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
    11. </appender>
    12. <root level="INFO">
    13. <appender-ref ref="jsonConsoleAppender"/>
    14. </root>
    15. </configuration>

    To change the logging level to DEBUG, you must:

    1. Apply the following kafka-config-logging ConfigMap or replace level="INFO" with level="DEBUG" to the ConfigMap kafka-config-logging:

      1. apiVersion: v1
      2. kind: ConfigMap
      3. metadata:
      4. name: kafka-config-logging
      5. namespace: knative-eventing
      6. data:
      7. config.xml: |
      8. <configuration>
      9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
      10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      11. </appender>
      12. <root level="DEBUG">
      13. <appender-ref ref="jsonConsoleAppender"/>
      14. </root>
      15. </configuration>
    2. Restart the kafka-broker-receiver and the kafka-broker-dispatcher, by entering the following commands:

      1. kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
      2. kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher

    When dispatching events, the Kafka broker can be configured to support different delivery ordering guarantees.

    You can configure the delivery order of events using the kafka.eventing.knative.dev/delivery.order annotation on the Trigger object:

    1. apiVersion: eventing.knative.dev/v1
    2. kind: Trigger
    3. metadata:
    4. name: my-service-trigger
    5. annotations:
    6. kafka.eventing.knative.dev/delivery.order: ordered
    7. spec:
    8. broker: my-kafka-broker
    9. subscriber:
    10. ref:
    11. apiVersion: serving.knative.dev/v1
    12. kind: Service
    13. name: my-service
    • unordered: An unordered consumer is a non-blocking consumer that delivers messages unordered, while preserving proper offset management.
    • ordered: An ordered consumer is a per-partition blocking consumer that waits for a successful response from the CloudEvent subscriber before it delivers the next message of the partition.

    unordered is the default ordering guarantee.