Old version of Key concepts
Enriching event data with the latest contextual data is crucial for processing real-time business insights.
Low latency enrichment architecture
OOTB Joule has provided the heavy lifting required to deploy a low-latency solution so that developers can focus on building advanced use case that required addition contextual data. The Joule contextual data architecture supports various low-latency in-memory data stores that reduce the inherent I/O overhead of out-of-process databases and thus enable stream based enrichment.

Example
The below example enriches events using the linked reference data and metrics. Reference data is imported at process startup using a provided parquet file. Metrics are calculated after 1 minute using a three minute tumbling window approach.
stream:
name: quoteAnalyticsStream
enabled: true
validFrom: 2020-01-01
validTo: 2025-12-31
eventTimeType: EVENT_TIME
sources: [ nasdaq_quotes_stream ]
initialisation:
sql import:
schema: reference_data
parquet:
- table: nasdaq_companies
asView: false
files: [ 'data/parquet/nasdaq.parquet' ]
index:
fields: [ 'symbol' ]
unique: true
processing unit:
metrics engine:
runtime policy:
frequency: 1
startup delay: 1
time unit: MINUTES
foreach metric compute:
metrics:
-
name: BidMovingAverage
metric key: symbol
table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
query:
SELECT symbol,
MIN(bid) AS 'avg_bid_min',
AVG(bid) AS 'avg_bid_avg',
MAX(bid) AS 'avg_bid_max'
FROM quotes.nasdaq
WHERE
ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
GROUP BY symbol
ORDER BY 1;
truncate on start: true
compaction policy:
frequency: 8
time unit: HOURS
pipeline:
- tap:
target schema: quotes
flush frequency: 5
index:
unique: false
fields:
- symbol
- enricher:
fields:
company_info:
by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
query fields: [ symbol ]
with values: [ Name,Country ]
using: JouleDB
quote_metrics:
by metric family: BidMovingAverage
by key: symbol
with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
using: MetricsDB
emit:
select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"
group by:
- symbol
Output
The above example generates the following output, in this case CSV.
nasdaq_View|null|1709894745240|1709894745240|MTZ|113.97318|103.25833|110.11051|MasTec Inc. Common Stock|Basic Industries
nasdaq_View|null|1709894745244|1709894745244|SBRA|18.71446|13.90475|16.472204|Sabra Health Care REIT Inc. Common Stock|Consumer Services
nasdaq_View|null|1709894745240|1709894745240|MUA|17.244429|14.651974|15.772599|Blackrock MuniAssets Fund Inc Common Stock|Finance
General Enricher DSL
The enricher processor provide users to the ability to enrich an event with multiple data elements from various data sources through the use of enhanced mapping.
Example
enricher:
fields:
deviceManufacturer:
by key: tac
with values: [deviceManufacturer, year_released]
using: deviceStore
modelDetails:
by key: tac
as object: true
using: deviceStore
contractedDataBundle:
by query: "select * from /userBundle where imsi = ?"
query fields: [imsi]
all attributes: true
using: dataBundleStore
stores:
deviceStore:
store name: mobiledevices
dataBundleStore:
store name: mobilecontracts
Top level attribute schema
Two key attributes are required for the enricher processor; one is to define which fields to enrich whereas the other provides the data store binding.
fields
List of fields to populated with reference data using a look up criteria.
List of field to reference data criteria configurations
stores
List of stores to perform the reference data lookup
List of store configurations
Fields Attribute
Enrichment is applied at the field level whereby each returned data element is added to the defined field either as map of values or as a domain object.
The field attribute is logical organised as three definition type:
Query approach
Response approach
Binding store
Query approach
Contextual data is retrieved using one of two methods, by key or by query.
By Key
Using the key based look up approach enables you to perform a look up against a store using either the primary key or the key within a caching solution.
Example returns specific attributes from ReferenceDataObject
deviceManufacturer:
by key: tac
with values: [deviceManufacturer, year_released]
using: deviceStore
Example returns a ReferenceDataObject as a linked object
modelDetails:
by key: tac
as object: true
using: deviceStore
See ReferenceDataObject for further information
By Query
To fine tune your enrichment process you can define a query rather than a strict key based look up. This would provide you with a greater flexibility to drive further pipeline processing. Below represents a OQL based query using an in-memory cache solution.
contractedDataBundle:
by query: "select * from /userBundle where imsi = ?"
query fields: [imsi]
all attributes: true
using: dataBundleStore
Attributes
by query
Dependent upon linked data store.
String
query fields
Event field values to be applied to the query
Ordered list of Strings
Response Approach
On a successful data retrieval the response object, ReferenceDataObejct, key values are added directly in to the event or added as an object.
with values
Values to add to the field as a map of key value pairs
List of Strings
all attributes
Map all returned values attributes to event
Boolean
as object
Returned value is linked as a object to the event
Boolean
Either one of the attributes must be provided.
with values
Add selected attributes to the event.
deviceManufacturer:
by key: tac
with values: [deviceManufacturer, year_released]
using: deviceStore
all attributes
Add all attributes to the event.
contractedDataBundle:
by query: "select * from /userBundle where imsi = ?"
query fields: [imsi]
all attributes: true
using: dataBundleStore
as object
Add the returned object to the event using the field name.
deviceInformation:
by key: tac
as object: true
using: deviceStore
Binding store
Bind the field configuration to a data store using a logical store name. This would either be custom or using the pre-defined stores. A custom store should be defined under the Stores Attribute, see section for more details.
using
Store name to apply query processing. Either a custom or supported store
String
Supported stores
JouleDB
MetricsDB
If either one of these are provided there is no need to specify the stores attribute.
Stores Attribute
Bind the processor to one or more linked data stores using a logical store name mapped to a set of configuration attributes. The defined store name configuration needs to have been provided, see reference data documentation for further details.
Example
The example below uses the nasdaqIndexCompanies
as a logical name to bind the reference data lookup criteria to be performed.
stores:
deviceStore:
store name: mobiledevices
dataBundleStore:
store name: mobilecontracts
Attributes
stores
List of stores required for the enricher. Usage is local logical name mapped to store name
String
store name
Actual store name specified in the reference data store configuration
String
ReferenceDataObject
For further information read Reference Data documentation
Last updated
Was this helpful?