- 从一个或多个 Pulsar topic 中消费信息
- 将用户提供的处理逻辑应用于每条信息
- 将计算结果发布到另一个主题
可以通过以下方法来管理 Functions。
可以对 functions 执行以下操作。
可以在集群模式下使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 创建 Pulsar function(在 Pulsar 集群上部署)。
Admin CLI(命令行界面)
使用 create
子命令。
示例
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}
Java Admin API
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setParallelism(1);
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setTopicsPattern(sourceTopicPattern);
functionConfig.setSubName(subscriptionName);
functionConfig.setAutoAck(true);
functionConfig.setOutput(sinkTopic);
admin.functions().createFunction(functionConfig, fileName);
更新 function
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来更新已经部署到 Pulsar 集群的 Pulsar function。
Admin CLI(命令行界面)
使用 update
子命令。
示例
$ pulsar-admin functions update \
--tenant public \
--namespace default \
--name (Pulsar Functions 的名称) \
--output persistent://public/default/update-output-topic \
# other options
REST Admin API
PUT /admin/v3/functions/{tenant /{namespace}/{functionName}
Java Admin API
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setParallelism(1);
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
UpdateOptions updateOptions = new UpdateOptions();
updateOptions.setUpdateAuthData(updateAuthData);
admin.functions().updateFunction(functionConfig, userCodeFile, updateOptions);
启动 function 的单个实例
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来启动已停止的 function 实例( 通过 instance-id
)。
Admin CLI(命令行界面)
使用 start
子命令。
$ pulsar-admin functions start \
--tenant public \
--namespace default \
--name (Pulsar Functions 的名称) \
--instance-id 1
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/start
Java Admin API
admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
启动 function 的所有实例
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来启动所有已停止的 function 实例。
Admin CLI(命令行界面)
使用 start
子命令。
示例
$ pulsar-admin functions start \
--tenant public \
--name (Pulsar Functions 的名称) \
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/start
Java
admin.functions().startFunction(tenant, namespace, functionName);
停止 function 的单个实例
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来停止一个 function 实例(通过 instance-id
)。
Admin CLI(命令行界面)
使用 stop
子命令。
示例
$ pulsar-admin functions stop \
--tenant public \
--namespace default \
--name (Pulsar Functions 的名称) \
--instance-id 1
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/stop
Java Admin API
admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
停止 function 的所有实例
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来停止 function 的所有实例。
Admin CLI(命令行界面)
使用 stop
子命令。
$ pulsar-admin functions stop \
--tenant public \
--name (Pulsar Functions 的名称) \
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/stop
Java Admin API
admin.functions().stopFunction(tenant, namespace, functionName);
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来重启单个 function 实例(通过 instance-id
)。
Admin CLI(命令行界面)
使用 子命令。
示例
REST API
/{namespace}/{functionName}/{instanceId}/restart
Java Admin API
admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
重启 function 的所有实例
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 重启 function 的所有实例。
Admin CLI(命令行界面)
使用 子命令。
示例
$ pulsar-admin functions restart \
--tenant public \
--namespace default \
--name (Pulsar Functions 的名称) \
REST API
/{namespace}/{functionName}/restart
Java Admin API
admin.functions().restartFunction(tenant, namespace, functionName);
Function 列表
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 列出指定租户和命名空间下正在运行的所有 Pulsar function。
Admin CLI(命令行界面)
使用 子命令。
示例
$ pulsar-admin functions list \
--tenant public \
--namespace default
REST API
/{namespace}
Java Admin API
admin.functions().getFunctions(tenant, namespace);
删除 function
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 删除在 Pulsar 集群上运行的 Pulsar function。
Admin CLI(命令行界面)
使用 子命令。
示例
$ pulsar-admin functions delete \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)
REST API
/{namespace}/{functionName}
Java Admin API
admin.functions().deleteFunction(tenant, namespace, functionName);
获取 function 的信息
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来获取某个在集群模式下运行的 Pulsar function 的信息。
Admin CLI(命令行界面)
使用 子命令。
示例
$ pulsar-admin functions get \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)
REST API
/{namespace}/{functionName}
Java Admin API
admin.functions().getFunction(tenant, namespace, functionName);
获取 function 单个实例的状态
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id
来获取单个 function 实例的当前状态。
Admin CLI(命令行界面)
使用 子命令。
$ pulsar-admin functions status \
--namespace default \
--name(Pulsar Functions 的名称)\
--instance-id 1
REST API
/{namespace}/{functionName}/{instanceId}/status
Java Admin API
admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前状态。
Admin CLI(命令行界面)
使用 status
子命令。
示例
REST API
GET /admin/v3/functions/{tenant /{namespace}/{functionName}/status
Java Admin API
admin.functions().getFunctionStatus(tenant, namespace, functionName);
获取 function 单个实例的数据
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id
获取单个 function 实例的当前数据。
Admin CLI(命令行界面)
使用 stats
子命令。
示例
$ pulsar-admin functions stats \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)\
--instance-id 1
REST API
GET /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/stats
Java Admin API
admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId));
获取 function 所有实例的数据
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前数据。
Admin CLI(命令行界面)
使用 stats
子命令。
示例
$ pulsar-admin functions stats \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)
REST API
GET /admin/v3/functions/{tenant /{namespace}/{functionName}/stats
Java Admin API
admin.functions().getFunctionStats(tenant, namespace, functionName);
触发 function
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过触发值触发指定的 Pulsar function。
Admin CLI(命令行界面)
使用 trigger
子命令。
示例
$ pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)\
--topic(输入 topic 的名称)\
--trigger-value \"hello pulsar\"
# or --trigger-file(触发文件的路径)
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/trigger
Java Admin API
admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
为 function 关联状态
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 关联 Pulsar function 的状态。
Admin CLI(命令行界面)
使用 putstate
子命令。
示例
$ pulsar-admin functions putstate \
--tenant public \
--namespace default \
--name(Pulsar Functions 的名称)\
--state "{\"key\":\"pulsar\", \"stringValue\":\"hello pulsar\"}"
REST API
POST /admin/v3/functions/{tenant /{namespace}/{functionName}/state/{key}
Java Admin API
TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {};
FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);
admin.functions().putFunctionState(tenant, namespace, functionName, stateRepr);
获取与 function 关联的状态
可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取与 Pulsar function 关联的状态。
Admin CLI(命令行界面)
使用 querystate
子命令。
$ pulsar-admin functions querystate \
--tenant public \
--namespace default \
--name (Pulsar Functions 的名称) \
--key (状态的键值)
REST API
GET /admin/v3/functions/{tenant /{namespace}/{functionName}/state/{key}