Kafka

Overview

Kafka publisher transport emits processed events onto a defined cluster topic. Events are serialised using either the default Json StreamEvents serializer or a custom transformer implementation.

Add to the /etc/hosts file the address of the Kafka host

i.e. 127.0.0.1 KAFKA_BROKER

Example

kafkaPublisher:
 cluster address: KAFKA_BROKER:9092
 topic: customers
 partitionKeys:
    - customerId

This simple example leverages the default setting resulting in events published as StreamEvent Json object on to the customers topic using the customer Id as the partitioning key.

An example use case for these settings is when there is a need to chain multiple Joule processors to form a complex, scalable and resilient use case.

Attributes schema

Configuration parameters available for the InfluxDB publisher transport. The parameters are organized by order of importance, ranked from high to low.

AttributeDescriptionData TypeRequired

cluster address

InfluxDB server address

http://<ip-address>:port

topic

InfluxDB UI / CLI Authentication access token

String

partitionKeys

Authentication access details for v1.x InfluxDB.

Note: Only required if authToken has not been provided.

String

serializer

InfluxDB UI / CLI organisation token

String

batchSize

Number of events to batch send, maps to batch.size. Batch size of zero disables batching function

Integer

Default: 1024

memBufferSize

Size in bytes of event buffer. Maps to buffer.memory

Long

Default: 33554432

retries

Maps to retries

Integer

Default: 0

properties

Additional publisher properties to be applied to the Kafka publisher subsystem

Properties map

Serialization Attributes

This topic provides configuration parameters available for the formatter attribute.

AttributeDescriptionData TypeRequired

transform

User provided implementation

Implementation of CustomTransformer

key serializer

Domain class that maps to the partition key type. Property maps to key.serializer property

String

Default: IntegerSerializer

value serializer

Domain class that serializes to Kafka. Property maps to value.serializer property

String

Default: StreamEventJsonSerializer

Serializer example

kafkaPublisher:
 cluster address: localhost:9092
 topic: customers
 partitionKeys:
   - customerId

 serializer:
   key serializer: org.apache.kafka.common.serialization.IntegerSerializer
   value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer

OOTB serializers

A flexible event serialization model is provided to enable Joule processes to be chained for complex use cases and provide down stream integrations. Out of the box Joule serializers are provided, along with Apache Kafka implementation, while custom serialization is achieved through the Joule SDK.

com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer

Custom Transformers

By implementing a CustomTransformer you can provide domain specific data types to ease integration to downstream applications.

Example

kafkaPublisher:
 cluster address: localhost:9092
 topic: customers
 partitionKeys:
   - customerId

 serializer:
   transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
   key serializer: org.apache.kafka.common.serialization.IntegerSerializer
   value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer

Java transformer implementation

This class will convert an internal StreamEvent object to a user defined Customer data type

public class CustomerTransformer implements CustomTransformer<Customer> {

   public CustomerTransformer() {
       // Required
   }

   @Override
   public Collection<Customer> transform(Collection<StreamEvent> events) 
   throws TranslationException {

       Collection<Customer> customers = new ArrayList<>();
       for (StreamEvent event : events) {
           customers.add(transform(event));
       }
       return customers;
   }

   @Override
   public Customer transform(StreamEvent event) 
   throws TranslationException {

       Customer customer = new Customer();
       customer.setCustomerId( (int)event.getValue("customerId"));
       customer.setFirstname( (String) event.getValue("firstname"));
       customer.setSurname( (String) event.getValue("surname"));
       return customer;
   }
}

Client library

org.apache.kafka:kafka-clients:2.7.0

Last updated