Joule
Search
K
Comment on page

Serializers

Joule ships with various event formatters and serialisers that convert StreamEvent objects to target streaming or storage formats

Serialization Specification

The SerializerSpecification class is used to defined how StreamEvents are serialised for downstream consumption.

Attributes

Attribute
Description
Type
transformer
Convert StreamEvent to a custom domain type
CustomTransformer
formatter
Convert a StreamEvent to a target format such as Json, CSV, Object.
Formatter
Default: Json
compress
If resulting serialization process needs to compress payload.
Boolean
Default: False
batch
Batch events to process
Boolean
Default: False
properties
Specific properties required for custom serialization process
Properties

Example

Simple example whereby RabbitMQ publishes StreamEvent as Json formatted objects. In this example a set of Joule processes would perform inter-process communication to perform a larger use case as a topology.
rabbitPublisher:
exchange:
name: quotes_exchange
type: FANOUT
serializer:
formatter:
json formatter: {}

CustomTransformer Interface

This classed is used when the use case requires a specific domain data type, from Joule, for a downstream consumption. Developers are expected to provide domain specific implementations by implementing the CustomTransformer interface.
/**
* Transform passed byte array to collection of StreamEvents
*
* @param payload collection of events to transform to target types
* @return collection of T
* @throws TranslationException is thrown when a translation failure occurs
*/
Collection<T> transform(Collection<StreamEvent> payload) throws TranslationException;
/**
* Transform a single StreamEvent in to a specific type
*
* @param payload an event
* @return
* @throws TranslationException
*/
T transform(StreamEvent payload) throws TranslationException;

Example

Class from the nbanking project where StreamEvents are transformed to Quote type.
public class QuoteTransformer implements CustomTransformer<Quote> {
public QuoteTransformer() {
// Required
}
@Override
public Collection<Quote> transform(Collection<StreamEvent> payload) throws TranslationException {
Collection<Quote> quotes = new ArrayList<>();
if( payload!= null) {
for(StreamEvent e : payload){
quotes.add(transform(e));
}
}
return quotes;
}
@Override
public Quote transform(StreamEvent payload) throws TranslationException {
return new Quote(
(String)payload.getValue("symbol"),
(double)payload.getValue("mid"),
(double)payload.getValue("bid"),
(double)payload.getValue("ask"),
(long)payload.getValue("volume"),
(double)payload.getValue("volatility"),
(long)payload.getEventTime(),
(Date)payload.getValue("date")
);
}
}

Available Implementations

Format
DSL
Class
avro
avro serializer
AvroDomainSerializer

Avro

This serializer has extra setting to support ability to transport the resulting StreamEvent to the desired the target data type using a provided arvo schema. Currently only local schema files are supported with schema registry support on request.

Example

avro serializer:
schema: /home/myapp/schema/customer.avro

Attributes

Attribute
Description
Data Type
Required
schema
Path and name of schema file
String
field mapping
Map source StreamEvent fields to target domain fields
Map<String,String>

Formatters

Formatters are mainly used for direct storage whereby Data tools such as PySpark, Apache Presto, DuckDB, MongDB, Postgres, MySQL etc,.

Available Implementations

Format
DSL
Class
json
json formatter
JsonStreamEventFormatter
csv
csv formatter
CSVStreamEventFormatter
parquet
parquet formatter
ParquetStreamEventFormatter

Json

Standard json formater converts processed events to a JSON string using specified attributes

Example

json formatter:
date format: YYYY/MM/dd
contentType: application/json
indent output: false

Attributes

Attribute
Description
Data Type
Required
date format
Date format to apply to date fields
String
Default: yyyy/MM/dd
indent output
Apply indentation formatting
Boolean
Default: false
contentType
Type of content to inform receiving application
String
Default: application/json
encoding
Payload encoding method
String
Default: UTF-8
ext
File extension
String
Default: json

CSV

Standard csv formatter converts processed events to a CSV string using specified attributes

Example

csv formatter:
contentType: text/csv
encoding: UTF_8
delimiter: "|"

Attributes

Attribute
Description
Data Type
Required
date format
Date format to apply to date fields
String
Default: yyyy/MM/dd
delimiter
Field delimiter
Character
Default: ","
contentType
Type of content to inform receiving application
String
Default: text/csv
encoding
Payload encoding method
String
Default: UTF-8
ext
File extension
String
Default: csv

Parquet

Converts a StreamEvent to an Avro object using a target schema format before writing to a parquet formatted object.

Example

parquet formatter:
schema path: /home/joule/outputschema.avro
compression codec: SNAPPY
temp filedir: /tmp
contentType: binary/octet-stream
encoding: UTF_8

Attributes

Attribute
Description
Data Type
Required
schema path
Path location for the Avro output schema
String
compression codec
Algorithm to use to compress file. Available types:
  • UNCOMPRESSED
  • SNAPPY
  • GZIP
  • LZO
  • BROTLI
  • LZ4
  • ZSTD
String
Default: UNCOMPRESSED
contentType
Type of content to inform receiving application
String
Default: binary/octet-stream
encoding
Payload encoding method
String
Default: UTF_8
ext
File extension
String
Default: parquet
temp file directory
Directory path for temp files
String
Default: ./tmp

Supporting classes

To support extendability of the platform the AbstractFormatterSpecification class has been provided.