Comment on page
Enrich event
Often processing stream events require additional data to support further processing functions. Joule provides a solution where events are enriched in place using locally cached reference
For this example mobile events are enriched with device information using the
TAC
code as the reference data lookup attribute, the IMEI
value combines both the TAC and device serial code. To gain the required values the IMEI
value is split into the required component parts using a user defined plugin, see class IMEIDecoder
for more information on how this was implemented. The resulting TAC
is used to perform the reference data lookup against the linked data store, Apache Geode in this example, and binds the returned value to the event.File: app-enrichment.env
SOURCEFILE=${PWD}/conf/sources/mobileSimulatedStream.yaml
REFERENCEDATA=${PWD}/conf/sources/mobileReferenceData.yaml
ENGINEFILE=${PWD}/conf/usecases/enrichment/mobileEventEnrichmentProcessWithSelect.yaml
PUBLISHFILE=${PWD}/conf/publishers/enrichedMobileEventFile.yaml
The 'REFERENCEDATA' variable defines the platform reference data configuration which can contain many external data stores, currently only Apache Geode is supported.
Reference data is co-located, in-memory, within the same process as Joule. This improves processing throughput by removing the need to retrieve data from traditional data stores which are typically out of process. A key feature of the implementation is the ability to load reference data on startup from a connected distributed Geode data cluster. This reduces the I/O overhead on cache misses, this feature is generally known as
Get Initial Image
.This pipeline will enrich events with mobile device information providing key data existing to perform the key data request requirement.
- Filter events without
IMSI
being set - Gain from the
IMEI
thetac
anddevice serial
code using a custom tokenizer. - Filter events without
tac
being set - Enrich the event with mobile device information using the
tac
as the lookup key
processing unit:
pipeline:
- filter:
expression: "(imsi !== null ) ? true : false;"
- tokenizer enricher:
tokenizers:
imei : com.fractalworks.streams.examples.telco.enricher.IMEIDecoder
- filter:
expression: "(tac !== null ) ? true : false;"
- enricher:
enrich:
deviceType:
key: device
using: deviceStore
stores:
deviceStore:
storeName: mobiledevices
primaryKey: tac
primaryKeyType: java.lang.String
The following fields are added to the processed StreamEvent object.
// Some code
Last modified 10mo ago