Apache Cassandra Connector
To use this connector, add the following dependency to your project:
Copied to clipboard!
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
There are multiple ways to bring up a Cassandra instance on local machine:
- Follow the instructions from .
- Launch a container running Cassandra from Official Docker Repository
Flink’s Cassandra sink are created by using the static CassandraSink.addSink(DataStream input) method. This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally the sink instance.
The following configuration methods can be used:
- setQuery(String query)
- The query is internally treated as CQL statement.
- DO set the upsert query for processing Tuple data type.
- DO NOT set the query for processing POJO data types.
- setClusterBuilder(ClusterBuilder clusterBuilder)
- Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
- setHost(String host[, int port])
- Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
- setMapperOptions(MapperOptions options)
- Sets the mapper options that are used to configure the DataStax ObjectMapper.
- Only applies when processing POJO data types.
- setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)
- Only applies when enableWriteAheadLog() is not configured.
- enableWriteAheadLog([CheckpointCommitter committer])
- An optional setting
- Allows exactly-once processing for non-deterministic algorithms.
- setFailureHandler([CassandraFailureHandler failureHandler])
- An optional setting
- Sets the custom failure handler.
- setDefaultKeyspace(String keyspace)
- Sets the default keyspace to be used.
- enableIgnoreNullFields()
- Enables ignoring null values, treats null values as unset and avoids writing null fields and creating tombstones.
Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple times without changing the result) and checkpointing is enabled. In case of a failure the failed checkpoint will be replayed completely.
Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program the replayed checkpoint may be completely different than the previous attempt, which may leave the database in an inconsistent state since part of the first attempt may already be written. The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. Note that that enabling this feature will have an adverse impact on latency.
Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.
With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
More details on and fault tolerance guarantee docs
The Cassandra sink currently supports both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use of those streaming data types, please refer to . We show two implementations based on SocketWindowWordCount, for POJO and Tuple data types respectively.
CQL
While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as , each Tuple element is converted to parameters of the statement.
For details about PreparedStatement
and , please visit
Java
Scala
An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper
class.
Java