User defined analytics
This example computes the exponential moving average using a Joule OOTB function packaged as a jar using the Joule SDK.
Use case configuration
File: app-ema-analytics-and-influx.env
SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/slidingEventWindowEMAAnalytics.yaml
PUBLISHFILE=conf/publishers/influxdbStandardAnalytics.yaml
Pipeline configuration
This pipeline will compute a sliding window function on a single event type based on the symbol CVCO
processing unit:
pipeline:
- filter:
expression: "symbol == 'CVCO'"
- sliding window analytics:
function: com.fractalworks.streams.processors.analytics.functions.ExponentialMovingAverage
window size: 5
fields: [ ask, bid ]
parameters:
emaFactor: 0.33333
Function Implementation
This function is interesting as it uses the previous ema value to compute the next value, Joule manages this process on behalf of the developer.
public class ExponentialMovingAverage implements AnalyticsFunction<Double> {
public final static String EMA_FACTOR = "emaFactor";
double emaFactor = 0.333;
public ExponentialMovingAverage() {
}
@Override
public void setParameters(Properties parameters) {
if (parameters != null && parameters.containsKey(EMA_FACTOR)) {
emaFactor = Double.parseDouble(parameters.get(EMA_FACTOR).toString());
}
}
@Override
public String getVariablePostFixID() {
return "EMA";
}
@Override
public Double compute(Number[] values, Number previousValue) {
if (previousValue != null && Double.isNaN(previousValue.doubleValue())) {
previousValue = values[values.length - 1];
}
for (Number d : values) {
previousValue = (d.doubleValue() * emaFactor) + (previousValue.doubleValue() * (1 - emaFactor));
}
return previousValue.doubleValue();
}
}
Last updated
Was this helpful?