User defined analytics


This example computes the exponential moving average using a Joule OOTB function packaged as a jar using the Joule SDK.

Use case configuration

File: app-ema-analytics-and-influx.env

SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/slidingEventWindowEMAAnalytics.yaml
PUBLISHFILE=conf/publishers/influxdbStandardAnalytics.yaml

Pipeline configuration

This pipeline will compute a sliding window function on a single event type based on the symbol CVCO

processing unit:
  pipeline:
    - filter:
      expression: "symbol == 'CVCO'"
    - sliding window analytics:
        function: com.fractalworks.streams.processors.analytics.functions.ExponentialMovingAverage
        window size: 5
        fields: [ ask, bid ]
        parameters:
          emaFactor: 0.33333

Function Implementation

This function is interesting as it uses the previous ema value to compute the next value, Joule manages this process on behalf of the developer.

public class ExponentialMovingAverage implements AnalyticsFunction<Double> {

    public final static String EMA_FACTOR = "emaFactor";
    double emaFactor = 0.333;

    public ExponentialMovingAverage() {
    }

    @Override
    public void setParameters(Properties parameters) {
        if (parameters != null && parameters.containsKey(EMA_FACTOR)) {
            emaFactor = Double.parseDouble(parameters.get(EMA_FACTOR).toString());
        }
    }

    @Override
    public String getVariablePostFixID() {
        return "EMA";
    }

    @Override
    public Double compute(Number[] values, Number previousValue) {

        if (previousValue != null && Double.isNaN(previousValue.doubleValue())) {
            previousValue = values[values.length - 1];
        }

        for (Number d : values) {
            previousValue = (d.doubleValue() * emaFactor) + (previousValue.doubleValue() * (1 - emaFactor));
        }
        return previousValue.doubleValue();
    }
}

Last updated