Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Examples & DSL attributes
  • Attributes schema
  • Serialisation attributes schema
  • Serialiser example
  • OOTB serialisers
  • Custom transformers
  • Example
  • Java transformer implementation
  • Default producer properties

Was this helpful?

  1. Components
  2. Connectors
  3. Sinks

Kafka

Sends events to specified Kafka topics, enabling real-time data streams

PreviousSinksNextRabbitMQ

Last updated 5 months ago

Was this helpful?

Overview

The Kafka Publisher Transport enables publishing of processed events to a specified Kafka topic on a defined Kafka cluster. Events are serialised using either a default JSON serialiser (StreamEventJsonSerializer) or a custom-defined transformer.

Configuration includes setting the Kafka cluster address in the /etc/hosts file and defining topics and partition keys for message distribution. Key configuration attributes are prioritised by importance and cover serialisation options, allowing flexible data handling.

Out-of-the-box serialisers support common formats (JSON, Avro), while custom serialisers and transformers facilitate integration with downstream applications by converting events into domain-specific data types.

The Kafka Publisher setup supports scalable, resilient event processing workflows, suitable for chaining multiple data-processing stages.

The Kafka consumers relies on the client library

Examples & DSL attributes

In this example, the Kafka Publisher sends events to the customers topic on the Kafka broker (KAFKA_BROKER:9092).

Events are partitioned by customerId, ensuring that all events for the same customer go to the same partition for ordered processing.

Using default settings, events are serialized as JSON StreamEvent objects, making them easily readable for downstream processes. This setup enables chaining multiple processing stages, useful for building scalable and resilient data workflows.

kafkaPublisher:
  cluster address: KAFKA_BROKER:9092
  topic: customers
  partitionKeys:
    - customerId

Add to the /etc/hosts file the address of the Kafka host i.e. 127.0.0.1 KAFKA_BROKER

Attributes schema

Configuration parameters available for the InfluxDB publisher transport.

The parameters are organised by order of importance, ranked from high to low.

Attribute
Description
Data Type
Required

cluster address

InfluxDB server address

http://<ip-address>:port

topic

InfluxDB UI / CLI Authentication access token

String

partitionKeys

Authentication access details for v1.x InfluxDB.

Note: Only required if authToken has not been provided

String

serializer

InfluxDB UI / CLI organisation token

String

batchSize

Number of events to batch send, maps to batch.size. Batch sise of zero disables batching function

Integer

Default: 1024

memBufferSize

Size in bytes of event buffer. Maps to buffer.memory

Long

Default: 33554432

retries

Maps to retries

Integer

Default: 0

properties

Properties map

Serialisation attributes schema

Attribute
Description
Data Type
Required

transform

User provided implementation

Implementation of CustomTransformer

key serializer

Domain class that maps to the partition key type. Property maps to key.serializer property

String

Default: IntegerSerializer

value serializer

Domain class that serializes to Kafka. Property maps to value.serializer property

String

Default: StreamEventJsonSerializer

Serialiser example

This example configures the Kafka Publisher to send events to the customers topic on a local broker (localhost:9092). Events are partitioned by customerId, ensuring ordered processing per customer.

  1. key serializer IntegerSerializer for customerId.

  2. value serializer StreamEventJsonSerializer for JSON-formatted event data.

This setup organises events by "customer ID" and makes them easily readable for downstream processing.

kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer

OOTB serialisers

A flexible event serialisation model allows Joule processes to be chained together for complex use cases and easy downstream integration.

com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer

Custom transformers

By implementing a custom transformer, you can create domain-specific data types to simplify integration with downstream applications.

Example

In this example, the Kafka Publisher sends events to the customers topic on a local Kafka broker (localhost:9092).

Events are partitioned by customerId and serialised using custom serialisers:

  1. key serializer IntegerSerializer for the customerId (integer).

  2. value serializer ObjectSerializer for the event data.

  3. transformer The CustomerTransformer is used to convert event data into a domain-specific Customer object before serialisation.

This setup enables custom data transformations and ensures events are processed and stored efficiently.

kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId
 
  serializer:
    transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer

Java transformer implementation

This class will convert an internal StreamEvent object to a user defined Customer data type.

public class CustomerTransformer implements CustomTransformer<Customer> {

   public CustomerTransformer() {
       // Required
   }

   @Override
   public Collection<Customer> transform(Collection<StreamEvent> events) 
   throws TranslationException {

       Collection<Customer> customers = new ArrayList<>();
       for (StreamEvent event : events) {
           customers.add(transform(event));
       }
       return customers;
   }

   @Override
   public Customer transform(StreamEvent event) 
   throws TranslationException {

       Customer customer = new Customer();
       customer.setCustomerId( (int)event.getValue("customerId"));
       customer.setFirstname( (String) event.getValue("firstname"));
       customer.setSurname( (String) event.getValue("surname"));
       return customer;
   }
}

Default producer properties

acks=0
retries=1
linger.ms=5
buffer.memory=33554432
request.timeout.ms=60000
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
partitioner.class=com.fractalworks.streams.transport.kafka.KeyPartitioner

Additional publisher properties to be applied to the Kafka publisher subsystem. Default properties applied are listed .

Joule includes built-in serialisers, compatible with Apache Kafka, and also supports custom serialisation through the .

org.apache.kafka:kafka-clients:2.7.0
Joule SDK
below