Getting started with Apache Beam

    How does it work?

    Apache Beam allows you to create programs in a variety of programming languages like Java, Python and Go using a standard . These programs build data pipelines which can then be executed using Beam on the various execution engines.

    Hop is using the Beam API to create Beam pipelines based off of your visually designed Hop pipelines. The terminology of Hop and Beam are aligned because they mean the same thing. Hop provides 4 standard ways to execute a pipeline that you designed on Spark, Flink, Dataflow or on the Direct runner.

    Here is the documentation for the relevant plugins:

    What software versions are supported

    An Apache Hop pipeline is just metadata. The various beam pipeline engine plugins look at this metadata one transform at a time. It decides what to do with it based on a Hop transform handler which is provided. The transforms are in general split into a different types described below.

    There are a number of Beam specific transforms available which only work on the provided Beam pipeline execution engines. For example: Beam Input which reads text file data from one or more files or which writes data to BigQuery.

    You can find these transforms in the category and their names all start with Beam to make is easy to recognize them.

    Here is an example of a simple pipeline which read files in a folder (on gs://), filters out data from California, removes and renames a few fields and writes the data back to another set of files:

    There are a few transforms which are translated into Beam variations:

    • Memory Group By: This transform allows you to aggregate data across large data volumes. When using the Beam engines it uses org.apache.beam.sdk.transforms.GroupByKey.

    • : You can join 2 data sources with this transform. The main difference is that in the Beam engines the input data doesn’t need to be sorted. The Beam class used to perform this is: org.apache.beam.sdk.extensions.joinlibrary.Join.

    • Generate Rows: This transform is used to generate (empty/static) rows of data. It can be either a fixed number, or it can generate rows indefinitely. When using the Beam engines it uses org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource or org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource.

    A few transforms are simply not supported because we haven’t found a good way to do this on Beam yet:

    • Group By : Use the Memory Group By instead

    The Denormaliser transform works technically correct on Apache Beam in release 1.1.0 and later. Even so you need to consider that the aggregation of the key-value pairs in that transform (in the general case) only happens on a sub-set of the rows. That is because in a Beam pipeline the order in which rows arrive is lost because they are continuously re-shuffled to maximize parallelism. This is different from the behavior of the “Local” Hop pipeline engine.

    To get around this issue you can apply a transform across the whole dataset to grab the first non-null value of every field you de-normalised. This will produce the correct result.

    All other transforms are simply supported. They are wrapped in a bit of code to make the exact same code that runs on the Hop local pipeline engine work in a Beam pipeline. There are a few things to mention though.

    Special caseSolution

    Info transforms

    Some transforms like Stream Lookup read data from other transforms. This is handled by side-inputs for the data in the Beam API and is as such fully supported.

    Sometimes you want to target specific transforms like in or Filter Rows. This is fully supported as well and handled by the Beam API which handles .

    Non-Beam input transforms

    When you’re reading data using a non-beam transform (see Beam specific transforms above) we need to make sure that this transform is executed in exactly one thread. Otherwise, you might read your XML or JSON document many times by the inherently parallel intentions of the various engines. This is handled by doing a Group By over a single value. You’ll see the following in for example your Dataflow pipeline: Create.ValuesWithKeysGroupByKeyValuesFlatten.IterablesParDo …​ and all this is just done to make sure we only ever execute our transform once.

    Non-Beam input transforms on Dataflow

    Non-Beam Output transforms

    The insistence of a Beam pipeline to run work in parallel can also trip you up on the output side. In rare cases maybe you don’t want a server to be bombarded by dozens of inbound connections. To limit the amount of output copies you can include in the number of copies value of a transform (click on the transform and select Number of copies in the Hop GUI). This will do a GroupBy exercise over all records to iterate over those and force a single thread.

    Row batching with non-Beam transforms

    A lot of target databases like to receive rows in batches of records. So if you have a transform like for example Table Output or Neo4j Output you might see that performance is not that great. This is because by default the Beam programming model is designed to stream rows of data through a pipeline in bundles and the Hop API only knows about a single record at once. For these transforms you can include BATCH in the number of copies string of a transform click on the transform and select Number of copies in the Hop GUI). For these flagged transforms you can then specify 2 parameters in your Beam pipeline run configurations. When you set these you can determine how long rows are kept behind before being forced to the transforms in question

    Streaming Hop transform flush interval: how long in time are rows kept and batched up? If you care about latency make this lower (500 or lower). If you have a long-running batching pipeline, make it higher (10000 or higher perhaps).

    Hop streaming transforms buffer size: how many rows are being batched? Consider making it the same as the batching size you use in your transform metadata (e.g. Table Output, Neo4j Cypher, …​)

    Please note that these are maximum values. If the end of a bundle is reached in a pipeline rows are always forced to the transform code and as such pushed to the target system. To get an idea of how many times a batching buffer is flushed to the underlying transform code (and as such to for example a remote database) we added a metric. You will notice this in your metrics view in the Hop GUI when executing.

    Fat jars?

    A fat jar is often used to package up all the code you need for a particular project. The Spark, Flink and Dataflow execution engines like it since it massively simplifies the Java classpath when executing pipelines. Apache Hop allows you to create a fat jar in the Hop GUI with the Tools/Generate a Hop fat jar…​ menu or using the following command:

    The path to this fat jar can then be referenced in the various Beam runtime configurations. Note that the current version of Hop and all its plugins are used to build the fat jar. If you install or remove plugins or update Hop itself make sure to remember to generate a new fat jar or to update it.

    The Beam Input and transforms expect you to define the layout of the file(s) being read or written.

    Beam File Definition example

    Current limitations

    There are some specific advantages to using engines like Spark, Flink and Dataflow. However, with it come some limitations as well…​

    • Previewing data is not available (yet). Because of the distributed nature of execution we don’t have a great way to acquire preview data.

    • Debugging or pausing a pipeline is not supported