# Types of import

## Overview

The page describes how data imports from Parquet and CSV files can be configured, how to define tables and indexes and how these imports can be utilised within a processing pipeline.

The provided example demonstrates the integration of imported data into a stream-based analytics system, with calculations and data enrichment performed on the incoming data.

## Parquet Import

Parquet formatted files can be imported into the system.

{% hint style="info" %}
`index` cannot be created over a view
{% endhint %}

### Example

This example imports Parquet files into different schemas and tables.

Some tables are views: `us_holidays`, others are regular tables: `fxrates`, `bid_moving_averages`.

Indexes are created on specific fields but are not unique and existing tables are dropped before importing new data.

```yaml
stream:
  ...
  data import:
    parquet:
      - schema: exchange_rates
        table: fxrates
        asView: false
        files: [ 'fxrates.parquet' ]
        drop table: true
        index:
          fields: [ 'ccy' ]
          unique: false
          
      - schema: reference_data
        table: us_holidays    
        asView: true
        files: [ 'holidays.parquet' ]
        drop table: true
        
      - schema: metrics
        table: bid_moving_averages
        files: ['data/parquet/mvavgs-prime.parquet']
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false
```

### Attributes schema

<table><thead><tr><th width="145">Attribute</th><th width="377">Description</th><th width="136">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>asView</td><td>Create a view over the <code>files</code>. This will mean disk IO for every table query. Consider <code>false</code> if the table is small and accessed frequently</td><td><p>Boolean</p><p><em>Default false</em></p></td><td>false</td></tr><tr><td>files</td><td>List of <code>files</code> of the same type to be imported</td><td>String list</td><td>true</td></tr></tbody></table>

## CSV import

Data can be imported from CSV files using a supported set of delimiters. The key difference between parquet and CSV, it is possible to **control the table definition**.

Joule by default will try to create a target table based upon a sample set of data **assuming a header exists** on the first row.

### Example

This example imports a CSV file: `fxrates.csv` into the `nasdaq` table in the `reference_data` schema. It specifies the table structure, including column types.

Key settings include a custom delimiter (`;`), date and timestamp formats, sample size and no automatic detection of data types.

The table is dropped before importing and an index is created on the `symbol` field.

```yaml
stream:
  ...
  data import:
    csv:
      - schema: reference_data
        table: nasdaq
        table definition:
          nasdaq (
          ccy VARCHAR,
          name VARCHAR,
          last_sale REAL,
          net_change REAL,
          change REAL,
          market_cap DOUBLE,
          country VARCHAR,
          ipo_year SMALLINT,
          volume INTEGER,
          sector VARCHAR,
          industry VARCHAR);
        file: 'data/csv/fxrates.csv'
        delimiter: ";"
        date format: "%Y/%m%d"
        timestamp format: "%Y/%m%d"
        sample size: 1024
        skip: 0
        header: true
        auto detect: false
        drop table: true
        index:
          fields: [ 'symbol' ]
          unique: false
```

### Attributes schema

<table><thead><tr><th width="145">Attribute</th><th width="377">Description</th><th width="136">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>table definition</td><td>Custom SQL table definition used when provided. This will override the <code>auto detect</code> flag</td><td><em>String</em></td><td>false</td></tr><tr><td>file</td><td>Name and path of the file to import.</td><td>String</td><td>true</td></tr><tr><td>delimiter</td><td>Field delimiter to use. Supported delimiters include <code>, | ; and space and tab</code></td><td><p>String</p><p>Default |</p></td><td>false</td></tr><tr><td>date format</td><td>User specified date format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y</td><td><p>String</p><p>Default: System</p></td><td>false</td></tr><tr><td>timestamp formate</td><td>User specified timestamp format. Many formats are possible, i.e: %d/%Y/%m | %d-%m-%Y</td><td><p>String</p><p>Default: System</p></td><td>false</td></tr><tr><td>sample size</td><td>Number of rows to use to determine types and table structure</td><td><p>Integer</p><p>Default: 1024</p></td><td>false</td></tr><tr><td>skip</td><td>Number of rows to skip when auto generating a table using a sample size</td><td><p>Integer</p><p>Default: 1</p></td><td>false</td></tr><tr><td>header</td><td>Flag to indicate the first line in the file contains a header</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>auto detect</td><td>Auto detect table format by taking a sample size of data. If set to <code>false</code> a table definition must be provided</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr></tbody></table>

## Application example

The following example primes the process with [contextual](/joule/components/contextual-data.md) and [metrics](/joule/components/processors/enrichment/metrics.md) data which is used for event [enrichment](/joule/components/processors/enrichment.md) processing.

The code defines a stream named `standardQuoteAnalyticsStream` that imports data from CSV: `nasdaq.csv` and Parquet: `mvavgs-prime.parquet` files.

It computes the `BidMovingAverage` metric, enriches the data with company info and quote metrics and outputs the data grouped by `symbol`.

The stream runs every minute, with compaction every 8 hours and flushes data every 5 minutes.

The data is processed based on event timestamps and includes table imports, metric calculations and enrichment steps.

```yaml
stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  initialisation:
    data import:
      csv:
        - schema: reference_data
          table: nasdaq_companies
          file: 'data/csv/nasdaq.csv'
          drop table: true
          index:
            fields: [ 'Symbol' ]
            unique: true

      parquet:
        - schema: metrics
          table: bid_moving_averages
          files: ['data/parquet/mvavgs-prime.parquet']
          drop table: true
          index:
            fields: [ 'symbol' ]
            unique: false

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages (symbol VARCHAR, avg_bid_min FLOAT, avg_bid_avg FLOAT,avg_bid_max FLOAT,createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: false
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol
      - enricher:
          fields:
            company_info:
              by query: "select * from reference_data.nasdaq_companies where Symbol = ?"
              query fields: [ symbol ]
              with values: [ Name,Country ]
              using: JouleDB
            quote_metrics:
              by metric family: BidMovingAverage
              by key: symbol
              with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
              using: MetricsDB
  emit:
    select: "symbol, Name, Country, avg_bid_min, avg_bid_avg, avg_bid_max"

  group by:
    - symbol
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/pipelines/data-priming/types-of-import.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
