Amazon S3

You can use S3 objects like regular files by specifying paths in the following format:

The endpoint can either be a single file or a directory, for example:

  1. env.readTextFile("s3://<bucket>/<endpoint>");
  2. // Write to S3 bucket
  3. stream.writeAsText("s3://<bucket>/<endpoint>");
  4. env.getCheckpointConfig().setCheckpointStorage("s3://<your-bucket>/<endpoint>");

Note that these examples are not exhaustive and you can use S3 in other places as well, including your high availability setup or the ; everywhere that Flink expects a FileSystem URI.

For most use cases, you may use one of our flink-s3-fs-hadoop and flink-s3-fs-presto S3 filesystem plugins which are self-contained and easy to set up. For some cases, however, e.g., for using S3 as YARN’s resource storage dir, it may be necessary to set up a specific Hadoop S3 filesystem implementation.

Flink provides two file systems to talk to Amazon S3, flink-s3-fs-presto and flink-s3-fs-hadoop. Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them.

  • flink-s3-fs-presto, registered under the scheme s3:// and s3p://, is based on code from the Presto project. You can configure it using , by adding the configurations to your flink-conf.yaml. The Presto S3 implementation is the recommended file system for checkpointing to S3.

  • For example, Hadoop has a fs.s3a.connection.maximum configuration key. If you want to change it, you need to put s3.connection.maximum: xyz to the flink-conf.yaml. Flink will internally translate this back to fs.s3a.connection.maximum. There is no need to pass configuration parameters using Hadoop’s XML configuration files.

    It is the only S3 file system with support for the FileSystem.

Both flink-s3-fs-hadoop and flink-s3-fs-presto register default FileSystem wrappers for URIs with the s3:// scheme, flink-s3-fs-hadoop also registers for s3a:// and flink-s3-fs-presto also registers for s3p://, so you can use this to use both at the same time. For example, the job uses the which only supports Hadoop, but uses Presto for checkpointing. In this case, you should explicitly use s3a:// as a scheme for the sink (Hadoop) and s3p:// for checkpointing (Presto).

To use flink-s3-fs-hadoop or flink-s3-fs-presto, copy the respective JAR file from the opt directory to the plugins directory of your Flink distribution before starting Flink, e.g.

Configure Access Credentials

After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.

The recommended way of setting up credentials on AWS is via Identity and Access Management (IAM). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are .

If you set this up correctly, you can manage access to S3 within AWS and don’t need to distribute any access keys to Flink.

Access Keys (Discouraged)

You need to configure both and s3.secret-key in Flink’s flink-conf.yaml:

  1. s3.access-key: your-access-key
  2. s3.secret-key: your-secret-key

The S3 Filesystems also support using S3 compliant object stores such as and MinIO. To do so, configure your endpoint in flink-conf.yaml.

Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose. In such cases, you will have to provide the property to enable path style access in flink-conf.yaml.

  1. s3.path.style.access: true

The bundled S3 file systems (flink-s3-fs-presto and flink-s3-fs-hadoop) support entropy injection. Entropy injection is a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key.

If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path s3://my-bucket/_entropy_/checkpoints/dashboard-job/ would be replaced by something like s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/. This only happens when the file creation passes the option to inject entropy! Otherwise, the file path removes the entropy key substring entirely. See for details.

The Flink runtime currently passes the option to inject entropy only to checkpoint data files. All other files, including checkpoint metadata and external URI, do not inject entropy to keep checkpoint URIs predictable.

To enable entropy injection, configure the entropy key and the entropy length parameters.