Query Configuration

    Flink’s Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a object. The QueryConfig can be obtained from the TableEnvironment and is passed back when a Table is translated, i.e., when it is transformed into a DataStream or .

    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val tableEnv = StreamTableEnvironment.create(env)
    3. // obtain query configuration from TableEnvironment
    4. val qConfig: StreamQueryConfig = tableEnv.queryConfig
    5. // set query parameters
    6. qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
    7. // define query
    8. // create TableSink
    9. val sink: TableSink[Row] = ???
    10. // register TableSink
    11. tableEnv.registerTableSink(
    12. "outputTable", // table name
    13. Array[String](...), // field names
    14. Array[TypeInformation[_]](...), // field types
    15. sink) // table sink
    16. // emit result Table via a TableSink
    17. result.insertInto("outputTable", qConfig)
    18. // convert result Table into a DataStream[Row]
    19. val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)

    In the following we describe the parameters of the QueryConfig and how they affect the accuracy and resource consumption of a query.

    For example the following query computes the number of clicks per session.

    1. SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

    The sessionId attribute is used as a grouping key and the continuous query maintains a count for each sessionId it observes. The attribute is evolving over time and sessionId values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of sessionId and expects that every sessionId value can occur at any point of time. It maintains a count for each observed sessionId value. Consequently, the total state size of the query is continuously growing as more and more sessionId values are observed.

    By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a sessionId would start again at 0.

    There are two parameters to configure the idle state retention time:

    • The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.
    • The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.
    1. val qConfig: StreamQueryConfig = ???
    2. qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

    Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.