# Ingestion

## Overview

When working with external data, deserialisation is often needed to **convert raw data into formats compatible** with Joule’s processing pipelines.

Joule provides a versatile deserialisation framework that extends the native Kafka deserialisation capabilities, supporting multiple formats to make ingestion and processing easier.

Available deserialisation options include:

1. <mark style="color:green;">**Domain object deserialisation**</mark>\
   Custom parsers can transform domain-specific objects into Joule-compatible formats.
2. <mark style="color:green;">**AVRO deserialisation**</mark>\
   AVRO schemas can convert structured data into Joule's internal `StreamEvents`.
3. <mark style="color:green;">**Native Joule**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**deserialisation**</mark>\
   Joule's format for internal processing allows direct ingestion of data as `StreamEvents`.

## Examples & DSL attributes

This example shows how a custom parser (`QuoteToStreamEventParser`) transforms `Quote` domain objects into `StreamEvents` that can be processed by Joule. Deserialisation in action.

```yaml
kafkaConsumer:
  ... 
  deserializer:
    parser: 
      com.fractalworks.examples.banking.data.QuoteToStreamEventParser
    
    key deserializer: 
      org.apache.kafka.common.serialization.IntegerDeserializer
    
    value deserializer: 
      com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
  ...
```

### Processing steps

1. <mark style="color:green;">**Consumption**</mark>\
   The Kafka client receives the event data using a generic object via the specified value deserialiser.
2. <mark style="color:green;">**Parsing**</mark>\
   The object is passed to `QuoteToStreamEventParser`, converting it from a `Quote` object to a `StreamEvent`.
3. <mark style="color:green;">**Streaming pipeline**</mark>\
   The `StreamEvent` is added to Joule’s streaming pipeline queue.
4. <mark style="color:green;">**Acknowledgment**</mark>\
   The Kafka infrastructure acknowledges the event asynchronously to manage processing flow.

### **Deserialisation** attributes schema

<table><thead><tr><th width="176">Attribute</th><th width="298">Description</th><th width="181">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>parser</td><td>User provided parser implementation</td><td>Implementation of CustomTransformer</td><td>false</td></tr><tr><td>key deserializer</td><td>Domain class that maps to the partition key type. Property maps to <code>key.serializer</code> property</td><td><p>String </p><p>Default: IntegerDeserializer</p></td><td>false</td></tr><tr><td>value deserializer</td><td>Domain class that deserialises incoming object to <code>StreamEvent</code>. Property maps to <code>value.serializer</code> property</td><td><p>String</p><p>Default: StringDeserializer</p></td><td>false</td></tr><tr><td>avro setting</td><td>AVRO ingestion specification.</td><td>Avro specification</td><td>false</td></tr></tbody></table>

### Provided **value deserialisers**

The following deserialisation options enable Joule to ingest external data structures seamlessly:

1. <mark style="color:green;">**Custom data types**</mark>\
   To handle custom data types without using an [AVRO IDL schema](https://avro.apache.org/docs/1.11.1/idl-language/), you must configure the system with `com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer`.\
   This configuration requires implementing a custom parser that can interpret and transform the incoming data into a format Joule can process.
2. <mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**(Binary)**</mark>\
   When connecting Joule processes in a Directed Acyclic Graph (DAG), events are passed as internal `StreamEvent` objects in binary format.\
   To properly deserialise these events, configure the system with the following setting:\
   `com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer`\
   This ensures that the binary `StreamEvent` is correctly interpreted and processed.
3. <mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**(JSON)**</mark>\
   For testing purposes, you may have saved `StreamEvent` objects as JSON strings.\
   These events can be consumed and converted into concrete objects by configuring the system with the following setting:\
   `com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer`\
   This setting ensures that the JSON data is properly deserialised into the appropriate `StreamEvent` objects.

## Avro Support

Joule includes AVRO support through locally stored schemas (AVRO IDL files) to map domain objects to `StreamEvents`. Although AVRO schema registries are not supported yet, this [feature is planned for 2025](https://docs.fractalworks.io/joule/product-updates/public-roadmap).

### Example AVRO deserialisation

Here’s a configuration that uses an AVRO schema to map `Quote` events into `StreamEvents.`

To review the schema for this example, please visit the [Kafka sources page](https://docs.fractalworks.io/joule/components/connectors/sources/kafka/..#attributes-schema-schema).

```yaml
kafkaConsumer:
  cluster address: localhost:19092
  consumerGroupId: quotesConsumerGroup
  topics:
    - quotes

  deserializer:
    key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    avro setting:
      local schema: avro/quote.avsc
```

### Sample AVRO IDL for quote schema

This AVRO schema defines a `Quote` object with attributes like `symbol`, `mid`, `bid`, and `ask`, allowing Joule to map incoming `Quote` data into `StreamEvents` automatically for streamlined ingestion.

```avro-idl
{
  "type" : "record",
  "name" : "Quote",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "mid",  "type" : "double"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"},
    {"name" : "volume", "type" : "long"},
    {"name" : "volatility", "type" : "double"},
    {"name" : "time", "type" : "long"},
    {"name": "date",
      "type" : {
        "type" : "int",
        "logicalType": "date"
      }
    }]
}
```
