Custom Metrics in Functions
public class UdfWithMetrics extends ScalarFunction {
// IMPORANT NOTE: For UDF metrics to be available, metric group name must be "deltastream_udf"
private final String METRIC_GROUP = "deltastream_udf";
private transient Counter counter;
private transient Meter meter;
private transient int gaugeValue = 0;
@Override
public void open(FunctionContext context) {
this.counter = context.getMetricGroup()
.addGroup(METRIC_GROUP)
.counter("myCounter");
this.meter = context.getMetricGroup()
.addGroup(METRIC_GROUP)
.meter("myMeter", new MeterView(60));
context.getMetricGroup().addGroup(METRIC_GROUP)
.gauge("myGauge", (Gauge<Integer>) () -> gaugeValue);
}
public String eval(String s) {
if (s == null) {
meter.markEvent();
return null;
}
if (s.startsWith("Value_")) {
try {
int val = Integer.parseInt(s.substring(6));
gaugeValue = val;
} catch(NumberFormatException e){
counter.inc();
}
}
return s.replaceAll("_", " ");
}
}Last updated

