Serializers

Joule ships with various event formatters and serialisers that convert StreamEvent objects to target streaming or storage formats

Serialization Specification

The SerializerSpecification class is used to defined how StreamEvents are serialised for downstream consumption.

Attributes

AttributeDescriptionType

transformer

Convert StreamEvent to a custom domain type

CustomTransformer

formatter

Convert a StreamEvent to a target format such as Json, CSV, Object.

Formatter

Default: Json

compress

If resulting serialization process needs to compress payload.

Boolean

Default: False

batch

Batch events to process

Boolean

Default: False

properties

Specific properties required for custom serialization process

Properties

Example

Simple example whereby RabbitMQ publishes StreamEvent as Json formatted objects. In this example a set of Joule processes would perform inter-process communication to perform a larger use case as a topology.

rabbitPublisher:
  exchange:
    name: quotes_exchange
    type: FANOUT

  serializer:
    formatter:
      json formatter: {}

CustomTransformer Interface

This classed is used when the use case requires a specific domain data type, from Joule, for a downstream consumption. Developers are expected to provide domain specific implementations by implementing the CustomTransformer interface.

/**
 * Transform passed byte array to collection of StreamEvents
 *
 * @param payload collection of events to transform to target types
 * @return collection of T
 * @throws TranslationException is thrown when a translation failure occurs
 */
Collection<T> transform(Collection<StreamEvent> payload) throws TranslationException;

/**
 * Transform a single StreamEvent in to a specific type
 *
 * @param payload an event
 * @return
 * @throws TranslationException
 */
T transform(StreamEvent payload) throws TranslationException;

Example

Class from the nbanking project where StreamEvents are transformed to Quote type.

public class QuoteTransformer implements CustomTransformer<Quote> {

    public QuoteTransformer() {
        // Required
    }

    @Override
    public Collection<Quote> transform(Collection<StreamEvent> payload) throws TranslationException {
        Collection<Quote> quotes = new ArrayList<>();
        if( payload!= null) {
            for(StreamEvent e : payload){
                quotes.add(transform(e));
            }
        }
        return quotes;
    }

    @Override
    public Quote transform(StreamEvent payload) throws TranslationException {
        return new Quote(
                (String)payload.getValue("symbol"),
                (double)payload.getValue("mid"),
                (double)payload.getValue("bid"),
                (double)payload.getValue("ask"),
                (long)payload.getValue("volume"),
                (double)payload.getValue("volatility"),
                (long)payload.getEventTime(),
                (Date)payload.getValue("date")
                );
    }
}

Available Implementations

FormatDSLClass

avro

avro serializer

AvroDomainSerializer

Avro

This serializer has extra setting to support ability to transport the resulting StreamEvent to the desired the target data domain type using a provided arvo schema. Currently only local schema files are supported with schema registry support on request.

Example

avro serializer:
    schema: /home/myapp/schema/customer.avsc

Attributes

AttributeDescriptionData TypeRequired

schema

Path and name of schema file

String

field mapping

Custom mapping of source StreamEvent fields to target domain fields

Map<String,String>

Formatters

Formatters are mainly used for direct storage whereby Data tools such as PySpark, Apache Presto, DuckDB, MongDB, Postgres, MySQL etc,.

Available Implementations

FormatDSLClass

json

json formatter

JsonStreamEventFormatter

csv

csv formatter

CSVStreamEventFormatter

parquet

parquet formatter

ParquetStreamEventFormatter

Json

Standard json formater converts processed events to a JSON string using specified attributes

Example

json formatter:
  date format: YYYY/MM/dd
  contentType: application/json
  indent output: false

Attributes

AttributeDescriptionData TypeRequired

date format

Date format to apply to date fields

String

Default: yyyy/MM/dd

indent output

Apply indentation formatting

Boolean

Default: false

contentType

Type of content to inform receiving application

String

Default: application/json

encoding

Payload encoding method

String

Default: UTF-8

ext

File extension

String

Default: json

CSV

Standard csv formatter converts processed events to a CSV string using specified attributes

Example

csv formatter:
  contentType: text/csv
  encoding: UTF_8
  delimiter: "|"

Attributes

AttributeDescriptionData TypeRequired

date format

Date format to apply to date fields

String

Default: yyyy/MM/dd

delimiter

Field delimiter

Character

Default: ","

contentType

Type of content to inform receiving application

String

Default: text/csv

encoding

Payload encoding method

String

Default: UTF-8

ext

File extension

String

Default: csv

Parquet

Converts a StreamEvent to an Avro object using a target schema format before writing to a parquet formatted object.

Example

parquet formatter:
  schema path: /home/joule/outputschema.avro
  compression codec: SNAPPY
  temp filedir: /tmp
  contentType: binary/octet-stream
  encoding: UTF_8

Attributes

AttributeDescriptionData TypeRequired

schema path

Path location for the Avro output schema

String

compression codec

Algorithm to use to compress file. Available types:

  • UNCOMPRESSED

  • SNAPPY

  • GZIP

  • LZO

  • BROTLI

  • LZ4

  • ZSTD

String

Default: UNCOMPRESSED

contentType

Type of content to inform receiving application

String

Default: binary/octet-stream

encoding

Payload encoding method

String

Default: UTF_8

ext

File extension

String

Default: parquet

temp file directory

Directory path for temp files

String

Default: ./tmp

Supporting classes

To support extendability of the platform the AbstractFormatterSpecification class has been provided.

Last updated