Build your first use case
Filtered stream of Nasdaq major bank quotes

This is an introductory example of how to use a combination of out-of-the-box features to provide streaming enriched quotes for major banks over a $3.5 billion market capital size.
Key takeaways
The tutorial will teach you how to use Joules OOTB features to filter, enrich and publish user defined alerts to a Kafka topic and csv file.
As a first use case we will cover a number of key features:
Subscribe and consume events Subscribe, consume, parse and present events ready for pipeline processing using Kafka.
Initialise Joule with contextual data Load local CSV contextual data in to the JouleDB
Filters and enrichment Apply filter for a subset of events using Javascript expressions and apply event enrichment with company data loaded in to JouleDB
Filter results by a constriant Using the "having" clause with a Javascript expression to only send events based upon a spread ratio breach
Publishing events Send processed events to either a CSV file or on to a Kafka topic as a defined AVRO domain data structure
Resources
Getting started project can be found here.
Use case development
Define the use case objective
Provide trading consumer applications with bid and ask quotes and company information for all major banks with a market cap of over $350 billion trading on the nasdaq stock market and when the spread widens to over 1.5% for the current business day.
use case:
name: nasdaq_banking_quotes
constraints:
valid from: '2024-10-01T09:25:00.000Z'
valid to: '2024-10-01T16:35:00.000Z'
sources:
- live_nasdaq_quotes
stream name: nasdaq_major_banks_stream
sinks:
- nasdaq_major_bank_topic
- nasdaq_major_bank_quotes_file
Change the the valid from and to dates.
Define processing pipeline
The processing stream defines an initialisation step to load contextual data in to memory, processing pipeline, event emit clause and the grouping of data.
The key processing steps include:
Enrich events with industry and market cap context information
Filter events by 'Major Banks' industry and with market cap greater than $350 billion
Send a stock record with following attributes for every event; symbol, company_name, market_cap, bid, ask
Stream definition
stream:
name: nasdaq_major_banks_stream
eventTimeType: EVENT_TIME
initialisation:
# Import contextual company data in to Joule
data import:
schema: reference_data
csv:
- table: nasdaq_companies
file: data/csv/nasdaq.csv
drop table: true
index:
fields:
- symbol
unique: true
processing unit:
pipeline:
# Filter events by major banks to reduce number of enrichment queries
- filter:
expression: "(typeof industry !== 'undefined' &&
industry == 'Major Banks')"
# Enrich filtered event with company information
- enricher:
fields:
company_info:
by query:
select * from reference_data.nasdaq_companies where symbol = ?
query fields:
- symbol
with values:
- company_name
- market_cap
using: JouleDB
# Filter events by market cap size
- filter:
expression: "(typeof market_cap !== 'undefined' &&
market_cap > 3500000000)"
emit:
select: symbol, company_name, market_cap, bid, ask
# Spread trigger
having: "((bid - ask) / bid) > 0.015"
group by:
- symbol
Subscribe to data sources
We shall use the getting started data simulator by defining the source feed subscribe to live nasdaq quote data (note we are using simulated data)
Source definition
kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: joule-gs-redpanda-0:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
max.poll.records" : 7000
fetch.max.bytes : 1048576
Define output destinations
Define a validation output file
This use case example will output events to a CSV file and a Kafka topic concurrently which both require there own configuration deployments.
File
A quick and easy way to validate your use case processing is to send the resulting events to a CSV file.
file:
name: nasdaq_major_bank_quotes_file
filename: nasdaq_major_banks
path: "./data/output/test_output"
batchSize: 1024
timeout: 1000
formatter:
csv formatter:
contentType: text/csv
encoding: UTF-8
delimiter: "|"
Kafka sink
The same emitted events will also be sent on to a Kafka topic ready for downstream trading applications to consume and continue processing.
A quick recap of how events will be transformed to AVRO data structures:
The user emit projection is transformed to provided domain data type using an AVRO schema, see below.
The resulting events are then published on to the
nasdaq_major_bank_quotes
Kafka topic.
Kafka Definition
kafkaPublisher:
name: nasdaq_major_bank_topic
cluster address: joule-gs-redpanda-0:9092
topic: nasdaq_major_bank_quotes
partitionKeys:
- symbol
serializer:
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
avro setting:
local schema: ./conf/avro/stockrecord.avsc
AVRO schema
{
"type" : "record",
"name" : "StockRecord",
"namespace" : "com.fractalworks.examples.banking.data",
"fields" : [
{"name" : "symbol", "type" : "string"},
{"name" : "company_name", "type" : "string"},
{"name" : "market_cap", "type" : "long"},
{"name" : "bid", "type" : "double"},
{"name" : "ask", "type" : "double"}
]
}
Reviewing results
First take a look at the generated CSV file by getting the first six lines of the file:
head -6 data/output/test_output/nasdaq_major_banks.csv
The command should return output similar to the below:
event_type|sub_type|event_time|ingest_time|symbol|company_name|market_cap|bid|ask
nasdaq_view|null|1733761988063|1733761988063|STL|Sterling Bancorp|4.535713421E9|23.907325574219808|23.192674425780194
nasdaq_view|null|1733761988066|1733761988066|CFG|Citizens Financial Group Inc. Common Stock|1.9298895504E10|45.70824817841169|44.91175182158832
nasdaq_view|null|1733761988068|1733761988068|UBS|UBS Group AG Registered Ordinary Shares|5.043039879E10|15.137171051621275|14.822828948378726
nasdaq_view|null|1733761988070|1733761988070|SFBS|ServisFirst Bancshares Inc. Common Stock|3.642322843E9|67.87892304294589|66.64107695705412
nasdaq_view|null|1733761988070|1733761988070|EBC|Eastern Bankshares Inc. Common Stock|3.695943868E9|20.38801925014015|19.191980749859848
Summary
That's it, you should now have an understanding how the components fit together to form a single use case.
To recap this example covers a number of key features:
Filter
Javascript expression filter.
Enrichment
Add contextual data to streaming events.
Output projection
Define an output projection that matches a AVRO schema attribute requirements.
Having clause
Define a Javascript analytic expression that sends alerts only when a specified condition is met.
File validation
Publish events to CSV file to validate results
Last updated
Was this helpful?