- Thread: Invoke functions in threads in Functions Worker.
- Process: Invoke functions in processes forked by Functions Worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by Functions Worker.
The differences of the thread and process modes are:
- Thread mode: when a function runs in thread mode, it runs on the same Java virtual machine (JVM) with Functions worker.
- Process mode: when a function runs in process mode, it runs on the same machine that Functions worker runs.
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
Thread runtime is only supported in Java function.
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory
in the functions_worker.yaml
file. The following is an example.
If you have already run a Pulsar cluster on Kubernetes, you can keep the settings unchanged at most of time.
- services
- configmaps
- apps.statefulsets
Otherwise, you will not be able to create any functions. The following is an example of error message.
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
io.kubernetes.client.ApiException: Forbidden
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
If this happens, you need to grant the required permissions to the service account used for running Functions Workers. An example to grant permissions is shown below: a service account functions-worker
is granted with permissions to access Kubernetes resources services
, configmaps
, pods
and apps.statefulsets
.
The functions (and sinks/sources) API provides a flag, customRuntimeOptions
which can be used to pass options to the runtime to customize how the runtime operates.
To use the basic implementation, set org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
for the runtimeCustomerClassName
property. This implementation takes the following customRuntimeOptions
{
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}