Performance Tuning

    在这一页,我们将介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。

    默认情况下,无界聚合算子是逐条处理输入的记录,即:(1)从状态中读取累加器,(2)累加/撤回记录至累加器,(3)将累加器写回状态,(4)下一条记录将再次从(1)开始处理。这种处理模式可能会增加 StateBackend 开销(尤其是对于 RocksDB StateBackend )。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。

    MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。

    下图说明了 mini-batch 聚合如何减少状态操作。

    默认情况下,对于无界聚合算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 、table.exec.mini-batch.allow-latencytable.exec.mini-batch.size。更多详细信息请参见配置页面。

    MiniBatch optimization is always enabled for , regardless of the above configuration. Window TVF aggregation buffer records in managed memory instead of JVM Heap, so there is no risk of overloading GC or OOM issues.

    下面的例子显示如何启用这些选项。

    Java

    Scala

    1. // instantiate table environment
    2. val tEnv: TableEnvironment = ...
    3. // access flink configuration
    4. val configuration = tEnv.getConfig()
    5. // set low-level key-value options
    6. configuration.set("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
    7. configuration.set("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
    8. configuration.set("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
    1. # instantiate table environment
    2. t_env = ...
    3. # access flink configuration
    4. configuration = t_env.get_config()
    5. # set low-level key-value options
    6. configuration.set("table.exec.mini-batch.enabled", "true") # enable mini-batch optimization
    7. configuration.set("table.exec.mini-batch.allow-latency", "5 s") # use 5 seconds to buffer input records
    8. configuration.set("table.exec.mini-batch.size", "5000") # the maximum number of records can be buffered by each aggregate operator task

    Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:

    1. FROM T
    2. GROUP BY color

    数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同 key 的输入数据累加到单个累加器中。全局聚合将仅接收 reduce 后的累加器,而不是大量的原始输入数据。这可以大大减少网络 shuffle 和状态访问的成本。每次本地聚合累积的输入数据量基于 mini-batch 间隔。这意味着 local-global 聚合依赖于启用了 mini-batch 优化。

    下图显示了 local-global 聚合如何提高性能。

    流式聚合 - 图2

    下面的例子显示如何启用 local-global 聚合。

    Java

    Scala

    1. // instantiate table environment
    2. val tEnv: TableEnvironment = ...
    3. // access flink configuration
    4. // set low-level key-value options
    5. configuration.set("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
    6. configuration.set("table.exec.mini-batch.allow-latency", "5 s")
    7. configuration.set("table.exec.mini-batch.size", "5000")
    8. configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation

    Python

    1. # instantiate table environment
    2. t_env = ...
    3. # access flink configuration
    4. configuration = t_env.get_config()
    5. # set low-level key-value options
    6. configuration.set("table.exec.mini-batch.enabled", "true") # local-global aggregation depends on mini-batch is enabled
    7. configuration.set("table.exec.mini-batch.allow-latency", "5 s")
    8. configuration.set("table.exec.mini-batch.size", "5000")
    9. configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") # enable two-phase, i.e. local-global aggregation

    Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。

    例如,如果我们要分析今天有多少唯一用户登录。我们可能有以下查询:

    1. FROM T
    2. GROUP BY day

    如果 distinct key (即 user_id)的值分布稀疏,则 COUNT DISTINCT 不适合减少数据。即使启用了 local-global 优化也没有太大帮助。因为累加器仍然包含几乎所有原始记录,并且全局聚合将成为瓶颈(大多数繁重的累加器由一个任务处理,即同一天)。

    拆分 distinct 聚合后,以上查询将被自动改写为以下查询:

    下图显示了拆分 distinct 聚合如何提高性能(假设颜色表示 days,字母表示 user_id)。

    注意:上面是可以从这个优化中受益的最简单的示例。除此之外,Flink 还支持拆分更复杂的聚合查询,例如,多个具有不同 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,可以与其他非 distinct 聚合(例如 SUMMAXMIN、 )一起使用。

    注意 当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。

    下面的例子显示了如何启用拆分 distinct 聚合优化。

    Java

    1. // instantiate table environment
    2. TableEnvironment tEnv = ...;
    3. tEnv.getConfig()
    4. .set("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split

    Scala

    1. // instantiate table environment
    2. val tEnv: TableEnvironment = ...
    3. tEnv.getConfig
    4. .set("table.optimizer.distinct-agg.split.enabled", "true") // enable distinct agg split

    Python

    1. # instantiate table environment
    2. t_env = ...
    3. t_env.get_config().set("table.optimizer.distinct-agg.split.enabled", "true") # enable distinct agg split

    在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 CASE WHEN,例如:

    但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。FILTER 是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为 FILTER 修饰符,如下所示:

    1. SELECT
    2. day,
    3. COUNT(DISTINCT user_id) AS total_uv,
    4. COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
    5. COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
    6. FROM T