Blocking Shuffle

    As a whole, Flink provides two different types of blocking shuffles; and Sort shuffle.

    They will be detailed in the following sections.

    The default blocking shuffle implementation, Hash Shuffle, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager’s, which read the files and transmit data via the network.

    Hash Shuffle provides different mechanisms for writing and reading files:

    • mmap: Writes and reads files with system call.
    • Auto: Writes files with the normal File IO, for file reading, it falls back to normal file option on 32 bit machine and use mmap on 64 bit machine. This is to avoid file size limitation of java mmap implementation on 32 bit machine.

    To further improve the performance, for most jobs we also recommend enabling compression unless the data is hard to compress.

    1. If the job scale is large, it might create too many files, and it requires a large write buffer to write these files at the same time.
    2. On HDD, when multiple downstream tasks fetch their data simultaneously, it might incur the issue of random IO.

    is another blocking shuffle implementation introduced in version 1.13. Different from Hash Shuffle, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read data sequentially, sort shuffle can achieve better performance than hash shuffle, especially on HDD. Additionally, sort shuffle uses extra managed memory as data reading buffer and does not rely on sendfile or mmap mechanism, thus it also works well with . Please refer to FLINK-19582 and for more details about sort shuffle.

    There are several config options that might need adjustment when using sort blocking shuffle:

    • taskmanager.network.sort-shuffle.min-parallelism: Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, hash shuffle will be used, otherwise sort shuffle will be used.
    • : Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
    • taskmanager.memory.framework.off-heap.batch-shuffle.size: Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.

    Currently sort shuffle only sort records by partition index instead of the records themselves, that is to say, the sort is only used as a data clustering algorithm.

    As a summary,

    • For small scale jobs running on SSD, both implementation should work.
    • In both case, you may consider to improve the performance unless the data is hard to compress.