# Processing unit

## Overview

At its simplest form, a processing unit provides two key functions; a **stream** processing pipeline and a **metrics engine** that generate metrics based on a time schedule policy. This forms the **core of data handling** in Joule.

The processing unit is composed of two main components:

1. [<mark style="color:green;">**Stream processors**</mark>](/joule/components/processors.md)\
   A configurable series of processors that perform transformations, filters, and aggregations on event data. Pipelines are built from Joule’s **set of core processors** and allow users to construct sophisticated processing workflows.
2. [<mark style="color:green;">**Metrics engine**</mark><br>](/joule/components/processors/enrichment/metrics.md)\
   This engine calculates **complex metrics on a defined schedule** using SQL-based queries. Metrics are generated and updated at regular intervals, allowing for real-time insights into data.

### Example

```yaml
stream:
  ...
  processing unit:              
      pipeline:
        # Here comes the list of processors

      metrics engine:
        # Here comes the metrics declarations            
```

## Pipeline

A pipeline is a sequence of processors that compute functions on events. Joule provides out-of-the-box a set of core processors to enable you to build useful use cases, learn more about the available processors and analytic tools.

<table data-card-size="large" data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td><mark style="color:orange;"><strong>Processors</strong></mark></td><td>Processors are the core of the Joule platform, each performing a specific task. These create use case when linked together</td><td></td><td><a href="/pages/21H6be81tCxZDwGJDYxq">/pages/21H6be81tCxZDwGJDYxq</a></td></tr><tr><td><mark style="color:orange;"><strong>Analytics tools</strong></mark></td><td>Define math expressions or provide as a file using Joule supported languages and APIs</td><td></td><td><a href="/pages/y5uZ4PFaYPXNzCR7iNXs">/pages/y5uZ4PFaYPXNzCR7iNXs</a></td></tr></tbody></table>

### Example

This example defines a pipeline that filters, groups, and aggregates events:

1. <mark style="color:green;">**Filter**</mark>\
   Filters events that match where `symbol != 'A'`.&#x20;
2. <mark style="color:green;">**Tumbling time window**</mark>\
   Groups events into 5-second windows.
3. <mark style="color:green;">**Aggregations**</mark>\
   Calculates `MIN` and `MAX` values for `ask` and `bid` fields, grouped by `symbol`.
4. <mark style="color:green;">**Event emission**</mark>\
   Outputs one event per `symbol` with aggregated values at the end of each window.

```yaml
stream:
  ...
  pipeline:
    - filter:
        expression: "symbol != 'A'"
    - time window:
        emitting type: tumblingQuoteAnalytics
        aggregate functions:
          MIN: [ask, bid]
          MAX: [ask, bid]
        policy:
          type: tumblingTime
          window size: 5000
```

## Metrics engine

The metrics engine provides the ability to compute complex metrics based upon SQL queries, the embedded SQL engine provides.

{% hint style="info" %}
This feature which is enabled by default
{% endhint %}

### Example

```yaml
stream:
  ...
  metrics engine:
      policy:
        timeUnit: MINUTES
        frequency: 1
        startup delay: 2
  
      foreach metric compute:
        metrics:
          - name: BidMovingAverage
            metric key: symbol
            table definition: standardQuoteAnalyticsStream.BidMovingAverage (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT)                          
            query:
              SELECT symbol, 
              AVG(bid_MAX) AS mt_avg_bid_min,
              AVG(bid_MAX) AS mt_avg_bid_max
              OVER (
              PARTITION BY symbol ORDER BY eventTime ASC
              RANGE BETWEEN INTERVAL 1 SECONDS PRECEDING
              AND INTERVAL 1 SECONDS FOLLOWING)
              AS 'bid max Moving Average'
              FROM quotesStream.tumblingQuoteAnalytics_View
              ORDER BY 1;
            truncate on start: true
            compaction:
              frequency: 8
              timeUnit: HOURS
```

### policy

The `policy` block defines the schedule for executing metrics calculations and managing data within the `metrics engine`.

The following table describes each attribute.

<table><thead><tr><th>Attribute</th><th>Description</th><th width="187" data-type="checkbox">Required</th><th>Default</th><th>Supported values</th></tr></thead><tbody><tr><td>p<strong>olicy</strong></td><td>Defines the scheduling policy for metric calculations, setting the timing and intervals for updates</td><td>true</td><td>N/A</td><td>N/A</td></tr><tr><td><strong>timeUnit</strong></td><td>Sets the time unit used for the <code>frequency</code> and <code>startup delay</code> intervals</td><td>true</td><td>MINUTES</td><td>SECONDS, MINUTES, HOURS</td></tr><tr><td><strong>frequency</strong></td><td>Specifies how often metric calculations are performed</td><td>false</td><td>1 Minute</td><td>Any positive integer in <code>timeUnit</code></td></tr><tr><td><strong>startup delay</strong></td><td>Delay before the initial metric calculation</td><td>false</td><td>5 Minutes</td><td>Any positive integer in <code>timeUnit</code></td></tr></tbody></table>

### Metrics computations

The `foreach metric compute` syntax defines a metric table, computation, management and assigns it to a named metric family.

The following table describes each attribute.

| Attributes            | Description                                                                                             | Required                                                                     | Default | Supported values        |
| --------------------- | ------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------- | ------- | ----------------------- |
| **name**              | A unique identifier for the metric; also referred to as the metrics family name                         | <ul class="contains-task-list"><li><input type="checkbox" checked></li></ul> | N/A     | N/A                     |
| **metric key**        | Generates optimised metric queries for user lookups and management functions                            | <ul class="contains-task-list"><li><input type="checkbox" checked></li></ul> | N/A     | N/A                     |
| **table definition**  | Defines the SQL table for storing and accessing metrics, including the schema as part of the table name | <ul class="contains-task-list"><li><input type="checkbox" checked></li></ul> | N/A     | N/A                     |
| **query**             | An ANSI SQL query executed periodically, with results inserted into the defined metric table            | <ul class="contains-task-list"><li><input type="checkbox" checked></li></ul> | N/A     | N/A                     |
| **Management**        | Default management processes on startup for efficient memory and housekeeping. Enabled hourly           | <ul class="contains-task-list"><li><input type="checkbox" checked></li></ul> | N/A     | N/A                     |
| **truncate on start** | Truncates the table on startup if set to `true`                                                         | <ul class="contains-task-list"><li><input type="checkbox"></li></ul>         | true    | true, false             |
| **compaction**        | Removes outdated metrics according to a set period, ensuring efficient use of storage                   | <ul class="contains-task-list"><li><input type="checkbox"></li></ul>         | hourly  | N/A                     |
| **frequency**         | Defines the interval between metric compaction processes                                                | <ul class="contains-task-list"><li><input type="checkbox"></li></ul>         | hourly  | N/A                     |
| **timeUnits**         | Specifies the time units for frequency, supporting **HOURS** and **MINUTES**                            | <ul class="contains-task-list"><li><input type="checkbox"></li></ul>         | HOURS   | SECONDS, MINUTES, HOURS |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/pipelines/processing-unit.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
