Enrich events

In advanced stream event processing, executing use cases often requires enriching events with contextual data to facilitate analytical and logical processing


The use cases below demonstration how to use the enrichment processor using static and meter data.

Enrich events with static data

Consistent and trusted data sourced from a master data platform will ensure event based insights are of high fidelity and quality. Leveraging this data within stream is often regarded as a key feature.

Use case configuration

This use case demonstrates how the enrichment processor binds static reference data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.

File: app-exampleEnrichment.env

SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/baseTumblingWindows.yaml
PUBLISHFILE=conf/publishers/kafkaAnalytics.yaml

Pipeline overview

The below Joule example loads, on process startup, csv file contents into the nasdaq_companies in-memory SQL table and creates a unique index on ‘Symbol’ field.

  • Prime reference data store with nasdaq company reference data

  • Enrich streaming events with reference data

initialisation:
  - data import:
      schema: reference_data
      csv:
        - table: nasdaq_companies
          file: data/csv/nasdaq.csv
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

processing unit:
  pipeline:
    - filter:
        expression: "symbol != 'A'"
    - enricher:
        fields:
          company_info:
            by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
            query fields: [symbol]
            with values: [Name,Country]
            using: JouleDB

emit:
  select: "symbol,Name,Country,bid, ask"

group by:
  - symbol

More information on how the enricher processor can be used to decorate events with static data can be found here.

Enrich with live metrics

Advanced use cases often require computed metrics as components for either further contextual event based calculations, ML predictions, as event based triggers or for additional context for the emitted event. Leveraging this data within stream is an advance use case feature.

Use case configuration

This use case demonstrates how the enrichment processor binds computed metric data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.

File: app-enrichmentWithMetricsAndReferenceDataExample.env

Pipeline overview

The below Joule example loads, on process startup initial images file contents into the in-memory SQL database, starts the metrics engine and enriches events with reference and metric data.

  • Prime reference data store with nasdaq company reference data

  • Prime metrics engine with initial metric values

  • Compute metrics every minute

  • Enrich streaming events with reference data and metrics data

This example applies the latest metrics for a given symbol to an event. More information can be found here

Last updated

Was this helpful?