# Ingestion

## Overview

When working with external data, deserialisation is often needed to **convert raw data into formats compatible** with Joule’s processing pipelines.

Joule provides a versatile deserialisation framework that extends the native Kafka deserialisation capabilities, supporting multiple formats to make ingestion and processing easier.

Available deserialisation options include:

1. <mark style="color:green;">**Domain object deserialisation**</mark>\
   Custom parsers can transform domain-specific objects into Joule-compatible formats.
2. <mark style="color:green;">**AVRO deserialisation**</mark>\
   AVRO schemas can convert structured data into Joule's internal `StreamEvents`.
3. <mark style="color:green;">**Native Joule**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**deserialisation**</mark>\
   Joule's format for internal processing allows direct ingestion of data as `StreamEvents`.

## Examples & DSL attributes

This example shows how a custom parser (`QuoteToStreamEventParser`) transforms `Quote` domain objects into `StreamEvents` that can be processed by Joule. Deserialisation in action.

```yaml
kafkaConsumer:
  ... 
  deserializer:
    parser: 
      com.fractalworks.examples.banking.data.QuoteToStreamEventParser
    
    key deserializer: 
      org.apache.kafka.common.serialization.IntegerDeserializer
    
    value deserializer: 
      com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
  ...
```

### Processing steps

1. <mark style="color:green;">**Consumption**</mark>\
   The Kafka client receives the event data using a generic object via the specified value deserialiser.
2. <mark style="color:green;">**Parsing**</mark>\
   The object is passed to `QuoteToStreamEventParser`, converting it from a `Quote` object to a `StreamEvent`.
3. <mark style="color:green;">**Streaming pipeline**</mark>\
   The `StreamEvent` is added to Joule’s streaming pipeline queue.
4. <mark style="color:green;">**Acknowledgment**</mark>\
   The Kafka infrastructure acknowledges the event asynchronously to manage processing flow.

### **Deserialisation** attributes schema

<table><thead><tr><th width="176">Attribute</th><th width="298">Description</th><th width="181">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>parser</td><td>User provided parser implementation</td><td>Implementation of CustomTransformer</td><td>false</td></tr><tr><td>key deserializer</td><td>Domain class that maps to the partition key type. Property maps to <code>key.serializer</code> property</td><td><p>String </p><p>Default: IntegerDeserializer</p></td><td>false</td></tr><tr><td>value deserializer</td><td>Domain class that deserialises incoming object to <code>StreamEvent</code>. Property maps to <code>value.serializer</code> property</td><td><p>String</p><p>Default: StringDeserializer</p></td><td>false</td></tr><tr><td>avro setting</td><td>AVRO ingestion specification.</td><td>Avro specification</td><td>false</td></tr></tbody></table>

### Provided **value deserialisers**

The following deserialisation options enable Joule to ingest external data structures seamlessly:

1. <mark style="color:green;">**Custom data types**</mark>\
   To handle custom data types without using an [AVRO IDL schema](https://avro.apache.org/docs/1.11.1/idl-language/), you must configure the system with `com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer`.\
   This configuration requires implementing a custom parser that can interpret and transform the incoming data into a format Joule can process.
2. <mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**(Binary)**</mark>\
   When connecting Joule processes in a Directed Acyclic Graph (DAG), events are passed as internal `StreamEvent` objects in binary format.\
   To properly deserialise these events, configure the system with the following setting:\
   `com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer`\
   This ensures that the binary `StreamEvent` is correctly interpreted and processed.
3. <mark style="color:green;">**`StreamEvent`**</mark><mark style="color:green;">**&#x20;**</mark><mark style="color:green;">**(JSON)**</mark>\
   For testing purposes, you may have saved `StreamEvent` objects as JSON strings.\
   These events can be consumed and converted into concrete objects by configuring the system with the following setting:\
   `com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer`\
   This setting ensures that the JSON data is properly deserialised into the appropriate `StreamEvent` objects.

## Avro Support

Joule includes AVRO support through locally stored schemas (AVRO IDL files) to map domain objects to `StreamEvents`. Although AVRO schema registries are not supported yet, this [feature is planned for 2025](/joule/product-updates/public-roadmap.md).

### Example AVRO deserialisation

Here’s a configuration that uses an AVRO schema to map `Quote` events into `StreamEvents.`

To review the schema for this example, please visit the [Kafka sources page](/joule/components/connectors/sources/kafka.md#attributes-schema-schema).

```yaml
kafkaConsumer:
  cluster address: localhost:19092
  consumerGroupId: quotesConsumerGroup
  topics:
    - quotes

  deserializer:
    key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    avro setting:
      local schema: avro/quote.avsc
```

### Sample AVRO IDL for quote schema

This AVRO schema defines a `Quote` object with attributes like `symbol`, `mid`, `bid`, and `ask`, allowing Joule to map incoming `Quote` data into `StreamEvents` automatically for streamlined ingestion.

```avro-idl
{
  "type" : "record",
  "name" : "Quote",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "mid",  "type" : "double"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"},
    {"name" : "volume", "type" : "long"},
    {"name" : "volatility", "type" : "double"},
    {"name" : "time", "type" : "long"},
    {"name": "date",
      "type" : {
        "type" : "int",
        "logicalType": "date"
      }
    }]
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/connectors/sources/kafka/ingestion.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
