How to develop Pulsar connectors

Pulsar connectors are special , so creating a Pulsar connector is similar to creating a Pulsar function.

Pulsar connectors come in two types:

You can develop Pulsar source connectors and sink connectors.

Developing a source connector is to implement the Source interface, which means you need to implement the method and the read method.

  1. Implement the method.

  1. In this method, you can retrieve all connector specific settings through the passed-in `config` parameter and initialize all necessary resources.
  2. For example, a Kafka connector can create a Kafka client in this `open` method.
  3. Besides, Pulsar runtime also provides a `SourceContext` for the connector to access runtime resources for tasks like collecting metrics. The implementation can save the `SourceContext` for future use.
  1. Implement the read method.

    1. /**
    2. * Reads the next message from source.
    3. * If source does not have any new messages, this call should block.
    4. * @return next message from source. The return result should never be null
    5. * @throws Exception
    6. */
    7. Record<T> read() throws Exception;
  1. If nothing to return, the implementation should be blocking rather than returning `null`.
  2. The returned [Record](https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java) should encapsulate the following information, which is needed by Pulsar IO runtime.
  3. - [Record](https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java) should provide the following variables:
  4. <table><thead><tr><th>Variable</th><th>Required</th><th>Description</th></tr></thead><tbody><tr><td><code>TopicName</code></td><td>No</td><td>Pulsar topic name from which the record is originated from.</td></tr><tr><td><code>Key</code></td><td>No</td><td>Messages can optionally be tagged with keys.<br><br>For more information, see <a href="$bd071df184d9bdaa.md#routing-modes">Routing modes</a>.</td></tr><tr><td><code>Value</code></td><td>Yes</td><td>Actual data of the record.</td></tr><tr><td><code>EventTime</code></td><td>No</td><td>Event time of the record from the source.</td></tr><tr><td><code>PartitionId</code></td><td>No</td><td>If the record is originated from a partitioned source, it returns its <code>PartitionId</code>.<br><br><code>PartitionId</code> is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.</td></tr><tr><td><code>RecordSequence</code></td><td>No</td><td>If the record is originated from a sequential source, it returns its <code>RecordSequence</code>.<br><br><code>RecordSequence</code> is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.</td></tr><tr><td><code>Properties</code></td><td>No</td><td>If the record carries user-defined properties, it returns those properties.</td></tr><tr><td><code>DestinationTopic</code></td><td>No</td><td>Topic to which message should be written.</td></tr><tr><td><code>Message</code></td><td>No</td><td>A class which carries data sent by users.<br><br>For more information, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java" target="_blank" rel="noopener noreferrer">Message.java</a>.</td></tr></tbody></table>
  5. - [Record](https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java) should provide the following methods:

Handle schema information

Pulsar IO automatically handles the schema and provides a strongly typed API based on Java generics. If you know the schema type that you are producing, you can declare the Java class relative to that type in your sink declaration.

  1. public class MySource implements Source<String> {
  2. public Record<String> read() {}
  3. }

If you want to implement a source that works with any schema, you can go with byte[] (of ByteBuffer) and use Schema.AUTO_PRODUCE_BYTES().

To handle the KeyValue type properly, follow the guidelines for your record implementation:

  • It must implement Record interface and implement getKeySchema,getValueSchema, and getKeyValueEncodingType
  • It must return a KeyValue object as Record.getValue()
  • It may return null in Record.getSchema()

When Pulsar IO runtime encounters a KVRecord, it brings the following changes automatically:

  • Set properly the KeyValueSchema
  • Encode the Message Key and the Message Value according to the (SEPARATED or INLINE)
tip

For more information about how to create a source connector, see KafkaSource.

Sink

Developing a sink connector is similar to developing a source connector, that is, you need to implement the Sink interface, which means implementing the method and the write method.

  1. Implement the method.

    1. /**
    2. * Open connector with configuration
    3. *
    4. * @param config initialization config
    5. * @param sinkContext
    6. * @throws Exception IO type exceptions when opening a connector
    7. */
    8. void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
  1. Implement the write method.

    1. /**
    2. * Write a message to Sink
    3. * @param record record to write to sink
    4. * @throws Exception
    5. */
    6. void write(Record<T> record) throws Exception;
  1. During the implementation, you can decide how to write the `Value` and the `Key` to the actual source, and leverage all the provided information such as `PartitionId` and `RecordSequence` to achieve different processing guarantees.
  2. You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send).
  1. public class MySink implements Sink<String> {
  2. public void write(Record<String> record) {}
  3. }

