Elastic Scaling

    This page describes options where Flink automatically adjusts the parallelism instead.

    Reactive Mode configures a job so that it always uses all resources available in the cluster. Adding a TaskManager will scale up your job, removing resources will scale it down. Flink will manage the parallelism of the job, always setting it to the highest possible values.

    Reactive Mode restarts a job on a rescaling event, restoring it from the latest completed checkpoint. This means that there is no overhead of creating a savepoint (which is needed for manually rescaling a job). Also, the amount of data that is reprocessed after rescaling depends on the checkpointing interval, and the restore time depends on the state size.

    The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the replica factor of a Kubernetes deployment, or an on AWS. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.

    If you just want to try out Reactive Mode, follow these instructions. They assume that you are deploying Flink on a single machine.

    Let’s quickly examine the used submission command:

    • ./bin/standalone-job.sh start deploys Flink in Application Mode
    • -Dscheduler-mode=reactive enables Reactive Mode.
    • the last argument is passing the Job’s main class name.

    You have now started a Flink job in Reactive Mode. The shows that the job is running on one TaskManager. If you want to scale up the job, simply add another TaskManager to the cluster:

    To scale down, remove a TaskManager instance.

    Usage

    Configuration

    To enable Reactive Mode, you need to configure scheduler-mode to reactive.

    The only way of influencing the parallelism is by setting a max parallelism for an operator (which will be respected by the scheduler). The maxParallelism is bounded by 2^15 (32768). If you do not set a max parallelism for individual operators or the entire job, the default parallelism rules will be applied, potentially applying lower bounds than the max possible value. As with the default scheduling mode, please take the into consideration.

    Note that such a high max parallelism might affect performance of the job, since more internal structures are needed to maintain some internal structures of Flink.

    When enabling Reactive Mode, the configuration key will default to -1. This means that the JobManager will run forever waiting for sufficient resources. If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure jobmanager.adaptive-scheduler.resource-wait-timeout.

    With Reactive Mode enabled, the jobmanager.adaptive-scheduler.resource-stabilization-timeout configuration key will default to : Flink will start running the job, as soon as there are sufficient resources available. In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job. Additionally, one can configure : This configuration option specifics the minimum amount of additional, aggregate parallelism increase before triggering a scale-up. For example if you have a job with a source (parallelism=2) and a sink (parallelism=2), the aggregate parallelism is 4. By default, the configuration key is set to 1, so any increase in the aggregate parallelism will trigger a restart.

    Recommendations

    • Configure periodic checkpointing for stateful jobs: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will lose its state. Checkpointing also configures a restart strategy. Reactive Mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it.

    • Downscaling in Reactive Mode might take longer if the TaskManager is not properly shutdown (i.e., if a SIGKILL signal is used instead of a SIGTERM signal). In this case, Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism.

      The default timeout is configured to 50 seconds. Adjust the configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the heartbeat.interval always needs to be lower than the timeout.

    Since Reactive Mode is a new, experimental feature, not all features supported by the default scheduler are also available with Reactive Mode (and its adaptive scheduler). The Flink community is working on addressing these limitations.

    • Deployment is only supported as a standalone application deployment. Active resource providers (such as native Kubernetes, YARN) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications.

      The only supported deployment options are (described on this page), and Standalone Kubernetes Application Cluster.

    Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users because slot allocation on a session cluster with multiple jobs is not defined.

    The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism. In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible. You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations:

    • If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session.

    One benefit of the Adaptive Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases.

    Usage

    The following configuration parameters need to be set:

    • jobmanager.scheduler: adaptive: Change from the default scheduler to adaptive scheduler
    • cluster.declarative-resource-management.enabled Declarative resource management must be enabled (enabled by default).

    The behavior of Adaptive Scheduler is configured by all configuration options containing adaptive-scheduler in their name.

    • Streaming jobs only: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
    • No support for : Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage.
    • No support for partial failover: Partial failover means that the scheduler is able to restart parts (“regions” in Flink’s internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink’s default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job.
    • Limited integration with Flink’s Web UI: Adaptive Scheduler allows that a job’s parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
    • Unused slots: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.
    • Scaling events trigger job and task restarts, which will increase the number of Task attempts.

    The Adaptive Batch Scheduler can automatically decide parallelisms of operators for batch jobs. If an operator is not set with a parallelism, the scheduler will decide parallelism for it according to the size of its consumed datasets. This can bring many benefits:

    • Batch job users can be relieved from parallelism tuning
    • Automatically tuned parallelisms can better fit consumed datasets which have a varying volume size every day
    • Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned

    Usage

    To automatically decide parallelisms for operators with Adaptive Batch Scheduler, you need to:

    • Configure to use Adaptive Batch Scheduler.

    Configure to use Adaptive Batch Scheduler

    To use Adaptive Batch Scheduler, you need to:

    • Set jobmanager.scheduler: AdaptiveBatch.
    • Leave the execution.batch-shuffle-mode unset or explicitly set it to ALL-EXCHANGES-BLOCKING (default value) due to .

    In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:

    • jobmanager.adaptive-batch-scheduler.min-parallelism: The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
    • : The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
    • jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task: The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
    • : The default parallelism of data source.

    Set the parallelism of operators to -1

    Adaptive Batch Scheduler will only decide parallelism for operators whose parallelism is not specified by users (parallelism is -1). So if you want the parallelism of operators to be decided automatically, you should configure as follows:

    • Set
    • Set table.exec.resource.default-parallelism: -1 in SQL jobs.
    • Don’t call setParallelism() for operators in DataStream/DataSet jobs.
    • Don’t call setParallelism() on StreamExecutionEnvironment/ExecutionEnvironment in DataStream/DataSet jobs.
    1. It’s recommended to use and set taskmanager.network.memory.buffers-per-channel to 0. This can decouple the required network memory from parallelism, so that for large scale jobs, the “Insufficient number of network buffers” errors are less likely to happen.
    2. It’s recommended to set to the parallelism you expect to need in the worst case. Values larger than that are not recommended, because excessive value may affect the performance. This option can affect the number of subpartitions produced by upstream tasks, large number of subpartitions may degrade the performance of hash shuffle and the performance of network transmission due to small packets.

    Limitations

    • Batch jobs only: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
    • ALL-EXCHANGES-BLOCKING jobs only: At the moment, Adaptive Batch Scheduler only supports jobs whose is ALL-EXCHANGES-BLOCKING.
    • The decided parallelism will be a power of 2: In order to ensure downstream tasks to consume the same count of subpartitions, the configuration option jobmanager.adaptive-batch-scheduler.max-parallelism should be set to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M <= N).
    • FileInputFormat sources are not supported: FileInputFormat sources are not supported, including StreamExecutionEnvironment#readFile(...) and StreamExecutionEnvironment#createInput(FileInputFormat, ...). Users should use the new sources( or FileSystem SQL Connector) to read files when using the Adaptive Batch Scheduler.
    • Inconsistent broadcast results metrics on WebUI: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See for details.