在1.6版本中,Thrift的source和sink支持kerberos认证。客户端需要使用secureRpcClientFactory的getThriftInstance方法获得SecureThriftRpcClient对象。SecureThriftClient继承ThriftRpcClient(实现了RpcClient接口)。使用SecureRpcClientFactory依赖于Flume-ng-auth模块。客户端的principal以及keytab都需要通过参数的形式传入,他们作为kerberos KDC的证书。另外,目标服务器的principal也需要提供。下面就是secureRpclientFacotry的例子:

    ThriftSource则需要配置成kerberos模式。

    1. a1.sources = r1
    2. a1.sinks = k1
    3. a1.channels.c1.type = memory
    4. a1.sources.r1.channels = c1
    5. a1.sources.r1.type = thrift
    6. a1.sources.r1.bind = 0.0.0.0
    7. a1.sources.r1.port = 41414
    8. a1.sources.r1.kerberos = true
    9. a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
    10. a1.sources.r1.agent-keytab = /tmp/flume.keytab
    11. a1.sinks.k1.channel = c1
    12. a1.sinks.k1.type = logger

    为了更灵活一些,failover flume client实现FailoverRpcClient,可以基于下面的配置:

    1. client.type = default_failover
    2. hosts = h1 h2 h3 # at least one is required, but 2 or
    3. # more makes better sense
    4. hosts.h2 = host2.example.org:41414
    5. hosts.h3 = host3.example.org:41414
    6. # specified, 3 in this case). A '0'
    7. # value doesn't make much sense because
    8. # it will just cause an append call to
    9. # immmediately fail. A '1' value means
    10. # that the failover client will try only
    11. # once to send the Event, and if it
    12. # fails then there will be no failover
    13. # to a second client, so this value
    14. # causes the failover client to
    15. # degenerate into just a default client.
    16. # It makes sense to set this value to at
    17. # least the number of hosts that you
    18. # specified.
    19. batch-size = 100 # Must be >=1 (default: 100)
    20. connect-timeout = 20000 # Must be >=1000 (default: 20000)
    21. request-timeout = 20000 # Must be >=1000 (default: 20000)

    Flume客户端SDK也支持在多个主机中负载均衡。client使用<host>:<port>的形式组成一个负载均衡组。client端会配置负载均衡的策略,可能是随机选择配置的主机,也可能是基于轮询的模式。你可以通过实现LoadBalancingRpcClient$HostSelector接口,指定自定义的类。在这种情况下,FQCN需要指定成特定的host selector.负载均衡RPC目前不支持Thrift.

    最大的backoff事件可以通过maxBackoff进行配置。默认是30S(在OrderSelector中指定)。backoff 参数会以指数级增长。最大限制为65536秒,即18.2小时。

    也可以直接如下配置:

    1. client.type = default_loadbalance
    2. hosts.h2 = host2.example.org:41414
    3. hosts.h3 = host3.example.org:41414
    4. backoff = false # Specifies whether the client should
    5. # back-off from (i.e. temporarily
    6. # blacklist) a failed host
    7. # (default: false).
    8. maxBackoff = 0 # Max timeout in millis that a will
    9. # remain inactive due to a previous
    10. # failure with that host (default: 0,
    11. # which effectively becomes 30000)
    12. host-selector = round_robin # The host selection strategy used
    13. # when load-balancing among hosts
    14. # (default: round_robin).
    15. # Other values are include "random"
    16. # or the FQCN of a custom class
    17. # that implements
    18. # LoadBalancingRpcClient$HostSelector
    19. batch-size = 100 # Must be >=1 (default: 100)
    20. connect-timeout = 20000 # Must be >=1000 (default: 20000)

    配置嵌入式节点与配置普通节点类似。下面是额外的配置: