Data priming

Prime Joule with necessary startup data

This is an optional feature that provides the ability to prime Joule with data necessary for an active use case

Overview

Advanced use cases often require contextual data to support calculations or complex business logic. Joule enables this by offering data priming at initialisation and enrichment processing stages.

The initialisation process imports data at startup from local files into an in-memory SQL database, making it immediately available for use in processing.

Initialisation process

Joule’s initialisation process leverages an embedded SQL engine, enabling powerful features like metrics, event capturing, data exporting and access to contextual data.

This imported data, typically static contextual information, plays a vital role in supporting key functions within the event stream pipeline.

Data made available through the initialisation process can be accessed through several main components:

  1. Enricher processor For adding contextual information to events.

  2. Metrics engine For real-time calculations and metrics updates.

  3. Select projection For choosing specific fields for further processing.

  4. In-memory SQL API For direct data access and manipulation within Joule.

AttributeDescriptionData TypeRequired

schema

Global database schema when set can be used for any import definition where schema is not defined. Default schema reference_data

String

csv

List of CSV data import configurations

parquet

List of parquet data import configurations

Example

This following example demonstrates how to initialise two separate data files into independent in-memory SQL database tables using CSV and Parquet formats.

  1. The CSV file contains Nasdaq company information, it is treated as static reference data and is therefore stored in the reference_data schema.

  2. Meanwhile, the Parquet file loads pre-calculated metrics, priming the metrics engine within the metrics schema.

This setup enables efficient access to contextual data and metrics calculations during event processing.

This feature can load and read files from existing databases!

stream:
  ...
  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true
  
      parquet:
        - schema: metrics
          table: bid_moving_averages      
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

Attributes schema

These are common DSL keywords used in both parquet and CSV importing methods.

AttributeDescriptionData TypeRequired

schema

Database schema to create and apply table import function

String

table

Target table to import data into

String

drop table

Drop existing table before

import. This will cause a table recreation

Boolean Default true

index

Create an index on the created table

Index

If this optional field is supplied the index is recreated once the data has been imported.

AttributeDescriptionData TypeRequired

fields

A list of table fields to base index on

String

unique

True for a unique index

Boolean

Default true

Types of import

Parquet Import

Parquet formatted files can be imported into the system.

index cannot be created over a view

Example

stream:
  ...
  data import:
    parquet:
      - schema: exchange_rates
        table: fxrates
        asView: false
        files: [ 'fxrates.parquet' ]
        drop table: true
        index:
          fields: [ 'ccy' ]
          unique: false
          
      - schema: reference_data
        table: us_holidays    
        asView: true
        files: [ 'holidays.parquet' ]
        drop table: true
        
      - schema: metrics
        table: bid_moving_averages
        files: ['data/parquet/mvavgs-prime.parquet']
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

AttributeDescriptionData TypeRequired

asView

Create a view over the files. This will mean disk IO for every table query. Consider false if the table is small and accessed frequently

Boolean

Default false

files

List of files of the same type to be imported

String list

CSV import

Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV, it is possible to control the table definition.

Joule by default will try to create a target table based upon a sample set of data assuming a header exists on the first row.

Example

stream:
  ...
  data import:
    csv:
      - schema: reference_data
        table: nasdaq
        table definition:
          nasdaq (
          ccy VARCHAR,
          name VARCHAR,
          last_sale REAL,
          net_change REAL,
          change REAL,
          market_cap DOUBLE,
          country VARCHAR,
          ipo_year SMALLINT,
          volume INTEGER,
          sector VARCHAR,
          industry VARCHAR);
        file: 'data/csv/fxrates.csv'
        delimiter: ";"
        date format: "%Y/%m%d"
        timestamp format: "%Y/%m%d"
        sample size: 1024
        skip: 0
        header: true
        auto detect: false
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

AttributeDescriptionData TypeRequired

table definition

Custom SQL table definition used when provided. This will override the auto detect flag

String

file

Name and path of the file to import.

String

delimiter

Field delimiter to use. Supported delimiters include , | ; and space and tab

String

Default |

date format

User specified date format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y

String

Default: System

timestamp formate

User specified timestamp format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y

String

Default: System

sample size

Number of rows to use to determine types and table structure

Integer

Default: 1024

skip

Number of rows to skip when auto generating a table using a sample size

Integer

Default: 1

header

Flag to indicate the first line in the file contains a header

Boolean

Default: true

auto detect

Auto detect table format by taking a sample size of data. If set to false a table definition must be provided

Boolean

Default: true

Application example

The following example primes the process with contextual and metrics data which is used for event enrichment processing.

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  validFrom: 2020-01-01
  validTo: 2025-12-31
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - schema: metrics
          table: bid_moving_averages
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol

Last updated