# Build your first use case

## Filtered stream of Nasdaq major bank quotes

<figure><img src="https://3062398388-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FUU6FZlV07ZD90OzbzGww%2Fuploads%2FTXmpe2NkhI6lk7j4IbSr%2FJoule%20-%20Build%20your%20first%20use%20case.jpg?alt=media&#x26;token=6bf96574-6064-49bc-8f89-3a0ed09e4d90" alt=""><figcaption><p>Overview of use case processing sequence</p></figcaption></figure>

This is an introductory example of how to use a combination of out-of-the-box features to provide streaming enriched quotes for major banks over a $3.5 billion market capital size.

## Key takeaways

The tutorial will teach you how to use Joules OOTB features to filter, enrich and publish user defined alerts to a Kafka topic and csv file.

As a first use case we will cover a number of key features:

* <mark style="color:green;">**Subscribe and consume events**</mark>\
  Subscribe, consume, parse and present events ready for pipeline processing using Kafka.
* <mark style="color:green;">**Initialise Joule with contextual data**</mark>\
  Load local CSV contextual data in to the JouleDB
* <mark style="color:green;">**Filters and enrichment**</mark>\
  Apply filter for a subset of events using Javascript expressions and apply event enrichment with company data loaded in to JouleDB&#x20;
* <mark style="color:green;">**Filter results by a constriant**</mark>\
  Using the "having" clause with a Javascript expression to only send events based upon a spread ratio breach
* <mark style="color:green;">**Publishing events**</mark>\
  Send processed events to either a CSV file or on to a Kafka topic as a defined AVRO domain data structure

## Resources

