Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Parquet Import
  • Example
  • Attributes schema
  • CSV import
  • Example
  • Attributes schema
  • Application example

Was this helpful?

  1. Components
  2. Pipelines
  3. Data priming

Types of import

Parquet and CSV data imports for stream processing

Overview

The page describes how data imports from Parquet and CSV files can be configured, how to define tables and indexes and how these imports can be utilised within a processing pipeline.

The provided example demonstrates the integration of imported data into a stream-based analytics system, with calculations and data enrichment performed on the incoming data.

Parquet Import

Parquet formatted files can be imported into the system.

index cannot be created over a view

Example

This example imports Parquet files into different schemas and tables.

Some tables are views: us_holidays, others are regular tables: fxrates, bid_moving_averages.

Indexes are created on specific fields but are not unique and existing tables are dropped before importing new data.

stream:
  ...
  data import:
    parquet:
      - schema: exchange_rates
        table: fxrates
        asView: false
        files: [ 'fxrates.parquet' ]
        drop table: true
        index:
          fields: [ 'ccy' ]
          unique: false
          
      - schema: reference_data
        table: us_holidays    
        asView: true
        files: [ 'holidays.parquet' ]
        drop table: true
        
      - schema: metrics
        table: bid_moving_averages
        files: ['data/parquet/mvavgs-prime.parquet']
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

Attribute
Description
Data Type
Required

asView

Create a view over the files. This will mean disk IO for every table query. Consider false if the table is small and accessed frequently

Boolean

Default false

files

List of files of the same type to be imported

String list

CSV import

Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV, it is possible to control the table definition.

Joule by default will try to create a target table based upon a sample set of data assuming a header exists on the first row.

Example

This example imports a CSV file: fxrates.csv into the nasdaq table in the reference_data schema. It specifies the table structure, including column types.

Key settings include a custom delimiter (;), date and timestamp formats, sample size and no automatic detection of data types.

The table is dropped before importing and an index is created on the symbol field.

stream:
  ...
  data import:
    csv:
      - schema: reference_data
        table: nasdaq
        table definition:
          nasdaq (
          ccy VARCHAR,
          name VARCHAR,
          last_sale REAL,
          net_change REAL,
          change REAL,
          market_cap DOUBLE,
          country VARCHAR,
          ipo_year SMALLINT,
          volume INTEGER,
          sector VARCHAR,
          industry VARCHAR);
        file: 'data/csv/fxrates.csv'
        delimiter: ";"
        date format: "%Y/%m%d"
        timestamp format: "%Y/%m%d"
        sample size: 1024
        skip: 0
        header: true
        auto detect: false
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false

Attributes schema

Attribute
Description
Data Type
Required

table definition

Custom SQL table definition used when provided. This will override the auto detect flag

String

file

Name and path of the file to import.

String

delimiter

Field delimiter to use. Supported delimiters include , | ; and space and tab

String

Default |

date format

User specified date format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y

String

Default: System

timestamp formate

User specified timestamp format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y

String

Default: System

sample size

Number of rows to use to determine types and table structure

Integer

Default: 1024

skip

Number of rows to skip when auto generating a table using a sample size

Integer

Default: 1

header

Flag to indicate the first line in the file contains a header

Boolean

Default: true

auto detect

Auto detect table format by taking a sample size of data. If set to false a table definition must be provided

Boolean

Default: true

Application example

The code defines a stream named standardQuoteAnalyticsStream that imports data from CSV: nasdaq.csv and Parquet: mvavgs-prime.parquet files.

It computes the BidMovingAverage metric, enriches the data with company info and quote metrics and outputs the data grouped by symbol.

The stream runs every minute, with compaction every 8 hours and flushes data every 5 minutes.

The data is processed based on event timestamps and includes table imports, metric calculations and enrichment steps.

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - schema: metrics
          table: bid_moving_averages
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol
PreviousData primingNextProcessing unit

Last updated 5 months ago

Was this helpful?

The following example primes the process with and data which is used for event processing.

contextual
metrics
enrichment