Create custom metrics

Add pre-computed metrics to custom processors that drive advanced analytic use cases

Pre-computed metrics generated by the Metrics Engine can be used within a custom processing component. For example, events could be filtered by user-defined metrics using time intervals, scoring models use metrics as part of the input feature space or build KPIs that combine metrics with event data.

Development steps

  1. Register query

  2. Get metrics

Explaining each step

Step 1: Register query

All queries to be executed must be registered, this reduces the overhead of recreating the target SQL query on each function call. The interface elegantly handles duplicate queries by providing the same query token in the form of a UUID.

UUID registerMetricQuery(String metricFamily, String[] metrics, String predicate)
    throws SQLException
Attribute
Description
Example

metricFamily

The metric family this query relates to and belongs to.

"nasdaq_metrics"

metrics

An array of metric names that match the metric table columns.

new String[]{
"avg_bid_min",
"avg_bid_avg",
"avg_bid_max"}

predicate

A SQL where predicate statement filters required metrics.

"WHERE symbol=?"

Example

// Get an instance of the interface
MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();

// Register the query
var uuidQueryToken = metricQueryInterface.registerMetricQuery(
                                "nasdaq_metrics",
                                new String[]{"avg_bid_min","avg_bid_avg","avg_bid_max"},
                                "WHERE symbol=?"
                                );

The queryToken returned is used as a parameter on the query method. Therefore it is best to be cached as a object class variable.

Step 2 Get metrics

This API provides the ability to query the target metric family for a set of pre-computed metrics. The function returns an Optional object type that is either empty or with a map of metrics with corresponding values.

Optional<Map<String,Object>> query(UUID queryToken, Object[] params) 
        throws SQLException, StreamsException
Attribute
Description
Example

queryToken

The token provided from the query registration process

uuidQueryToken

params

An array of parameters that match the query predicate definition

Object[]{"MSFT")}

Example

@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
    metrics.incrementMetric(Metric.RECEIVED);
    if(enabled){
        // TODO: Add processing logic
        var symbol = event.getValue("symbol");
        var spread = calculateSpread(symbol);
        
        // Super simple return value. You could send an indicator to trigger 
        // when spread widens over a threshold for a specific symbol  
        streamEvent.addValue(uuid, "spread", spread);
        metrics.incrementMetric(Metric.PROCESSED);

    } else {
        metrics.incrementMetric(Metric.IGNORED);
    }
    return streamEvent;
}

public double calculateSpread(final String parameter){
    MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();    
    Optional<Map<String, Object>> results =  metricQueryInterface.query(
            uuidQueryToken,
            new Object[]{parameter});
    
    var spread = Double.NaN;
    if (!results.isEmpty()) {
        var metrics = results.get();
        if( results.isPresent()){
          spread = results.get().get("avg_bid_max") - results.get().get("avg_bid_min");
        }
    }
    return spread;
}

Last updated