Apache Kafka Sink

    You must have access to a Kubernetes cluster with .

    Installation

    1. Install the Kafka controller:

    2. Install the KafkaSink data plane:

    3. Verify that kafka-controller and kafka-sink-receiver Deployments are running:

      1. kubectl get deployments.apps -n knative-eventing

      Example output:

      1. NAME READY UP-TO-DATE AVAILABLE AGE
      2. eventing-controller 1/1 1 1 10s
      3. eventing-webhook 1/1 1 1 9s
      4. kafka-controller 1/1 1 1 3s
      5. kafka-sink-receiver 1/1 1 1 5s

    A KafkaSink object looks similar to the following:

    Output Topic Content Mode

    The CloudEvent specification defines 2 modes to transport a CloudEvent: structured and binary.

    A KafkaSink object with a specified contentMode looks similar to the following:

    1. apiVersion: eventing.knative.dev/v1alpha1
    2. kind: KafkaSink
    3. metadata:
    4. name: my-kafka-sink
    5. namespace: default
    6. spec:
    7. topic: mytopic
    8. bootstrapServers:
    9. - my-cluster-kafka-bootstrap.kafka:9092
    10. # CloudEvent content mode of Kafka messages sent to the topic.
    11. # Possible values:
    12. # - structured
    13. # - binary
    14. #
    15. # default: binary.
    16. #
    17. # CloudEvent spec references:
    18. # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
    19. # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
    20. contentMode: binary # or structured

    Knative supports the following Apache Kafka security features:

    Enabling security features

    To enable security features, in the KafkaSink spec, you can reference a secret:

    1. apiVersion: eventing.knative.dev/v1alpha1
    2. kind: KafkaSink
    3. metadata:
    4. name: my-kafka-sink
    5. namespace: default
    6. spec:
    7. topic: mytopic
    8. bootstrapServers:
    9. - my-cluster-kafka-bootstrap.kafka:9092
    10. auth:
    11. secret:
    12. ref:
    13. name: my_secret

    Note

    The secret my_secret must exist in the same namespace of the KafkaSink. Certificates and keys must be in ._

    Knative supports the following SASL mechanisms:

    • PLAIN
    • SCRAM-SHA-256
    • SCRAM-SHA-512
    1. --from-literal=protocol=SASL_PLAINTEXT \
    2. --from-literal=sasl.mechanism=<sasl_mechanism> \
    3. --from-literal=user=<my_user> \
    4. --from-literal=password=<my_password>
    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SSL \
    3. --from-file=ca.crt=<my_caroot.pem_file_path> \
    1. kubectl create secret --namespace <namespace> generic <my_secret> \
    2. --from-literal=protocol=SSL \
    3. --from-file=ca.crt=<my_caroot.pem_file_path> \
    4. --from-file=user.crt=<my_cert.pem_file_path> \
    5. --from-file=user.key=<my_key.pem_file_path>

    Note

    The ca.crt can be omitted to enable fallback and use the system’s root CA set.

    A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. You can change the configuration for Kafka Producers in your cluster by modifying the config-kafka-sink-data-plane ConfigMap in the knative-eventing namespace.

    Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, .

    Enable debug logging for data plane components

    To enable debug logging for data plane components change the logging level to DEBUG in the kafka-config-logging ConfigMap.

    1. Create the kafka-config-logging ConfigMap as a YAML file that contains the following:

      1. apiVersion: v1
      2. kind: ConfigMap
      3. metadata:
      4. name: kafka-config-logging
      5. namespace: knative-eventing
      6. data:
      7. config.xml: |
      8. <configuration>
      9. <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
      10. <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      11. </appender>
      12. <root level="DEBUG">
      13. <appender-ref ref="jsonConsoleAppender"/>
      14. </root>
      15. </configuration>
    2. Apply the YAML file by running the command:

      Where <filename> is the name of the file you created in the previous step.