Types of import
Parquet and CSV data imports for stream processing
Overview
The page describes how data imports from Parquet and CSV files can be configured, how to define tables and indexes and how these imports can be utilised within a processing pipeline.
The provided example demonstrates the integration of imported data into a stream-based analytics system, with calculations and data enrichment performed on the incoming data.
Parquet Import
Parquet formatted files can be imported into the system.
Example
This example imports Parquet files into different schemas and tables.
Some tables are views: us_holidays
, others are regular tables: fxrates
, bid_moving_averages
.
Indexes are created on specific fields but are not unique and existing tables are dropped before importing new data.
stream:
...
data import:
parquet:
- schema: exchange_rates
table: fxrates
asView: false
files: [ 'fxrates.parquet' ]
drop table: true
index:
fields: [ 'ccy' ]
unique: false
- schema: reference_data
table: us_holidays
asView: true
files: [ 'holidays.parquet' ]
drop table: true
- schema: metrics
table: bid_moving_averages
files: ['data/parquet/mvavgs-prime.parquet']
drop table: true
index:
fields: [ 'symbol' ]
unique: false
Attributes schema
asView
Create a view over the files
. This will mean disk IO for every table query. Consider false
if the table is small and accessed frequently
Boolean
Default false
files
List of files
of the same type to be imported
String list
CSV import
Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV, it is possible to control the table definition.
Joule by default will try to create a target table based upon a sample set of data assuming a header exists on the first row.
Example
This example imports a CSV file: fxrates.csv
into the nasdaq
table in the reference_data
schema. It specifies the table structure, including column types.
Key settings include a custom delimiter (;
), date and timestamp formats, sample size and no automatic detection of data types.
The table is dropped before importing and an index is created on the symbol
field.
stream:
...
data import:
csv:
- schema: reference_data
table: nasdaq
table definition:
nasdaq (
ccy VARCHAR,
name VARCHAR,
last_sale REAL,
net_change REAL,
change REAL,
market_cap DOUBLE,
country VARCHAR,
ipo_year SMALLINT,
volume INTEGER,
sector VARCHAR,
industry VARCHAR);
file: 'data/csv/fxrates.csv'
delimiter: ";"
date format: "%Y/%m%d"
timestamp format: "%Y/%m%d"
sample size: 1024
skip: 0
header: true
auto detect: false
drop table: true
index:
fields: [ 'symbol' ]
unique: false
Attributes schema
table definition
Custom SQL table definition used when provided. This will override the auto detect
flag
String
file
Name and path of the file to import.
String
delimiter
Field delimiter to use. Supported delimiters include , | ; and space and tab
String
Default |
date format
User specified date format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y
String
Default: System
timestamp formate
User specified timestamp format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y
String
Default: System
sample size
Number of rows to use to determine types and table structure
Integer
Default: 1024
skip
Number of rows to skip when auto generating a table using a sample size
Integer
Default: 1
header
Flag to indicate the first line in the file contains a header
Boolean
Default: true
auto detect
Auto detect table format by taking a sample size of data. If set to false
a table definition must be provided
Boolean
Default: true
Application example
The following example primes the process with contextual and metrics data which is used for event enrichment processing.
The code defines a stream named standardQuoteAnalyticsStream
that imports data from CSV: nasdaq.csv
and Parquet: mvavgs-prime.parquet
files.
It computes the BidMovingAverage
metric, enriches the data with company info and quote metrics and outputs the data grouped by symbol
.
The stream runs every minute, with compaction every 8 hours and flushes data every 5 minutes.
The data is processed based on event timestamps and includes table imports, metric calculations and enrichment steps.
stream:
name: standardQuoteAnalyticsStream
enabled: true
eventTimeType: EVENT_TIME
sources: [ nasdaq_quotes_stream ]
initialisation:
data import:
csv:
- schema: reference_data
table: nasdaq_companies
file: 'data/csv/nasdaq.csv'
drop table: true
index:
fields: [ 'Symbol' ]
unique: true
parquet:
- schema: metrics
table: bid_moving_averages
files: ['data/parquet/mvavgs-prime.parquet']
drop table: true
index:
fields: [ 'symbol' ]
unique: false
processing unit:
metrics engine:
runtime policy:
frequency: 1
startup delay: 1
time unit: MINUTES
foreach metric compute:
metrics:
-
name: BidMovingAverage
metric key: symbol
table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
query:
SELECT symbol,
MIN(bid) AS 'avg_bid_min',
AVG(bid) AS 'avg_bid_avg',
MAX(bid) AS 'avg_bid_max'
FROM quotes.nasdaq
WHERE
ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
GROUP BY symbol
ORDER BY 1;
truncate on start: false
compaction policy:
frequency: 8
time unit: HOURS
pipeline:
- tap:
target schema: quotes
flush frequency: 5
index:
unique: false
fields:
- symbol
- enricher:
fields:
company_info:
by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
query fields: [ symbol ]
with values: [ Name,Country ]
using: JouleDB
quote_metrics:
by metric family: BidMovingAverage
by key: symbol
with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
using: MetricsDB
emit:
select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"
group by:
- symbol
Last updated
Was this helpful?