Create custom metrics

Add pre-computed metrics to custom processors that drive advanced analytic use cases

Pre-computed metrics generated by the Metrics Engine can be used within a custom processing component. For example, events could be filtered by user-defined metrics using time intervals, scoring models use metrics as part of the input feature space or build KPIs that combine metrics with event data.

Requirements

See Setting up the environment documentation

Note all gradle commands must be execute at the root of the project directory

Development steps

  1. Register query

  2. Get metrics

Explaining each step

Step 1: Register query

All queries to be executed must be registered, this reduces the overhead of recreating the target SQL query on each function call. The interface elegantly handles duplicate queries by providing the same query token in the form of a UUID.

UUID registerMetricQuery(String metricFamily, String[] metrics, String predicate)
    throws SQLException
AttributeDescriptionExample

metricFamily

The metric family this query relates to and belongs to.

metrics

An array of metric names that match the metric table columns.

predicate

A SQL where predicate statement filters required metrics.

Example

// Get an instance of the interface
MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();

// Register the query
var uuidQueryToken = metricQueryInterface.registerMetricQuery(
                                "nasdaq_metrics",
                                new String[]{"avg_bid_min","avg_bid_avg","avg_bid_max"},
                                "WHERE symbol=?"
                                );

The queryToken returned is used as a parameter on the query method. Therefore it is best to be cached as a object class variable.

Step 2 Get metrics

This API provides the ability to query the target metric family for a set of pre-computed metrics. The function returns an Optional object type that is either empty or with a map of metrics with corresponding values.

Optional<Map<String,Object>> query(UUID queryToken, Object[] params) 
        throws SQLException, StreamsException
AttributeDescriptionExample

queryToken

The token provided from the query registration process

params

An array of parameters that match the query predicate definition

Example

@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
    metrics.incrementMetric(Metric.RECEIVED);
    if(enabled){
        // TODO: Add processing logic
        var symbol = event.getValue("symbol");
        var spread = calculateSpread(symbol);
        
        // Super simple return value. You could send an indicator to trigger 
        // when spread widens over a threshold for a specific symbol  
        streamEvent.addValue(uuid, "spread", spread);
        metrics.incrementMetric(Metric.PROCESSED);

    } else {
        metrics.incrementMetric(Metric.IGNORED);
    }
    return streamEvent;
}

public double calculateSpread(final String parameter){
    MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();    
    Optional<Map<String, Object>> results =  metricQueryInterface.query(
            uuidQueryToken,
            new Object[]{parameter});
    
    var spread = Double.NaN;
    if (!results.isEmpty()) {
        var metrics = results.get();
        if( results.isPresent()){
          spread = results.get().get("avg_bid_max") - results.get().get("avg_bid_min");
        }
    }
    return spread;
}

Last updated