Processor API

Processors form the core of the joule platform. Given the importance of Processors, an API has been provided to enable developers to build and extend the capabilities of the platform.

Development steps

  1. Create project using the template

  2. Implement custom processor

  3. Build, test and package

  4. Deploy

Explaining each step

Step 1: Create project using the template

We have provided a project template project to quick start development. The project can be found here. Clone the template project and copy relevant code and structure to your own project

git clone git@gitlab.com:joule-platform/fractalworks-project-templates.git

Joule uses Gradle to manage Java dependencies. To add dependencies for your connector, manage them in the build.gradle file inside your connector's directory.

Step 2: Implement custom processor

Processors differ from connectors as they do not require, currently, a specification and builder classes. So jump right in and create and name a class that reflects the processing function. Joule provides the core logic such as batching, cloning, linking of data stores, and a unique processor UUID for event change lineage.

Key areas of implementation

  • Define processor DSL namespace

  • Class definition to plugins.properties

  • Initialize and apply methods

  • Attribute setters

Define processor DSL namespace

@JsonRootName(value = "template processor")
public class TemplateProcessor extends AbstractProcessor

For Joule to load and initialised the component the processor must be defined within the plugins.properties file under the META-INF/services directory

# Change and add lines for your processor classes
com.fractalworks.streams.examples.processor.TemplateProcessor

Implement the initialize and apply methods

All processor Initialize functions are called before any event processing. Add your custom initialisation logic e.g. priming expensive reference data attributes, calculations etc,.

@Override
public void initialize(Properties prop) throws ProcessorException {
    super.initialize(prop);
    // TODO: Add any processor initialisation logic
}

Add your custom event processing logic within the apply function. Joule provides a single event to processors through micro batch dispatches, this includes event cloning.

@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
    metrics.incrementMetric(Metric.RECEIVED);
    if(enabled){
        // TODO: Add processing logic
        metrics.incrementMetric(Metric.PROCESSED);

    } else {
        metrics.incrementMetric(Metric.IGNORED);
    }
    return streamEvent;
}

To gain processor JMX telemetry add relevant metrics. Every platform component with defined metrics will be accessible within a JMX monitoring platform.

Note: If you would like to perform batch processing override the below method.

public MicroBatch apply(MicroBatch batch, Context context) throws StreamsException;

Step 4: Build, test and package

The template project provides basic JUnit test to validate DSL. The project will execute these tests during the gradle build cycle and deploy to your local maven repository.

gradle build publishToMavenLocal

Step 5: Deploy

Once your package has been successfully created you are ready to deploy to a Joule project. The resulting jar artefact needs to be placed in to the userlibs directory in your Joule projects directory. See provided examples documentation for further directions.

cp build/libs/<your-processor>.jar <location>/userlibs

Custom processor example

Below is a simple data quality checker for a specific field using a default value in the event of it being missing.

@JsonRootName(value = "volume quality transformer")
public class CustomVolumeQualityTransformer extends AbstractProcessor {

    private double defaultValue = 0.0;

    public CustomVolumeQualityTransformer() {
        super();
    }

    @Override
    public void initialize(Properties prop) throws ProcessorException {
        super.initialize(prop);
        logger = LoggerFactory.getLogger(this.getClass().getName());
    }

    @Override
    public StreamEvent apply(StreamEvent event, Context context) 
        throws StreamsException {
        
        metrics.incrementMetric(Metric.RECEIVED);
        
        if(enabled) {
            var volume = (Double)event.getValue("volume");
            if(volume == null){
                event.replaceValue(uuid,"volume", defaultValue);                
            } else if(volume < 0) {
                event.replaceValue(uuid,"volume", Math.abs(volume));
            }
        
            metrics.incrementMetric(Metric.PROCESSED);
        
        } else {
            metrics.incrementMetric(Metric.DISCARDED);
        }
        return event;
    }
    
    @JsonProperty(value = "default value", required = false)
    public void setDefaultValue(double defaultValue) {
        this.defaultValue = defaultValue;
    }
    
    @Override
    public String toString() {
        return "CustomVolumeQualityTransformer{" +
               "defaultValue=" + defaultValue +
               "}";
    }   
}

Last updated