Enrich events

In advanced stream event processing, executing use cases often requires enriching events with contextual data to facilitate analytical and logical processing


The use cases below demonstration how to use the enrichment processor using static and meter data.

Enrich events with static data

Consistent and trusted data sourced from a master data platform will ensure event based insights are of high fidelity and quality. Leveraging this data within stream is often regarded as a key feature.

Use case configuration

This use case demonstrates how the enrichment processor binds static reference data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.

File: app-exampleEnrichment.env

SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/baseTumblingWindows.yaml
PUBLISHFILE=conf/publishers/kafkaAnalytics.yaml

Pipeline overview

The below Joule example loads, on process startup, csv file contents into the nasdaq_companies in-memory SQL table and creates a unique index on ‘Symbol’ field.

  • Prime reference data store with nasdaq company reference data

  • Enrich streaming events with reference data

initialisation:
  - data import:
      schema: reference_data
      csv:
        - table: nasdaq_companies
          file: data/csv/nasdaq.csv
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

processing unit:
  pipeline:
    - filter:
        expression: "symbol != 'A'"
    - enricher:
        fields:
          company_info:
            by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
            query fields: [symbol]
            with values: [Name,Country]
            using: JouleDB

emit:
  select: "symbol,Name,Country,bid, ask"

group by:
  - symbol

More information on how the enricher processor can be used to decorate events with static data can be found here.

Enrich with live metrics

Advanced use cases often require computed metrics as components for either further contextual event based calculations, ML predictions, as event based triggers or for additional context for the emitted event. Leveraging this data within stream is an advance use case feature.

Use case configuration

This use case demonstrates how the enrichment processor binds computed metric data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.

File: app-enrichmentWithMetricsAndReferenceDataExample.env

SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/enrichmentWithMetricsAndReferenceDataExample.yaml
PUBLISHFILE=conf/publishers/kafkaAnalytics.yaml

Pipeline overview

The below Joule example loads, on process startup initial images file contents into the in-memory SQL database, starts the metrics engine and enriches events with reference and metric data.

  • Prime reference data store with nasdaq company reference data

  • Prime metrics engine with initial metric values

  • Compute metrics every minute

  • Enrich streaming events with reference data and metrics data

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  validFrom: 2020-01-01
  validTo: 2025-12-31
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - table: nasdaq_companies
          schema: reference_data
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - table: bid_moving_averages
          schema: metrics
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol

This example applies the latest metrics for a given symbol to an event. More information can be found here

Last updated