Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Filtered stream of Nasdaq major bank quotes
  • Key takeaways
  • Resources
  • Use case development
  • Summary

Was this helpful?

  1. Tutorials

Build your first use case

PreviousGetting startedNextStream sliding window quote analytics

Last updated 4 months ago

Was this helpful?

Filtered stream of Nasdaq major bank quotes

This is an introductory example of how to use a combination of out-of-the-box features to provide streaming enriched quotes for major banks over a $3.5 billion market capital size.

Key takeaways

The tutorial will teach you how to use Joules OOTB features to filter, enrich and publish user defined alerts to a Kafka topic and csv file.

As a first use case we will cover a number of key features:

  • Subscribe and consume events Subscribe, consume, parse and present events ready for pipeline processing using Kafka.

  • Initialise Joule with contextual data Load local CSV contextual data in to the JouleDB

  • Filters and enrichment Apply filter for a subset of events using Javascript expressions and apply event enrichment with company data loaded in to JouleDB

  • Filter results by a constriant Using the "having" clause with a Javascript expression to only send events based upon a spread ratio breach

  • Publishing events Send processed events to either a CSV file or on to a Kafka topic as a defined AVRO domain data structure

Resources

Use case development

1

Define the use case objective

Provide trading consumer applications with bid and ask quotes and company information for all major banks with a market cap of over $350 billion trading on the nasdaq stock market and when the spread widens to over 1.5% for the current business day.

use case:
  name: nasdaq_banking_quotes
  constraints:
    valid from: '2024-10-01T09:25:00.000Z'
    valid to: '2024-10-01T16:35:00.000Z'
  sources:
    - live_nasdaq_quotes
  stream name: nasdaq_major_banks_stream
  sinks:
    - nasdaq_major_bank_topic
    - nasdaq_major_bank_quotes_file

Change the the valid from and to dates.

2

Define processing pipeline

The processing stream defines an initialisation step to load contextual data in to memory, processing pipeline, event emit clause and the grouping of data.

The key processing steps include:

  1. Enrich events with industry and market cap context information

  2. Filter events by 'Major Banks' industry and with market cap greater than $350 billion

  3. Send a stock record with following attributes for every event; symbol, company_name, market_cap, bid, ask

Stream definition

stream:
  name: nasdaq_major_banks_stream
  eventTimeType: EVENT_TIME
  
  initialisation:
    # Import contextual company data in to Joule
    data import:
      schema: reference_data
      csv:
      - table: nasdaq_companies
        file: data/csv/nasdaq.csv
        drop table: true
        index:
          fields:
          - symbol
          unique: true
          
  processing unit:
    pipeline:
    # Filter events by major banks to reduce number of enrichment queries
    - filter:
        expression: "(typeof industry !== 'undefined' && 
                      industry == 'Major Banks')"
    
    # Enrich filtered event with company information
    - enricher:
        fields:
          company_info:
            by query: 
               select * from reference_data.nasdaq_companies where symbol = ?
            query fields:
            - symbol
            with values:
            - company_name
            - market_cap
            using: JouleDB
            
    # Filter events by market cap size
    - filter:
        expression: "(typeof market_cap !== 'undefined' &&
                      market_cap > 3500000000)"
  emit:
    select: symbol, company_name, market_cap, bid, ask
    
    # Spread trigger
    having: "((bid - ask) / bid) > 0.015"
    
  group by:
  - symbol
3

Subscribe to data sources

We shall use the getting started data simulator by defining the source feed subscribe to live nasdaq quote data (note we are using simulated data)

Source definition

kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: joule-gs-redpanda-0:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
      max.poll.records" : 7000
      fetch.max.bytes : 1048576
4

Define output destinations

Define a validation output file

This use case example will output events to a CSV file and a Kafka topic concurrently which both require there own configuration deployments.

File

A quick and easy way to validate your use case processing is to send the resulting events to a CSV file.

file:
  name: nasdaq_major_bank_quotes_file
  filename: nasdaq_major_banks
  path: "./data/output/test_output"
  batchSize: 1024
  timeout: 1000
  formatter:
    csv formatter:
      contentType: text/csv
      encoding: UTF-8
      delimiter: "|"

Kafka sink

The same emitted events will also be sent on to a Kafka topic ready for downstream trading applications to consume and continue processing.

A quick recap of how events will be transformed to AVRO data structures:

  1. The user emit projection is transformed to provided domain data type using an AVRO schema, see below.

  2. The resulting events are then published on to the nasdaq_major_bank_quotes Kafka topic.

Kafka Definition

kafkaPublisher:
  name: nasdaq_major_bank_topic
  cluster address: joule-gs-redpanda-0:9092
  topic: nasdaq_major_bank_quotes
  partitionKeys:
    - symbol

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    avro setting:
      local schema: ./conf/avro/stockrecord.avsc

AVRO schema

{
  "type" : "record",
  "name" : "StockRecord",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "company_name", "type" : "string"},
    {"name" : "market_cap", "type" : "long"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"}
    ]
}

5

Deploying the use case

Now we have all the use case definitions we can now deploy to Joule via the Rest API using Postman. Following the same getting started deployment steps for this project.

Go to the "Build your first use case" folder under the Joule - Banking demo / Tutorials Postman examples within the getting started project

6

Reviewing results

First take a look at the generated CSV file by getting the first six lines of the file:

head -6 data/output/test_output/nasdaq_major_banks.csv

The command should return output similar to the below:

event_type|sub_type|event_time|ingest_time|symbol|company_name|market_cap|bid|ask
nasdaq_view|null|1733761988063|1733761988063|STL|Sterling Bancorp|4.535713421E9|23.907325574219808|23.192674425780194
nasdaq_view|null|1733761988066|1733761988066|CFG|Citizens Financial Group Inc. Common Stock|1.9298895504E10|45.70824817841169|44.91175182158832
nasdaq_view|null|1733761988068|1733761988068|UBS|UBS Group AG Registered Ordinary Shares|5.043039879E10|15.137171051621275|14.822828948378726
nasdaq_view|null|1733761988070|1733761988070|SFBS|ServisFirst Bancshares Inc. Common Stock|3.642322843E9|67.87892304294589|66.64107695705412
nasdaq_view|null|1733761988070|1733761988070|EBC|Eastern Bankshares Inc. Common Stock|3.695943868E9|20.38801925014015|19.191980749859848
7

Minor sink refactoring

Once you are satisfied with the results you can remove nasdaq_major_bank_quotes_file reference from the use case definition file.

Summary

That's it, you should now have an understanding how the components fit together to form a single use case.

To recap this example covers a number of key features:

  • Filter

  • Enrichment

    • Add contextual data to streaming events.

  • Output projection

  • Having clause

    • Define a Javascript analytic expression that sends alerts only when a specified condition is met.

  • Kafka

    • Publish events using using Avro binary data format.

  • File validation

Getting started project can be found .

Javascript expression .

Define an output that matches a AVRO schema attribute requirements.

Subscribe, consume and publish events Kafka s and connectors.

Publish events to to validate results

here
filter
projection
ource
sink
CSV file
Overview of use case processing sequence