Deserializers

Joule ships with the ability to parse domain types using a custom deserialiser along with standard StreamEvent deserialisers

Deserialization Specification

The DeserializerSpecification class is used to defined how StreamEvents are serialised for downstream consumption.

Attributes

StreamEventParser

Parser interface to converts an Object to a collection of stream events.Developers are expected to provide domain specific implementations if there is a need to publish non-Joule events.

/**
* Parse passed byte array to collection of {@link StreamEvent}s
*
* @param payload byte array to translate
* @return collection of stream events
* @throws TranslationException is thrown when a translation failure occurs
*/
Collection<StreamEvent> translate(Object payload) throws TranslationException;

Available Implementations

Example

For the provided implementations a full package namespace is required. For the above implementations use com.fractalworks.streams.sdk.codec

deserializer:
   parser: 
      json deserializer: {} 

AVRO

This deserializer has extra setting to support ability to load the target schema. Currently only local schema files are supported with schema registry support on request.

Example

deserializer:
   parser: 
      avro deserializer: 
         schema: /home/myapp/schema/customer.avro

CSV

This parser has extra optional setting to support ability to custom parsing through the use of defining a custom header.

Example

deserializer:
    parser: 
        csv deserializer:
            type map:
               symbol: STRING
               rate:   DOUBLE

Supported Types

Following types are supported

  • DOUBLE, FLOAT, LONG, INTEGER, SHORT, BYTE, BOOLEAN, STRING, DATE

Custom Example

@JsonRootName(value = "quote parser")
public class QuoteToStreamEventParser implements StreamEventParser {

    public QuoteToStreamEventParser() {
        // Required
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Quote){
            Quote quote = (Quote) o;
            StreamEvent event = new StreamEvent("quote");
            event.setEventTime(quote.time());
            event.addValue("symbol", quote.symbol());
            event.addValue("bid", quote.bid());
            event.addValue("ask", quote.ask());
            event.addValue("volatility", quote.volatility());
            event.addValue("volume", quote.volume());
            event.addValue("date", quote.date());

            events = Collections.singletonList( event);
        }
        return events;
    }
}

Last updated