Enrich events

Often processing stream events require additional data to support further processing functions. Joule provides a solution where events are enriched in place using locally cached reference


Semi-static data update infrequently but is critical to be applied promptly within a process when the change occurs. Typically, core business data warehouses manage this type of data, with updates made available either through distribution technology or at the end of the day using traditional batch processing methods. However, a common solution employed by forward-looking businesses is the use of a distributed caching and messaging platforms.

For this example mobile events are enriched with device information using the TAC code as the reference data lookup attribute, the IMEI value combines both the TAC and device serial code. To gain the required values the IMEI value is split into the required component parts using a user defined plugin, see class IMEIDecoder for more information on how this was implemented. The resulting TAC is used to perform the reference data lookup against the linked data store, Apache Geode in this example, and binds the returned value to the event.

Use case configuration

File: app-enrichment.env

SOURCEFILE=conf/sources/mobileSimulatedStream.yaml
REFERENCEDATA=conf/sources/mobileReferenceData.yaml
ENGINEFILE=conf/usecases/enrichment/mobileEventEnrichmentProcessWithSelect.yaml
PUBLISHFILE=conf/publishers/enrichedMobileEventFile.yaml

The 'REFERENCEDATA' variable defines the platform reference data configuration which can contain many external data stores, currently only Apache Geode is supported.

Reference data is co-located, in-memory, within the same process as Joule. This improves processing throughput by removing the need to retrieve data from traditional data stores which are typically out of process. A key feature of the implementation is the ability to load reference data on startup from a connected distributed Geode data cluster. This reduces the I/O overhead on cache misses, this feature is generally known as Get Initial Image.

Pipeline configuration

This pipeline will enrich events with mobile device information providing key data existing to perform the key data request requirement.

Processing steps

  • Filter events without IMSI being set

  • Gain from the IMEI the tac and device serial code using a custom tokenizer.

  • Filter events without tac being set

  • Enrich the event with mobile device information using the tac as the lookup key

processing unit:
  pipeline:
    - filter:
          expression: "(imsi !== null ) ? true : false;"
          
    - tokenizer enricher:
       tokenizers:
        imei : com.fractalworks.streams.examples.telco.enricher.IMEIDecoder
     
    - filter:
          expression: "(tac !== null ) ? true : false;"
          
    - enricher:
       enrich:
        deviceType:
         key: device
         using: deviceStore
    
       stores:
        deviceStore:
         storeName: mobiledevices
         primaryKey: tac
         primaryKeyType: java.lang.String

Output Event

The following fields are added to the processed StreamEvent object.

// Some code

Last updated