Deploy Pulsar Functions

    If you run a non-standalone cluster, you need to obtain the service URL for the cluster. How you obtain the service URL depends on how you deploy your Pulsar cluster.

    If you want to deploy and trigger Python user-defined functions, you need to install on all the machines running functions workers.

    Pulsar Functions are deployed and managed using the interface, which contains commands such as create for deploying functions in , trigger for functions, list for listing deployed functions.

    To learn more commands, refer to .

    When managing Pulsar Functions, you need to specify a variety of information about functions, including tenant, namespace, input and output topics, and so on. However, some parameters have default values if you do not specify values for them. The following table lists the default values.

    Example of default arguments

    Take the create command as an example.

    The function has default values for the function name (MyFunction), tenant (public), namespace (default), subscription type (SHARED), processing guarantees (ATLEAST_ONCE), and Pulsar service URL (pulsar://localhost:6650).

    If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an instance, and so on). The following is a localrun command example.

    1. $ bin/pulsar-admin functions localrun \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/input-1 \
    5. --output persistent://public/default/output-1

    By default, the function connects to a Pulsar cluster running on the same machine, via a local service URL of pulsar://localhost:6650. If you use local run mode to run a function but connect it to a non-local Pulsar cluster, you can specify a different broker URL using the --brokerServiceUrl flag. The following is an example.

    1. --broker-service-url pulsar://my-cluster-host:6650 \
    2. # Other function parameters

    When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your local environment. You can run a function in cluster mode using the command.

    1. --py myfunc.py \
    2. --classname myfunc.SomeFunction \
    3. --inputs persistent://public/default/input-1 \
    4. --output persistent://public/default/output-1
    1. $ bin/pulsar-admin functions update \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/new-input-topic \
    5. --output persistent://public/default/new-output-topic

    Parallelism

    Pulsar Functions run as processes or threads, which are called instances. When you run a Pulsar Function, it runs as a single instance by default. With one localrun command, you can only run a single instance of a function. If you want to run multiple instances, you can use localrun command multiple times.

    When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the command.

    You can adjust the parallelism of an already created function using the update interface.

    1. $ bin/pulsar-admin functions update \
    2. --parallelism 5 \
    3. # Other function

    If you specify a function configuration via YAML, use the parallelism parameter. The following is a config file example.

    1. # function-config.yaml
    2. parallelism: 3
    3. inputs:
    4. - persistent://public/default/input-1
    5. output: persistent://public/default/output-1
    6. # other parameters

    The following is corresponding update command.

    1. $ bin/pulsar-admin functions update \
    2. --function-config-file function-config.yaml

    When you run Pulsar Functions in , you can specify the resources that are assigned to each function instance.

    ResourceSpecified asRuntimes
    CPUThe number of coresKubernetes
    RAMThe number of bytesProcess, Docker
    Disk spaceThe number of bytesDocker

    The following function creation command allocates 8 cores, 8 GB of RAM, and 10 GB of disk space to a function.

    1. $ bin/pulsar-admin functions create \
    2. --classname org.example.functions.MyFunction \
    3. --cpu 8 \
    4. --ram 8589934592 \
    5. --disk 10737418240

    Use Package management service

    Package management enables version management and simplifies the upgrade and rollback processes for Functions, Sinks, and Sources. When you use the same function, sink and source in different namespaces, you can upload them to a common package management system.

    To use Package management service, ensure that the package management service has been enabled in your cluster by setting the following properties in broker.conf.

    With Package management service enabled, you can upload your function packages by to the service and get the package URL.

    When you have a ready to use package URL, you can create the function with package URL by setting --jar, --py, or --go to the package URL with pulsar-admin functions create.

    If a Pulsar Function is running in , you can trigger it at any time using the command line. Triggering a function means that you send a message with a specific value to the function and get the function output (if any) via the command line.

    To learn how to trigger a function, you can start with Python function that returns a simple string based on the input.

    1. # myfunc.py
    2. def process(input):
    3. return "This function has been triggered with a value of {0}".format(input)

    You can run the function in local run mode.

    1. $ bin/pulsar-admin functions create \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \
    5. --py myfunc.py \
    6. --classname myfunc \
    7. --inputs persistent://public/default/in \
    8. --output persistent://public/default/out

    Then assign a consumer to listen on the output topic for messages from the myfunc function with the command.

    1. $ bin/pulsar-client consume persistent://public/default/out \
    2. --subscription-name my-subscription
    3. --num-messages 0 # Listen indefinitely

    And then you can trigger the function.

    1. $ bin/pulsar-admin functions trigger \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \

    The consumer listening on the output topic produces something as follows in the log.