Comment on page
Kafka
Kafka publisher transport emits processed events onto a defined cluster topic. Events are serialised using either the default Json StreamEvents serializer or a custom transformer implementation.
org.apache.kafka:kafka-clients:2.7.0
kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
This simple example leverages the default setting resulting in events published as StreamEvent Json object on to the customers topic using the customer Id as the partitioning key.
An example use case for these settings is when there is a need to chain multiple Joule processors to form a complex, scalable and resilient use case.
Configuration parameters available for the InfluxDB publisher transport. The parameters are organized by order of importance, ranked from high to low.
Attribute | Description | Data Type | Required |
---|---|---|---|
cluster address | InfluxDB server address | http://<ip-address>:port | |
topic | InfluxDB UI / CLI Authentication access token | String | |
partitionKeys | Authentication access details for v1.x InfluxDB. Note: Only required if authToken has not been provided. | String | |
serializer | InfluxDB UI / CLI organisation token | String | |
batchSize | Number of events to batch send, maps to batch.size. Batch size of zero disables batching function | Integer Default: 1024 | |
memBufferSize | Size in bytes of event buffer. Maps to buffer.memory | Long Default: 33554432 | |
retries | Maps to retries | Integer Default: 0 | |
properties | Additional publisher properties to be applied to the Kafka publisher subsystem | Properties map |
This topic provides configuration parameters available for the formatter attribute.
Attribute | Description | Data Type | Required |
---|---|---|---|
transform | User provided implementation | Implementation of CustomTransformer | |
key serializer | Domain class that maps to the partition key type. Property maps to key.serializer property | String Default: IntegerSerializer | |
value serializer | Domain class that serializes to Kafka. Property maps to value.serializer property | String Default: StreamEventJsonSerializer |
kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
serializer:
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
A flexible event serialization model is provided to enable Joule processes to be chained for complex use cases and provide down stream integrations. Out of the box Joule serializers are provided, along with Apache Kafka implementation, while custom serialization is achieved through the Joule SDK.
com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer
By implementing a CustomTransformer you can provide domain specific data types to ease integration to downstream applications.
kafkaPublisher:
cluster address: localhost:9092
topic: customers
partitionKeys:
- customerId
serializer:
transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
key serializer: org.apache.kafka.common.serialization.IntegerSerializer
value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
This class will convert an internal StreamEvent object to a user defined Customer data type
public class CustomerTransformer implements CustomTransformer<Customer> {
public CustomerTransformer() {
// Required
}
@Override
public Collection<Customer> transform(Collection<StreamEvent> events)
throws TranslationException {
Collection<Customer> customers = new ArrayList<>();
for (StreamEvent event : events) {
customers.add(transform(event));
}
return customers;
}
@Override
public Customer transform(StreamEvent event)
throws TranslationException {
Customer customer = new Customer();
customer.setCustomerId( (int)event.getValue("customerId"));
customer.setFirstname( (String) event.getValue("firstname"));
customer.setSurname( (String) event.getValue("surname"));
return customer;
}
}
Last modified 1mo ago