How to use Pulsar connectors
Pulsar 自带了几个常用的数据交互系统(如数据库、消息系统等)相关的 内置连接器。 当然,你也可以创建和使用你想要用到的非内置连接器。
若要设置内置连接器,请按照这里 。
设置完成后,内置连接器会自动被 Pulsar broker(或 function-workers) 发现,因此不需要额外安装步骤。
配置连接器
配置以下信息:
要配置内置连接器的默认文件夹,请在 ./conf/functions_worker.yml
文件中设置 connectorsDirectory
参数。
示例
将 ./connectors
文件夹设为内置连接器的默认存储位置。
配置连接器的 YAML 文件
要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。
YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。
示例 1
下面是 Cassandra sink 的 YAML 配置文件:
要连接哪个 Cassandra 集群
Cassandra 中用于收集数据的
keyspace
和columnFamily
是什么如何将 Pulsar 消息映射到 Cassandra 的表键(table key)和列(column)
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
示例 2
下面是 Kafka source 的 YAML 配置文件。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
示例 3
下面是 PostgreSQL JDBC sink 的 YAML 配置文件。
configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"
获取可用的连接器
在开始使用连接器之前,可以执行以下操作:
reload
如果你在连接器文件夹中添加或删除了 nar 文件,在使用连接器之前要重新载入已有内置连接器。
Source
使用 reload
子命令。
$ pulsar-admin sources reload
更多信息,可参考。
Sink
使用 reload
子命令。
$ pulsar-admin sinks reload
更多信息,可参考。
available
重新加载连接器后(可选),你可以获得可用连接器列表。
Source
使用 available-sources
子命令。
$ pulsar-admin sources available-sources
Sink
使用 available-sinks
子命令。
$ pulsar-admin sinks available-sinks
要运行连接器,你可以执行以下操作:
You can create a connector using Admin CLI, REST API or JAVA admin API.f
Source
创建一个源连接器。
Admin CLI
REST API
Java Admin API
使用 create
子命令。
$ pulsar-admin sources create options
更多信息,可参阅。
向如下端点发送一个 POST
请求: POST /admin/v3/sources/:tenant/:namespace/:sourceName
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
参数名
`sourceConfig` | 源配置对象
异常
| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-)。
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL 包括
http
和file
。示例
HTTP:
File: file:///dir/fileName.jar
参数名
参数 | 描述 |—-|——
sourceConfig
| 源配置对象pkgUrl
| 可以从其中下载 pkg 的 URL异常
配置项 说明 PulsarAdminException
未知错误 欲了解更多信息,请参阅 createSourceWithUrl。
Sink
创建 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 create
子命令。
$ pulsar-admin sinks create options
更多信息,可参阅此处。
向如下端点发送一个 POST
请求:
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
参数名
配置项 说明
`sinkConfig` | The sink configuration object**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-)。
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
支持的 URL 包括
http
和file
。示例
File: file:///dir/fileName.jar
参数名
参数 | 描述 |—-|——
sinkConfig
| sink 配置对象pkgUrl
| 可以从其中下载 pkg 的 URL异常
配置项 说明 PulsarAdminException
未知错误 欲了解更多信息,请参阅 。
start
You can start a connector using Admin CLI or REST API.
Source
启动一个源连接器。
Admin CLI
REST API
使用 start
子命令。
$ pulsar-admin sources start options
更多信息,可参阅此处。
Start all source connectors.
向如下端点发送一个
POST
请求:Start a specified source connector.
向如下端点发送一个
POST
请求:POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start
Sink
启动 sink 连接器。
Admin CLI
REST API
使用 start
子命令。
更多信息,可参阅此处。
Start all sink connectors.
向如下端点发送一个
POST
请求:Start a specified sink connector.
向如下端点发送一个
POST
请求:POST /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Source
本地运行一个源连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sources localrun options
更多信息,可参阅。
Sink
本地运行 sink 连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sinks localrun options
更多信息,可参阅。
监控连接器
要监控一个连接器,你可以执行以下操作:
get
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Source
Admin CLI
REST API
Java Admin API
使用 get
子命令。
$ pulsar-admin sources get options
更多信息,可参阅此处。
向如下端点发送一个 GET
请求:
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException
示例
这是 sourceConfig。
这是一个源配置示例。
{ "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}
异常
异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException
| 你没有 admin 权限 PulsarAdminException.NotFoundException
| 集群不存在 PulsarAdminException
| 意外错误
欲了解更多信息,请参阅 getSource。
Sink
获取 sink 连接器的信息。
Admin CLI
REST API
Java Admin API
使用 get
子命令。
$ pulsar-admin sinks get options
更多信息,可参阅此处。
向如下端点发送一个 GET
请求:
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException
示例
这是一个 sinkConfig。
{"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}
这是一个 sinkConfig 示例。
{ "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}
参数描述
名称|描述 |—-|—— tenant
| 租户名称 namespace
| 命名空间名称 sink
| Sink 名称
欲了解更多信息,请参阅 getSink。
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Source
获取所有正在运行的源连接器列表。
Admin CLI
REST API
Java Admin API
使用 list
子命令。
$ pulsar-admin sources list options
更多信息,可参阅此处。
向如下端点发送一个 GET
请求:
List<String> listSources(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException
| 你没有 admin 权限 PulsarAdminException
| 意外错误
欲了解更多信息,见 listSource。
Sink
获取所有正在运行的 sink 连接器列表。
Admin CLI
REST API
Java Admin API
使用 list
子命令。
$ pulsar-admin sinks list options
更多信息,可参阅此处。
向如下端点发送一个 GET
请求:
List<String> listSinks(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称 | 描述 |—-|—- PulsarAdminException.NotAuthorizedException
| 你没有 admin 权限 PulsarAdminException
| 意外错误
欲了解更多信息,见 listSource。
status
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 status
子命令。
$ pulsar-admin sources status options
更多信息,可参阅。
Get the current status of all source connectors.
向如下端点发送一个
GET
请求:GET /admin/v3/sources/:tenant/:namespace/:sourceName/statusGet the current status of all source connectors.
SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException
参数名
参数|描述 |—-|——
tenant
| 租户名称namespace
| 命名空间名称sink
| 源名称异常
名称 | 描述 |—-|——
PulsarAdminException
| 意外错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException
参数名
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Source instanceID异常
异常名称|描述 | —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅 getSourceStatus。
Sink
获取 Pulsar sink 连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 status
子命令。
$ pulsar-admin sinks status options
更多信息,可参阅此处。
Get the current status of all sink connectors.
向如下端点发送一个
GET
请求:Gets the current status of a specified sink connector.
发送
GET
请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/statusGet the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
参数|描述 |—-|——
tenant
| 租户名称namespace
| 命名空间名称sink
| 源名称异常
异常名称|描述 | —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException
参数名
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Sink instanceID异常
异常名称|描述 | —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅 getSinkStatusWithInstanceID。
更新连接器
update
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Source
更新正在运行的 Pulsar 源连接器。
Admin CLI
REST API
Java Admin API
使用 update
子命令。
$ pulsar-admin sources update options
更多信息,可参阅此处。
向如下端点发送一个 PUT
请求:
Update a running source connector with a local file.
void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
参数名
配置项 说明 sourceConfig
源配置对象 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 集群不存在 PulsarAdminException
未知错误 欲了解更多信息,请参阅 updateSource。
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL 包括
http
和file
。HTTP:
File: file:///dir/fileName.jar
参数名
配置项 说明 sourceConfig
源配置对象 pkgUrl
下载 pkg 的 URL 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误
欲了解更多信息,请参阅 createSourceWithUrl。
Sink
更新正在运行的 Pulsar sink 连接器。
Admin CLI
REST API
Java Admin API
使用 update
子命令。
$ pulsar-admin sinks update options
更多信息,可参阅此处。
向如下端点发送一个 PUT
请求:
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
参数名
异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误 欲了解更多信息,请参阅 updateSink。
Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
支持的 URL 包括
http
和file
。示例
HTTP:
File: file:///dir/fileName.jar
参数名
配置项 说明 sinkConfig
sink 配置对象 pkgUrl
下载 pkg 的 URL 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误
欲了解更多信息,请参阅 updateSinkWithUrl。
stop
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Source
停止一个源连接器。
Admin CLI
REST API
Java Admin API
使用 stop
子命令。
$ pulsar-admin sources stop options
更多信息,可参阅。
Stop all source connectors.
向如下端点发送一个
POST
请求: POST /admin/v3/sources/:tenant/:namespace/:sourceNameStop a specified source connector.
向如下端点发送一个
POST
请求:Stop all source connectors.
void stopSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified source connector.
void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-)。
Sink
停止 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 stop
子命令。
$ pulsar-admin sinks stop options
更多信息,可参阅。
Stop all sink connectors.
向如下端点发送一个
POST
请求: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/stopStop a specified sink connector.
向如下端点发送一个
POST
请求:Stop all sink connectors.
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified sink connector.
void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-)。
重新启动连接器
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Source
重新启动一个源连接器。
Admin CLI
REST API
Java Admin API
使用 restart
子命令。
$ pulsar-admin sources restart options
更多信息,可参阅此处。
Restart all source connectors.
向如下端点发送一个
POST
请求:Restart a specified source connector.
向如下端点发送一个
POST
请求: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restartRestart all source connectors.
void restartSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified source connector.
void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSource`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-)。
Sink
重新启动 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 restart
子命令。
$ pulsar-admin sinks restart options
更多信息,可参阅此处。
Restart all sink connectors.
向如下端点发送一个
POST
请求:Restart a specified sink connector.
向如下端点发送一个
POST
请求:POST /admin/v3/sources/:tenant/:namespace/:sinkName/:instanceId/restart重启所有 Pulsar sink 连接器。
void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified sink connector.
void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`restartSink`](https://pulsar.apache.org/api/admin/2.9.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-)。
删除连接器
delete
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Source
删除源连接器。
Admin CLI
REST API
Java Admin API
使用 delete
子命令。
$ pulsar-admin sources delete options
更多信息,可参阅此处。
删除 Pulsar 源连接器。
向如下端点发送一个 DELETE
请求: ]}
删除源连接器。
void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 | 说明 |
---|---|
tenant
| 租户名称 namespace
| 命名空间名称 source
| 源名称
异常
配置项 | 说明 |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 deleteSource。
Sink
删除 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 delete
子命令。
更多信息,可参阅此处。
删除 sink 连接器。
向如下端点发送一个 DELETE
请求:
删除 Pulsar sink 连接器。
void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 | 说明 |
---|---|
tenant
| 租户名称 namespace
| 命名空间名称 sink
| Sink 名称
配置项 | 说明 |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
集群不是空的 | |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 deleteSource。