Deploy Pulsar Functions

    运行一个非 集群,需要获取集群服务的 URL。 如何获取集群服务的 URL 取决于如何部署 Pulsar 集群。

    想要部署并触发 Python 的用户自定义 function,需要在所有运行 functions workers 的设备上安装 。

    Pulsar Functions 使用 接口进行部署和管理,通过 在 cluster mode 下部署 functions;通过 使用 triggering functions,通过 列出已部署的 functions。

    了解更多命令,请参阅 pulsar-admin functions

    在管理 Pulsar Functions 时,需要指定关于 functions 的各种信息,包括租户、命名空间、输入主题、输出主题等。 但是,在不输入信息时,有些参数会使用默认值。 如下表所示。

    create 命令为例。

    此 function 具有默认值的参数包括:function 名称(MyFunction)、租户(public)、命名空间(default)、订阅类型(SHARED)、处理保证(ATLEAST_ONCE)、Pulsar 服务 URL (pulsar://localhost:6650)。

    If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an instance, and so on). 本地运行命令示例如下。

    1. --py myfunc.py \
    2. --classname myfunc.SomeFunction \
    3. --inputs persistent://public/default/input-1 \
    4. --output persistent://public/default/output-1
    1. $ bin/pulsar-admin functions localrun \
    2. --broker-service-url pulsar://my-cluster-host:6650 \

    When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your . 您可以使用 create 命令在群集模式下运行函数。

    1. $ bin/pulsar-admin functions create \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/input-1 \
    5. --output persistent://public/default/output-1

    You can use the command to update a Pulsar Function running in cluster mode. 以下命令用于更新在集群模式下创建的 function。

    1. $ bin/pulsar-admin functions update \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/new-input-topic \
    5. --output persistent://public/default/new-output-topic

    Pulsar Functions run as processes or threads, which are called instances. 在运行 Pulsar Function 时,默认为单个实例。 使用一个本地运行命令只能运行 function 的单个实例。 想要运行多个实例则需要多次使用本地运行命令。

    When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the command.

    You can adjust the parallelism of an already created function using the update interface.

    1. $ bin/pulsar-admin functions update \
    2. --parallelism 5 \
    3. # Other function

    通过 YAML ,使用 parallelism function 指定其配置。 配置示例如下。

    1. # function-config.yaml
    2. parallelism: 3
    3. inputs:
    4. - persistent://public/default/input-1
    5. # other parameters

    相关更新命令如下。

    1. $ bin/pulsar-admin functions update \
    2. --function-config-file function-config.yaml

    在下运行 Pulsar Functions 时,可以指定资源分配给 function 的每个实例

    ResourceSpecified asRuntimes
    CPUThe number of coresKubernetes
    RAMThe number of bytesProcess, Docker
    Disk spaceThe number of bytesDocker
    1. --jar target/my-functions.jar \
    2. --classname org.example.functions.MyFunction \
    3. --cpu 8 \
    4. --ram 8589934592 \
    5. --disk 10737418240

    If a Pulsar Function is running in , you can trigger it at any time using the command line. 触发 function 意味着向其发送具有特定值的消息,并通过命令行获取其输出(如有输入)。

    要学习如何触发 function,可以从 Python function 开始,Python function 会返回基于输入的简单字符串。

    可以在本地运行模式下运行 function。

    1. $ bin/pulsar-admin functions create \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \
    5. --py myfunc.py \
    6. --classname myfunc \
    7. --inputs persistent://public/default/in \
    8. --output persistent://public/default/out

    指定 consumer 以 命令在输出 topic 上接收来自 myfunc function 的消息。

    1. $ bin/pulsar-client consume persistent://public/default/out \
    2. --subscription-name my-subscription
    3. --num-messages 0 # Listen indefinitely

    然后可以触发 function。

    1. $ bin/pulsar-admin functions trigger \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \
    5. --trigger-value "hello world"

    监听输出 topic 的 consumer 会在日志中生成如下内容。

    1. This function has been triggered with a value of hello world