This example computes the exponential moving average using a Joule OOTB function packaged as a jar using the Joule SDK.
SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/slidingEventWindowEMAAnalytics.yaml
PUBLISHFILE=conf/publishers/influxdbStandardAnalytics.yaml
This pipeline will compute a sliding window function on a single event type based on the symbol CVCO
processing unit:
pipeline:
- filter:
expression: "symbol == 'CVCO'"
- sliding window analytics:
function: com.fractalworks.streams.processors.analytics.functions.ExponentialMovingAverage
window size: 5
fields: [ ask, bid ]
parameters:
emaFactor: 0.33333
This function is interesting as it uses the previous ema value to compute the next value, Joule manages this process on behalf of the developer.
public class ExponentialMovingAverage implements AnalyticsFunction<Double> {
public final static String EMA_FACTOR = "emaFactor";
double emaFactor = 0.333;
public ExponentialMovingAverage() {
}
@Override
public void setParameters(Properties parameters) {
if (parameters != null && parameters.containsKey(EMA_FACTOR)) {
emaFactor = Double.parseDouble(parameters.get(EMA_FACTOR).toString());
}
}
@Override
public String getVariablePostFixID() {
return "EMA";
}
@Override
public Double compute(Number[] values, Number previousValue) {
if (previousValue != null && Double.isNaN(previousValue.doubleValue())) {
previousValue = values[values.length - 1];
}
for (Number d : values) {
previousValue = (d.doubleValue() * emaFactor) + (previousValue.doubleValue() * (1 - emaFactor));
}
return previousValue.doubleValue();
}
}