Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Using the select expression
  • Query
  • Source event
  • Output event
  • Using metrics
  • Example
  • Attributes schema
  • having clause
  • retain
  • Attributes schema

Was this helpful?

  1. Components
  2. Pipelines

Emit computed events

Select and filter output events for publishing

Overview

Publish event query projections to downstream systems, with options to select, refine and calculate event attributes and filter the final projected event.

The following function can be applied to the projection expression

  1. Event attribute selection Select event attributes to be included in the final results.

  2. Add metric data Apply calculated metrics within the projection.

  3. Apply simple math functions Use simple math function on the selected attributes.

  4. Filter resulting projected data Use the having filter to narrow the result set, similar to the SQL having clause.

  5. Attribute renaming using an alias Rename attributes to ease system integrations.

Using the select expression

To bring this to life, we shall use the following select query and final stream event that will be queried.

Query

stream:
  ...
  emit:
    eventType: buy_alert
    select:
        "
        venue.name 'venue',
        symbol, 
        ask,
        bid,
        ask - bid 'spread',
        details.*,
        BidMovingAverage.avg_ask|avg_bid;where symbol = {symbol}
        "
    having: "symbol =='IBM' && spread > 0.05"

Source event

The following event example will be used to demonstrate the select query and having filter.

# Source processed event
streamEvent:
  ...
  attributes:
      symbol: IBM
      ask: 101
      bid: 100
      details: {
         vol: 50
         liq: 60   
      }
      venue: {
         name: nasdaq
      }

Output event

This will produce the following result which is published to the connected consumers.

# Result object
event type: buy_alert
  venue: nasdaq
  symbol: IBM
  ask: 101
  bid: 100
  spread: 1
  vol: 50
  liq: 60
  avg_ask: 101.34
  avg_bid: 100.12

Using metrics

A key feature is the ability to use and embed calculated metrics within the select expression.

This enables the following advanced capabilities:

  • Ability to add pre-calculated metrics within the select projection.

  • Perform calculations using event attributes and metrics.

  • Embed metrics values to be applied to within the having clause logic

Example

stream:
  ...
  emit:
    ...
    select:
        "
        ...
        # Adding metrics attributes
        BidMovingAverage.avg_ask|avg_bid;where symbol = {symbol}
        "
    having:
       "symbol =='IBM' && avg_ask > 101.15"

Attributes schema

Format
Result
Description

BidMovingAverage.avg_ask;where symbol = {symbol}

101.34

Query the BidMovingAverage table where the event symbol joins the matched latest row for attribute avg_ask

BidMovingAverage.avg_ask|avg_bid;where symbol = {symbol}

101.34, 100.12

Query the BidMovingAverage table where the event symbol joins the matched latest row for attributes avg_ask and avg_bid

BidMovingAverage.avg_ask;where symbol = {symbol} AND venue=${venue.name}

101.24, 100.22

Query the BidMovingAverage table where the event symbol and venue joins the matched latest row for the attribute avg_ask

having clause

This is a Javascript expression based filter that evaluates the expression against the resulting select projection result.

If the expression condition is met the resulting event is published otherwise it is dropped.

This is an optional property

# Only publish IBM events with an avg_ask greater than 101.15
stream:
  ...
  emit:
  ...
    having: "symbol == 'IBM' && avg_ask > 101.15"

retain

This boolean attribute informs the system to cache and conflate the computed select projection event which can be used across processor processing contexts.

Each select statement field is prefixed with the emiited_ tag and stored within the defined group by context object (i.e. symbol ). This context is cached ready for next processing state either on a successful having clause or simply without.

Example

This example uses the having clause to demonstrate how to use the previous emitted event to deterime if the evednt should be filtered.

stream:
  processing unit:
    pipeline:
    ...
     - analytic:
          expression: "(bid_MAX - ask_MIN) / bid_MAX"
          assign to: bid_spread
          
  emit:
    select: symbol, ask_MIN, bid_MAX, volume_SUM, volatility_MEAN, bid_spread
    having: bid_spread > 0.015 && (bid_spread > (emitted_bid_spread || bid_spread))
    retain: true
  
  group by
     - symbol

Attributes schema

Attribute
Description
Data Type
Required

event type

Allows renaming of the event type in the published output. The default appends the final event type attribute to "_view"

String Default: <eventType>_view

select

SQL-style command that lets users specify which fields to include. Users can perform calculations (i.e., averages) on fields as well and it supports integration with the metrics engine for advanced calculations. Users can add metrics to the event output by specifying metric expressions in the select clause

String

having

This is a Javascript expression filter that evaluates the expression against the resulting select project. If the expression passes the resulting event is presented to publishing sinks

String

retain

Store the select projection so that it can be used in next processing cycle.

Boolean Default: false

PreviousGroup byNextTelemetry auditing

Last updated 4 months ago

Was this helpful?

For further information on how this feature is used go to the apply metrics .

documentation