Overview

    • The source layer opens port monitoring, which is realized through netty server. The decoded data is sent to the channel layer
    • The channel layer has a selector, which is used to choose which type of channel to go. If the memory is eventually full, the data will be processed.
    • The data of the channel layer will be forwarded through the sink layer. The main purpose here is to convert the data to the TDMsg1 format and push it to the cache layer (tube is more commonly used here)

    DataProxy supports configurable source-channel-sink, which is similar to flume’s configuration file structure. The configuration file name is such as dataproxy-*.conf. Currently, dataproxy-pulsar.conf and dataproxy-tube.conf are supported to distinguish different message middleware types. The specific type can be specified when startup. The default (when not specified) ) using dataproxy-pulsar.conf as configuration file.

    • Source configuration example:
    • Channel configuration examples, memory channel:
    1. memory channel type
    2. agent1.channels.ch-more1.capacity = 10000000
    3. Memory channel queue size, the maximum number of messages that can be cached
    4. agent1.channels.ch-more1.keep-alive = 0
    5. agent1.channels.ch-more1.transactionCapacity = 20
    • Channel configuration examples, file channel:
    • Sink configuration example:
    1. agent1.sinks.meta-sink-more1.channel = ch-msg1
    2. The upstream channel name of the sink
    3. agent1.sinks.meta-sink-more1.type = org.apache.flume.sink.MetaSink
    4. The sink class is implemented, where the message is implemented to push data to the tube cluster
    5. agent1.sinks.meta-sink-more1.master-host-port-list =
    6. Tube cluster master node list
    7. agent1.sinks.meta-sink-more1.send_timeout = 30000
    8. Timeout limit when sending to tube
    9. agent1.sinks.meta-sink-more1.stat-interval-sec = 60
    10. Sink indicator statistics interval time, in seconds
    11. Sink class sends messages to the worker thread, 8 means to start 8 concurrent threads
    12. agent1.sinks.meta-sink-more1.client-id-cache = true
    13. agent id cache, used to check the data reported by the agent to remove duplicates
    14. agent1.sinks.meta-sink-more1.max-survived-time = 300000
    15. Maximum cache time
    16. agent1.sinks.meta-sink-more1.max-survived-size = 3000000
    17. Maximum number of caches

    User can describe the configuration in the file “common.properties “. For example:

    • The JMX domain name of DataProxy is “DataProxy”.
    • It is defined by the parameter “metricDomains”.
    • The listeners of JMX domain is defined by the parameter “metricDomains.$domainName.domainListeners”.
    • The class names of the listeners is separated by the space char.
    • The listener class need to implement the interface “org.apache.inlong.dataproxy.metrics.MetricListener”.
    • The snapshot interval of the listeners is defined by the parameter “metricDomains.$domainName.snapshotInterval”, the parameter unit is “millisecond”.

    The method proto of org.apache.inlong.dataproxy.metrics.MetricListener is:

    The field of MetricItemValue.metrics has these metrics(The fields of DataProxyMetricItem defined by the Annotation “@CountMetric”):

    propertydescription
    readSuccessCountSuccessful event count reading from source component.
    readSuccessSizeSuccessful event body size reading from source component.
    readFailCountFailure event count reading from source component.
    readFailSizeFailure event body size reading from source component.
    sendCountEvent count sending to sink destination.
    sendSizeEvent body size sending to sink destination.
    sendSuccessCountSuccessful event count sending to sink destination.
    sendSuccessSizeSuccessful event body size sending to sink destination.
    sendFailCountFailure event count sending to sink destination.
    sendFailSizeFailure event body size sending to sink destination.
    sinkDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in sending to sink destination.
    nodeDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in getting event from source.
    wholeDurationThe unit is millisecond, the duration is between current timepoint and the timepoint in generating event.

    Monitor indicators have registered to MBeanServer, user can append JMX parameters when running DataProxy, remote server can get monitor metrics with RMI.