Key concepts

Enriching event data with the latest contextual data is crucial for processing real-time business insights.


Low latency enrichment architecture

OOTB Joule has provided the heavy lifting required to deploy a low-latency solution so that developers can focus on building advanced use case that required addition contextual data. The Joule contextual data architecture supports various low-latency in-memory data stores that reduce the inherent I/O overhead of out-of-process databases and thus enable stream based enrichment.


Example

The below example enriches events using the linked reference data and metrics. Reference data is imported at process startup using a provided parquet file. Metrics are calculated after 1 minute using a three minute tumbling window approach.

stream:
  name: quoteAnalyticsStream
  enabled: true
  validFrom: 2020-01-01
  validTo: 2025-12-31
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    sql import:
      schema: reference_data
      parquet:
        - table: nasdaq_companies
          asView: false
          files: [ 'data/parquet/nasdaq.parquet' ]
          index:
            fields: [ 'symbol' ]
            unique: true

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: true
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol

Output

The above example generates the following output, in this case CSV.

nasdaq_View|null|1709894745240|1709894745240|MTZ|113.97318|103.25833|110.11051|MasTec Inc. Common Stock|Basic Industries
nasdaq_View|null|1709894745244|1709894745244|SBRA|18.71446|13.90475|16.472204|Sabra Health Care REIT Inc. Common Stock|Consumer Services
nasdaq_View|null|1709894745240|1709894745240|MUA|17.244429|14.651974|15.772599|Blackrock MuniAssets Fund Inc Common Stock|Finance

General Enricher DSL

The enricher processor provide users to the ability to enrich an event with multiple data elements from various data sources through the use of enhanced mapping.

Example

enricher:
  fields:      
    deviceManufacturer:
      by key: tac
      with values: [deviceManufacturer, year_released]
      using: deviceStore

    modelDetails:
      by key: tac
      as object: true   
      using: deviceStore

    contractedDataBundle:
      by query:  "select * from /userBundle where imsi = ?"
      query fields: [imsi]
      all attributes: true
      using: dataBundleStore

  stores:
    deviceStore:
      store name: mobiledevices

    dataBundleStore:
      store name: mobilecontracts

Top level attributes

Two key attributes are required for the enricher processor; one is to define which fields to enrich whereas the other provides the data store binding.

Fields Attribute

Enrichment is applied at the field level whereby each returned data element is added to the defined field either as map of values or as a domain object.

The field attribute is logical organised as three definition type:

  • Query approach

  • Response approach

  • Binding store

Query approach

Contextual data is retrieved using one of two methods, by key or by query.

By Key

Using the key based look up approach enables you to perform a look up against a store using either the primary key or the key within a caching solution.

Example returns specific attributes from ReferenceDataObject

deviceManufacturer:
    by key: tac
    with values: [deviceManufacturer, year_released]
    using: deviceStore

Example returns a ReferenceDataObject as a linked object

modelDetails:
  by key: tac
  as object: true
  using: deviceStore

See ReferenceDataObject for further information

By Query

To fine tune your enrichment process you can define a query rather than a strict key based look up. This would provide you with a greater flexibility to drive further pipeline processing. Below represents a OQL based query using an in-memory cache solution.

contractedDataBundle:
    by query:  "select * from /userBundle where imsi = ?"
    query fields: [imsi]
    all attributes: true
    using: dataBundleStore

Attributes


Response Approach

On a successful data retrieval the response object, ReferenceDataObejct, key values are added directly in to the event or added as an object.

Either one of the attributes must be provided.

with values

Add selected attributes to the event.

deviceManufacturer:
    by key: tac
    with values: [deviceManufacturer, year_released]
    using: deviceStore

all attributes

Add all attributes to the event.

contractedDataBundle:
    by query:  "select * from /userBundle where imsi = ?"
    query fields: [imsi]
    all attributes: true
    using: dataBundleStore

as object

Add the returned object to the event using the field name.

deviceInformation:
    by key: tac
    as object: true    
    using: deviceStore

Binding store

Bind the field configuration to a data store using a logical store name. This would either be custom or using the pre-defined stores. A custom store should be defined under the Stores Attribute, see section for more details.

Supported stores

  • JouleDB

  • MetricsDB

If either one of these are provided there is no need to specify the stores attribute.

Stores Attribute

Bind the processor to one or more linked data stores using a logical store name mapped to a set of configuration attributes. The defined store name configuration needs to have been provided, see reference data documentation for further details.

Example

The example below uses the nasdaqIndexCompanies as a logical name to bind the reference data lookup criteria to be performed.

stores:
  deviceStore:
    store name: mobiledevices

  dataBundleStore:
    store name: mobilecontracts

Attributes

ReferenceDataObject

For further information read Reference Data documentation

Last updated