How to use Pulsar connectors
Pulsar 捆绑了几个用于移动数据进出常用系统的 内置连接器。 (可选)你可以创建和使用所需的非内置连接器。
若要设置内置连接器,请按照这里 。
安装后,内置连接器会自动被 Pulsar 代理(或 function-workers)发现,因此不需要额外安装步骤。
配置
配置以下信息:
要配置内置连接器的默认文件夹,请在 中设置 <code>连接目录
参考/conf/functions_worker.yml 配置文件。
示例
设置 ./connectors
文件夹作为内置连接器的默认存储位置。
配置一个 YAML 文件的连接器
要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。
YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。
示例 1
下面是 Cassandra sink 的 YAML 配置文件
哪个 Cassandra 集群可以连接
什么是
keyspace
andcolumnFamily
用于收集数据如何将 Pulsar 消息映射到 Cassandra 桌面键和列
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
示例 2
下面是 Kafka 源的 YAML 配置文件。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
示例 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs:
userName: "postgres"
password: "password"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"
获取可用的连接
在开始使用连接器之前,可以执行以下操作:
重新加载
如果你在连接器文件夹中添加或删除 nar 文件,在使用之前重新载入可用的内置连接器。
Source
使用 重新加载
子命令。
$ pulsar-admin 源刷新
For more information, see .
Sink
使用 重新加载
子命令。
$ pulsar-admin sinks reload
For more information, see .
可用
重新加载连接器后(可选),你可以获得可用连接器列表。
Source
使用 可用源
子命令。
$ pulsar-admin sources available-sources
Sink
使用 可用的集合
子命令。
$ pulsar-admin sinks available-sinks
要运行连接器,你可以执行以下操作:
You can create a connector using Admin CLI, REST API or JAVA admin API.f
Source
创建一个源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sources create options
For more information, see .
Send a POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
参数名
`sourceConfig` | 源配置对象
异常
| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`CreateSource`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-)。
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
HTTP:
文件: file:///dir/fileName.jar
参数名
参数 | 描述 |—-|——
sourceConfig
| 源配置对象pkgUrl
| 可以从其中下载pkg 的 URL异常
配置项 说明 PulsarAdminException
未知错误 欲了解更多信息,请参阅
createSourceWell
。
Sink
创建源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sinks create options
For more information, see here.
Send a POST
request to this endpoint:
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
参数名
配置项 说明
`sinkConfig` | The sink configuration object**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-)。
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
参数名
参数 | 描述 |—-|——
sourceConfig
| 源配置对象pkgUrl
| 可以从其中下载pkg 的 URL异常
配置项 说明 PulsarAdminException
未知错误 欲了解更多信息,请参阅 。
start
You can start a connector using Admin CLI or REST API.
Source
启动一个源连接器。
Admin CLI
REST API
使用 起始
子命令。
$ pulsar-admin sources start options
For more information, see here.
Start all source connectors.
Send a
POST
request to this endpoint:Start a specified source connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start
Sink
启动 sink 连接器。
Admin CLI
REST API
使用 起始
子命令。
$ pulsar-admin sinks start options
For more information, see here.
Start all sink connectors.
Send a
POST
request to this endpoint:Start a specified sink connector.
Send a
POST
request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Source
本地运行一个源连接器。
Admin CLI
使用 子命令。
$ pulsar-admin sources localrun options
For more information, see .
Sink
本地运行 sink 连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sinks localrun options
For more information, see .
监视连接器
要监视连接器,你可以执行以下操作:
get
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Source
Admin CLI
REST API
Java Admin API
使用 获得
子命令。
$ pulsar-admin sources get options
For more information, see here.
Send a GET
request to this endpoint:
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException
示例
这是一个源配置
这是一个源配置示例。
{ "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,请参阅 getSource
。
Sink
获取 sink 的信息。
Admin CLI
REST API
Java Admin API
使用 获得
子命令。
$ pulsar-admin sinks get options
For more information, see here.
Send a GET
request to this endpoint:
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException
示例
这是一个 sinkConfig。
{"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}
这是一个 sinkConfig 示例。
{ "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}
参数描述
名称|描述 |—-|—— 租户
| 租户名称 命名空间
| 命名空间名称 sink
| 唱名名称
欲了解更多信息,请参阅 getSink
。
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Source
获取所有正在运行的源连接器列表。
Admin CLI
REST API
Java Admin API
使用 列表
子命令。
$ pulsar-admin sources list options
For more information, see here.
Send a GET
request to this endpoint:
List<String> listSources(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,见 listSource
。
Sink
获取所有正在运行的 sink 列表。
Admin CLI
REST API
Java Admin API
使用 列表
子命令。
$ pulsar-admin sinks list options
For more information, see here.
Send a GET
request to this endpoint:
List<String> listSinks(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,见 listSource
。
status
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 状态
子命令。
$ pulsar-admin sources status options
For more information, see .
Get the current status of all source connectors.
Send a
GET
request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/statusGet the current status of all source connectors.
SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException
参数名
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
名称 | 描述 |—-|——
PulsarAdminException
| 意外错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException
参数名
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Source instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅
getSourceStatus
。
Sink
获取 Pulsar sink 连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 状态
子命令。
$ pulsar-admin sinks status options
For more information, see here.
Get the current status of all sink connectors.
Send a
GET
request to this endpoint:Gets the current status of a specified sink connector.
Send a
GET
request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/statusGet the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException
参数名
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Sink instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅
getSinkStatusWidstanceID
。
更新连接器
update
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Source
更新正在运行的 Pulsar 源连接器。
Admin CLI
REST API
Java Admin API
使用 更新
子命令。
$ pulsar-admin sources update options
For more information, see here.
Send a PUT
request to this endpoint:
Update a running source connector with a local file.
void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
参数名
配置项 说明 sourceConfig
源配置对象 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 未知错误 欲了解更多信息,请参阅
updateSource
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。HTTP:
文件: file:///dir/fileName.jar
参数名
配置项 说明 sourceConfig
源配置对象 pkgUrl
下载 pkg 的 URL 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误
欲了解更多信息,请参阅 createSourceWell
。
Sink
更新正在运行的 Pulsar sink 连接器。
Admin CLI
REST API
Java Admin API
使用 更新
子命令。
$ pulsar-admin sinks update options
For more information, see here.
Send a PUT
request to this endpoint:
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
参数名
异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误 欲了解更多信息,请参阅
updateSink
。Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
支持的 URL是
http
和文件
。示例
HTTP:
文件: file:///dir/fileName.jar
参数名
配置项 说明 sinkConfig
sink 配置对象 pkgUrl
下载 pkg 的 URL 异常
配置项 说明 PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误
欲了解更多信息,请参阅 updateSinkWirl
。
stop
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Source
停止一个源连接器。
Admin CLI
REST API
Java Admin API
使用 停止
子命令。
$ pulsar-admin sources stop options
For more information, see .
Stop all source connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceNameStop a specified source connector.
Send a
POST
request to this endpoint:Stop all source connectors.
void stopSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified source connector.
void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-)。
Sink
停止 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 停止
子命令。
$ pulsar-admin sinks stop options
For more information, see .
Stop all sink connectors.
Send a
POST
request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/stopStop a specified sink connector.
Send a
POST
request to this endpoint:Stop all sink connectors.
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified sink connector.
void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-)。
重新启动连接器
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Source
重新启动一个源连接器。
Admin CLI
REST API
Java Admin API
使用 重启
子命令。
$ pulsar-admin sources restart options
For more information, see here.
Restart all source connectors.
Send a
POST
request to this endpoint:Restart a specified source connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restartRestart all source connectors.
void restartSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified source connector.
void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-)。
Sink
重新启动 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 重启
子命令。
$ pulsar-admin sinks restart options
For more information, see here.
Restart all sink connectors.
Send a
POST
request to this endpoint:Restart a specified sink connector.
Send a
POST
request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/:instanceId/restart重启所有 Pulsar sink 连接器。
void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified sink connector.
void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
参数名
配置项 说明
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| 配置项 | 说明 || ---------------------- | ---- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.8.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-)。
删除连接器
delete
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Source
删除源连接器。
Admin CLI
REST API
Java Admin API
使用 删除
子命令。
$ pulsar-admin sources delete options
For more information, see here.
删除 al Pulsar 源连接器。
Send a DELETE
request to this endpoint:
删除源连接器。
void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 | 说明 |
---|---|
租户
| 租户名称 命名空间
| 命名空间名称 源
| 源名称
异常
配置项 | 说明 |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 deleteSource
。
Sink
删除 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 删除
子命令。
For more information, see here.
删除 sink 连接器。
Send a DELETE
request to this endpoint:
删除 Pulsar sink 连接器。
void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException
参数名
配置项 | 说明 |
---|---|
租户
| 租户名称 命名空间
| 命名空间名称 sink
| sink 名称
配置项 | 说明 |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 deleteSource
。