Pulsar 捆绑了几个用于移动数据进出常用系统的 。 (可选)你可以创建和使用所需的非内置连接器。
若要设置内置连接器,请按照这里 说明设置。
安装后,内置连接器会自动被 Pulsar 代理(或 function-workers)发现,因此不需要额外安装步骤。
配置
配置以下信息:
要配置内置连接器的默认文件夹,请在 中设置 <code>连接目录
参考/conf/functions_worker.yml 配置文件。
示例
设置 ./connectors
文件夹作为内置连接器的默认存储位置。
配置一个 YAML 文件的连接器
要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。
YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。
示例 1
下面是 Cassandra sink 的 YAML 配置文件
哪个 Cassandra 集群可以连接
什么是
keyspace
andcolumnFamily
用于收集数据如何将 Pulsar 消息映射到 Cassandra 桌面键和列
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
示例 2
下面是 Kafka 源的 YAML 配置文件。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
示例 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs:
userName: "postgres"
jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc"
tableName: "test_jdbc"
获取可用的连接
在开始使用连接器之前,可以执行以下操作:
重新加载
如果你在连接器文件夹中添加或删除 nar 文件,在使用之前重新载入可用的内置连接器。
Source
使用 重新加载
子命令。
$ pulsar-admin 源刷新
For more information, see here
.
Sink
使用 重新加载
子命令。
$ pulsar-admin sinks reload
For more information, see here
.
可用
重新加载连接器后(可选),你可以获得可用连接器列表。
Source
使用 可用源
子命令。
$ pulsar-admin sources available-sources
Sink
使用 可用的集合
子命令。
$ pulsar-admin sinks available-sinks
要运行连接器,你可以执行以下操作:
You can create a connector using Admin CLI, REST API or JAVA admin API.f
Source
创建一个源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sources create options
For more information, see here.
发送 POST
请求到此端点:
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
Parameter
`sourceConfig` | 源配置对象
异常
| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`createSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-).
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
**Parameter**
参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg 的 URL
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
For more information, see [`createSourceWithUrl`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSourceWithUrl-SourceConfig-java.lang.String-).
Sink
创建源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sinks create options
For more information, see here.
发送 POST
请求到此端点:
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
Parameter
Name Description
`sinkConfig` | The sink configuration object**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`createSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-).
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
**Parameter**
参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg 的 URL
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
For more information, see [`createSinkWithUrl`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSinkWithUrl-SinkConfig-java.lang.String-).
start
You can start a connector using Admin CLI or REST API.
Source
启动一个源连接器。
Admin CLI
REST API
使用 起始
子命令。
For more information, see .
Start all source connectors.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName/startStart a specified source connector.
发送
POST
请求到此端点:
Sink
启动 sink 连接器。
Admin CLI
REST API
使用 起始
子命令。
$ pulsar-admin sinks start options
For more information, see .
Start all sink connectors.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sinkName/startStart a specified sink connector.
发送
POST
请求到此端点:
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Source
本地运行一个源连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sources localrun options
For more information, see here.
Sink
本地运行 sink 连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sinks localrun options
For more information, see here.
监视连接器
要监视连接器,你可以执行以下操作:
get
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的信息。
Admin CLI
REST API
Java Admin API
$ pulsar-admin sources get options
For more information, see .
发送 GET
请求到此端点: GET /admin/v3/sources/:tenant/:namespace/:sourceName
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException
示例
这是一个源配置
这是一个源配置示例。
{ "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}
异常
Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException
| You don’t have the admin permission PulsarAdminException.NotFoundException
| Cluster doesn’t exist PulsarAdminException
| Unexpected error
For more information, see .
Sink
获取 sink 的信息。
Admin CLI
REST API
Java Admin API
使用 获得
子命令。
$ pulsar-admin sinks get options
For more information, see .
发送 GET
请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sinkName
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException
示例
这是一个 sinkConfig。
{"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}
这是一个 sinkConfig 示例。
{ "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}
参数描述
名称|描述 |—-|—— 租户
| 租户名称 命名空间
| 命名空间名称 sink
| 唱名名称
For more information, see .
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Source
获取所有正在运行的源连接器列表。
Admin CLI
REST API
Java Admin API
使用 列表
子命令。
$ pulsar-admin sources list options
For more information, see .
发送 GET
请求到这个端点:GET /admin/v3/sources/:tenant/:namespace/
List<String> listSources(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException
| You don’t have the admin permission PulsarAdminException
| Unexpected error
For more information, see .
Sink
获取所有正在运行的 sink 列表。
Admin CLI
REST API
Java Admin API
使用 列表
子命令。
$ pulsar-admin sinks list options
For more information, see .
发送 GET
请求到此端点:GET /admin/v3/sinks/:tenant/:namespace/
List<String> listSinks(String tenant, String namespace) throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
Exception name | Description |—-|—- PulsarAdminException.NotAuthorizedException
| You don’t have the admin permission PulsarAdminException
| Unexpected error
For more information, see .
status
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 状态
子命令。
$ pulsar-admin sources status options
For more information, see here.
Gets the current status of a specified source connector.
发送
GET
请求到此端点:
Get the current status of all source connectors.
SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
名称 | 描述 |—-|——
PulsarAdminException
| 意外错误For more information, see
getSourceStatus
.Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminException
Parameter
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Source instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误For more information, see .
Sink
获取 Pulsar sink 连接器的当前状态。
Admin CLI
REST API
Java Admin API
使用 状态
子命令。
$ pulsar-admin sinks status options
For more information, see .
Get the current status of all sink connectors.
发送
GET
请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/statusGets the current status of a specified sink connector.
发送
GET
请求到此端点:
Get the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误For more information, see
getSinkStatus
.Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminException
Parameter
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Sink instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误For more information, see .
更新连接器
update
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Source
更新正在运行的 Pulsar 源连接器。
Admin CLI
REST API
Java Admin API
使用 更新
子命令。
$ pulsar-admin sources update options
For more information, see .
发送 PUT
请求到此端点: PUT /admin/v3/sources:tenant/:namespace/:sourceName
Update a running source connector with a local file.
Parameter
Name Description sourceConfig
源配置对象 异常
Name Description PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误 For more information, see .
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。文件: file:///dir/fileName.jar
**Parameter**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>源配置对象</td></tr><tr><td><code>pkgUrl</code></td><td>下载 pkg 的 URL</td></tr></tbody></table>
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>没有管理员权限</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>集群不存在</td></tr><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
For more information, see .
Sink
更新正在运行的 Pulsar sink 连接器。
Admin CLI
REST API
Java Admin API
使用 更新
子命令。
$ pulsar-admin sinks update options
For more information, see .
发送 PUT
请求到此端点: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
Parameter
Name Description sinkConfig
sink 配置对象 异常
Name Description PulsarAdminException.NotAuthorizedException
没有管理员权限 PulsarAdminException.NotFoundException
集群不存在 PulsarAdminException
未知错误 For more information, see .
Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
For more information, see .
stop
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Source
停止一个源连接器。
Admin CLI
REST API
Java Admin API
使用 停止
子命令。
$ pulsar-admin sources stop options
For more information, see here.
Stop all source connectors.
发送
POST
请求到此端点:Stop a specified source connector.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId
Stop all source connectors.
void stopSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-).
Stop a specified source connector.
void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-).
Sink
停止 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 停止
子命令。
$ pulsar-admin sinks stop options
For more information, see here.
Stop all sink connectors.
发送
POST
请求到此端点:Stop a specified sink connector.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sinkeName/:instanceId/stop
Stop all sink connectors.
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-).
Stop a specified sink connector.
void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-).
重新启动连接器
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Source
重新启动一个源连接器。
Admin CLI
REST API
Java Admin API
使用 重启
子命令。
$ pulsar-admin sources restart options
For more information, see .
Restart all source connectors.
发送
POST
请求到此端点:POST /admin/v3/sources/:tenant/:namespace/:sourceName/restartRestart a specified source connector.
发送
POST
请求到此端点:
Restart all source connectors.
void restartSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-).
Restart a specified source connector.
void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-).
Sink
重新启动 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 重启
子命令。
$ pulsar-admin sinks restart options
For more information, see .
Restart all sink connectors.
发送
POST
请求到此端点:POST /admin/v3/sources/:tenant/:namespace/:sinkName/restartRestart a specified sink connector.
发送
POST
请求到此端点:
重启所有 Pulsar sink 连接器。
void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException
Parameter
`tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-).
Restart a specified sink connector.
void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |For more information, see [`restartSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-).
删除连接器
delete
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Source
删除源连接器。
Admin CLI
REST API
Java Admin API
使用 删除
子命令。
$ pulsar-admin sources delete options
For more information, see .
删除 al Pulsar 源连接器。
发送 DELETE
请求到此端点: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName
删除源连接器。
void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
租户
| 租户名称 命名空间
| 命名空间名称 源
| 源名称
异常
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
For more information, see .
Sink
删除 sink 连接器。
Admin CLI
REST API
Java Admin API
使用 删除
子命令。
For more information, see .
删除 sink 连接器。
发送 DELETE
请求到此端点: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkname
删除 Pulsar sink 连接器。
void deleteSink(String tenant, String namespace, String source) throws PulsarAdminException
Parameter
Name | Description |
---|---|
租户
| 租户名称 命名空间
| 命名空间名称 sink
| sink 名称
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
For more information, see .