Apache Kafka Source

    The reads messages stored in existing Apache Kafka topics, and sends those messages as CloudEvents through HTTP to its configured sink. The KafkaSource preserves the order of the messages stored in the topic partitions. It does this by waiting for a successful response from the sink before it delivers the next message in the same partition.

    1. Install the KafkaSource controller by entering the following command:

    2. Install the Kafka Source data plane by entering the following command:

      1. kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.8.4/eventing-kafka-source.yaml
    3. Verify that kafka-controller and kafka-source-dispatcher are running, by entering the following command:

      1. kubectl get deployments.apps -n knative-eventing

      Example output:

      1. NAME READY UP-TO-DATE AVAILABLE AGE
      2. kafka-controller 1/1 1 1 3s
      3. kafka-source-dispatcher 1/1 1 1 4s

    Optional: Create a Kafka topic

    Note

    The create a Kafka topic section assumes you’re using Strimzi to operate Apache Kafka, however equivalent operations can be replicated using the Apache Kafka CLI or any other tool.

    If you are using Strimzi:

    1. Create a KafkaTopic YAML file:

      1. apiVersion: kafka.strimzi.io/v1beta2
      2. kind: KafkaTopic
      3. metadata:
      4. name: knative-demo-topic
      5. namespace: kafka
      6. labels:
      7. strimzi.io/cluster: my-cluster
      8. spec:
      9. partitions: 3
      10. replicas: 1
      11. config:
      12. retention.ms: 7200000
      13. segment.bytes: 1073741824
    2. Deploy the KafkaTopic YAML file by running the command:

      1. kubectl apply -f <filename>.yaml

      Where <filename> is the name of your KafkaTopic YAML file.

      Example output:

      1. kafkatopic.kafka.strimzi.io/knative-demo-topic created
    3. Ensure that the KafkaTopic is running by running the command:

      1. kubectl -n kafka get kafkatopics.kafka.strimzi.io

      Example output:

      1. NAME CLUSTER PARTITIONS REPLICATION FACTOR
      2. knative-demo-topic my-cluster 3 1
    1. Create the event-display Service as a YAML file:

      1. apiVersion: serving.knative.dev/v1
      2. kind: Service
      3. metadata:
      4. name: event-display
      5. namespace: default
      6. spec:
      7. template:
      8. spec:
      9. containers:
      10. - # This corresponds to
      11. # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
      12. image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    2. Apply the YAML file by running the command:

      1. kubectl apply -f <filename>.yaml

      Example output:

      1. service.serving.knative.dev/event-display created
    3. Ensure that the Service Pod is running, by running the command:

      The Pod name is prefixed with event-display:

      1. NAME READY STATUS RESTARTS AGE
      2. event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
    1. Modify source/event-source.yaml accordingly with bootstrap servers, topics, and so on:

      1. apiVersion: sources.knative.dev/v1beta1
      2. kind: KafkaSource
      3. metadata:
      4. name: kafka-source
      5. spec:
      6. consumerGroup: knative-group
      7. bootstrapServers:
      8. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      9. topics:
      10. - knative-demo-topic
      11. sink:
      12. ref:
      13. kind: Service
      14. name: event-display
    2. Deploy the event source:

      1. kubectl apply -f event-source.yaml

      Example output:

      1. kafkasource.sources.knative.dev/kafka-source created
    3. Verify that the KafkaSource is ready:

      Example output:

      1. NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE
      2. kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
    1. Produce a message ({"msg": "This is a test!"}) to the Apache Kafka topic as in the following example:

      1. kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic

      Tip

      If you don’t see a command prompt, try pressing Enter.

    2. Verify that the Service received the message from the event source:

      1. kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container

      Example output:

      1. ☁️ cloudevents.Event
      2. Validation: valid
      3. Context Attributes,
      4. specversion: 1.0
      5. type: dev.knative.kafka.event
      6. source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      7. subject: partition:0#564
      8. id: partition:0/offset:564
      9. time: 2020-02-10T18:10:23.861866615Z
      10. datacontenttype: application/json
      11. Extensions,
      12. key:
      13. Data,
      14. {
      15. "msg": "This is a test!"
      16. }

    Optional: Specify the key deserializer

    When KafkaSource receives a message from Kafka, it dumps the key in the Event extension called Key and dumps Kafka message headers in the extensions starting with kafkaheader.

    You can specify the key deserializer among four types:

    • string (default) for UTF-8 encoded strings
    • int for 32-bit & 64-bit signed integers
    • float for 32-bit & 64-bit floating points
    • byte-array for a Base64 encoded byte array

    To specify the key deserializer, add the label kafkasources.sources.knative.dev/key-type to the KafkaSource definition, as shown in the following example:

    1. apiVersion: sources.knative.dev/v1beta1
    2. kind: KafkaSource
    3. metadata:
    4. name: kafka-source
    5. labels:
    6. kafkasources.sources.knative.dev/key-type: int
    7. spec:
    8. consumerGroup: knative-group
    9. bootstrapServers:
    10. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
    11. topics:
    12. - knative-demo-topic
    13. sink:
    14. ref:
    15. apiVersion: serving.knative.dev/v1
    16. kind: Service
    17. name: event-display
    1. apiVersion: sources.knative.dev/v1beta1
    2. kind: KafkaSource
    3. metadata:
    4. name: kafka-source
    5. spec:
    6. consumerGroup: knative-group
    7. initialOffset: earliest
    8. bootstrapServers:
    9. - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
    10. topics:
    11. - knative-demo-topic
    12. sink:
    13. ref:
    14. apiVersion: serving.knative.dev/v1
    15. kind: Service
    16. name: event-display

    Note

    The valid values for initialOffset are earliest and latest. Any other value results in a validation error. This field is honored only if there are no committed offsets for that consumer group.

    Connecting to a TLS-enabled Kafka Broker

    The KafkaSource supports TLS and SASL authentication methods. To enable TLS authentication, you must have the following files:

    • CA Certificate
    • Client Certificate and Key

    KafkaSource expects these files to be in PEM format. If they are in another format, such as JKS, convert them to PEM.

    1. Create the certificate files as secrets in the namespace where KafkaSource is going to be set up, by running the commands:

      1. kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    2. Apply the KafkaSource. Modify the bootstrapServers and topics fields accordingly.

      1. kind: KafkaSource
      2. name: kafka-source-with-tls
      3. spec:
      4. net:
      5. tls:
      6. enable: true
      7. cert:
      8. secretKeyRef:
      9. key: tls.crt
      10. name: kafka-secret
      11. key:
      12. secretKeyRef:
      13. key: tls.key
      14. name: kafka-secret
      15. caCert:
      16. secretKeyRef:
      17. key: caroot.pem
      18. name: cacert
      19. consumerGroup: knative-group
      20. bootstrapServers:
      21. - my-secure-kafka-bootstrap.kafka:443
      22. topics:
      23. - knative-demo-topic
      24. sink:
      25. ref:
      26. apiVersion: serving.knative.dev/v1
      27. kind: Service
      28. name: event-display

    Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster, otherwise events cannot be produced or consumed.

    • You have access to a Kafka cluster that has Simple Authentication and Security Layer (SASL).
    1. Create a secret that uses the Kafka cluster’s SASL information, by running the following commands:

      1. STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
      1. SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
      1. kubectl create secret -n default generic <secret_name> \
      2. --from-literal=ca.crt="$STRIMZI_CRT" \
      3. --from-literal=password="$SASL_PASSWD" \
      4. --from-literal=saslType="SCRAM-SHA-512" \
      5. --from-literal=user="example-user"
    2. Create or modify a KafkaSource so that it contains the following spec options:

      1. apiVersion: sources.knative.dev/v1beta1
      2. kind: KafkaSource
      3. metadata:
      4. name: example-source
      5. spec:
      6. ...
      7. net:
      8. sasl:
      9. enable: true
      10. user:
      11. secretKeyRef:
      12. name: <secret_name>
      13. key: user
      14. password:
      15. secretKeyRef:
      16. name: <secret_name>
      17. key: password
      18. type:
      19. secretKeyRef:
      20. name: <secret_name>
      21. key: saslType
      22. tls:
      23. enable: true
      24. caCert:
      25. secretKeyRef:
      26. name: <secret_name>
      27. key: ca.crt
      28. ...

      Where <secret_name> is the name of the secret generated in the previous step.

    Clean up steps

    1. Delete the Kafka event source:

      1. kubectl delete -f source/source.yaml kafkasource.sources.knative.dev

      Example output:

      1. "kafka-source" deleted
    2. Delete the event-display Service:

      1. kubectl delete -f source/event-display.yaml service.serving.knative.dev

      Example output:

      1. "event-display" deleted
    3. Optional: Remove the Apache Kafka Topic

      1. kubectl delete -f kafka-topic.yaml

      Example output: