Processor API
Processors form the core of the joule platform. Given the importance of Processors, an API has been provided to enable developers to build and extend the capabilities of the platform.
Development steps
Create project using the template
Implement custom processor
Build, test and package
Deploy
Explaining each step
Step 1: Create project using the template
We have provided a project template project to quick start development. The project can be found here. Clone the template project and copy relevant code and structure to your own project
git clone [email protected]:joule-platform/fractalworks-project-templates.git
Step 2: Implement custom processor
Processors differ from connectors as they do not require, currently, a specification and builder classes. So jump right in and create and name a class that reflects the processing function. Joule provides the core logic such as batching, cloning, linking of data stores, and a unique processor UUID for event change lineage.
Key areas of implementation
Define processor DSL namespace
Class definition to plugins.properties
Initialize and apply methods
Attribute setters
Define processor DSL namespace
@JsonRootName(value = "template processor")
public class TemplateProcessor extends AbstractProcessor
For Joule to load and initialised the component the processor must be defined within the plugins.properties
file under the META-INF/services
directory
# Change and add lines for your processor classes
com.fractalworks.streams.examples.processor.TemplateProcessor
Implement the initialize
and apply
methods
initialize
and apply
methodsAll processor Initialize functions are called before any event processing. Add your custom initialisation logic e.g. priming expensive reference data attributes, calculations etc,.
@Override
public void initialize(Properties prop) throws ProcessorException {
super.initialize(prop);
// TODO: Add any processor initialisation logic
}
Add your custom event processing logic within the apply function. Joule provides a single event to processors through micro batch dispatches, this includes event cloning.
@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
metrics.incrementMetric(Metric.RECEIVED);
if(enabled){
// TODO: Add processing logic
metrics.incrementMetric(Metric.PROCESSED);
} else {
metrics.incrementMetric(Metric.IGNORED);
}
return streamEvent;
}
To gain processor JMX telemetry add relevant metrics. Every platform component with defined metrics will be accessible within a JMX monitoring platform.
Note: If you would like to perform batch processing override the below method.
public MicroBatch apply(MicroBatch batch, Context context) throws StreamsException;
Step 4: Build, test and package
The template project provides basic JUnit test to validate DSL. The project will execute these tests during the gradle build cycle and deploy to your local maven repository.
gradle build publishToMavenLocal
Step 5: Deploy
Once your package has been successfully created you are ready to deploy to a Joule project. The resulting jar artefact needs to be placed in to the userlibs
directory in your Joule projects directory. See provided examples documentation for further directions.
cp build/libs/<your-processor>.jar <location>/userlibs
Custom processor example
Below is a simple data quality checker for a specific field using a default value in the event of it being missing.
@JsonRootName(value = "volume quality transformer")
public class CustomVolumeQualityTransformer extends AbstractProcessor {
private double defaultValue = 0.0;
public CustomVolumeQualityTransformer() {
super();
}
@Override
public void initialize(Properties prop) throws ProcessorException {
super.initialize(prop);
logger = LoggerFactory.getLogger(this.getClass().getName());
}
@Override
public StreamEvent apply(StreamEvent event, Context context)
throws StreamsException {
metrics.incrementMetric(Metric.RECEIVED);
if(enabled) {
var volume = (Double)event.getValue("volume");
if(volume == null){
event.replaceValue(uuid,"volume", defaultValue);
} else if(volume < 0) {
event.replaceValue(uuid,"volume", Math.abs(volume));
}
metrics.incrementMetric(Metric.PROCESSED);
} else {
metrics.incrementMetric(Metric.DISCARDED);
}
return event;
}
@JsonProperty(value = "default value", required = false)
public void setDefaultValue(double defaultValue) {
this.defaultValue = defaultValue;
}
@Override
public String toString() {
return "CustomVolumeQualityTransformer{" +
"defaultValue=" + defaultValue +
"}";
}
}
Last updated
Was this helpful?