Custom Metrics in Functions

Functions enable you to define your own processing logic and make it available in SQL. This document walks you through how to add custom metrics to functions.

DeltaStream queries are powered by Apache Flink; you can implement functions as Apache Flink user-defined functions as described in the Apache Flink documentation.

You can add 3 types of custom metrics (see Flink docs on Metrics for more information):

  1. Counter: an integer value that can be incremented or decremented.

  2. Meter: a value that measures the average throughput of events over a time period.

  3. Gauge: a value of any type that can be retrieved on demand. This value is set in the function's logic.

The code block below demonstrates how you can add a custom metric (counter) to a Java function:

public class UdfWithMetrics extends ScalarFunction {

    // IMPORANT NOTE: For UDF metrics to be available, metric group name must be "deltastream_udf"
    private final String METRIC_GROUP = "deltastream_udf";

    private transient Counter counter;
    private transient Meter meter;
    private transient int gaugeValue = 0;

    @Override
    public void open(FunctionContext context) {
        this.counter = context.getMetricGroup()
            .addGroup(METRIC_GROUP)
            .counter("myCounter");
        this.meter = context.getMetricGroup()
            .addGroup(METRIC_GROUP)
            .meter("myMeter", new MeterView(60));
        context.getMetricGroup().addGroup(METRIC_GROUP)
            .gauge("myGauge", (Gauge<Integer>) () -> gaugeValue);
    }

    public String eval(String s) {
        if (s == null) {
            meter.markEvent();
            return null;
        }

        if (s.startsWith("Value_")) {
            try {
                int val = Integer.parseInt(s.substring(6));
                gaugeValue = val;
            } catch(NumberFormatException e){
                counter.inc();
            }
        }

        return s.replaceAll("_", " ");
    }
}

Important In the context.getMetricGroup().addGroup(METRIC_GROUP) method, the METRIC_GROUP value is deltastream_udf. You must set the metric group name to this value for the DeltaStream platform to properly scrape and make this metric available.

For the full code example, see this example.

After setting up a metrics integration, you can find your metrics with prefix deltastream_udf. For instance, the myCounter metric in the code block above has the metric name deltastream_udf_myCounter.

Last updated