Amazon Kinesis Data Firehose SQL Connector

    The Kinesis Data Firehose connector allows for writing data into Amazon Kinesis Data Firehose (KDF).

    In order to use the the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

    How to create a Kinesis Data Firehose table

    OptionRequiredDefaultTypeDescription
    Common Options
    connector
    required(none)StringSpecify what connector to use. For Kinesis Data Firehose use .
    delivery-stream
    required(none)StringName of the Kinesis Data Firehose delivery stream backing this table.
    format
    required(none)StringThe format used to deserialize and serialize Kinesis Data Firehose records. See Data Type Mapping for details.
    aws.region
    required(none)StringThe AWS region where the delivery stream is defined. This option is required for KinesisFirehoseSink creation.
    aws.endpoint
    optional(none)StringThe AWS endpoint for Amazon Kinesis Data Firehose.
    aws.trust.all.certificates
    optionalfalseBooleanIf true accepts all SSL certificates.
    Authentication Options
    aws.credentials.provider
    optionalAUTOStringA credentials provider to use when authenticating against the Kinesis endpoint. See for details.
    aws.credentials.basic.accesskeyid
    optional(none)StringThe AWS access key ID to use when setting credentials provider type to BASIC.
    aws.credentials.basic.secretkey
    optional(none)StringThe AWS secret key to use when setting credentials provider type to BASIC.
    aws.credentials.profile.path
    optional(none)StringOptional configuration for profile path if credential provider type is set to be PROFILE.
    aws.credentials.profile.name
    optional(none)StringOptional configuration for profile name if credential provider type is set to be PROFILE.
    aws.credentials.role.arn
    optional(none)StringThe role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
    aws.credentials.role.sessionName
    optional(none)StringThe role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.
    aws.credentials.role.externalId
    optional(none)StringThe external ID to use when credential provider type is set to ASSUME_ROLE.
    aws.credentials.role.provider
    optional(none)StringThe credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE
    aws.credentials.webIdentityToken.file
    optional(none)StringThe absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.
    Sink Options
    sink.http-client.max-concurrency
    optional10000IntegerMaximum number of allowed concurrent requests by FirehoseAsyncClient to be delivered to delivery stream.
    sink.http-client.read-timeout
    optional360000IntegerMaximum amount of time in ms for requests to be sent by FirehoseAsyncClient to delivery stream before failure.
    sink.http-client.protocol.version
    optionalHTTP2StringHttp version used by FirehoseAsyncClient.
    sink.batch.max-size
    optional500IntegerMaximum batch size of elements to be passed to FirehoseAsyncClient to be written downstream to delivery stream.
    sink.requests.max-inflight
    optional16IntegerRequest threshold for uncompleted requests by FirehoseAsyncClientbefore blocking new write requests.
    sink.requests.max-buffered
    optional10000Stringrequest buffer threshold by FirehoseAsyncClient before blocking new write requests.
    sink.flush-buffer.size
    optional5242880LongThreshold value in bytes for writer buffer in FirehoseAsyncClient before flushing.
    sink.flush-buffer.timeout
    optional5000LongThreshold time in ms for an element to be in a buffer of before flushing.
    sink.fail-on-error
    optionalfalseBooleanFlag used for retrying failed requests. If set any request failure will not be retried and will fail the job.

    Authorization

    Make sure to to allow reading writing to the Kinesis Data Firehose delivery stream.

    Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis Data Firehose. By default, the AUTO Credentials Provider is used. If the access key ID and secret key are set in the deployment configuration, this results in using the BASIC provider.

    • AUTO - Use the default AWS Credentials Provider chain that searches for credentials in the following order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE, and EC2/ECS credentials provider.
    • - Use access key ID and secret key supplied as configuration.
    • SYS_PROP - Use Java system properties aws.accessKeyId and aws.secretKey.
    • PROFILE - Use an AWS credentials profile to create the AWS credentials.
    • ASSUME_ROLE - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
    • - Create AWS credentials by assuming a role using Web Identity Token.

    Data Type Mapping

    Kinesis Data Firehose stores records as Base64-encoded binary data objects, so it doesn’t have a notion of internal record structure. Instead, Kinesis Data Firehose records are deserialized and serialized by formats, e.g. ‘avro’, ‘csv’, or ‘json’. To determine the data type of the messages in your Kinesis Data Firehose backed tables, pick a suitable Flink format with the format keyword. Please refer to the pages for more details.

    The current implementation for the Kinesis Data Firehose SQL connector only supports Kinesis Data Firehose backed sinks and doesn’t provide an implementation for source queries. Queries similar to:

    Back to top