Amazon Kinesis Data Firehose Sink

    Follow the instructions from the Amazon Kinesis Data Firehose Developer Guide to setup a Kinesis Data Firehose delivery stream.

    To use the connector, add the following Maven dependency to your project:

    Copied to clipboard!

    Java

    Scala

    Flink’s Firehose sink is created by using the static builder KinesisFirehoseSink.<InputType>builder().

    1. setFirehoseClientProperties(Properties sinkProperties)
      • Required.
      • Supplies credentials, region and other parameters to the Firehose client.
    2. setSerializationSchema(SerializationSchema serializationSchema)
      • Required.
      • Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
    3. setDeliveryStreamName(String deliveryStreamName)
      • Required.
    4. setFailOnError(boolean failOnError)
      • Optional. Default: false.
      • Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
    5. setMaxBatchSize(int maxBatchSize)
      • Optional. Default: .
      • Maximum size of a batch to write to Firehose.
    6. setMaxInFlightRequests(int maxInFlightRequests)
      • Optional. Default: 50.
      • The maximum number of in flight requests allowed before the sink applies backpressure.
    7. setMaxBufferedRequests(int maxBufferedRequests)
      • The maximum number of records that may be buffered in the sink before backpressure is applied.
    8. setMaxBatchSizeInBytes(int maxBatchSizeInBytes)
      • Optional. Default: .
      • The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
    9. setMaxTimeInBufferMS(int maxTimeInBufferMS)
      • Optional. Default: 5000.
      • The maximum time a record may stay in the sink before being flushed.
    10. setMaxRecordSizeInBytes(int maxRecordSizeInBytes)
      • Optional. Default: 1000 * 1024.
      • The maximum record size that the sink will accept, records larger than this will be automatically rejected.

    Using Custom Firehose Endpoints

    To override the AWS endpoint, set the and AWSConfigConstants.AWS_REGION properties. The region will be used to sign the endpoint URL.

    Java

    Scala