Ingestion

Overview

External data often requires a transformation process to facilitate further processing. This is performed using deserialisation.

Joule provides a flexible deserialisation framework that builds upon the standard Kafka's capabilities. Ingestion options:

  1. Domain object deserialisation

  2. AVRO deserialisation

  3. Native Joule StreamEvent deserialisation

Example

kafkaConsumer:
  ... 
  deserializer:
    parser: 
      com.fractalworks.examples.banking.data.QuoteToStreamEventParser
    
    key deserializer: 
      org.apache.kafka.common.serialization.IntegerDeserializer
    
    value deserializer: 
      com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
  ...

Processing steps

The following steps are executed before the event is processed by the stream pipeline:

  1. Kafka client subsystem consumes the event value as a generic object that is not typed using the value deserializer.

  2. The value object is passed on to the QuoteToStreamEventParser parser which converts a strongly typed object, Quote, to a StreamEvent.

  3. The resulting StreamEvent is added to the to the streaming pipeline process queue.

  4. The original event received is asynchronously acknowledged for the Kafka infrastructure to perform event management.

Deserialization Attributes

AttributeDescriptionData TypeRequired

parser

User provided parser implementation

Implementation of CustomTransformer

key deserializer

Domain class that maps to the partition key type. Property maps to key.serializer property

String

Default: IntegerDeserializer

value deserializer

Domain class that deserializes incoming object to StreamEvent. Property maps to value.serializer property

String

Default: StringDeserializer

avro setting

AVRO ingestion specification.

Avro specification

Provided value deserializers

The following implementations are provided to enable external data structures to be ingested in to the Joule platform.

  • Custom data types: When custom data types are consumed without an AVRO IDL the below configuration must be set along with a custom parser implementation: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

  • StreamEvent (Binary): When connecting Joule processes together as a DAG events can be passed as internal StreamEvent's, these will be in binary format. For this case set the deserialized with the below setting: com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer

  • StreamEvent (Json): For testing you may have saved StreamEvents as Json string. These can be consumed and transformed in to concrete objects using the below setting: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer

Avro Support

Avro support is provided using locally stored schema's. Domain objects are transformed in to StreamEvents using the provided AVRO IDL file

Currently schema registries are not supported. This is on the roadmap for 2025.

Example

This example will load the quote AVRO schema and perform the translation mapping of Quote events to StreamEvents automatically without any further configurations.

kafkaConsumer:
  cluster address: localhost:19092
  consumerGroupId: quotesConsumerGroup
  topics:
    - quotes

  deserializer:
    key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
    avro setting:
      local schema: avro/quote.avsc

Quote avro IDL

{
  "type" : "record",
  "name" : "Quote",
  "namespace" : "com.fractalworks.examples.banking.data",
  "fields" : [
    {"name" : "symbol", "type" : "string"},
    {"name" : "mid",  "type" : "double"},
    {"name" : "bid", "type" : "double"},
    {"name" : "ask", "type" : "double"},
    {"name" : "volume", "type" : "long"},
    {"name" : "volatility", "type" : "double"},
    {"name" : "time", "type" : "long"},
    {"name": "date",
      "type" : {
        "type" : "int",
        "logicalType": "date"
      }
    }]
}

Last updated