Pulsar connector 是特殊的 ,因此创建 Pulsar connector 类似于创建 Pulsar function。
Pulsar connector 可分为两类:
{@inject: github:
:/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java}|Import data from another system to Pulsar.|RabbitMQ source connector imports the messages of a RabbitMQ queue to a Pulsar topic. | 从 Pulsar 导出数据到另一个系统。|Kinesis sink connector 从 Pulsar topic 导出消息到 Kinesis 流。
你可以开发 Pulsar source connector 和 sink connector。
Developing a source connector is to implement the interface, which means you need to implement the open
method and the method.
Implement the
open
method.Source connector 初始化会调用此方法。
在此方法中,可通过传入的
config
参数检索所有 connector 的特定设置,并初始化所有必须的资源。例如,Kafka connector 可以在
open
方法中创建 Kafka 客户端。Besides, Pulsar runtime also provides a
SourceContext
for the connector to access runtime resources for tasks like collecting metrics. 执行此方法可以保存SourceContext
供后续使用。Implement the method.
/**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* @return next message from source. The return result should never be null
*/
Record<T> read() throws Exception;
如果没有要返回的内容,则应停止该方法的执行而不是返回
null
。Record
should provide the following variables:变量 Required Description
Sink
Developing a sink connector is similar to developing a source connector, that is, you need to implement the Sink
interface, which means implementing the method and the write
method.
Implement the method.
/**
* Open connector with configuration
*
* @param config initialization config
* @param sinkContext
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;
Implement the
write
method.During the implementation, you can decide how to write the
Value
and theKey
to the actual source, and leverage all the provided information such asPartitionId
andRecordSequence
to achieve different processing guarantees.还需要 ack 记录(消息发送成功)或发送失败记录(消息发送失败)。
测试 connector 会有一定的难度,因为 Pulsar IO connector 与两个可能很难模拟的系统交互,即 Pulsar 和与 connector 连接的系统。
建议在模拟外部服务时编写特殊测试来测试 connector 功能,如下所示。
可以为 connector 创建单元测试。
集成测试
只要有足够的单元测试,就可以添加单独的集成测试来验证端到端的功能。
Pulsar uses testcontainers for all integration tests.
有两种方法可以使用 Pulsar Functions 的运行时间,即 和 uber JAR。
NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java ClassLoader isolation.
Pulsar uses the same mechanism for packaging all .
安装 Pulsar connector 最简单的方法是使用 nifi-nar-maven-plugin 创建一个 NAR 包。
Include this in your maven project for your connector as below.
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
You must also create a resources/META-INF/services/pulsar-io.yaml
file with the following contents:
If you are using the Gradle NiFi plugin you might need to create a directive to ensure your pulsar-io.yaml is .
Uber JAR
An alternative approach is to create an uber JAR that contains all of the connector’s JAR files and other resource files. No directory internal structure is necessary.
可以使用 来创建 uber JAR。如下所示:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
</filter>
</filters>
</configuration>
</execution>
</executions>