Apache Kafka Binding Example

    In the following example a Kubernetes Job will be using the KafkaBinding to produce messages on a Kafka Topic, which will be received by the Event Display service via Kafka Source

    1. You must ensure that you meet the prerequisites listed in the Apache Kafka overview.
    2. This feature is available from Knative Eventing 0.15+

    Creating a KafkaSource source CRD

    1. Install the KafkaSource sub-component to your Knative cluster:
    1. Check that the kafka-controller-manager-0 pod is running.

      1. kubectl get pods --namespace knative-sources
      2. NAME READY STATUS RESTARTS AGE
      3. kafka-controller-manager-0 1/1 Running 0 42m
    1. (Optional) Source code for Event Display service

    Get the source code of Event Display container image from here

    1. Deploy the Event Display Service via kubectl:
    1. apiVersion: serving.knative.dev/v1
    2. kind: Service
    3. metadata:
    4. name: event-display
    5. spec:
    6. template:
    7. spec:
    8. containers:
    9. - image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    1. $ kubectl apply --filename event-display.yaml
    2. ...
    3. service.serving.knative.dev/event-display created
    1. (Optional) Deploy the Event Display Service via kn cli:

    Alternatively, you can create the knative service by running the following command in the kn CLI.

    1. kn service create event-display --image=gcr.io/knative-releases/knative.dev/eventing/cmd/event_display

    Apache Kafka Event Source

    1. Modify event-source.yaml accordingly with bootstrap servers, topics, etc…:
    1. apiVersion: sources.knative.dev/v1beta1
    2. kind: KafkaSource
    3. metadata:
    4. name: kafka-source
    5. spec:
    6. consumerGroup: knative-group
    7. bootstrapServers:
    8. - my-cluster-kafka-bootstrap.kafka:9092 #note the kafka namespace
    9. topics:
    10. - logs
    11. ref:
    12. apiVersion: serving.knative.dev/v1
    13. kind: Service
    14. name: event-display
    1. Deploy the event source.

      1. $ kubectl apply -f event-source.yaml
      2. kafkasource.sources.knative.dev/kafka-source created
    2. Check that the event source pod is running. The pod name will be prefixed with kafka-source.

      1. $ kubectl get pods
      2. NAME READY STATUS RESTARTS AGE
      3. kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m

    Create the KafkaBinding that will inject kafka bootstrap information into select Jobs:

    1. Modify kafka-binding.yaml accordingly with bootstrap servers etc…:
    1. apiVersion: bindings.knative.dev/v1beta1
    2. kind: KafkaBinding
    3. metadata:
    4. name: kafka-binding-test
    5. spec:
    6. subject:
    7. apiVersion: batch/v1
    8. kind: Job
    9. selector:
    10. matchLabels:
    11. kafka.topic: "logs"
    12. bootstrapServers:
    13. - my-cluster-kafka-bootstrap.kafka:9092

    In this case, we will bind any Job with the labels kafka.topic: "logs".

    Create Kubernetes Job

    1. Source code for kafka-publisher service
    1. Now we will use the kafka-publisher container to send events to kafka topic when the Job runs.

    1. Check that the Job has run successfully.

    1. $ kubectl get jobs
    2. NAME COMPLETIONS DURATION AGE
    3. kafka-publisher-job 1/1 7s 7s
    1. Ensure the Event Display received the message sent to it by the Event Source.
    1. $ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    2. ☁️ cloudevents.Event
    3. Validation: valid
    4. Context Attributes,
    5. specversion: 1.0
    6. type: dev.knative.kafka.event
    7. source: /apis/v1/namespaces/default/kafkasources/kafka-source#logs
    8. subject: partition:0#1
    9. id: partition:0/offset:1
    10. time: 2020-05-17T19:45:02.7Z
    11. datacontenttype: application/json
    12. kafkaheadercontenttype: application/json
    13. traceparent: 00-f383b779f512358b24ffbf6556a6d6da-cacdbe78ef9b5ad3-00
    14. Data,
    15. {
    16. "msg": "This is a test!"
    17. }

    Connecting to a TLS enabled Kafka broker

    The KafkaBinding supports TLS and SASL authentication methods. For injecting TLS authentication, you must have the following files:

    • CA Certificate
    • Client Certificate and Key

    These files are expected to be in pem format, if it is in other format like jks , please convert to pem.

    1. Create the certificate files as secrets in the namespace where KafkaBinding is going to be set up
    1. $ kubectl create secret generic cacert --from-file=caroot.pem
    2. secret/cacert created
    3. $ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    4. secret/key created
    1. Apply the kafkabinding-tls.yaml, change bootstrapServers accordingly.

      1. apiVersion: sources.knative.dev/v1beta1
      2. kind: KafkaBinding
      3. metadata:
      4. name: kafka-source-with-tls
      5. spec:
      6. subject:
      7. apiVersion: batch/v1
      8. kind: Job
      9. selector:
      10. matchLabels:
      11. kafka.topic: "logs"
      12. net:
      13. tls:
      14. enable: true
      15. cert:
      16. secretKeyRef:
      17. key: tls.crt
      18. name: kafka-secret
      19. key:
      20. secretKeyRef:
      21. key: tls.key
      22. name: kafka-secret
      23. caCert:
      24. secretKeyRef:
      25. key: caroot.pem
      26. name: cacert
      27. consumerGroup: knative-group
      28. bootstrapServers: