Apache Kafka Sink
You must have access to a Kubernetes cluster with .
Installation
Install the Kafka controller:
Install the KafkaSink data plane:
Verify that
kafka-controller
andkafka-sink-receiver
Deployments are running:kubectl get deployments.apps -n knative-eventing
Example output:
NAME READY UP-TO-DATE AVAILABLE AGE
eventing-controller 1/1 1 1 10s
eventing-webhook 1/1 1 1 9s
kafka-controller 1/1 1 1 3s
kafka-sink-receiver 1/1 1 1 5s
A KafkaSink object looks similar to the following:
Output Topic Content Mode
The CloudEvent specification defines 2 modes to transport a CloudEvent: structured and binary.
A KafkaSink object with a specified contentMode
looks similar to the following:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
# CloudEvent content mode of Kafka messages sent to the topic.
# Possible values:
# - structured
# - binary
#
# default: binary.
#
# CloudEvent spec references:
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
contentMode: binary # or structured
Knative supports the following Apache Kafka security features:
Enabling security features
To enable security features, in the KafkaSink spec, you can reference a secret:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
Note
The secret my_secret
must exist in the same namespace of the KafkaSink. Certificates and keys must be in ._
Knative supports the following SASL mechanisms:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
Note
The ca.crt
can be omitted to enable fallback and use the system’s root CA set.
A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. You can change the configuration for Kafka Producers in your cluster by modifying the config-kafka-sink-data-plane
ConfigMap in the knative-eventing
namespace.
Documentation for the settings available in this ConfigMap is available on the Apache Kafka website, in particular, .
Enable debug logging for data plane components
To enable debug logging for data plane components change the logging level to DEBUG
in the kafka-config-logging
ConfigMap.
Create the
kafka-config-logging
ConfigMap as a YAML file that contains the following:apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="DEBUG">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
Apply the YAML file by running the command:
Where
<filename>
is the name of the file you created in the previous step.-