Package Pulsar Functions
在运行 Pulsar 函数之前,你必须先启动 Pulsar。 你可以在 Docker 内独立运行 Pulsar或者。
验证 Docker 镜像是否启动,你可以使用命令。
在Java 内打包一个函数,需要完成如下步骤。
使用 pom 文件创建一个新的 maven 项目。 如下代码示例,假设你的包名称是
mainClass
。编写 Java 函数
package org.example.test;
import java.util.function.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String apply(String s) {
return "This is my function!";
}
}
对于包的应用,你可以使用以下接口之一:
- Java 8 提供的 Function 接口:
java.util.function.Function
- Pulsar Function 提供的接口:
org.apache.pulsar.functions.api.Function
两个接口主要的不同是,接口
org.apache.pulsar.functions.api.Function
接口提供了 Context 接口。 当你编写一个函数,并想和它交互时,你能够通过 Context 获的关于 Pulsar Function 的各种各样的信息和功能。下面的例子使用了接口
org.apache.pulsar.functions.api.Function
提供的 Context 对象。package org.example.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
// This function is invoked every time a message is published to the input topic
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return null;
}
}
Java 函数打包
mvn package
Java 函数打包完成后,会自动创建
target
目录。 打开target
目录可以看到一个叫做java-function-1.0-SNAPSHOT.jar
的 JAR 文件。运行 Java 函数。
(1) 把 jar 包拷贝到 Pulsar 镜像里面
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of java-function-1.0-SNAPSHOT.jar> CONTAINER ID:/pulsar
(2) 使用如下命令运行 Java 函数。
./bin/pulsar-admin functions localrun \
--classname org.example.test.ExclamationFunction \
--jar java-function-1.0-SNAPSHOT.jar \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name JavaFunction
显示如下日志,表明 Java 函数运行成功。
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
Python 函数支持以下三种格式:
- ZIP 压缩文件
- PIP
Python 打包单个Python文件的函数,需要完成如下步骤。
写一个Python 函数。
from pulsar import Function // import the Function module from Pulsar
# The classic ExclamationFunction that appends an exclamation at the end
# of the input
class ExclamationFunction(Function):
pass
def process(self, input, context):
return input + '!'
process()
方法主要有两个参数:input
表示你的输入。context
是由 Pulsar 函数暴露的一个接口。 基于提供的上下文对象,您可以获取 Python 函数中的属性。
安装Python 客户端。
Python 函数的实现取决于 Python 客户端,所以在部署一个函数之前,你必须去安装对应的 Python 客户端版本。
pip install pulsar-client==2.6.0
运行 Python 函数
(1) 拷贝 Python 函数到 Pulsar 镜像内。
(2) 使用如下命令运行 Python 函数。
./bin/pulsar-admin functions localrun \
--classname <Python Function file name>.<Python Function class name> \
--py <path of Python Function file> \
--inputs persistent://public/default/my-topic-1 \
--output persistent://public/default/test-1 \
--tenant public \
--namespace default \
--name PythonFunction
显示如下日志,表明 Java 函数运行成功。
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
Python 使用ZIP压缩文件打包函数,需要完成如下步骤。
准备ZIP 压缩文件
Python 函数打包成ZIP 压缩文件,需要包含一下内容。
假设 ZIP 文件的名称是 `func.zip`,那么解压后的目录结构应如下:
"func/src"
"func/requirements.txt"
"func/deps"
以exclamation.zip为例。 这个例子内部的结构如下。
.
├── deps
│ └── sh-1.12.14-py2.py3-none-any.whl
└── src
└── exclamation.py
运行 Python 函数
(1) 拷贝 ZIP 文件到 Pulsar 镜像里面。
docker exec -it [CONTAINER ID] /bin/bash
docker cp <path of ZIP file> CONTAINER ID:/pulsar
(2) 使用如下命令运行 Python 函数。
./bin/pulsar-admin functions localrun \
--classname exclamation \
--py <path of ZIP file> \
--inputs persistent://public/default/in-topic \
--output persistent://public/default/out-topic \
--tenant public \
--namespace default \
--name PythonFunction
显示如下日志,表明 Java 函数运行成功。
...
07:55:03.724 [main] INFO org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
...
PIP 方式仅在 Kubernetes 运行时支持。 Python 使用PIP方式打包函数,需要完成如下步骤。
-
installUserCodeDependencies: true
编写 Python 函数。
您可以引入额外的依赖关系。 当 Python 函数发现这个文件使用
whl
并且参数installUserCodeDependencies
是打开的,那么系统会使用pip install
命令安装Python 函数所需要的依赖。生成
whl
文件。$ cd $PULSAR_HOME/pulsar-functions/scripts/python
$ chmod +x generate.sh
$ ./generate.sh <path of your Python Function> <path of the whl output dir> <the version of whl>
# 例子: ./generate.sh /path/to/python /path/to/python/output 1.0.0
执行后会在目录生成如下文件:
-rw-r--r-- 1 root staff 1.8K 8 27 14:29 pulsarfunction-1.0.0-py2-none-any.whl
-rw-r--r-- 1 root staff 1.4K 8 27 14:29 pulsarfunction-1.0.0.tar.gz
-rw-r--r-- 1 root staff 0B 8 27 14:29 pulsarfunction.whl
Go 打包函数,需要完成如下步骤。
编写 Go 函数。
当前,Go 函数仅仅只能使用 SDK 实现,并且函数的接口是 SDK 暴露出来的。 在使用 Go 函数之前,你必须先引入 “github.com/apache/pulsar/pulsar-function-go/pf”包。
import (
"context"
"fmt"
"github.com/apache/pulsar/pulsar-function-go/pf"
)
func HandleRequest(ctx context.Context, input []byte) error {
fmt.Println(string(input) + "!")
return nil
}
func main() {
pf.Start(HandleRequest)
}
你能够使用 Context 对象和 Go 函数交互。
if fc, ok := pf.FromContext(ctx); ok {
fmt.Printf("function ID is:%s, ", fc.GetFuncID())
fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
}
写 Go 函数时,需要记得下面几点:
在
main()
函数中,你仅仅需要将函数名称注册到Start()
. 仅仅在函数Start()
会收到函数名称。Go 函数用到的 Go 反射机制,基于收到的函数名称,去校验参数列表和返回值列表是否正确。 参数列表和返回值列表必须是如下函数例子中的一个:
func ()
func () error
func (input) error
func () (output, error)
func (input) (output, error)
func (context.Context) error
func (context.Context, input) error
func (context.Context) (output, error)
func (context.Context, input) (output, error)
构建 Go 函数。
go build <your Go Function filename>.go
运行 Go 函数
(1) 拷贝 Go 函数到 Pulsar 镜像里面。
docker exec -it [CONTAINER ID] /bin/bash
docker cp <your go function path> CONTAINER ID:/pulsar
(2) 使用如下命令运行 Go 函数。
./bin/pulsar-admin functions localrun \
--go [your go function path]
--inputs [input topics] \
--output [output topic] \
--tenant [default:public] \
--namespace [default:default] \
--name [custom unique go function name]
显示如下日志,表明 Java 函数运行成功。
如果你想在集群模式下启动函数,你可以将上面命令的localrun
替换为create
。 显示如下日志,表明 Java 函数运行成功。
"Created successfully"
更多关于参数的信息,如--classname
,--jar
,--py
,--go
,--inputs
,可以运行命令./bin/pulsar-admin functions
或者点击查看。