# Pipelines

{% hint style="info" %}
Visit the [use case concept article](https://docs.fractalworks.io/joule/use-case-enablement) to understand what use cases are in Joule.
{% endhint %}

## Overview

At the core of the platform is the use case, **defined by users** who understand the business's specific computation and needs. This page should give an overview of how a use case is defined within the Joule platform.

It will describes a sample use case for setting up a **real-time data stream** that **monitors and analyses** stock quote data, using features like window aggregation, metrics calculations, filtering and event grouping.

## Key features of data pipelines

The following described features equip users to build, manage and refine robust data-driven pipelines tailored to their business objectives.

1. <mark style="color:green;">**Data priming**</mark>\
   Joule allows the system to be **primed at initialisation** with static contextual data and pre-calculated metrics, ensuring that initial data needs are met before processing begins. This setup is essential for **accurate data handling** and pre-population of relevant tables.
2. <mark style="color:green;">**Processing unit**</mark>\
   The platform enables **rapid and straightforward** creation of use cases, providing tools for **customisable computation and processing logic**. This flexibility allows users to define and execute business-specific workflows quickly.
3. <mark style="color:green;">**Emit**</mark>\
   Joule supports **tailored output selection**, allowing users to specify the exact fields to be included in the final output. Additionally, it provides options for **final filtering**, **streamlining the data** sent to downstream systems.
4. <mark style="color:green;">**Group by**</mark>\
   Grouping similar events enables **more efficient data aggregation** and **reduces output size**, helping optimise processing and storage requirements for downstream systems.
5. <mark style="color:green;">**Telemetry auditing**</mark>\
   With **inbound and outbound event auditing**, Joule **supports rigorous** testing, model validation and retraining processes. This enhances reliability and ongoing refinement of data models.

## Example

In this use case, we are setting up a real-time data stream pipeline called `tumblingWindowQuoteStream` to monitor and analyse stock quote data from a live source. This setup provides real-time analytics on stock quotes, capturing trends and key statistics for each stock symbol over defined time intervals.

The use case makes use of the [metrics engine](https://docs.fractalworks.io/joule/components/analytics/metrics-engine) and [tumbling window](https://docs.fractalworks.io/joule/components/analytics/analytic-tools/window-analytics/tumbling-window) before publishing a filter stream of events to a connected publisher.

{% hint style="info" %}
This example will be split up in section in each [element of the data pipeline](#types-of-elements-for-the-data-pipelines)
{% endhint %}

```yaml
stream:
  name: tumblingWindowQuoteStream
  eventTimeType: EVENT_TIME
  
  initialisation:
    sql import:
      schema: fxrates
      parquet:
        - table: quote
          asView: false
          files: [ 'fxrates.parquet' ]
          drop table: true
          index:
            fields: [ 'ccy' ]
            unique: false
            
  processing unit:
    metrics engine:
        policy:
          timeUnit: MINUTES
          frequency: 1
          startup delay: 2
  
        foreach metric compute:
          metrics:
            - name: BidMovingAverage
              metric key: symbol
              table definition: standardQuoteAnalyticsStream.BidMovingAverage (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT)                          
              query:
                SELECT symbol, 
                AVG(bid_MAX) AS mt_avg_bid_min,
                AVG(bid_MAX) AS mt_avg_bid_max
                OVER (
                PARTITION BY symbol ORDER BY eventTime ASC
                RANGE BETWEEN INTERVAL 1 SECONDS PRECEDING
                AND INTERVAL 1 SECONDS FOLLOWING)
                AS 'bid max Moving Average'
                FROM quotesStream.tumblingQuoteAnalytics_View
                ORDER BY 1;
              truncate on start: true
              compaction:
                frequency: 8
                timeUnit: HOURS
              
    pipeline:
      - filter:
          expression: "symbol != 'A'"
      - timeWindow:
          emittingEventType: tumblingQuoteAnalytics
          aggregateFunctions:
            MIN: [ask, bid]
            MAX: [ask, bid]
          policy:
            type: tumblingTime
            windowSize: 5000

  emit:
    eventType: windowQuoteEvent
    select: "symbol, ask_MIN, ask_MAX, bid_MIN, bid_MAX, BidMovingAverage.avg_bid_max;WHERE symbol=${symbol}"
    having: "symbol !='A'"

  group by:
    - symbol
```

## Types of elements for the data pipelines

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td><mark style="color:orange;"><strong>Data priming</strong></mark></td><td>Loads Joule with necessary startup data</td><td></td><td><a href="pipelines/data-priming">data-priming</a></td></tr><tr><td><mark style="color:orange;"><strong>Procession unit</strong></mark></td><td>Quickly builds custom business use cases</td><td></td><td><a href="pipelines/processing-unit">processing-unit</a></td></tr><tr><td><mark style="color:orange;"><strong>Emit</strong></mark></td><td>Selects and filters output for publishing</td><td></td><td><a href="pipelines/emit-computed-events">emit-computed-events</a></td></tr><tr><td><mark style="color:orange;"><strong>Group by</strong></mark></td><td>Aggregates events to streamline downstream data</td><td></td><td><a href="pipelines/group-by">group-by</a></td></tr><tr><td><mark style="color:orange;"><strong>Telemetry auditing</strong></mark></td><td>Tracks events for validation and testing</td><td></td><td><a href="pipelines/telemetry-auditing">telemetry-auditing</a></td></tr></tbody></table>
