# Banking example

## Objective

The below examples aims to showcase how an existing stream pipeline can be enriched with contextual data.

We will apply a query named `company_info` to look up data and enrich simultaneously the company quote using the function `BidMovingAverage`.

{% hint style="info" %}
Review the [Anatomy of enrichment](https://docs.fractalworks.io/joule/components/processors/enrichment/key-concepts/anatomy-of-enrichment-dsl) to understand how an enricher is build.
{% endhint %}

## **Initialisation and pre-processing stage**

1. Import company reference data at startup using a parquet file.&#x20;
2. Deploy metrics definition to Metrics engine. Company quote metrics are calculated after 1 minute using a three minute tumbling window.&#x20;
3. Group processing by company nasdaq `symbol` field.

## **Stream processing**

1. Save events in to the quotes internal in-memory database every 5 seconds and recreates non-unique index.
2. Enrichment happens:
   1. Add `company_info` to the event using a query with the event symbol field as the look up value.
   2. Add company quote metrics using the `BidMovingAverage` metrics family and the event symbol value.
3. Publish aggregated view of company quote metrics along with basic company information.

```yaml
stream:
  name: quoteAnalyticsStream
  eventTimeType: EVENT_TIME

  initialisation:
    sql import:
      schema: contextual_data
      parquet:
        - table: nasdaq_companies
          asView: false
          files: [ 'data/parquet/nasdaq.parquet' ]
          index:
            fields: [ 'symbol' ]
            unique: true

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          - name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: true
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from contextual_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
              
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
              
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol
```

## Expected output

The previous example generates the following output, in this case a "|" separated`CSV`.

```csv
event_type|sub_type|event_time|ingest_time|symbol|avg_bid_min|avg_bid_avg|avg_bid_max|company_name|industry
nasdaq_View|null|1709894745240|1709894745240|MTZ|113.97318|103.25833|110.11051|MasTec Inc. Common Stock|Basic Industries
nasdaq_View|null|1709894745244|1709894745244|SBRA|18.71446|13.90475|16.472204|Sabra Health Care REIT Inc. Common Stock|Consumer Services
nasdaq_View|null|1709894745240|1709894745240|MUA|17.244429|14.651974|15.772599|Blackrock MuniAssets Fund Inc Common Stock|Finance
```
