测试

    Usually, one can assume that Flink produces correct results outside of a user-defined function. Therefore, it is recommended to test those classes that contain the main business logic with unit tests as much as possible.

    For example, let’s take the following stateless .

    1. class IncrementMapFunction extends MapFunction[Long, Long] {
    2. override def map(record: Long): Long = {
    3. record + 1
    4. }
    5. }

    It is very easy to unit test such a function with your favorite testing framework by passing suitable arguments and verifying the output.

    1. public class IncrementMapFunctionTest {
    2. @Test
    3. public void testIncrement() throws Exception {
    4. // instantiate your function
    5. IncrementMapFunction incrementer = new IncrementMapFunction();
    6. // call the methods that you have implemented
    7. assertEquals(3L, incrementer.map(2L));
    8. }
    9. }
    1. class IncrementMapFunctionTest extends FlatSpec with Matchers {
    2. "IncrementMapFunction" should "increment values" in {
    3. // instantiate your function
    4. val incrementer: IncrementMapFunction = new IncrementMapFunction()
    5. // call the methods that you have implemented
    6. incremeter.map(2) should be (3)
    7. }
    8. }

    Similarly, a user-defined function which uses an org.apache.flink.util.Collector (e.g. a FlatMapFunction or ProcessFunction) can be easily tested by providing a mock object instead of a real collector. A FlatMapFunction with the same functionality as the IncrementMapFunction could be unit tested as follows.

    1. public class IncrementFlatMapFunctionTest {
    2. @Test
    3. public void testIncrement() throws Exception {
    4. // instantiate your function
    5. IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
    6. Collector<Integer> collector = mock(Collector.class);
    7. // call the methods that you have implemented
    8. incrementer.flatMap(2L, collector);
    9. //verify collector was called with the right output
    10. Mockito.verify(collector, times(1)).collect(3L);
    11. }
    12. }

    Testing the functionality of a user-defined function, which makes use of managed state or timers is more difficult because it involves testing the interaction between the user code and Flink’s runtime.For this Flink comes with a collection of so called test harnesses, which can be used to test such user-defined functions as well as custom operators:

    • OneInputStreamOperatorTestHarness (for operators on DataStreamss)
    • KeyedOneInputStreamOperatorTestHarness (for operators on KeyedStreams)
    • TwoInputStreamOperatorTestHarness (for operators of ConnectedStreams of two DataStreams)
    • KeyedTwoInputStreamOperatorTestHarness (for operators on ConnectedStreams of two KeyedStreams)

    To use the test harnesses a set of additional dependencies (test scoped) is needed.

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-test-utils_2.11</artifactId>
    4. <version>1.9.0</version>
    5. <scope>test</scope>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.flink</groupId>
    9. <artifactId>flink-runtime_2.11</artifactId>
    10. <version>1.9.0</version>
    11. <scope>test</scope>
    12. <classifier>tests</classifier>
    13. </dependency>
    14. <dependency>
    15. <groupId>org.apache.flink</groupId>
    16. <artifactId>flink-streaming-java_2.11</artifactId>
    17. <version>1.9.0</version>
    18. <scope>test</scope>
    19. <classifier>tests</classifier>
    20. </dependency>
    1. public class StatefulFlatMapTest {
    2. private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
    3. private StatefulFlatMap statefulFlatMapFunction;
    4. @Before
    5. public void setupTestHarness() throws Exception {
    6. statefulFlatMapFunction = new StatefulFlatMapFunction();
    7. // wrap user defined function into a the corresponding operator
    8. testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
    9. // optionally configured the execution environment
    10. testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
    11. // open the test harness (will also call open() on RichFunctions)
    12. testHarness.open();
    13. }
    14. @Test
    15. public void testingStatefulFlatMapFunction() throws Exception {
    16. //push (timestamped) elements into the operator (and hence user defined function)
    17. testHarness.processElement(2L, 100L);
    18. //trigger event time timers by advancing the event time of the operator with a watermark
    19. testHarness.processWatermark(100L);
    20. //trigger processing time timers by advancing the processing time of the operator directly
    21. testHarness.setProcessingTime(100L);
    22. //retrieve list of emitted records for assertions
    23. assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
    24. //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
    25. //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
    26. }
    27. }
    1. class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with BeforeAndAfter {
    2. private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
    3. private var statefulFlatMap: StatefulFlatMapFunction = null
    4. before {
    5. //instantiate user-defined function
    6. statefulFlatMap = new StatefulFlatMap
    7. // wrap user defined function into a the corresponding operator
    8. testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new StreamFlatMap(statefulFlatMap))
    9. // optionally configured the execution environment
    10. testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
    11. // open the test harness (will also call open() on RichFunctions)
    12. testHarness.open();
    13. }
    14. "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
    15. //push (timestamped) elements into the operator (and hence user defined function)
    16. testHarness.processElement(2, 100);
    17. //trigger event time timers by advancing the event time of the operator with a watermark
    18. testHarness.processWatermark(100);
    19. //trigger proccesign time timers by advancing the processing time of the operator directly
    20. testHarness.setProcessingTime(100);
    21. //retrieve list of emitted records for assertions
    22. testHarness.getOutput should contain (3)
    23. //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
    24. //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should have size 0
    25. }
    26. }

    KeyedOneInputStreamOperatorTestHarness and KeyedTwoInputStreamOperatorTestHarness are instantiated by additionally providing a KeySelector including TypeInformation for the class of the key.

    1. public class StatefulFlatMapFunctionTest {
    2. private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
    3. private StatefulFlatMap statefulFlatMapFunction;
    4. @Before
    5. public void setupTestHarness() throws Exception {
    6. //instantiate user-defined function
    7. statefulFlatMapFunction = new StatefulFlatMapFunction();
    8. testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), Types.STRING);
    9. // open the test harness (will also call open() on RichFunctions)
    10. testHarness.open();
    11. }
    12. //tests
    13. }

    Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:

    • org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest is a good example for testing operators and user-defined functions, which depend on processing or event time.
    • org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest shows how to test a custom sink with the AbstractStreamOperatorTestHarness. Specifically, it uses AbstractStreamOperatorTestHarness.snapshot and AbstractStreamOperatorTestHarness.initializeState to tests its interaction with Flink’s checkpointing mechanism.

    Note Be aware that AbstractStreamOperatorTestHarness and its derived classes are currently not part of the public API and can be subject to change.

    Apache Flink provides a JUnit rule called MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster.called MiniClusterWithClientResource.

    To use MiniClusterWithClientResource one additional dependency (test scoped) is needed.

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-test-utils_2.11</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    Let us take the same simple MapFunction as in the previous sections.

    1. public class IncrementMapFunction implements MapFunction<Long, Long> {
    2. @Override
    3. public Long map(Long record) throws Exception {
    4. return record + 1;
    5. }
    6. }
    1. class IncrementMapFunction extends MapFunction[Long, Long] {
    2. override def map(record: Long): Long = {
    3. record + 1
    4. }
    5. }
    1. public class ExampleIntegrationTest {
    2. @ClassRule
    3. public static MiniClusterWithClientResource flinkCluster =
    4. new MiniClusterWithClientResource(
    5. new MiniClusterResourceConfiguration.Builder()
    6. .setNumberSlotsPerTaskManager(2)
    7. .setNumberTaskManagers(1)
    8. .build());
    9. @Test
    10. public void testIncrementPipeline() throws Exception {
    11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    12. // configure your test environment
    13. env.setParallelism(2);
    14. // values are collected in a static variable
    15. CollectSink.values.clear();
    16. // create a stream of custom elements and apply transformations
    17. env.fromElements(1L, 21L, 22L)
    18. .map(new IncrementMapFunction())
    19. .addSink(new CollectSink());
    20. // execute
    21. env.execute();
    22. // verify your results
    23. assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
    24. }
    25. // create a testing sink
    26. private static class CollectSink implements SinkFunction<Long> {
    27. // must be static
    28. public static final List<Long> values = new ArrayList<>();
    29. @Override
    30. public synchronized void invoke(Long value) throws Exception {
    31. values.add(value);
    32. }
    33. }
    34. }

    A few remarks on integration testing with MiniClusterWithClientResource:

    • In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.

    • The static variable in CollectSink is used here because Flink serializes all operators before distributing them across a cluster.Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue.Alternatively, you could write the data to files in a temporary directory with your test sink.

    • You can implement a custom parallel source function for emitting watermarks if your job uses event timer timers.

    • It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.

    • Prefer over @Rule so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests.