Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Objective
  • Example
  • Class: CustomTransformer
  • Constructor
  • Transform single event method
  • Transform batch events method

Was this helpful?

  1. DEVELOPER GUIDES
  2. Builder SDK
  3. Connector API
  4. Sinks

CustomTransformer API

Custom parser converts an internal Joule StreamEvent into a domain object

Objective

A CustomTransformer implementation converts a StreamEvent to a custom domain type ready for consumer processing.

Example

This example maps StreamEvent attributes to a Quote object attributes.

public class QuoteTransformer implements CustomTransformer<Quote> {

    public QuoteTransformer() {
        // Required
    }

    @Override
    public Collection<Quote> transform(Collection<StreamEvent> payload) throws TranslationException {
        Collection<Quote> quotes = new ArrayList<>();
        if( payload!= null) {
            for(StreamEvent e : payload){
                quotes.add(transform(e));
            }
        }
        return quotes;
    }

    @Override
    public Quote transform(StreamEvent payload) throws TranslationException {
        return new Quote(
                (String)payload.getValue("symbol"),
                (double)payload.getValue("mid"),
                (double)payload.getValue("bid"),
                (double)payload.getValue("ask"),
                (long)payload.getValue("volume"),
                (double)payload.getValue("volatility"),
                (long)payload.getEventTime(),
                (Date)payload.getValue("date")
                );
    }
}

Class: CustomTransformer

All implementations must provide a concrete implements of the transform methods.

/**
 * Transform passed byte array to collection of StreamEvents
 *
 * @param payload collection of events to transform to target types
 * @return collection of T
 * @throws TranslationException is thrown when a translation failure occurs
 */
Collection<T> transform(Collection<StreamEvent> payload) throws TranslationException;

/**
 * Transform a single StreamEvent in to a specific type
 *
 * @param payload an event
 * @return
 * @throws TranslationException
 */
T transform(StreamEvent payload) throws TranslationException;

Constructor

The constructor is required but does not contain any specific logic for this example.

Transform single event method

This core method that converts the StreamEvent object to a single domain specific object. This is performed by a simple attribute mapping from the StreamEvent to Quote constructor parameter requirements.

@Override
public Quote transform(StreamEvent payload) throws TranslationException {
return new Quote(
        // Mapping
        ... 
        );
}

Transform batch events method

This method is called when in micro-batching mode. It takes a collection of StreamEvents and converts to a collection of domain specific events using the defined single event transform method.

@Override
public Collection<Quote> transform(Collection<StreamEvent> payload) throws TranslationException {
    Collection<Quote> quotes = new ArrayList<>();
    if( payload!= null) {
        for(StreamEvent e : payload){
            quotes.add(transform(e));
        }
    }
    return quotes;
}
PreviousSinksNextProcessor API

Last updated 6 months ago

Was this helpful?