If you want to implement a sink that works with any schema, you can you go with the special GenericObject interface.

In the case of AVRO, JSON, and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE), you can cast the genericObject variable to GenericRecord and use getFields() and getField() API. You are able to access the native AVRO record using genericObject.getNativeObject().

In the case of KeyValue type, you can access both the schema for the key and the schema for the value using this code.

  1. public class MySink implements Sink<GenericObject> {
  2. public void write(Record<GenericObject> record) {
  3. Schema schema = record.getSchema();
  4. GenericObject genericObject = record.getValue();
  5. SchemaType type = genericObject.getSchemaType();
  6. Object nativeObject = genericObject.getNativeObject();
  7. if (type == SchemaType.KEY_VALUE) {
  8. KeyValue keyValue = (KeyValue) nativeObject;
  9. Object key = keyValue.getKey();
  10. Schema keySchema = keyValueSchema.getKeySchema();
  11. Schema valueSchema = keyValueSchema.getValueSchema();
  12. }
  13. ....
  14. }
  15. }

Test

Testing connectors can be challenging because Pulsar IO connectors interact with two systems that may be difficult to mock—Pulsar and the system to which the connector is connecting.

It is recommended writing special tests to test the connector functionalities as below while mocking the external service.

You can create unit tests for your connector.

Integration test

Once you have written sufficient unit tests, you can add separate integration tests to verify end-to-end functionality.

Pulsar uses for all integration tests.

tip

For more information about how to create integration tests for Pulsar connectors, see .

Once you’ve developed and tested your connector, you need to package it so that it can be submitted to a Pulsar Functions cluster.

There are two methods to work with Pulsar Functions’ runtime, that is, and uber JAR.

note

If you plan to package and distribute your connector for others to use, you are obligated to

license and copyright your own code properly. Remember to add the license and copyright to all libraries your code uses and to your distribution.

tip

For more information about how NAR works, see .

Pulsar uses the same mechanism for packaging all built-in connectors.

The easiest approach to package a Pulsar connector is to create a NAR package using .

Include this nifi-nar-maven-plugin in your maven project for your connector as below.

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.nifi</groupId>
  4. <artifactId>nifi-nar-maven-plugin</artifactId>
  5. <version>1.2.0</version>
  6. </plugin>
  7. </plugins>

You must also create a resources/META-INF/services/pulsar-io.yaml file with the following contents:

  1. name: connector name
  2. description: connector description
  3. sourceClass: fully qualified class name (only if source connector)
  4. sinkClass: fully qualified class name (only if sink connector)

For Gradle users, there is a .

tip

For more information about an how to use NAR for Pulsar connectors, see .

Uber JAR

An alternative approach is to create an uber JAR that contains all of the connector’s JAR files and other resource files. No directory internal structure is necessary.

You can use to create a uber JAR as below:

  1. <plugin>
  2. <groupId>org.apache.maven.plugins</groupId>
  3. <artifactId>maven-shade-plugin</artifactId>
  4. <version>3.1.1</version>
  5. <executions>
  6. <execution>
  7. <phase>package</phase>
  8. <goals>
  9. <goal>shade</goal>
  10. </goals>
  11. <configuration>
  12. <filters>
  13. <filter>
  14. <artifact>*:*</artifact>
  15. </filter>
  16. </filters>
  17. </configuration>
  18. </execution>
  19. </plugin>

Monitor

Pulsar connectors enable you to move data in and out of Pulsar easily. It is important to ensure that the running connectors are healthy at any time. You can monitor Pulsar connectors that have been deployed with the following methods:

  • Check the metrics provided by Pulsar.

    Pulsar connectors expose the metrics that can be collected and used for monitoring the health of Java connectors. You can check the metrics by following the guide.

  • Set and check your customized metrics.

    In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for Java connectors. Function workers collect user-defined metrics to Prometheus automatically and you can check them in Grafana.