Getting started project can be found [<mark style="color:green;">here</mark>](https://gitlab.com/joule-platform/fractalworks-stream-gettingstarted)<mark style="color:green;">.</mark>

## Use case development

{% stepper %}
{% step %}

### Define the use case objective

Provide trading consumer applications with **bid and ask quotes and company information** for all major banks with a market cap of over $350 billion trading on the nasdaq stock market and when the spread widens to over 1.5% for the current business day.

```yaml
use case:
  name: nasdaq_banking_quotes
  constraints:
    valid from: '2024-10-01T09:25:00.000Z'
    valid to: '2024-10-01T16:35:00.000Z'
  sources:
    - live_nasdaq_quotes
  stream name: nasdaq_major_banks_stream
  sinks:
    - nasdaq_major_bank_topic
    - nasdaq_major_bank_quotes_file
```

{% hint style="danger" %}
**Change** the the valid **from** and **to** dates.&#x20;
{% endhint %}
{% endstep %}

{% step %}

### Define processing pipeline

The processing stream defines an initialisation step to load contextual data in to memory, processing pipeline, event emit clause and the grouping of data.

The key processing steps include:

1. Enrich events with industry and market cap context information
2. Filter events by 'Major Banks' industry and with market cap greater than $350 billion
3. Send a stock record with following attributes for every event; symbol, company\_name, market\_cap, bid, ask

#### Stream definition

```yaml
stream:
  name: nasdaq_major_banks_stream
  eventTimeType: EVENT_TIME
  
  initialisation:
    # Import contextual company data in to Joule
    data import:
      schema: reference_data
      csv:
      - table: nasdaq_companies
        file: data/csv/nasdaq.csv
        drop table: true
        index:
          fields:
          - symbol
          unique: true
          
  processing unit:
    pipeline:
    # Filter events by major banks to reduce number of enrichment queries
    - filter:
        expression: "(typeof industry !== 'undefined' && 
                      industry == 'Major Banks')"
    
    # Enrich filtered event with company information
    - enricher:
        fields:
          company_info:
            by query: 
               select * from reference_data.nasdaq_companies where symbol = ?
            query fields:
            - symbol
            with values:
            - company_name
            - market_cap
            using: JouleDB
            
    # Filter events by market cap size
    - filter:
        expression: "(typeof market_cap !== 'undefined' &&
                      market_cap > 3500000000)"
  emit:
    select: symbol, company_name, market_cap, bid, ask
    
    # Spread trigger
    having: "((bid - ask) / bid) > 0.015"
    
  group by:
  - symbol
```

{% endstep %}

{% step %}

### Subscribe to data sources

We shall use the getting started data simulator by defining the source feed  subscribe to live nasdaq quote data (note we are using simulated data)

#### Source definition

```yaml
kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: joule-gs-redpanda-0:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
      max.poll.records" : 7000
      fetch.max.bytes : 1048576
```

{% endstep %}

{% step %}

### Define output destinations&#x20;

### Define a validation output file

This use case example will output events to a CSV file and a Kafka topic concurrently which both require there own configuration deployments.

#### File

A quick and easy way to validate your use case processing is to send the resulting events to a CSV file.&#x20;

```yaml
file:
  name: nasdaq_major_bank_quotes_file
  filename: nasdaq_major_banks
  path: "./data/output/test_output"
  batchSize: 1024
  timeout: 1000
  formatter:
    csv formatter:
      contentType: text/csv
      encoding: UTF-8
      delimiter: "|"
```

### Kafka sink

The same emitted events will also be sent on to a Kafka topic ready for downstream trading applications to consume and continue processing.

A quick recap of how events will be transformed to AVRO data structures:

1. The user emit projection is transformed to provided domain data type using an AVRO schema, see below.&#x20;
2. The resulting events are then published on to the `nasdaq_major_bank_quotes` Kafka topic.

#### Kafka Definition

```yaml
kafkaPublisher:
  name: nasdaq_major_bank_topic
  cluster address: joule-gs-redpanda-0:9092
  topic: nasdaq_major_bank_quotes
  partitionKeys:
    - symbol

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    avro setting:
      local schema: ./conf/avro/stockrecord.avsc
```

#### AVRO schema

```avro-idl
{
  "type" : "record",
  "name" : "StockRecord",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "company_name", "type" : "string"},
    {"name" : "market_cap", "type" : "long"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"}
    ]
}
```

{% endstep %}

{% step %}

### Deploying the use case

Now we have all the use case definitions we can now deploy to Joule via the Rest API using Postman. Following the same getting started deployment steps for this project.

{% hint style="info" %}
Go to the "Build your first use case" folder under the Joule - Banking demo / Tutorials Postman examples within the getting started project
{% endhint %}
{% endstep %}

{% step %}

### Reviewing results

First take a look at the generated CSV file by getting the first six lines of the file:

```bash
head -6 data/output/test_output/nasdaq_major_banks.csv
```

The command should return output similar to the below:

```csv
event_type|sub_type|event_time|ingest_time|symbol|company_name|market_cap|bid|ask
nasdaq_view|null|1733761988063|1733761988063|STL|Sterling Bancorp|4.535713421E9|23.907325574219808|23.192674425780194
nasdaq_view|null|1733761988066|1733761988066|CFG|Citizens Financial Group Inc. Common Stock|1.9298895504E10|45.70824817841169|44.91175182158832
nasdaq_view|null|1733761988068|1733761988068|UBS|UBS Group AG Registered Ordinary Shares|5.043039879E10|15.137171051621275|14.822828948378726
nasdaq_view|null|1733761988070|1733761988070|SFBS|ServisFirst Bancshares Inc. Common Stock|3.642322843E9|67.87892304294589|66.64107695705412
nasdaq_view|null|1733761988070|1733761988070|EBC|Eastern Bankshares Inc. Common Stock|3.695943868E9|20.38801925014015|19.191980749859848
```

{% endstep %}

{% step %}

### Minor sink refactoring

Once you are satisfied with the results you can remove `nasdaq_major_bank_quotes_file` reference from the use case definition file.
{% endstep %}
{% endstepper %}

## Summary

That's it, you should now have an understanding how the components fit together to form a single use case.

To recap this example covers a number of key features:

* <mark style="color:green;">**Filter**</mark>&#x20;
  * Javascript expression [filter](https://docs.fractalworks.io/joule/components/processors/filters).
* <mark style="color:green;">**Enrichment**</mark>
  * Add contextual data to streaming events.
* <mark style="color:green;">**Output projection**</mark>
  * Define an output [projection](https://docs.fractalworks.io/joule/components/pipelines/emit-computed-events) that matches a AVRO schema attribute requirements.
* <mark style="color:green;">**Having clause**</mark>
  * Define a Javascript analytic expression that sends alerts only when a specified condition is met.&#x20;
* <mark style="color:green;">**Kafka**</mark>
  * Subscribe, consume and publish events Kafka s[ource](https://docs.fractalworks.io/joule/components/connectors/sources/kafka) and [sink](https://docs.fractalworks.io/joule/components/connectors/sinks/kafka) connectors.
  * Publish events using using Avro binary data format.
* <mark style="color:green;">**File validation**</mark>
  * Publish events to [CSV file](https://docs.fractalworks.io/joule/components/connectors/sinks/custom-connectors/file) to validate results
