Stream sliding window quote analytics
Filtered stream of Nasdaq major bank quote analytics
Provide trading consumer application with quotes analytics for all major banks trading on the nasdaq stock market for the current business week

Resources
Getting started project can be found here.
Key takeaways
The tutorial will teach you how to use Joules OOTB features to filter, perform sliding window analytics and publish AVRO formatted events to a Kafka topic and csv file.
As a first process we have covered a number of key features:
Subscribe and consume events Subscribe, consume, parse and present events ready for pipeline processing using Kafka.
Event filtering Apply filter for a subset of events using Javascript expressions.
Sliding Window Analytics Define a set of analytics grouped by symbol to be executed over a sliding window of events.
Publishing events Send processed events to a persistent Parquet file and to a Kafka topic using a defined AVRO domain data structure.
Use case development
Define the use case objective
Provide trading consumer application with quotes analytics for all major banks trading on the nasdaq stock market for the current business week.
Additionally:
The use case should only be processing for a single defined market business week.
Events to be sent to a Kafka topic and a persistent parquet file using the same data format.
use case:
name: nasdaq_banking_quote_analytics
constraints:
valid from: '2024-10-01T09:25:00.000Z'
valid to: '2024-10-05T16:35:00.000Z'
sources:
- live_nasdaq_quotes
stream name: nasdaq_major_banks_analytics_stream
sinks:
- nasdaq_major_bank_analytics_topic
- nasdaq_major_bank_analytics_parquetfile
Define processing pipeline
This use case jumps in to Joule's analytic window features:
Filter events by 'Major Banks' industry
Apply an analytic sliding time window to calculate aggregate functions and window functions. Window definition: Analytics calculated using a 500ms sliding window over a total window size of 2.5 seconds.
Send a quote analytics record with following attributes for every event; symbol, ask_EMA, bid_EMA, volume_SUM, volatility_MEAN, ask_MINMAX_NORM, bid_MINMAX_NORM, ask_ZSCORE and bid_ZSCORE.
Stream definition
stream:
name: nasdaq_major_banks_analytics_stream
eventTimeType: EVENT_TIME
processing unit:
pipeline:
# Filter events by major banks to reduce number of enrichment queries
- filter:
expression: "(typeof industry !== 'undefined' &&
industry == 'Major Banks')"
- time window:
emitting type: slidingQuoteAnalytics
aggregate functions:
SUM: [ volume ]
MEAN: [ volatility ]
window functions:
# exponential moving average
ema rates:
function:
exponential moving average:
parameters:
smoothing factor: 0.001996007984032
attributes: [ ask,bid ]
# minmax
ranges:
function:
minmax norm: {}
attributes: [ ask,bid ]
# zscore
norms:
function:
zscore: {}
attributes: [ ask,bid ]
policy:
type: slidingTime
slide: 500
window size: 2500
emit:
select: "event_time, symbol, ask_EMA, bid_EMA,
volume_SUM, volatility_MEAN,
ask_MINMAX_NORM, bid_MINMAX_NORM,
ask_ZSCORE, bid_ZSCORE"
group by:
- symbol
Subscribe to data sources
We shall use the getting started data simulator by defining the source feed subscribe to live nasdaq quote data (note we are using simulated data)
Source definition
kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: joule-gs-redpanda-0:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
max.poll.records" : 7000
fetch.max.bytes : 1048576
Define output destinations
Parquet file output
A quick and easy way to validate your use case processing is to send the resulting events to a parquet file.
file:
name: nasdaq_major_bank_analytics_parquetfile
filename: nasdaq_major_banks
path: "./data/output/test_output"
batchSize: 5120
timeout: 1000
formatter:
parquet formatter:
schema: conf/avro/quote_analytics.avsc
compression codec: SNAPPY
temp directory: data/tmp
disable CRC: false
parsing threads : 2
Avro Schema
{
"type" : "record",
"name" : "QuoteAnalytic",
"namespace" : "com.fractalworks.examples.banking.data",
"fields" : [
{"name" : "event_time", "type" : "long"},
{"name" : "symbol", "type" : "string"},
{"name" : "ask_EMA", "type" : "double"},
{"name" : "bid_EMA", "type" : "double"},
{"name" : "volume_SUM", "type" : "double"},
{"name" : "volatility_MEAN", "type" : "double"},
{"name" : "ask_MINMAX_NORM", "type" : "double"},
{"name" : "bid_MINMAX_NORM", "type" : "double"},
{"name" : "ask_ZSCORE", "type" : "double"},
{"name" : "bid_ZSCORE", "type" : "double"}
]
}
Publish events to consumers
The user emit projection is transformed to provided domain data type using the same AVRO schema definition used for Parquet file output, see above.
The resulting events are then published on to the
nasdaq_major_bank_quote_analytics
Kafka topic.
A quick recap of how events will be transformed to AVRO data structures:
The same events published to parquet file are published using the same AVRO domain schema on to a Kafka consumer topic.
Sink Definition
kafkaPublisher:
name: nasdaq_major_bank_analytics_topic
cluster address: joule-gs-redpanda-0:9092
topic: nasdaq_major_bank_quote_analytics
partitionKeys:
- symbol
serializer:
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
avro setting:
local schema: conf/avro/quote_analytics.avsc
Summary
This example covers a number of key features:
Filter
Javascript expression filter
Kafka
Parquet
Parquet files are generated using a provided AVRO schema
Last updated
Was this helpful?