• 从一个或多个 Pulsar topic 中消费信息
    • 将用户提供的处理逻辑应用于每条信息
    • 将计算结果发布到另一个主题

    可以通过以下方法来管理 Functions。

    可以对 functions 执行以下操作。

    可以在集群模式下使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 创建 Pulsar function(在 Pulsar 集群上部署)。

    Admin CLI(命令行界面)

    使用 create 子命令。

    示例

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}

    Java Admin API

    1. FunctionConfig functionConfig = new FunctionConfig();
    2. functionConfig.setTenant(tenant);
    3. functionConfig.setNamespace(namespace);
    4. functionConfig.setName(functionName);
    5. functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
    6. functionConfig.setParallelism(1);
    7. functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
    8. functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
    9. functionConfig.setTopicsPattern(sourceTopicPattern);
    10. functionConfig.setSubName(subscriptionName);
    11. functionConfig.setAutoAck(true);
    12. functionConfig.setOutput(sinkTopic);
    13. admin.functions().createFunction(functionConfig, fileName);

    更新 function

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来更新已经部署到 Pulsar 集群的 Pulsar function。

    Admin CLI(命令行界面)

    使用 update 子命令。

    示例

    1. $ pulsar-admin functions update \
    2. --tenant public \
    3. --namespace default \
    4. --name (Pulsar Functions 的名称) \
    5. --output persistent://public/default/update-output-topic \
    6. # other options

    REST Admin API

    PUT /admin/v3/functions/{tenant /{namespace}/{functionName}

    Java Admin API

    1. FunctionConfig functionConfig = new FunctionConfig();
    2. functionConfig.setTenant(tenant);
    3. functionConfig.setNamespace(namespace);
    4. functionConfig.setName(functionName);
    5. functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
    6. functionConfig.setParallelism(1);
    7. functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
    8. UpdateOptions updateOptions = new UpdateOptions();
    9. updateOptions.setUpdateAuthData(updateAuthData);
    10. admin.functions().updateFunction(functionConfig, userCodeFile, updateOptions);

    启动 function 的单个实例

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来启动已停止的 function 实例( 通过 instance-id)。

    Admin CLI(命令行界面)

    使用 start 子命令。

    1. $ pulsar-admin functions start \
    2. --tenant public \
    3. --namespace default \
    4. --name (Pulsar Functions 的名称) \
    5. --instance-id 1

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/start

    Java Admin API

    1. admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

    启动 function 的所有实例

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来启动所有已停止的 function 实例。

    Admin CLI(命令行界面)

    使用 start 子命令。

    示例

    1. $ pulsar-admin functions start \
    2. --tenant public \
    3. --name (Pulsar Functions 的名称) \

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/start

    Java

    1. admin.functions().startFunction(tenant, namespace, functionName);

    停止 function 的单个实例

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API来停止一个 function 实例(通过 instance-id)。

    Admin CLI(命令行界面)

    使用 stop 子命令。

    示例

    1. $ pulsar-admin functions stop \
    2. --tenant public \
    3. --namespace default \
    4. --name (Pulsar Functions 的名称) \
    5. --instance-id 1

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/stop

    Java Admin API

    1. admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

    停止 function 的所有实例

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来停止 function 的所有实例。

    Admin CLI(命令行界面)

    使用 stop 子命令。

    1. $ pulsar-admin functions stop \
    2. --tenant public \
    3. --name (Pulsar Functions 的名称) \

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/stop

    Java Admin API

    1. admin.functions().stopFunction(tenant, namespace, functionName);

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来重启单个 function 实例(通过 instance-id)。

    Admin CLI(命令行界面)

    使用 子命令。

    示例

    REST API

    /{namespace}/{functionName}/{instanceId}/restart

    Java Admin API

    1. admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));

    重启 function 的所有实例

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 重启 function 的所有实例。

    Admin CLI(命令行界面)

    使用 子命令。

    示例

    1. $ pulsar-admin functions restart \
    2. --tenant public \
    3. --namespace default \
    4. --name (Pulsar Functions 的名称) \

    REST API

    /{namespace}/{functionName}/restart

    Java Admin API

    1. admin.functions().restartFunction(tenant, namespace, functionName);

    Function 列表

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 列出指定租户和命名空间下正在运行的所有 Pulsar function。

    Admin CLI(命令行界面)

    使用 子命令。

    示例

    1. $ pulsar-admin functions list \
    2. --tenant public \
    3. --namespace default

    REST API

    /{namespace}

    Java Admin API

    1. admin.functions().getFunctions(tenant, namespace);

    删除 function

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 删除在 Pulsar 集群上运行的 Pulsar function。

    Admin CLI(命令行界面)

    使用 子命令。

    示例

    1. $ pulsar-admin functions delete \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)

    REST API

    /{namespace}/{functionName}

    Java Admin API

    1. admin.functions().deleteFunction(tenant, namespace, functionName);

    获取 function 的信息

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 来获取某个在集群模式下运行的 Pulsar function 的信息。

    Admin CLI(命令行界面)

    使用 子命令。

    示例

    1. $ pulsar-admin functions get \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)

    REST API

    /{namespace}/{functionName}

    Java Admin API

    1. admin.functions().getFunction(tenant, namespace, functionName);

    获取 function 单个实例的状态

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id 来获取单个 function 实例的当前状态。

    Admin CLI(命令行界面)

    使用 子命令。

    1. $ pulsar-admin functions status \
    2. --namespace default \
    3. --namePulsar Functions 的名称)\
    4. --instance-id 1

    REST API

    /{namespace}/{functionName}/{instanceId}/status

    Java Admin API

    1. admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前状态。

    Admin CLI(命令行界面)

    使用 status 子命令。

    示例

    REST API

    GET /admin/v3/functions/{tenant /{namespace}/{functionName}/status

    Java Admin API

    1. admin.functions().getFunctionStatus(tenant, namespace, functionName);

    获取 function 单个实例的数据

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过 instance-id 获取单个 function 实例的当前数据。

    Admin CLI(命令行界面)

    使用 stats 子命令。

    示例

    1. $ pulsar-admin functions stats \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)\
    5. --instance-id 1

    REST API

    GET /admin/v3/functions/{tenant /{namespace}/{functionName}/{instanceId}/stats

    Java Admin API

    1. admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId));

    获取 function 所有实例的数据

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取所有 function 实例的当前数据。

    Admin CLI(命令行界面)

    使用 stats 子命令。

    示例

    1. $ pulsar-admin functions stats \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)

    REST API

    GET /admin/v3/functions/{tenant /{namespace}/{functionName}/stats

    Java Admin API

    1. admin.functions().getFunctionStats(tenant, namespace, functionName);

    触发 function

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 通过触发值触发指定的 Pulsar function。

    Admin CLI(命令行界面)

    使用 trigger 子命令。

    示例

    1. $ pulsar-admin functions trigger \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)\
    5. --topic(输入 topic 的名称)\
    6. --trigger-value \"hello pulsar\"
    7. # or --trigger-file(触发文件的路径)

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/trigger

    Java Admin API

    1. admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);

    为 function 关联状态

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 关联 Pulsar function 的状态。

    Admin CLI(命令行界面)

    使用 putstate 子命令。

    示例

    1. $ pulsar-admin functions putstate \
    2. --tenant public \
    3. --namespace default \
    4. --namePulsar Functions 的名称)\
    5. --state "{\"key\":\"pulsar\", \"stringValue\":\"hello pulsar\"}"

    REST API

    POST /admin/v3/functions/{tenant /{namespace}/{functionName}/state/{key}

    Java Admin API

    1. TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {};
    2. FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);
    3. admin.functions().putFunctionState(tenant, namespace, functionName, stateRepr);

    获取与 function 关联的状态

    可以使用 Admin CLI(命令行界面)、REST API 或 Java Admin API 获取与 Pulsar function 关联的状态。

    Admin CLI(命令行界面)

    使用 querystate 子命令。

    1. $ pulsar-admin functions querystate \
    2. --tenant public \
    3. --namespace default \
    4. --name (Pulsar Functions 的名称) \
    5. --key (状态的键值)

    REST API

    GET /admin/v3/functions/{tenant /{namespace}/{functionName}/state/{key}

    Java Admin CLI