Set up TaskManager Memory
The further described memory configuration is applicable starting with the release version 1.10. If you upgrade Flink from earlier versions, check the migration guide because many changes were introduced with the 1.10 release.
The total process memory of Flink JVM processes consists of memory consumed by Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap, managed memory (managed by Flink) and other direct (or native) memory.
If you run Flink locally (e.g. from your IDE) without creating a cluster, then only a subset of the memory configuration options are relevant, see also for more details.
Otherwise, the simplest way to setup memory for TaskManagers is to configure the total memory. A more fine-grained approach is described in more detail .
The rest of the memory components will be adjusted automatically, based on default values or additionally configured options. See next chapters for more details about the other memory components.
Configure Heap and Managed Memory
As mentioned before in , another way to setup memory in Flink is to specify explicitly both task heap and . It gives more control over the available JVM Heap to Flink’s tasks and its managed memory.
The rest of the memory components will be adjusted automatically, based on default values or additionally configured options. are more details about the other memory components.
Managed Memory
Managed memory is managed by Flink and is allocated as native memory (off-heap). The following workloads use managed memory:
- Streaming jobs can use it for .
- Both streaming and batch jobs can use it for sorting, hash tables, caching of intermediate results.
The size of managed memory can be
- either configured explicitly via taskmanager.memory.managed.size
- or computed as a fraction of total Flink memory via .
Size will override fraction, if both are set. If neither size nor fraction is explicitly configured, the default fraction will be used.
See also and batch jobs.
Consumer Weights
If your job contains multiple types of managed memory consumers, you can also control how managed memory should be shared across these types. The configuration option taskmanager.memory.managed.consumer-weights allows you to set a weight for each type, to which Flink will reserve managed memory proportionally. Valid consumer types are:
- : for built-in algorithms.
STATE_BACKEND
: for RocksDB state backend in streaming- : for Python processes.
E.g. if a streaming job uses both RocksDB state backend and Python UDFs, and the consumer weights are configured as STATE_BACKEND:70,PYTHON:30
, Flink will reserve of the total managed memory for RocksDB state backend and 30%
for Python processes.
For each type, Flink reserves managed memory only if the job contains managed memory consumers of that type. E.g, if a streaming job uses the heap state backend and Python UDFs, and the consumer weights are configured as , Flink will use all of its managed memory for Python processes, because the heap state backend does not use managed memory.
The off-heap memory which is allocated by user code should be accounted for in task off-heap memory ().
You can also adjust the framework off-heap memory. You should only change this value if you are sure that the Flink framework needs more memory.
Note Although, native non-direct memory usage can be accounted for as a part of the framework off-heap memory or task off-heap memory, it will result in a higher JVM’s direct memory limit in this case.
Note The network memory is also part of JVM direct memory, but it is managed by Flink and guaranteed to never exceed its configured size. Therefore, resizing the network memory will not help in this situation.
See also .
Detailed Memory Model
The following table lists all memory components, depicted above, and references Flink configuration options which affect the size of the respective components:
As you can see, the size of some memory components can be simply set by the respective option. Other components can be tuned using multiple options.
You should not change the framework heap memory and framework off-heap memory without a good reason. Adjust them only if you are sure that Flink needs more memory for some internal data structures or operations. It can be related to a particular deployment environment or job structure, like high parallelism. In addition, Flink dependencies, such as Hadoop may consume more direct or native memory in certain setups.
Note Flink neither isolates heap nor off-heap versions of framework and task memory at the moment. The separation of framework and task memory can be used in future releases for further optimizations.
Local Execution
If you start Flink locally on your machine as a single java program without creating a cluster (e.g. from your IDE) then all components are ignored except for the following:
Memory component | Relevant options | Default value for the local execution |
---|---|---|
Task heap | taskmanager.memory.task.heap.size | infinite |
Task off-heap | infinite | |
Managed memory | taskmanager.memory.managed.size | 128Mb |
Network memory | taskmanager.memory.network.max | 64Mb |
All of the components listed above can be but do not have to be explicitly configured for local execution. If they are not configured they are set to their default values. and task off-heap memory are considered to be infinite (Long.MAX_VALUE bytes) and managed memory has a default value of 128Mb only for the local execution mode.