Metrics

    You can access the metric system from a Python user-defined function by calling in the open method. The get_metric_group() method returns a MetricGroup object on which you can create and register new metrics.

    PyFlink supports Counters, Gauges, Distribution and Meters.

    Counter

    A Counter is used to count something. The current value can be in- or decremented using inc()/inc(n: int) or dec()/dec(n: int). You can create and register a Counter by calling counter(name: str) on a MetricGroup.

    Python

    Gauge

    A Gauge provides a value on demand. You can register a gauge by calling gauge(name: str, obj: Callable[[], int]) on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.

    1. from pyflink.table.udf import ScalarFunction
    2. def __init__(self):
    3. self.length = 0
    4. def open(self, function_context):
    5. function_context.get_metric_group().gauge("my_gauge", lambda : self.length)
    6. def eval(self, i):
    7. self.length = i
    8. return i - 1

    Distribution

    A metric that reports information(sum, count, min, max and mean) about the distribution of reported values. The value can be updated using update(n: int). You can register a distribution by calling distribution(name: str) on a MetricGroup. Distribution metrics are restricted to integer-only distributions.

    Python

    Meter

    A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event() method. The occurrence of multiple events at the same time can be registered with mark_event(n: int) method. You can register a meter by calling meter(self, name: str, time_span_in_seconds: int = 60) on a MetricGroup. The default value of time_span_in_seconds is 60.

    Python

    1. from pyflink.table.udf import ScalarFunction
    2. class MyUDF(ScalarFunction):
    3. def __init__(self):
    4. self.meter = None
    5. def open(self, function_context):
    6. super().open(function_context)
    7. # an average rate of events per second over 120s, default is 60s.
    8. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)
    9. def eval(self, i):
    10. self.meter.mark_event(i)
    11. return i - 1

    You can refer to the Java metric document for more details on .

    Python

    You can refer to the Java metric document for more details on System Scope.

    You can refer to the Java metric document for more details on .

    You can define a user variable by calling MetricGroup.addGroup(key: str, value: str = None) and specifying the value parameter.

    Important: User variables cannot be used in scope formats.

    1. function_context
    2. .get_metric_group()
    3. .counter("my_counter")

    You can refer to the Java metric document for more details on the following sections: