Create custom metrics
Add pre-computed metrics to custom processors that drive advanced analytic use cases
Pre-computed metrics generated by the Metrics Engine can be used within a custom processing component. For example, events could be filtered by user-defined metrics using time intervals, scoring models use metrics as part of the input feature space or build KPIs that combine metrics with event data.
Development steps
Register query
Get metrics
Explaining each step
Step 1: Register query
All queries to be executed must be registered, this reduces the overhead of recreating the target SQL query on each function call. The interface elegantly handles duplicate queries by providing the same query token in the form of a UUID.
UUID registerMetricQuery(String metricFamily, String[] metrics, String predicate)
throws SQLException
metricFamily
The metric family this query relates to and belongs to.
"nasdaq_metrics"
metrics
An array of metric names that match the metric table columns.
new String[]{
"avg_bid_min",
"avg_bid_avg",
"avg_bid_max"}
predicate
A SQL where predicate statement filters required metrics.
"WHERE symbol=?"
Example
// Get an instance of the interface
MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();
// Register the query
var uuidQueryToken = metricQueryInterface.registerMetricQuery(
"nasdaq_metrics",
new String[]{"avg_bid_min","avg_bid_avg","avg_bid_max"},
"WHERE symbol=?"
);
The queryToken
returned is used as a parameter on the query
method. Therefore it is best to be cached as a object class variable.
Step 2 Get metrics
This API provides the ability to query the target metric family for a set of pre-computed metrics. The function returns an Optional object type that is either empty or with a map of metrics with corresponding values.
Optional<Map<String,Object>> query(UUID queryToken, Object[] params)
throws SQLException, StreamsException
queryToken
The token provided from the query registration process
uuidQueryToken
params
An array of parameters that match the query predicate definition
Object[]{"MSFT")}
Example
@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
metrics.incrementMetric(Metric.RECEIVED);
if(enabled){
// TODO: Add processing logic
var symbol = event.getValue("symbol");
var spread = calculateSpread(symbol);
// Super simple return value. You could send an indicator to trigger
// when spread widens over a threshold for a specific symbol
streamEvent.addValue(uuid, "spread", spread);
metrics.incrementMetric(Metric.PROCESSED);
} else {
metrics.incrementMetric(Metric.IGNORED);
}
return streamEvent;
}
public double calculateSpread(final String parameter){
MetricQueryInterface metricQueryInterface = MetricQueryInterface.getInstance();
Optional<Map<String, Object>> results = metricQueryInterface.query(
uuidQueryToken,
new Object[]{parameter});
var spread = Double.NaN;
if (!results.isEmpty()) {
var metrics = results.get();
if( results.isPresent()){
spread = results.get().get("avg_bid_max") - results.get().get("avg_bid_min");
}
}
return spread;
}
Last updated
Was this helpful?