Types of import

Parquet and CSV data imports for stream processing

Overview

The page describes how data imports from Parquet and CSV files can be configured, how to define tables and indexes and how these imports can be utilised within a processing pipeline.

The provided example demonstrates the integration of imported data into a stream-based analytics system, with calculations and data enrichment performed on the incoming data.

Parquet Import

Parquet formatted files can be imported into the system.

index cannot be created over a view

Example

This example imports Parquet files into different schemas and tables.

Some tables are views: us_holidays, others are regular tables: fxrates, bid_moving_averages.

Indexes are created on specific fields but are not unique and existing tables are dropped before importing new data.

stream:
  ...
  data import:
    parquet:
      - schema: exchange_rates
        table: fxrates
        asView: false
        files: [ 'fxrates.parquet' ]
        drop table: true
        index:
          fields: [ 'ccy' ]
          unique: false
          
      - schema: reference_data
        table: us_holidays    
        asView: true
        files: [ 'holidays.parquet' ]
        drop table: true
        
      - schema: metrics
        table: bid_moving_averages
        files: ['data/parquet/mvavgs-prime.parquet']
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

CSV import

Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV, it is possible to control the table definition.

Joule by default will try to create a target table based upon a sample set of data assuming a header exists on the first row.

Example

This example imports a CSV file: fxrates.csv into the nasdaq table in the reference_data schema. It specifies the table structure, including column types.

Key settings include a custom delimiter (;), date and timestamp formats, sample size and no automatic detection of data types.

The table is dropped before importing and an index is created on the symbol field.

stream:
  ...
  data import:
    csv:
      - schema: reference_data
        table: nasdaq
        table definition:
          nasdaq (
          ccy VARCHAR,
          name VARCHAR,
          last_sale REAL,
          net_change REAL,
          change REAL,
          market_cap DOUBLE,
          country VARCHAR,
          ipo_year SMALLINT,
          volume INTEGER,
          sector VARCHAR,
          industry VARCHAR);
        file: 'data/csv/fxrates.csv'
        delimiter: ";"
        date format: "%Y/%m%d"
        timestamp format: "%Y/%m%d"
        sample size: 1024
        skip: 0
        header: true
        auto detect: false
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

Application example

The following example primes the process with contextual and metrics data which is used for event enrichment processing.

The code defines a stream named standardQuoteAnalyticsStream that imports data from CSV: nasdaq.csv and Parquet: mvavgs-prime.parquet files.

It computes the BidMovingAverage metric, enriches the data with company info and quote metrics and outputs the data grouped by symbol.

The stream runs every minute, with compaction every 8 hours and flushes data every 5 minutes.

The data is processed based on event timestamps and includes table imports, metric calculations and enrichment steps.

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  validFrom: 2020-01-01
  validTo: 2025-12-31
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - schema: metrics
          table: bid_moving_averages
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol

Last updated