Pulsar 捆绑了几个用于移动数据进出常用系统的 内置连接器。 (可选)你可以创建和使用所需的非内置连接器。
若要设置内置连接器,请按照这里 。
安装后,内置连接器会自动被 Pulsar 代理(或 function-workers)发现,因此不需要额外安装步骤。
配置以下信息:
要配置内置连接器的默认文件夹,请在 中设置 <code>连接目录
参考/conf/functions_worker.yml 配置文件。
示例
设置 ./connectors
文件夹作为内置连接器的默认存储位置。
配置一个 YAML 文件的连接器
要配置连接器,需要在创建连接器时提供一个 YAML 配置文件。
YAML 配置文件告诉 Pulsar 在哪里找到连接器,以及如何将连接器与 Pulsar 主题连接起来。
示例 1
下面是 Cassandra sink 的 YAML 配置文件
哪个 Cassandra 集群可以连接
什么是
keyspace
andcolumnFamily
用于收集数据如何将 Pulsar 消息映射到 Cassandra 桌面键和列
tenant: public
namespace: default
name: cassandra-test-sink
...
# cassandra specific config
configs:
roots: "localhost:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
示例 2
下面是 Kafka 源的 YAML 配置文件。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
示例 3
下面是 MySQL JDBC sink 的 YAML 配置文件。
configs:
userName: "root"
password: "jdbc"
jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
tableName: "test_jdbc"
在开始使用连接器之前,可以执行以下操作:
重新加载
如果你在连接器文件夹中添加或删除 nar 文件,在使用之前重新载入可用的内置连接器。
Source
使用 重新加载
子命令。
$ pulsar-admin 源刷新
For more information, see .
Sink
使用 重新加载
子命令。
$ pulsar-admin sinks reload
For more information, see .
可用
重新加载连接器后(可选),你可以获得可用连接器列表。
Source
使用 可用源
子命令。
$ pulsar-admin sources available-sources
Sink
使用 可用的集合
子命令。
$ pulsar-admin sinks available-sinks
要运行连接器,你可以执行以下操作:
You can create a connector using Admin CLI, REST API or JAVA admin API.
Source
创建一个源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sources create options
For more information, see .
发送 POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException
Parameter
`sourceConfig` | 源配置对象
异常
| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`CreateSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSource-SourceConfig-java.lang.String-)。
Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
HTTP:
文件: file:///dir/fileName.jar
**Parameter**
参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg 的 URL
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
欲了解更多信息,请参阅 [`createSourceWell`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#createSourceWithUrl-SourceConfig-java.lang.String-)。
Sink
创建源连接器。
Admin CLI
REST API
Java Admin API
使用 创建
子命令。
$ pulsar-admin sinks create options
For more information, see .
发送 POST
请求到此端点:POST /admin/v3/sinks/:tenant/:namespace/:sinkName
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException
Parameter
Name Description
`sinkConfig` | The sink configuration object**异常**| Name | Description || ---------------------- | ----------- || `PulsarAdminException` | 未知错误 |欲了解更多信息,请参阅 [`createSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSink-SinkConfig-java.lang.String-)。
Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException
支持的 URL是
http
和文件
。示例
HTTP:
文件: file:///dir/fileName.jar
**Parameter**
参数 | 描述 |---|---- `sourceConfig` | 源配置对象 `pkgUrl` | 可以从其中下载pkg 的 URL
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
欲了解更多信息,请参阅 [`createSinkWidurl`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#createSinkWithUrl-SinkConfig-java.lang.String-)。
start
You can start a connector using Admin CLI or REST API.
Source
启动一个源连接器。
Admin CLI
REST API
使用 起始
子命令。
$ pulsar-admin sources start options
For more information, see here.
Start all source connectors.
发送
POST
请求到此端点:Start a specified source connector.
Sink
启动 sink 连接器。
Admin CLI
REST API
使用 起始
子命令。
$ pulsar-admin sinks start options
For more information, see .
Start all sink connectors.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sinkName/startStart a specified sink connector.
发送
POST
请求到此端点:
localrun
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Source
本地运行一个源连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sources localrun options
For more information, see here.
Sink
本地运行 sink 连接器。
Admin CLI
使用 localrun
子命令。
$ pulsar-admin sinks localrun options
For more information, see here.
要监视连接器,你可以执行以下操作:
get
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的信息。
使用 获得
子命令。
$ pulsar-admin sources get options
For more information, see here.
发送 GET
请求到此端点:
SourceConfig getSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
示例
这是一个源配置
这是一个源配置示例。
{
"tenant": "public",
"namespace": "default",
"name": "debezium-mysql-source",
"className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource",
"topicName": "debezium-mysql-topic",
"configs": {
"database.user": "debezium",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.port": "3306",
"database.hostname": "localhost",
"database.password": "dbz",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"database.history.pulsar.topic": "history-topic2"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
}
}
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,请参阅 getSource
。#### Sink
获取 sink 的信息。使用 获得
子命令。
$ pulsar-admin sinks get options
For more information, see .发送 GET
请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sinkName
SinkConfig getSink(String tenant,
String namespace,
String sink)
throws PulsarAdminException
示例
这是一个 sinkConfig。
{
"tenant": "tenantName",
"namespace": "namespaceName",
"name": "sinkName",
"className": "className",
"inputSpecs": {
"topicName": {
"isRegexPattern": false
}
},
"configs": {},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
这是一个 sinkConfig 示例。
{
"tenant": "public",
"namespace": "default",
"name": "pulsar-mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"pulsar-mysql-jdbc-sink-topic": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink",
"userName": "root",
"tableName": "pulsar_mysql_jdbc_sink"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
参数描述
名称|描述 |—-|—— 租户
| 租户名称 命名空间
| 命名空间名称 sink
| 唱名名称
欲了解更多信息,请参阅 。### list
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Source
获取所有正在运行的源连接器列表。使用 列表
子命令。
$ pulsar-admin sources list options
For more information, see .发送 GET
请求到这个端点:GET /admin/v3/sources/:tenant/:namespace/
List<String> listSources(String tenant,
String namespace)
throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,见 。#### Sink
获取所有正在运行的 sink 列表。使用 列表
子命令。
$ pulsar-admin sinks list options
For more information, see here.发送 GET
请求到此端点:
List<String> listSinks(String tenant,
String namespace)
throws PulsarAdminException
响应示例
["f1", "f2", "f3"]
异常
异常名称|描述| —- | —-PulsarAdminException.NotAuthorizedException
|你没有管理员权限PulsarAdminException.NotFoundException
|集群不存在PulsarAdminException
|意外错误
欲了解更多信息,见 listSource
。### status
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Source
获取源连接器的当前状态。使用 状态
子命令。
$ pulsar-admin sources status options
For more information, see here.* Get the current status of all source connectors.
发送 GET
请求到此端点:
Gets the current status of a specified source connector.
发送
GET
请求到此端点: GET /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status * Get the current status of all source connectors.SourceStatus getSourceStatus(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
名称 | 描述 |—-|——
PulsarAdminException
| 意外错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant,
String namespace,
String source,
int id)
throws PulsarAdminException
Parameter
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Source instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅
getSourceStatus
。#### Sink
获取 Pulsar sink 连接器的当前状态。使用 状态
子命令。
$ pulsar-admin sinks status options
For more information, see .* Get the current status of all sink connectors.
发送 GET
请求到此端点: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/status
Gets the current status of a specified sink connector.
SinkStatus getSinkStatus(String tenant,
String namespace,
String sink)
throws PulsarAdminException
Parameter
参数|描述 |—-|——
租户
| 租户名称命名空间
| 命名空间名称sink
| 源名称异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅 。
Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant,
String namespace,
String sink,
int id)
throws PulsarAdminException
Parameter
Parameter| Description |—-|—-
tenant
| Tenant namenamespace
| Namespace namesink
| Source nameid
| Sink instanceID异常
异常名称|描述| —- | —-
PulsarAdminException
|意外的错误欲了解更多信息,请参阅
getSinkStatusWidstanceID
。## 更新连接器
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Source
更新正在运行的 Pulsar 源连接器。使用 更新
子命令。
$ pulsar-admin sources update options
For more information, see here.发送 PUT
请求到此端点: * Update a running source connector with a local file.
```java
void updateSource(SourceConfig sourceConfig,
String fileName)
throws PulsarAdminException
```
**Parameter**
| Name | Description |
| -------------- | ----------- |
| `sourceConfig` | 源配置对象 |
**异常**
| Name | Description |
| --------------------------------------------- | ----------- |
| `PulsarAdminException.NotAuthorizedException` | 没有管理员权限 |
| `PulsarAdminException.NotFoundException` | 集群不存在 |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`updateSource`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#updateSource-SourceConfig-java.lang.String-)
Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig,
String pkgUrl)
throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
**Parameter**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>sourceConfig</code></td><td>源配置对象</td></tr><tr><td><code>pkgUrl</code></td><td>下载 pkg 的 URL</td></tr></tbody></table>
**异常**
<table><thead><tr><th>Name</th><th>Description</th></tr></thead><tbody><tr><td><code>PulsarAdminException.NotAuthorizedException</code></td><td>没有管理员权限</td></tr><tr><td><code>PulsarAdminException.NotFoundException</code></td><td>集群不存在</td></tr><tr><td><code>PulsarAdminException</code></td><td>未知错误</td></tr></tbody></table>
欲了解更多信息,请参阅 。#### Sink
更新正在运行的 Pulsar sink 连接器。使用 更新
子命令。
$ pulsar-admin sinks update options
For more information, see here.发送 PUT
请求到此端点: * Update a running sink connector with a local file.
```java
void updateSink(SinkConfig sinkConfig,
String fileName)
throws PulsarAdminException
```
**Parameter**
| Name | Description |
| ------------ | ----------- |
| `sinkConfig` | sink 配置对象 |
**异常**
| Name | Description |
| --------------------------------------------- | ----------- |
| `PulsarAdminException.NotAuthorizedException` | 没有管理员权限 |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`updateSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#updateSink-SinkConfig-java.lang.String-)。
Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSinkWithUrl(SinkConfig sinkConfig,
throws PulsarAdminException
支持的 URL是
http
和文件
。示例
文件: file:///dir/fileName.jar
欲了解更多信息,请参阅 。## 停止连接器
stop
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Source
停止一个源连接器。使用 停止
子命令。
$ pulsar-admin sources stop options
For more information, see here.* Stop all source connectors.
发送 POST
请求到此端点:
Stop a specified source connector.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId * Stop all source connectors.void stopSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified source connector.
void stopSource(String tenant,
String namespace,
String source,
int instanceId)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`停止源`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#stopSource-java.lang.String-java.lang.String-java.lang.String-int-)。<!--END_DOCUSAURUS_CODE_TABS-->#### Sink
停止 sink 连接器。使用 停止
子命令。
$ pulsar-admin sinks stop options
For more information, see .* Stop all sink connectors.
发送 POST
请求到此端点:POST /admin/v3/sinks/:tenant/:namespace/:sinkName/stop
Stop a specified sink connector.
发送
POST
请求到此端点: * Stop all sink connectors.void stopSink(String tenant,
String namespace,
String sink)
throws PulsarAdminException
Parameter
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-)。
Stop a specified sink connector.
void stopSink(String tenant,
String namespace,
String sink,
int instanceId)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`stopSink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#stopSink-java.lang.String-java.lang.String-java.lang.String-int-)。<!--END_DOCUSAURUS_CODE_TABS-->## 重新启动连接器
restart
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Source
重新启动一个源连接器。使用 重启
子命令。
$ pulsar-admin sources restart options
For more information, see here.* Restart all source connectors.
发送 POST
请求到此端点:
Restart a specified source connector.
发送
POST
请求到此端点: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restart * Restart all source connectors.void restartSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified source connector.
void restartSource(String tenant,
String namespace,
String source,
int instanceId)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Source instanceID
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`重启源`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Source.html#restartSource-java.lang.String-java.lang.String-java.lang.String-int-)。<!--END_DOCUSAURUS_CODE_TABS-->#### Sink
重新启动 sink 连接器。使用 重启
子命令。
$ pulsar-admin sinks restart options
For more information, see .* Restart all sink connectors.
发送 POST
请求到此端点:POST /admin/v3/sources/:tenant/:namespace/:sinkName/restart
Restart a specified sink connector.
发送
POST
请求到此端点: * 重启所有 Pulsar sink 连接器。void restartSink(String tenant,
String namespace,
String sink)
throws PulsarAdminException
Parameter
`tenant` | Tenant name `namespace` | Namespace name `sink` | Sink name
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-)。
Restart a specified sink connector.
void restartSink(String tenant,
String namespace,
String sink,
int instanceId)
throws PulsarAdminException
Parameter
Name Description
`tenant` | Tenant name `namespace` | Namespace name `source` | Source name `instanceId` | Sink instanceID
**异常**
| Name | Description |
| ---------------------- | ----------- |
| `PulsarAdminException` | 未知错误 |
欲了解更多信息,请参阅 [`重新启动Sink`](https://pulsar.apache.org/api/admin/2.6.0-SNAPSHOT/org/apache/pulsar/client/admin/Sink.html#restartSink-java.lang.String-java.lang.String-java.lang.String-int-)。<!--END_DOCUSAURUS_CODE_TABS-->## 删除连接器
delete
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Source
删除源连接器。使用 删除
子命令。
$ pulsar-admin sources delete options
For more information, see here.删除 al Pulsar 源连接器。
发送 DELETE
请求到此端点: 删除源连接器。
void deleteSource(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
Name | Description |
---|---|
租户
| 租户名称 命名空间
| 命名空间名称 源
| 源名称
异常
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 deleteSource
。#### Sink
删除 sink 连接器。使用 删除
子命令。
$ pulsar-admin sinks delete options
For more information, see .删除 sink 连接器。
发送 DELETE
请求到此端点: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkname 删除 Pulsar sink 连接器。
void deleteSink(String tenant,
String namespace,
String source)
throws PulsarAdminException
Parameter
租户
| 租户名称 命名空间
| 命名空间名称 sink
| sink 名称
Name | Description |
---|---|
PulsarAdminException.NotAuthorizedException | 没有管理员权限 |
PulsarAdminException.NotFoundException | 集群不存在 |
PulsarAdminException.PreconditionFailedException | 集群不是空的 |
PulsarAdminException | 未知错误 |
欲了解更多信息,请参阅 。