Ingestion

Joule enables flexible Kafka data deserialisation for processing

Overview

When working with external data, deserialisation is often needed to convert raw data into formats compatible with Joule’s processing pipelines.

Joule provides a versatile deserialisation framework that extends the native Kafka deserialisation capabilities, supporting multiple formats to make ingestion and processing easier.

Available deserialisation options include:

  1. Domain object deserialisation Custom parsers can transform domain-specific objects into Joule-compatible formats.

  2. AVRO deserialisation AVRO schemas can convert structured data into Joule's internal StreamEvents.

  3. Native Joule StreamEvent deserialisation Joule's format for internal processing allows direct ingestion of data as StreamEvents.

Examples & DSL attributes

This example shows how a custom parser (QuoteToStreamEventParser) transforms Quote domain objects into StreamEvents that can be processed by Joule. Deserialisation in action.

kafkaConsumer:
  ... 
  deserializer:
    parser: 
      com.fractalworks.examples.banking.data.QuoteToStreamEventParser
    
    key deserializer: 
      org.apache.kafka.common.serialization.IntegerDeserializer
    
    value deserializer: 
      com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
  ...

Processing steps

  1. Consumption The Kafka client receives the event data using a generic object via the specified value deserialiser.

  2. Parsing The object is passed to QuoteToStreamEventParser, converting it from a Quote object to a StreamEvent.

  3. Streaming pipeline The StreamEvent is added to Joule’s streaming pipeline queue.

  4. Acknowledgment The Kafka infrastructure acknowledges the event asynchronously to manage processing flow.

Deserialisation attributes schema

Provided value deserialisers

The following deserialisation options enable Joule to ingest external data structures seamlessly:

  1. Custom data types To handle custom data types without using an AVRO IDL schema, you must configure the system with com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer. This configuration requires implementing a custom parser that can interpret and transform the incoming data into a format Joule can process.

  2. StreamEvent (Binary) When connecting Joule processes in a Directed Acyclic Graph (DAG), events are passed as internal StreamEvent objects in binary format. To properly deserialise these events, configure the system with the following setting: com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer This ensures that the binary StreamEvent is correctly interpreted and processed.

  3. StreamEvent (JSON) For testing purposes, you may have saved StreamEvent objects as JSON strings. These events can be consumed and converted into concrete objects by configuring the system with the following setting: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer This setting ensures that the JSON data is properly deserialised into the appropriate StreamEvent objects.

Avro Support

Joule includes AVRO support through locally stored schemas (AVRO IDL files) to map domain objects to StreamEvents. Although AVRO schema registries are not supported yet, this feature is planned for 2025.

Example AVRO deserialisation

Here’s a configuration that uses an AVRO schema to map Quote events into StreamEvents.

To review the schema for this example, please visit the Kafka sources page.

kafkaConsumer:
  cluster address: localhost:19092
  consumerGroupId: quotesConsumerGroup
  topics:
    - quotes

  deserializer:
    key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    avro setting:
      local schema: avro/quote.avsc

Sample AVRO IDL for quote schema

This AVRO schema defines a Quote object with attributes like symbol, mid, bid, and ask, allowing Joule to map incoming Quote data into StreamEvents automatically for streamlined ingestion.

{
  "type" : "record",
  "name" : "Quote",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "mid",  "type" : "double"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"},
    {"name" : "volume", "type" : "long"},
    {"name" : "volatility", "type" : "double"},
    {"name" : "time", "type" : "long"},
    {"name": "date",
      "type" : {
        "type" : "int",
        "logicalType": "date"
      }
    }]
}

Last updated