Modular load manager

    你可以通过以下两种方式启用模块化负载管理器:

    1. conf/broker.confloadManagerClassName 参数的值由 org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl 改为 org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl

    2. Using the pulsar-admin tool. 下面是一个示例:

      You can use the same method to change back to the original value. In either case, any mistake in specifying the load manager will cause Pulsar to default to SimpleLoadManagerImpl.

    确定使用何种负载管理器有几种不同的方法:

    1. 使用 pulsar-admin 查看 loadManagerClassName 元素:

      1. $ bin/pulsar-admin brokers get-all-dynamic-config
      2. {
      3. "loadManagerClassName" : "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"
      4. }

      如果没有 loadManagerClassName 元素,则使用默认负载管理器。

    2. 请参阅 ZooKeeper 负载报告。 在模块负载管理器中,/loadbalance/brokers/... 的负载报告会有许多差异。 例如, systemResourceUsage 的子元素 (bandwidthInbandwidthOut 等),现如今都属于顶级。 下面是模块负载管理器的示例负载报告:

      1. {
      2. "systemResourceUsage": {
      3. "bandwidthIn": {
      4. "limit": 10240000.0,
      5. },
      6. "bandwidthOut": {
      7. "limit": 10240000.0,
      8. "usage": 0.0
      9. },
      10. "cpu": {
      11. "limit": 2400.0,
      12. "usage": 0.0
      13. },
      14. "directMemory": {
      15. "limit": 16384.0,
      16. "usage": 1.0
      17. },
      18. "memory": {
      19. "limit": 8192.0,
      20. "usage": 3903.0
      21. }
      22. }
      23. }
    3. 根据所用的负载管理器不同,命令行 broker monitor 会有不同的输出格式。

      这是模块化负载管理器的示例:

      这是简单负载管理器的示例:

      1. ===================================================================================================================
      2. ||COUNT |TOPIC |BUNDLE |PRODUCER |CONSUMER |BUNDLE + |BUNDLE - ||
      3. ||RAW SYSTEM |CPU % |MEMORY % |DIRECT % |BW IN % |BW OUT % |MAX % ||
      4. ||ALLOC SYSTEM |CPU % |MEMORY % |DIRECT % |BW IN % |BW OUT % |MAX % ||
      5. || |0.20 |1.89 | |1.27 |3.21 |3.21 ||
      6. ||RAW MSG |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL ||
      7. || |0.00 |0.00 |0.00 |0.01 |0.01 |0.01 ||
      8. ||ALLOC MSG |MSG/S IN |MSG/S OUT |TOTAL |KB/S IN |KB/S OUT |TOTAL ||
      9. || |54.84 |134.48 |189.31 |126.54 |320.96 |447.50 ||
      10. ===================================================================================================================

    It is important to note that the module load manager is centralized, meaning that all requests to assign a bundle—-whether it’s been seen before or whether this is the first time—-only get handled by the lead broker (which can change over time). 要确定当前的 lead broker,请检查 ZooKeeper 中的 /loadbalance/leader 节点。

    模块化负载管理器监控的数据包含在 类中。 在这里,可用数据被细分为 bundle 数据和 broker 数据。

    Broker

    Broker 数据包含在 类中。 这些数据分为两部分,一部分是由每个 broker 单独写入 ZooKeeper 的本地数据,另一部分是由 leader broker 写入 ZooKeeper 的历史 broker 数据。

    本地 Broker 数据

    本地 broker 数据包含在 类中,同时提供了以下有关的资源信息:

    • CPU 利用率
    • JVM 堆内存使用情况
    • 直接内存使用情况
    • 带宽输入/输出使用情况
    • 所有包(bundle)最新消息的输入/输出总速率
    • 主题(topic)、包(bundle)、生产者(producer)和消费者(consumer)总数
    • 分配给此 broker 所有包(bundle) 的名称
    • 分配给此 broker 的包(bundle) 的最新更改情况

    本地 broker 数据根据服务配置 “loadBalancerReportUpdateMaxIntervalMinutes” 定期更新。 任何 broker 更新其本地 broker 数据后,leader broker 立即 通过 ZooKeeper watch 接收更新,并从 ZooKeeper 节点 /loadbalance/brokers/<broker host/port> 读取本地数据。

    历史 Broker 数据

    为了协调在稳态场景中做出正确决策以及在关键场景下做出反应性决策的需求,历史数据分为两部分:反应性决策的短期数据和稳态决策的长期数据。 这两个时间段都保留了以下信息:

    • 整个 broker 输入/输出的消息速率
    • 整个 broker 输入/输出的消息吞吐量

    与 bundle 数据不同,broker 数据不维护全局 broker 消息速率和吞吐量的样本,因为在删除或添加新 bundle 时,这些数据不可能保持稳定。 相反,这些数据是在 bundle 的短期和长期数据上汇总的。 请参阅 bundle 数据部分以了解如何收集和维护这些数据。

    每当任何 broker 将其本地数据写入 ZooKeeper 时,leader broker 就会更新内存中的每个 broker 的历史 broker 数据。 然后,leader broker 根据配置loadBalancerResourceQuotaUpdateIntervalMinutes,定期将历史数据写入ZooKeeper。

    Bundle 数据

    Bundle 数据包含在 BundleData 中。 与历史 broker 数据一样,bundle 数据被分为短期和长期时间段。 在每个时间段内维护的信息:

    • 此 bundle 输入/输出的消息速率
    • 此 bundle 输入/输出的消息吞吐量
    • 此 bundle 当前的样本数量

    时间段是通过在一组有限数量的样本上保持这些值的平均值来实现的,其中 样本是通过本地数据中的消息速率和吞吐量值获得的。 因此,如果本地数据的更新间隔为 2 分钟,短样本数为 10,长样本数为 1000,则短期数据保持在 10 个样本 * 2 分钟 / 样本 = 20 分钟的时间段内,类似地,长期数据在 2000 分钟的时间段内。 当没有足够的样本满足给定时间段时,只对现有样本取平均值。 如果没有可用的样本,将采用默认值,直到被第一个样本覆盖。 当前默认值是

    • 输入/输出的消息吞吐量:皆为每秒 50KB

    当任何 broker 将其本地数据写入 ZooKeeper 时,bundle 数据就会在 leader broker 的内存中更新。 然后,根据配置loadBalancerResourceQuotaUpdateIntervalMinutes,leader broker 会定期将 bundle 数据与历史 broker 数据同时写入 ZooKeeper。

    流量分布

    模块化负载管理器使用 ModularLoadManagerStrategy 提供的抽象概念来决定 bundle 的分配。 该策略通过考虑服务配置、整个负载数据以及要分配的 bundle 的 bundle 数据来做出决定。 目前,唯一支持的策略是 ,不过用户很快就能够根据需要注入自己的策略。

    最小长期消息速率策略

    顾名思义,最小长期消息速率策略尝试在 broker 之间分配 bundle,以便每个 broker 在长期时间窗口中的消息速率大致相同。 然而,简单地基于消息速率平衡负载并不能解决每个 broker 上每条消息的资源负担不对称的问题。 因此,在分配过程中还会考虑系统资源的使用情况,即CPU、内存、直接内存、带宽输入和带宽输出。 这是通过根据 1 / (overload_threshold - max_usage) 对最终消息速率进行加权来完成的,其中overload_threshold 对应配置 loadBalancerBrokerOverloadedThresholdPercentage,而 max_usage 是候选 broker 正在使用的系统资源的最大比例。 这个倍数可确保机器在相同消息速率下承受更高的负荷,从而减少负载。 特别的,它确保了如果有一台机器过载,那么说明所有机器都近乎超载。 在 broker 的最大使用量超过过载阈值的情况下,不考虑为该 broker 分配 bundle。 如果所有 broker 都过载,bundle 会随机分配。