Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Deserialiser DSL element
  • Example
  • Attributes schema
  • Parser
  • StreamEventParser API
  • AVRO deserialisation
  • Available Implementations
  • JSON
  • CSV
  • Object

Was this helpful?

  1. Components
  2. Connectors
  3. Serialisers

Deserialisers

Consumed data is mapped to internal StreamEvents using a flexible deserialisation framework

PreviousFormattersNextCustom parsing example

Last updated 6 months ago

Was this helpful?

Overview

Joule's deserialization framework enables seamless conversion of raw domain-specific data into StreamEvent objects for processing.

By leveraging built-in deserializers (i.e., JSON, CSV, AVRO) or custom parsers, it ensures compatibility with various data formats and workflows, allowing precise transformation tailored to application needs.

For advanced scenarios, the supports custom parsing when standard deserializers cannot handle unique formats or structures. This flexibility allows developers to map domain-specific objects into StreamEvent collections, ensuring smooth integration with Joule's processing pipeline.

Every connector type will have it preferred supported deserialisation method

This page serves as a guide for configuring data deserialisation, with examples and options for effective data integration in Joule.

This page includes the following:

  1. Deserialiser DSL element Describes the deserialisation element, which defines how received data is parsed ready for Joule processing.

  2. Kafka example Provides an example Kafka consumer configuration that deserialises incoming domain events to StreamEvents.

  3. Custom parser API Covers the StreamEventParser interface for custom deserialisation when unique data parsing is required.

  4. AVRO deserialiser Introduce how can be used to transform domain events to Joule StreamEvent events using a provided AVRO IDL schema file.

  5. Available Implementations Learn how to use OOTB deserialisers for JSON and CSV event data.

Deserialiser DSL element

The deserialiser DSL element defines the deserialisation process for StreamEvents to prepare them for Joule processing.

It allows developers to specify how data should be transformed into StreamEvents using custom parsing logic, enabling precise handling of various data formats and structures tailored to specific application needs.

Example

This code snippet shows a basic setup for a Kafka consumer within Joule, where JSON StreamEvent string objects are converted toStreamEvent objects ready for pipeline processing.

kafkaConsumer:
   ...
   deserializer:
      parser: 
         json deserializer: {} 

Attributes schema

Attribute
Description
Type

parser

Parser interface to convert a Object to a collection of stream events

StreamEventParser

Default: StreamEventJsonDeserializer

compressed

If passed payload is compressed

Boolean

Default: false

batch

Is payload a batch as events

Boolean

Default: false

properties

Specific properties required for custom deserialisation process

Properties

Parser

Parser interface to converts a received Object to a collection of stream events.

StreamEventParser API

For complex use cases whereby AVRO cannot be used a custom parser will be needed. In this case developers can provide a parsing solution using the StreamEventParser API.

Example

This example shows how a custom parser (QuoteToStreamEventParser) transforms Quote domain objects into StreamEvents that can be processed by Joule. Deserialisation in action.

kafkaConsumer:
  ... 
  deserializer:
    parser: 
      com.fractalworks.examples.banking.data.QuoteToStreamEventParser
    
    key deserializer: 
      org.apache.kafka.common.serialization.IntegerDeserializer
    
    value deserializer: 
      com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

AVRO deserialisation

This deserialiser has extra setting to support ability to load the target schema. Currently only local schema files are supported with schema registry support on request.

The transformer automatically maps domain attributes to StreamEvent attributes using a provided AVRO schema IDL. Currently only local schema files are supported with schema registry support on request.

Integrate Joule to any existing system using already established data structures

Example

kafkaConsumer:
   ...
   deserializer:
      parser: 
         avro deserializer: 
            schema: /home/myapp/schema/customer.avro

Attributes schema

Attribute
Description
Type
Required

schema

Path and name of schema file

String

field mapping

Map source fields to map to a target StreamEvent field

Map<String,String>

Available Implementations

Joule provides several deserialiser implementations for different data formats, each requiring a full package namespace.

Key implementations include:

  1. JSON deserialiser Parse StreamEvent JSON events into concrete objects.

  2. CSV Deserialiser Parse CSV formatted data using options for custom headers and a type map to define data types like STRING and DOUBLE.

  3. Object Deserialiser Parse StreamEvent binary events into concrete objects.

Format
DSL
Class

json

json deserialiser

StreamEventJsonDeserializer

csv

csv deserialiser

StreamEventCSVDeserializer

object

object deserialiser

StreamEventObjectDeserializer

JSON

This parser reads a converts a JSON string formatted StreamEvent to a StreamEvent object.

Attribute
Description
Type
Required

date format

String Default: yyyy/MM/dd

Example

deserializer:
    parser: 
        json deserializer:
            date format: dd/MM/yyyy

CSV

This parser has extra optional setting to support ability to custom parsing through the use of defining a custom header.

Attribute
Description
Type
Required

type map

Map of fields to types. See supported types

Map<String,Type>

delimiter

Define a custom file delimiter

String Default: ","

date format

Provide a custom date format.

String Default: yyyy/MM/dd

Example

deserializer:
    parser: 
        csv deserializer:
            type map:
               symbol: STRING
               rate:   DOUBLE

Supported Types

Following types are supported

  • DOUBLE, FLOAT, LONG, INTEGER, SHORT, BYTE, BOOLEAN, STRING, DATE

Object

This parser reads a converts a binary formatted StreamEvent to a StreamEvent object

Example

deserializer:
    parser: 
        object deserializer: {}

For a

Provide a custom date format. See for further data format options.

custom example on deserialisers
AVRO
StreamEventParser API
JavaDocs