Apache Kafka Binding Example
In the following example a Kubernetes Job will be using the KafkaBinding to produce messages on a Kafka Topic, which will be received by the Event Display service via Kafka Source
- You must ensure that you meet the prerequisites listed in the Apache Kafka overview.
- This feature is available from Knative Eventing 0.15+
Creating a KafkaSource
source CRD
- Install the
KafkaSource
sub-component to your Knative cluster:
Check that the
kafka-controller-manager-0
pod is running.kubectl get pods --namespace knative-sources
NAME READY STATUS RESTARTS AGE
kafka-controller-manager-0 1/1 Running 0 42m
- (Optional) Source code for Event Display service
Get the source code of Event Display container image from here
- Deploy the Event Display Service via kubectl:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
$ kubectl apply --filename event-display.yaml
...
service.serving.knative.dev/event-display created
- (Optional) Deploy the Event Display Service via kn cli:
Alternatively, you can create the knative service by running the following command in the kn
CLI.
kn service create event-display --image=gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
Apache Kafka Event Source
- Modify
event-source.yaml
accordingly with bootstrap servers, topics, etc…:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 #note the kafka namespace
topics:
- logs
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Deploy the event source.
$ kubectl apply -f event-source.yaml
kafkasource.sources.knative.dev/kafka-source created
Check that the event source pod is running. The pod name will be prefixed with
kafka-source
.$ kubectl get pods
NAME READY STATUS RESTARTS AGE
kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
Create the KafkaBinding that will inject kafka bootstrap information into select Jobs
:
- Modify
kafka-binding.yaml
accordingly with bootstrap servers etc…:
apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-binding-test
spec:
subject:
apiVersion: batch/v1
kind: Job
selector:
matchLabels:
kafka.topic: "logs"
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
In this case, we will bind any Job
with the labels kafka.topic: "logs"
.
Create Kubernetes Job
- Source code for kafka-publisher service
- Now we will use the kafka-publisher container to send events to kafka topic when the Job runs.
1. Check that the Job has run successfully.
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
kafka-publisher-job 1/1 7s 7s
- Ensure the Event Display received the message sent to it by the Event Source.
$ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/default/kafkasources/kafka-source#logs
subject: partition:0#1
id: partition:0/offset:1
time: 2020-05-17T19:45:02.7Z
datacontenttype: application/json
kafkaheadercontenttype: application/json
traceparent: 00-f383b779f512358b24ffbf6556a6d6da-cacdbe78ef9b5ad3-00
Data,
{
"msg": "This is a test!"
}
Connecting to a TLS enabled Kafka broker
The KafkaBinding supports TLS and SASL authentication methods. For injecting TLS authentication, you must have the following files:
- CA Certificate
- Client Certificate and Key
These files are expected to be in pem format, if it is in other format like jks , please convert to pem.
- Create the certificate files as secrets in the namespace where KafkaBinding is going to be set up
$ kubectl create secret generic cacert --from-file=caroot.pem
secret/cacert created
$ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
secret/key created
Apply the kafkabinding-tls.yaml, change bootstrapServers accordingly.
apiVersion: sources.knative.dev/v1beta1
kind: KafkaBinding
metadata:
name: kafka-source-with-tls
spec:
subject:
apiVersion: batch/v1
kind: Job
selector:
matchLabels:
kafka.topic: "logs"
net:
tls:
enable: true
cert:
secretKeyRef:
key: tls.crt
name: kafka-secret
key:
secretKeyRef:
key: tls.key
name: kafka-secret
caCert:
secretKeyRef:
key: caroot.pem
name: cacert
consumerGroup: knative-group
bootstrapServers: