Apache Cassandra Connector

    Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

    There are multiple ways to bring up a Cassandra instance on local machine:

    Flink’s Cassandra sink are created by using the static CassandraSink.addSink(DataStream input) method.This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally the sink instance.

    The following configuration methods can be used:

    • setClusterBuilder()
      • Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
    • setHost(String host[, int port])
      • Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
    • setMapperOptions(MapperOptions options)
      • Sets the mapper options that are used to configure the DataStax ObjectMapper.
    • setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)
      • Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute.
      • Only applies when enableWriteAheadLog() is not configured.
    • enableWriteAheadLog([CheckpointCommitter committer])
      • An optional setting
      • Allows exactly-once processing for non-deterministic algorithms.
    • setFailureHandler([CassandraFailureHandler failureHandler])
      • An optional setting
      • Sets the custom failure handler.
    • build()

    Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multipletimes without changing the result) and checkpointing is enabled. In case of a failure the failedcheckpoint will be replayed completely.

    Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a programthe replayed checkpoint may be completely different than the previous attempt, which may leave thedatabase in an inconsistent state since part of the first attempt may already be written.The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt.Note that that enabling this feature will have an adverse impact on latency.

    Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.

    With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.

    The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to . We show two implementations based on SocketWindowWordCount, for Pojo and Tuple data types respectively.

    In all these examples, we assumed the associated Keyspace and Table wordcount have been created.

    While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as , each Tuple element is converted to parameters of the statement.

    For details about PreparedStatement and , please visit

    The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and