Elasticsearch Connector
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found . Make sure to set and remember a cluster name. This must be set when creating an ElasticsearchSink
for requesting document actions against your cluster.
The ElasticsearchSink
uses a TransportClient
(before 6.x) or RestHighLevelClient
(starting with 6.x) to communicate with an Elasticsearch cluster.
The example below shows how to configure and create a sink:
java, 5.x
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
config.put("bulk.flush.max.actions", "1");
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
java, Elasticsearch 6.x and above
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests;
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
DataStream
List
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setDefaultHeaders(…); restClientBuilder.setMaxRetryTimeoutMillis(…); restClientBuilder.setPathPrefix(…); restClientBuilder.setHttpClientConfigCallback(…); } );
// finally, build and add the sink to the job’s pipeline input.addSink(esSinkBuilder.build());
scala, 5.x
import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import java.net.InetAddress import java.net.InetSocketAddress import java.util.ArrayList import java.util.HashMap import java.util.List import java.util.Map
val input: DataStream[String] = …
val config = new java.util.HashMap[String, String] config.put(“cluster.name”, “my-cluster-name”) // This instructs the sink to emit after every element, otherwise they would be buffered config.put(“bulk.flush.max.actions”, “1”)
val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName(“127.0.0.1”), 9300)) transportAddresses.add(new InetSocketAddress(InetAddress.getByName(“10.2.3.1”), 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { val json = new java.util.HashMap[String, String] json.put(“data”, element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json)
} }))
scala, Elasticsearch 6.x and above
import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests
import java.util.ArrayList import java.util.List
val input: DataStream[String] = …
val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost(“127.0.0.1”, 9200, “http”)) httpHosts.add(new HttpHost(“10.2.3.1”, 9200, “http”))
val esSinkBuilder = new ElasticsearchSink.BuilderString { val json = new java.util.HashMap[String, String] json.put(“data”, element)
val rqst: IndexRequest = Requests.indexRequest
.index("my-index")
.source(json)
indexer.add(rqst)
}
} )
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1)
// provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory(new RestClientFactory { override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { restClientBuilder.setDefaultHeaders(…) restClientBuilder.setMaxRetryTimeoutMillis(…) restClientBuilder.setPathPrefix(…) restClientBuilder.setHttpClientConfigCallback(…) } })
// finally, build and add the sink to the job’s pipeline input.addSink(esSinkBuilder.build)
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs
**NOTE**: Users can disable flushing if they wish to do so, by calling **disableFlushOnCheckpoint()** on the created **ElasticsearchSink**. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
### Handling Failing Elasticsearch Requests
Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an `ActionRequestFailureHandler` and providing it to the constructor.
Below is an example:
Java
DataStream
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
Scala
val input: DataStream[String] = …
input.addSink(new ElasticsearchSink( config, transportAddresses, new ElasticsearchSinkFunction[String] {…}, new ActionRequestFailureHandler { @throws(classOf[Throwable]) override def onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
})) ```
The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a ActionRequestFailureHandler
is not provided to the constructor, the sink will fail for any kind of error.
Note that onFailure
is called for failures that still occur only after the BulkProcessor
internally finishes all backoff retry attempts. By default, the BulkProcessor
retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal BulkProcessor
and how to configure it, please see the following section.
By default, if a failure handler is not provided, the sink uses a NoOpFailureHandler
that simply fails for all kinds of exceptions. The connector also provides a RetryRejectedExecutionFailureHandler
implementation that always re-add requests that have failed due to queue capacity saturation.
IMPORTANT: Re-adding requests back to the internal BulkProcessor on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using RetryRejectedExecutionFailureHandler, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
The internal BulkProcessor
can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided Map<String, String>
:
- bulk.flush.max.actions: Maximum amount of actions to buffer before flushing.
- bulk.flush.max.size.mb: Maximum size of data (in megabytes) to buffer before flushing.
- bulk.flush.interval.ms: Interval at which to flush regardless of the amount or size of buffered actions.
For versions 2.x and above, configuring how temporary request errors are retried is also supported:
- bulk.flush.backoff.enable: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary
EsRejectedExecutionException
. - bulk.flush.backoff.type: The type of backoff delay, either
CONSTANT
orEXPONENTIAL
- bulk.flush.backoff.retries: The amount of backoff retries to attempt.
More information about Elasticsearch can be found .
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see here for further information).
Alternatively, you can put the connector’s jar file into Flink’s folder to make it available system-wide, i.e. for all job being run.