Custom Metrics in Functions

Functions allow users to define their own processing logic and make it available in SQL. This document walks through how to add custom metrics to Functions.

DeltaStream queries are powered by Apache Flink, so users can implement Functions as Apache Flink user defined functions, as described in the Apache Flink documentation. There are 3 types of custom metrics that can be added (more info in Flink docs on Metrics):

  • Counter: an integer value that can be incremented or decremented.

  • Meter: a value that measures the average throughput of events over a time period.

  • Gauge: a value of any type that can be retrieved on demand. This value is set in the Function's logic.

The code block below demonstrates how a custom metric (counter) can be added to a Java Function:

public class UdfWithMetrics extends ScalarFunction {

    // IMPORANT NOTE: For UDF metrics to be available, metric group name must be "deltastream_udf"
    private final String METRIC_GROUP = "deltastream_udf";

    private transient Counter counter;
    private transient Meter meter;
    private transient int gaugeValue = 0;

    @Override
    public void open(FunctionContext context) {
        this.counter = context.getMetricGroup()
            .addGroup(METRIC_GROUP)
            .counter("myCounter");
        this.meter = context.getMetricGroup()
            .addGroup(METRIC_GROUP)
            .meter("myMeter", new MeterView(60));
        context.getMetricGroup().addGroup(METRIC_GROUP)
            .gauge("myGauge", (Gauge<Integer>) () -> gaugeValue);
    }

    public String eval(String s) {
        if (s == null) {
            meter.markEvent();
            return null;
        }

        if (s.startsWith("Value_")) {
            try {
                int val = Integer.parseInt(s.substring(6));
                gaugeValue = val;
            } catch(NumberFormatException e){
                counter.inc();
            }
        }

        return s.replaceAll("_", " ");
    }
}

Notice in the context.getMetricGroup().addGroup(METRIC_GROUP) method that the METRIC_GROUP value is deltastream_udf. The metric group name is REQUIRED to be set to this value in order for our platform to properly scrape and make this metric available.

For the full code example, see this example.

After setting up a metrics integration, you can find your metrics with prefix deltastream_udf. For instance, the "myCounter" metric in the code block above will have the metric name deltastream_udf_myCounter.

Last updated