Initialisation

Often advanced use case require reference data to support the processing of calculations or business logic. Joule enables this by providing data priming at initialisation and enrichment processing


The initialisation process imports at startup data from the local file store into the in-memory SQL database. This data is intended be used This is an optional feature.

Data is accessibility points provided

Example

The below example loads two data files in to independent in-memory SQL database tables using a CSV and parquet files. The CSV file contains nasdaq company information which will can be consisted static and therefore will reside within the reference_data schema whereas the a parquet file primes the metrics engine with pre-calculated metrics.

initialisation:
  data import:
    csv:
      - schema: reference_data
        table: nasdaq_companies
        file: 'data/csv/nasdaq.csv'
        drop table: true
        index:
          fields: [ 'Symbol' ]
          unique: true

    parquet:
      - schema: metrics
        table: bid_moving_averages      
        files: ['data/parquet/mvavgs-prime.parquet']
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Data Import

Joule has an embedded SQL engine that when enabled various features such as metrics, event capturing, exporting and reference data can be taken advantage of.

Data imported in this manner is typically for static reference data which supports key processing functions within the event stream pipeline.

AttributeDescriptionData TypeRequired

schema

Global database schema when set can used for any import definition where schema is not defined. Default schema reference_data

String

csv

List of CSV data import configurations

See CSV attributes

parquet

List of parquet data import configurations

Seee parquet attributes

CSV and Parquet common DLS elements

There are common keywords used in both parquet and CSV importing methods

AttributeDescriptionData TypeRequired

schema

Database schema to create and appy table import function

String

table

Target table to import data into

String

drop table

Drop existing table before

import. This will cause a table recreation.

Boolean Default true

index

Create an index on the created table

See Below

Index

If this optional field is supplied the index is recreated once the data has been imported.

AttributeDescriptionData TypeRequired

fields

A list of table fields to base index on.

String

unique

True for a unique index

Boolean

Default true

Parquet Import

Parquet formatted files can be imported into the system. Note an index cannot be created over a view.

Example

parquet:
  - 
    table: fxrates
    schema: reference_data
    asView: false
    files: [ 'fxrates.parquet' ]
    drop table: true
    index:
      fields: [ 'ccy' ]
      unique: false
  -
    table: us_holidays    
    schema: reference_data
    asView: true
    files: [ 'holidays.parquet' ]
    drop table: true
  - 
    table: bid_moving_averages
    schema: metrics
    files: ['data/parquet/mvavgs-prime.parquet']
    drop table: true
    index:
      fields: [ 'symbol' ]
      unique: false
AttributeDescriptionData TypeRequired

asView

Create a view over the files'. This will mean disk IO for every table query. Consider false if the table is small and accessed frequently.

Boolean

Default false

files

List of files of the same type to be imported

String list

CSV

Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV is you can control the table definition. Joule by default will try to create a target table based upon a sample set of data assuming a header exists on the first row.

Example

csv:
  - 
    table: nasdaq
    table definition:
      nasdaq (
      ccy VARCHAR,
      name VARCHAR,
      last_sale REAL,
      net_change REAL,
      change REAL,
      market_cap DOUBLE,
      country VARCHAR,
      ipo_year SMALLINT,
      volume INTEGER,
      sector VARCHAR,
      industry VARCHAR);
    file: fxrates.csv
    delimiter: ";"
    date format: "%Y/%m%d"
    timestamp format: "%Y/%m%d"
    sample size: 1024
    skip: 0
    header: true
    auto detect: false
    drop table: true
    index:
      fields: [ 'symbol' ]
      unique: false
AttributeDescriptionData TypeRequired

table definition

Custom SQL table definition used when provided. This will override the auto detect flag.

String

file

Name and path of the file to import.

String

delimiter

Field delimiter to use. Supported delimiters include , | ; and space and tab

String

Default |

date format

User specified date format.

String

Default: System

timestamp formate

User specified timestamp format.

String

Default: System

sample size

Number of rows to use to determine types and table structure.

Integer

Default: 1024

skip

Number of rows to skip when auto generating a table using a sample size.

Integer

Default: 1

header

Flag to indicate the first line in the file contains a header.

Boolean

Default: true

auto detect

Auto detect table format by taking a sample size of data. If set to false a table definition must be provided.

Boolean

Default: true

Application example

The below example primes the process with reference and metrics data which is used for event enrichment processing.

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  validFrom: 2020-01-01
  validTo: 2025-12-31
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - schema: metrics
          table: bid_moving_averages
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol

Access

To access data loaded using this method Joule provides a SQL API. See further documentation on how to use this in your custom components

Last updated