# Pipelines

{% hint style="info" %}
Visit the [use case concept article](https://docs.fractalworks.io/joule/use-case-enablement) to understand what use cases are in Joule.
{% endhint %}

## Overview

At the core of the platform is the use case, **defined by users** who understand the business's specific computation and needs. This page should give an overview of how a use case is defined within the Joule platform.

It will describes a sample use case for setting up a **real-time data stream** that **monitors and analyses** stock quote data, using features like window aggregation, metrics calculations, filtering and event grouping.

## Key features of data pipelines

The following described features equip users to build, manage and refine robust data-driven pipelines tailored to their business objectives.

1. <mark style="color:green;">**Data priming**</mark>\
   Joule allows the system to be **primed at initialisation** with static contextual data and pre-calculated metrics, ensuring that initial data needs are met before processing begins. This setup is essential for **accurate data handling** and pre-population of relevant tables.
2. <mark style="color:green;">**Processing unit**</mark>\
   The platform enables **rapid and straightforward** creation of use cases, providing tools for **customisable computation and processing logic**. This flexibility allows users to define and execute business-specific workflows quickly.
3. <mark style="color:green;">**Emit**</mark>\
   Joule supports **tailored output selection**, allowing users to specify the exact fields to be included in the final output. Additionally, it provides options for **final filtering**, **streamlining the data** sent to downstream systems.
4. <mark style="color:green;">**Group by**</mark>\
   Grouping similar events enables **more efficient data aggregation** and **reduces output size**, helping optimise processing and storage requirements for downstream systems.
5. <mark style="color:green;">**Telemetry auditing**</mark>\
   With **inbound and outbound event auditing**, Joule **supports rigorous** testing, model validation and retraining processes. This enhances reliability and ongoing refinement of data models.

## Example

In this use case, we are setting up a real-time data stream pipeline called `tumblingWindowQuoteStream` to monitor and analyse stock quote data from a live source. This setup provides real-time analytics on stock quotes, capturing trends and key statistics for each stock symbol over defined time intervals.

The use case makes use of the [metrics engine](https://docs.fractalworks.io/joule/components/analytics/metrics-engine) and [tumbling window](https://docs.fractalworks.io/joule/components/analytics/analytic-tools/window-analytics/tumbling-window) before publishing a filter stream of events to a connected publisher.

{% hint style="info" %}
This example will be split up in section in each [element of the data pipeline](#types-of-elements-for-the-data-pipelines)
{% endhint %}

```yaml
stream:
  name: tumblingWindowQuoteStream
  eventTimeType: EVENT_TIME
  
  initialisation:
    sql import:
      schema: fxrates
      parquet:
        - table: quote
          asView: false
          files: [ 'fxrates.parquet' ]
          drop table: true
          index:
            fields: [ 'ccy' ]
            unique: false
            
  processing unit:
    metrics engine:
        policy:
          timeUnit: MINUTES
          frequency: 1
          startup delay: 2
  
        foreach metric compute:
          metrics:
            - name: BidMovingAverage
              metric key: symbol
              table definition: standardQuoteAnalyticsStream.BidMovingAverage (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT)                          
              query:
                SELECT symbol, 
                AVG(bid_MAX) AS mt_avg_bid_min,
                AVG(bid_MAX) AS mt_avg_bid_max
                OVER (
                PARTITION BY symbol ORDER BY eventTime ASC
                RANGE BETWEEN INTERVAL 1 SECONDS PRECEDING
                AND INTERVAL 1 SECONDS FOLLOWING)
                AS 'bid max Moving Average'
                FROM quotesStream.tumblingQuoteAnalytics_View
                ORDER BY 1;
              truncate on start: true
              compaction:
                frequency: 8
                timeUnit: HOURS
              
    pipeline:
      - filter:
          expression: "symbol != 'A'"
      - timeWindow:
          emittingEventType: tumblingQuoteAnalytics
          aggregateFunctions:
            MIN: [ask, bid]
            MAX: [ask, bid]
          policy:
            type: tumblingTime
            windowSize: 5000

  emit:
    eventType: windowQuoteEvent
    select: "symbol, ask_MIN, ask_MAX, bid_MIN, bid_MAX, BidMovingAverage.avg_bid_max;WHERE symbol=${symbol}"
    having: "symbol !='A'"

  group by:
    - symbol
```

## Types of elements for the data pipelines

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td><mark style="color:orange;"><strong>Data priming</strong></mark></td><td>Loads Joule with necessary startup data</td><td></td><td><a href="pipelines/data-priming">data-priming</a></td></tr><tr><td><mark style="color:orange;"><strong>Procession unit</strong></mark></td><td>Quickly builds custom business use cases</td><td></td><td><a href="pipelines/processing-unit">processing-unit</a></td></tr><tr><td><mark style="color:orange;"><strong>Emit</strong></mark></td><td>Selects and filters output for publishing</td><td></td><td><a href="pipelines/emit-computed-events">emit-computed-events</a></td></tr><tr><td><mark style="color:orange;"><strong>Group by</strong></mark></td><td>Aggregates events to streamline downstream data</td><td></td><td><a href="pipelines/group-by">group-by</a></td></tr><tr><td><mark style="color:orange;"><strong>Telemetry auditing</strong></mark></td><td>Tracks events for validation and testing</td><td></td><td><a href="pipelines/telemetry-auditing">telemetry-auditing</a></td></tr></tbody></table>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/pipelines.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
