Debug Pulsar Functions
将 Function 启动信息和抓取的 stderr 输出写入 logs/functions/<tenant>/<namespace>/<function>/<function>-<instance>.log
。
这对于调试 Function 无法启动非常有用。
A Pulsar Function is a function with inputs and outputs, you can test a Pulsar Function in a similar way as you test any function.
For example, if you have the following Pulsar Function:
You can write a simple unit test to test Pulsar Function.
@Test
public void testJavaNativeExclamationFunction() {
JavaNativeExclamationFunction exclamation = new JavaNativeExclamationFunction();
String output = exclamation.apply("foo");
Assert.assertEquals(output, "foo!");
}
The following Pulsar Function implements the org.apache.pulsar.functions.api.Function
interface.
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("%s!", input);
}
}
In this situation, you can write a unit test for this function as well. Remember to mock the Context
parameter. The following is an example.
提示
Pulsar使用测试ng进行测试。
@Test
public void testExclamationFunction() {
ExclamationFunction exclamation = new ExclamationFunction();
String output = exclamation.process("foo", mock(Context.class));
Assert.assertEquals(output, "foo!");
}
When you run a Pulsar Function in localrun mode, it launches an instance of the Function on your local machine as a thread.
In this mode, a Pulsar Function consumes and produces actual data to a Pulsar cluster, and mirrors how the function actually runs in a Pulsar cluster.
You can launch your function in the following manner.
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName(functionName);
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);
LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
localRunner.start(true);
So you can debug functions using an IDE easily. Set breakpoints and manually step through a function to debug with real data.
The following example illustrates how to programmatically launch a function in localrun mode.
public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String s, Context context) throws Exception {
return s + "!";
}
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
LocalRunner localRunner = LocalRunner.builder().functionConfig(functionConfig).build();
localRunner.start(false);
}
To use localrun mode programmatically, add the following dependency.
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner</artifactId>
<version>${pulsar.version}</version>
</dependency>
For complete code samples, see .
Note
Debugging with localrun mode for Pulsar Functions written in other languages will be supported soon.
示例
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public class LoggingFunction implements Function<String, Void> {
@Override
public void apply(String input, Context context) {
Logger LOG = context.getLogger();
String messageId = new String(context.getMessageId());
if (input.contains("danger")) {
LOG.warn("A warning was received in message {}", messageId);
} else {
LOG.info("Message {} received\nContent: {}", messageId, input);
}
return null;
}
}
As shown in the example above, you can get the logger via and assign the logger to the LOG
variable of slf4j
, so you can define your desired log information in a function using the LOG
variable. Meanwhile, you need to specify the topic to which the log information is produced.
示例
With Pulsar Functions CLI, you can debug Pulsar Functions with the following subcommands:
get
status
stats
list
trigger
Get information about a Pulsar Function.
用法
$ pulsar-admin functions get options
选项
提示
--fqfn
包含--name
,--namespace
和--租户
, 你可以指定--fqfn
或--name
--namespace
和--租户
.
示例
You can specify --fqfn
to get information about a Pulsar Function.
$ ./bin/pulsar-admin functions get public/default/ExclamationFunctio6
Optionally, you can specify --name
, --namespace
and --tenant
to get information about a Pulsar Function.
$ ./bin/pulsar-admin functions get \
--tenant public \
--namespace default \
--name ExclamationFunctio6
As shown below, the get
command shows input, output, runtime, and other information about the ExclamationFunctio6 function.
{
"tenant": "public",
"namespace": "default",
"name": "ExclamationFunctio6",
"className": "org.example.test.ExclamationFunction",
"inputSpecs": {
"persistent://public/default/my-topic-1": {
"isRegexPattern": false
}
},
"output": "persistent://public/default/test-1",
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"userConfig": {},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1
}
Check the current status of a Pulsar Function.
用法
$ pulsar-admin functions status options
示例
$ ./bin/pulsar-admin functions status \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
As shown below, the status
command shows the number of instances, running instances, the instance running under the ExclamationFunctio6 function, received messages, successfully processed messages, system exceptions, the average latency and so on.
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numSuccessfullyProcessed" : 1,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.8385,
"workerId" : "c-standalone-fw-23ccc88ef29b-8080"
}
} ]
}
Get the current stats of a Pulsar Function.
用法
选项
示例
$ ./bin/pulsar-admin functions stats \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
The output is shown as follows:
{
"receivedTotal" : 1,
"processedSuccessfullyTotal" : 1,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 0.8385,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1557734137987,
"instances" : [ {
"instanceId" : 0,
"metrics" : {
"receivedTotal" : 1,
"processedSuccessfullyTotal" : 1,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : 0.8385,
"1min" : {
"receivedTotal" : 0,
"processedSuccessfullyTotal" : 0,
"systemExceptionsTotal" : 0,
"userExceptionsTotal" : 0,
"avgProcessLatency" : null
},
"lastInvocation" : 1557734137987,
"userMetrics" : { }
}
} ]
}
List all Pulsar Functions running under a specific tenant and namespace.
用法
$ pulsar-admin functions list options
选项
示例
$ ./bin/pulsar-admin functions list \
--tenant public \
--namespace default
As shown below, the list
command returns three functions running under the public tenant and the default namespace.
ExclamationFunctio1
ExclamationFunctio2
ExclamationFunctio3
Trigger a specified Pulsar Function with a supplied value. This command simulates the execution process of a Pulsar Function and verifies it.
用法
$ pulsar-admin functions trigger options
选项
示例
$ ./bin/pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name ExclamationFunctio6 \
--topic persistent://public/default/my-topic-1 \
--trigger-value "hello pulsar functions"
As shown below, the trigger
command returns the following result: