Enrich events
In advanced stream event processing, executing use cases often requires enriching events with contextual data to facilitate analytical and logical processing
The use cases below demonstration how to use the enrichment processor using static and meter data.
Enrich events with static data
Consistent and trusted data sourced from a master data platform will ensure event based insights are of high fidelity and quality. Leveraging this data within stream is often regarded as a key feature.
Use case configuration
This use case demonstrates how the enrichment processor binds static reference data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.
File: app-exampleEnrichment.env
SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/baseTumblingWindows.yaml
PUBLISHFILE=conf/publishers/kafkaAnalytics.yaml
Pipeline overview
The below Joule example loads, on process startup, csv file contents into the nasdaq_companies in-memory SQL table and creates a unique index on ‘Symbol’ field.
Prime reference data store with nasdaq company reference data
Enrich streaming events with reference data
initialisation:
- data import:
schema: reference_data
csv:
- table: nasdaq_companies
file: data/csv/nasdaq.csv
drop table: true
index:
fields: [ 'Symbol' ]
unique: true
processing unit:
pipeline:
- filter:
expression: "symbol != 'A'"
- enricher:
fields:
company_info:
by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
query fields: [symbol]
with values: [Name,Country]
using: JouleDB
emit:
select: "symbol,Name,Country,bid, ask"
group by:
- symbol
More information on how the enricher processor can be used to decorate events with static data can be found here.
Enrich with live metrics
Advanced use cases often require computed metrics as components for either further contextual event based calculations, ML predictions, as event based triggers or for additional context for the emitted event. Leveraging this data within stream is an advance use case feature.
Use case configuration
This use case demonstrates how the enrichment processor binds computed metric data to an event in real-time through an event-based lookup using the “symbol” event value via a AQL query.
File: app-enrichmentWithMetricsAndReferenceDataExample.env
SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/enrichmentWithMetricsAndReferenceDataExample.yaml
PUBLISHFILE=conf/publishers/kafkaAnalytics.yaml
Pipeline overview
The below Joule example loads, on process startup initial images file contents into the in-memory SQL database, starts the metrics engine and enriches events with reference and metric data.
Prime reference data store with nasdaq company reference data
Prime metrics engine with initial metric values
Compute metrics every minute
Enrich streaming events with reference data and metrics data
stream:
name: standardQuoteAnalyticsStream
enabled: true
eventTimeType: EVENT_TIME
sources: [ nasdaq_quotes_stream ]
initialisation:
data import:
csv:
- table: nasdaq_companies
schema: reference_data
file: 'data/csv/nasdaq.csv'
drop table: true
index:
fields: [ 'Symbol' ]
unique: true
parquet:
- table: bid_moving_averages
schema: metrics
files: ['data/parquet/mvavgs-prime.parquet']
drop table: true
index:
fields: [ 'symbol' ]
unique: false
processing unit:
metrics engine:
runtime policy:
frequency: 1
startup delay: 1
time unit: MINUTES
foreach metric compute:
metrics:
-
name: BidMovingAverage
metric key: symbol
table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
query:
SELECT symbol,
MIN(bid) AS 'avg_bid_min',
AVG(bid) AS 'avg_bid_avg',
MAX(bid) AS 'avg_bid_max'
FROM quotes.nasdaq
WHERE
ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
GROUP BY symbol
ORDER BY 1;
truncate on start: false
compaction policy:
frequency: 8
time unit: HOURS
pipeline:
- tap:
target schema: quotes
flush frequency: 5
index:
unique: false
fields:
- symbol
- enricher:
fields:
company_info:
by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
query fields: [ symbol ]
with values: [ Name,Country ]
using: JouleDB
quote_metrics:
by metric family: BidMovingAverage
by key: symbol
with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
using: MetricsDB
emit:
select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"
group by:
- symbol
This example applies the latest metrics for a given symbol to an event. More information can be found here
Last updated
Was this helpful?