# Processor API

## Development steps

1. Create project using the template
2. Implement custom processor
3. Build, test and package
4. Deploy

## Explaining each step <a href="#explaining-each-step" id="explaining-each-step"></a>

### Step 1: Create project using the template <a href="#step-1-create-the-destination-using-the-template" id="step-1-create-the-destination-using-the-template"></a>

We have provided a project template project to quick start development. The project can be found [here](https://gitlab.com/joule-platform/fractalworks-project-templates). Clone the template project and copy relevant code and structure to your own project

<pre class="language-bash"><code class="lang-bash"><strong>git clone git@gitlab.com:joule-platform/fractalworks-project-templates.git
</strong></code></pre>

{% hint style="info" %}
Joule uses Gradle to manage Java dependencies. To add dependencies for your connector, manage them in the `build.gradle` file inside your connector's directory.
{% endhint %}

### Step 2: Implement custom processor

Processors differ from connectors as they do not require, currently, a specification and builder classes. So jump right in and create and name a class that reflects the processing function. Joule provides the core logic such as batching, cloning, linking of data stores, and a unique processor UUID for event change lineage.

Key areas of implementation

* Define processor DSL namespace
* Class definition to plugins.properties&#x20;
* Initialize and apply methods
* Attribute setters&#x20;

#### Define processor DSL namespace

```java
@JsonRootName(value = "template processor")
public class TemplateProcessor extends AbstractProcessor
```

For Joule to load and initialised the component the processor must be defined within the `plugins.properties` file under the `META-INF/services` directory

```properties
# Change and add lines for your processor classes
com.fractalworks.streams.examples.processor.TemplateProcessor
```

#### Implement the `initialize` and `apply` methods

All processor Initialize functions are called before any event processing. Add your custom initialisation logic e.g. priming expensive reference data attributes, calculations etc,.    &#x20;

```java
@Override
public void initialize(Properties prop) throws ProcessorException {
    super.initialize(prop);
    // TODO: Add any processor initialisation logic
}
```

Add your custom event processing logic within the apply function. Joule provides a single event to processors through micro batch dispatches, this includes event cloning.

```java
@Override
public StreamEvent apply(StreamEvent streamEvent, Context context) throws StreamsException {
    metrics.incrementMetric(Metric.RECEIVED);
    if(enabled){
        // TODO: Add processing logic
        metrics.incrementMetric(Metric.PROCESSED);

    } else {
        metrics.incrementMetric(Metric.IGNORED);
    }
    return streamEvent;
}
```

To gain processor JMX telemetry add relevant metrics. Every platform component with defined metrics will be accessible within a JMX monitoring platform.

**Note:** If you would like to perform batch processing override the below method.

```java
public MicroBatch apply(MicroBatch batch, Context context) throws StreamsException;
```

### Step 4: Build, test and package <a href="#step-1-create-the-destination-using-the-template" id="step-1-create-the-destination-using-the-template"></a>

The template project provides basic JUnit test to validate DSL. The project will execute these tests during the gradle build cycle and deploy to your local maven repository.&#x20;

```
gradle build publishToMavenLocal
```

### Step 5: Deploy <a href="#step-1-create-the-destination-using-the-template" id="step-1-create-the-destination-using-the-template"></a>

Once your package has been successfully created you are ready to deploy to a Joule project. The resulting jar artefact needs to be placed in to the `userlibs` directory in your Joule projects directory. See provided examples [documentation](https://docs.fractalworks.io/joule/use-case-examples/overview) for further directions.

```bash
cp build/libs/<your-processor>.jar <location>/userlibs
```

## Custom processor example

Below is a simple data quality checker for a specific field using a default value in the event of it being missing.&#x20;

```java
@JsonRootName(value = "volume quality transformer")
public class CustomVolumeQualityTransformer extends AbstractProcessor {

    private double defaultValue = 0.0;

    public CustomVolumeQualityTransformer() {
        super();
    }

    @Override
    public void initialize(Properties prop) throws ProcessorException {
        super.initialize(prop);
        logger = LoggerFactory.getLogger(this.getClass().getName());
    }

    @Override
    public StreamEvent apply(StreamEvent event, Context context) 
        throws StreamsException {
        
        metrics.incrementMetric(Metric.RECEIVED);
        
        if(enabled) {
            var volume = (Double)event.getValue("volume");
            if(volume == null){
                event.replaceValue(uuid,"volume", defaultValue);                
            } else if(volume < 0) {
                event.replaceValue(uuid,"volume", Math.abs(volume));
            }
        
            metrics.incrementMetric(Metric.PROCESSED);
        
        } else {
            metrics.incrementMetric(Metric.DISCARDED);
        }
        return event;
    }
    
    @JsonProperty(value = "default value", required = false)
    public void setDefaultValue(double defaultValue) {
        this.defaultValue = defaultValue;
    }
    
    @Override
    public String toString() {
        return "CustomVolumeQualityTransformer{" +
               "defaultValue=" + defaultValue +
               "}";
    }   
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/developer-guides/builder-sdk/processor-api.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
