Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Filtered stream of Nasdaq major bank quote analytics
  • Resources
  • Key takeaways
  • Use case development
  • Summary

Was this helpful?

  1. Tutorials

Stream sliding window quote analytics

PreviousBuild your first use caseNextAdvanced tutorials

Last updated 14 days ago

Was this helpful?

Filtered stream of Nasdaq major bank quote analytics

Provide trading consumer application with quotes analytics for all major banks trading on the nasdaq stock market for the current business week

Resources

Key takeaways

The tutorial will teach you how to use Joules OOTB features to filter, perform sliding window analytics and publish AVRO formatted events to a Kafka topic and csv file.

As a first process we have covered a number of key features:

  • Subscribe and consume events Subscribe, consume, parse and present events ready for pipeline processing using Kafka.

  • Event filtering Apply filter for a subset of events using Javascript expressions.

  • Sliding Window Analytics Define a set of analytics grouped by symbol to be executed over a sliding window of events.

  • Publishing events Send processed events to a persistent Parquet file and to a Kafka topic using a defined AVRO domain data structure.

Use case development

1

Define the use case objective

Provide trading consumer application with quotes analytics for all major banks trading on the nasdaq stock market for the current business week.

Additionally:

  • The use case should only be processing for a single defined market business week.

  • Events to be sent to a Kafka topic and a persistent parquet file using the same data format.

use case:
  name: nasdaq_banking_quote_analytics
  constraints:
    valid from: '2024-10-01T09:25:00.000Z'
    valid to: '2024-10-05T16:35:00.000Z'
  sources:
    - live_nasdaq_quotes
  stream name: nasdaq_major_banks_analytics_stream
  sinks:
    - nasdaq_major_bank_analytics_topic
    - nasdaq_major_bank_analytics_parquetfile

Change the the valid from and to dates.

2

Define processing pipeline

This use case jumps in to Joule's analytic window features:

  • Filter events by 'Major Banks' industry

  • Apply an analytic sliding time window to calculate aggregate functions and window functions. Window definition: Analytics calculated using a 500ms sliding window over a total window size of 2.5 seconds.

  • Send a quote analytics record with following attributes for every event; symbol, ask_EMA, bid_EMA, volume_SUM, volatility_MEAN, ask_MINMAX_NORM, bid_MINMAX_NORM, ask_ZSCORE and bid_ZSCORE.

Stream definition

stream:
  name: nasdaq_major_banks_analytics_stream
  eventTimeType: EVENT_TIME

  processing unit:
    pipeline:
      # Filter events by major banks to reduce number of enrichment queries
      - filter:
          expression: "(typeof industry !== 'undefined' && 
                        industry == 'Major Banks')"
  
      - time window:
          emitting type: slidingQuoteAnalytics
          aggregate functions:
            SUM: [ volume ]
            MEAN: [ volatility ]
          window functions:
            # exponential moving average
            ema rates:
              function:
                exponential moving average:
                  parameters:
                    smoothing factor: 0.001996007984032
              attributes: [ ask,bid ]

            # minmax
            ranges:
              function:
                minmax norm: {}
              attributes: [ ask,bid ]

            # zscore
            norms:
              function:
                zscore: {}
              attributes: [ ask,bid ]

          policy:
            type: slidingTime
            slide: 500
            window size: 2500

  emit:
    select: "event_time, symbol, ask_EMA, bid_EMA, 
              volume_SUM, volatility_MEAN, 
              ask_MINMAX_NORM, bid_MINMAX_NORM, 
              ask_ZSCORE, bid_ZSCORE"

  group by:
    - symbol

3

Subscribe to data sources

We shall use the getting started data simulator by defining the source feed subscribe to live nasdaq quote data (note we are using simulated data)

Source definition

kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: joule-gs-redpanda-0:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
      max.poll.records" : 7000
      fetch.max.bytes : 1048576

4

Define output destinations

Parquet file output

A quick and easy way to validate your use case processing is to send the resulting events to a parquet file.

file:
  name: nasdaq_major_bank_analytics_parquetfile
  filename: nasdaq_major_banks
  path: "./data/output/test_output"
  batchSize: 5120
  timeout: 1000
  formatter:
    parquet formatter:
      schema: conf/avro/quote_analytics.avsc
      compression codec: SNAPPY
      temp directory: data/tmp
      disable CRC: false
      parsing threads : 2

Avro Schema

{
  "type" : "record",
  "name" : "QuoteAnalytic",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "event_time", "type" : "long"},
    {"name" : "symbol", "type" : "string"},
    {"name" : "ask_EMA", "type" : "double"},
    {"name" : "bid_EMA", "type" : "double"},
    {"name" : "volume_SUM", "type" : "double"},
    {"name" : "volatility_MEAN", "type" : "double"},
    {"name" : "ask_MINMAX_NORM", "type" : "double"},
    {"name" : "bid_MINMAX_NORM", "type" : "double"},
    {"name" : "ask_ZSCORE", "type" : "double"},
    {"name" : "bid_ZSCORE", "type" : "double"}
  ]
}

Publish events to consumers

  1. The user emit projection is transformed to provided domain data type using the same AVRO schema definition used for Parquet file output, see above.

  2. The resulting events are then published on to the nasdaq_major_bank_quote_analytics Kafka topic.

A quick recap of how events will be transformed to AVRO data structures:

The same events published to parquet file are published using the same AVRO domain schema on to a Kafka consumer topic.

Sink Definition

kafkaPublisher:
  name: nasdaq_major_bank_analytics_topic
  cluster address: joule-gs-redpanda-0:9092
  topic: nasdaq_major_bank_quote_analytics
  partitionKeys:
    - symbol

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    avro setting:
      local schema: conf/avro/quote_analytics.avsc

5

Deploying the use case

Now we have all the use case definitions we can now deploy to Joule via the Rest API using Postman. Following the same getting started deployment steps for this project.

Go to the "Build your first use case" folder under the Joule - Banking demo / Tutorials Postman examples within the getting started project

6

Review parquet file contents

Open up your favorite parquet view to review the output.

Example tools

  • IntelliJ Parquet viewer

  • PyPI parquet-tools

  • Visual Studio parquet-viewer

Summary

This example covers a number of key features:

  • Analytics

  • Filter

  • Kafka

  • Parquet

    • Parquet files are generated using a provided AVRO schema

Getting started project can be found .

Javascript expression

and

here
Sliding window
Aggregate functions
Exponential mean average
Min max
Z-Score
filter
Source
sinks using Avro
Sliding window analytics