Overview
- The source layer opens port monitoring, which is realized through netty server. The decoded data is sent to the channel layer
- The channel layer has a selector, which is used to choose which type of channel to go. If the memory is eventually full, the data will be processed.
- The data of the channel layer will be forwarded through the sink layer. The main purpose here is to convert the data to the TDMsg1 format and push it to the cache layer (tube is more commonly used here)
DataProxy supports configurable source-channel-sink, which is similar to flume’s configuration file structure. The configuration file name is such as dataproxy-*.conf. Currently, dataproxy-pulsar.conf and dataproxy-tube.conf are supported to distinguish different message middleware types. The specific type can be specified when startup. The default (when not specified) ) using dataproxy-pulsar.conf as configuration file.
- Source configuration example:
- Channel configuration examples, memory channel:
memory channel type
agent1.channels.ch-more1.capacity = 10000000
Memory channel queue size, the maximum number of messages that can be cached
agent1.channels.ch-more1.keep-alive = 0
agent1.channels.ch-more1.transactionCapacity = 20
- Channel configuration examples, file channel:
- Sink configuration example:
agent1.sinks.meta-sink-more1.channel = ch-msg1
The upstream channel name of the sink
agent1.sinks.meta-sink-more1.type = org.apache.flume.sink.MetaSink
The sink class is implemented, where the message is implemented to push data to the tube cluster
agent1.sinks.meta-sink-more1.master-host-port-list =
Tube cluster master node list
agent1.sinks.meta-sink-more1.send_timeout = 30000
Timeout limit when sending to tube
agent1.sinks.meta-sink-more1.stat-interval-sec = 60
Sink indicator statistics interval time, in seconds
Sink class sends messages to the worker thread, 8 means to start 8 concurrent threads
agent1.sinks.meta-sink-more1.client-id-cache = true
agent id cache, used to check the data reported by the agent to remove duplicates
agent1.sinks.meta-sink-more1.max-survived-time = 300000
Maximum cache time
agent1.sinks.meta-sink-more1.max-survived-size = 3000000
Maximum number of caches
User can describe the configuration in the file “common.properties “. For example:
- The JMX domain name of DataProxy is “DataProxy”.
- It is defined by the parameter “metricDomains”.
- The listeners of JMX domain is defined by the parameter “metricDomains.$domainName.domainListeners”.
- The class names of the listeners is separated by the space char.
- The listener class need to implement the interface “org.apache.inlong.dataproxy.metrics.MetricListener”.
- The snapshot interval of the listeners is defined by the parameter “metricDomains.$domainName.snapshotInterval”, the parameter unit is “millisecond”.
The method proto of org.apache.inlong.dataproxy.metrics.MetricListener is:
The field of MetricItemValue.metrics has these metrics(The fields of DataProxyMetricItem defined by the Annotation “@CountMetric”):
property | description |
---|---|
readSuccessCount | Successful event count reading from source component. |
readSuccessSize | Successful event body size reading from source component. |
readFailCount | Failure event count reading from source component. |
readFailSize | Failure event body size reading from source component. |
sendCount | Event count sending to sink destination. |
sendSize | Event body size sending to sink destination. |
sendSuccessCount | Successful event count sending to sink destination. |
sendSuccessSize | Successful event body size sending to sink destination. |
sendFailCount | Failure event count sending to sink destination. |
sendFailSize | Failure event body size sending to sink destination. |
sinkDuration | The unit is millisecond, the duration is between current timepoint and the timepoint in sending to sink destination. |
nodeDuration | The unit is millisecond, the duration is between current timepoint and the timepoint in getting event from source. |
wholeDuration | The unit is millisecond, the duration is between current timepoint and the timepoint in generating event. |
Monitor indicators have registered to MBeanServer, user can append JMX parameters when running DataProxy, remote server can get monitor metrics with RMI.