Joule
Search
K
Comment on page

Kafka

Kafka publisher transport emits processed events onto a defined cluster topic. Events are serialised using either the default Json StreamEvents serializer or a custom transformer implementation.

Driver details

org.apache.kafka:kafka-clients:2.7.0

Example configuration

kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
This simple example leverages the default setting resulting in events published as StreamEvent Json object on to the customers topic using the customer Id as the partitioning key.
An example use case for these settings is when there is a need to chain multiple Joule processors to form a complex, scalable and resilient use case.

Core Attributes

Configuration parameters available for the InfluxDB publisher transport. The parameters are organized by order of importance, ranked from high to low.
Attribute
Description
Data Type
Required
cluster address
InfluxDB server address
http://<ip-address>:port
topic
InfluxDB UI / CLI Authentication access token
String
partitionKeys
Authentication access details for v1.x InfluxDB.
Note: Only required if authToken has not been provided.
String
serializer
InfluxDB UI / CLI organisation token
String
batchSize
Number of events to batch send, maps to batch.size. Batch size of zero disables batching function
Integer
Default: 1024
memBufferSize
Size in bytes of event buffer. Maps to buffer.memory
Long
Default: 33554432
retries
Maps to retries
Integer
Default: 0
properties
Additional publisher properties to be applied to the Kafka publisher subsystem
Properties map

Serialization Attributes

This topic provides configuration parameters available for the formatter attribute.
Attribute
Description
Data Type
Required
transform
User provided implementation
Implementation of CustomTransformer
key serializer
Domain class that maps to the partition key type. Property maps to key.serializer property
String
Default: IntegerSerializer
value serializer
Domain class that serializes to Kafka. Property maps to value.serializer property
String
Default: StreamEventJsonSerializer

Serializer example

kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
serializer:
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer

OOTB serializers

A flexible event serialization model is provided to enable Joule processes to be chained for complex use cases and provide down stream integrations. Out of the box Joule serializers are provided, along with Apache Kafka implementation, while custom serialization is achieved through the Joule SDK.
com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer

Custom Transformers

By implementing a CustomTransformer you can provide domain specific data types to ease integration to downstream applications.

Example

kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
serializer:
transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer

Java transformer implementation

This class will convert an internal StreamEvent object to a user defined Customer data type
public class CustomerTransformer implements CustomTransformer<Customer> {
public CustomerTransformer() {
// Required
}
@Override
public Collection<Customer> transform(Collection<StreamEvent> events)
throws TranslationException {
Collection<Customer> customers = new ArrayList<>();
for (StreamEvent event : events) {
customers.add(transform(event));
}
return customers;
}
@Override
public Customer transform(StreamEvent event)
throws TranslationException {
Customer customer = new Customer();
customer.setCustomerId( (int)event.getValue("customerId"));
customer.setFirstname( (String) event.getValue("firstname"));
customer.setSurname( (String) event.getValue("surname"));
return customer;
}
}