Joule
Search
K
Comment on page

Kafka

Kafka consumer transport ingests data from subscribed cluster topics. Events consumed are deserialised in to StreamEvents ready for pipeline processing either automactically or through a custom transformer implementation.

Driver details

org.apache.kafka:kafka-clients:2.7.0

Configuration Guide

Example configuration

kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
This basic example leverages the default setting resulting in events consumed from the quotes topic as StreamEvent Json object. This assumes the upstream data producer publishes the Joule StreamEvent object as a Json object.

Core Attributes

Available configuration parameters
Attribute
Description
Data Type
Required
name
Name of source stream
String
cluster address
Kafka cluster details. Maps to bootstrap.servers
http://<ip-address>:port
consumerGroupId
Consumer group to ensure an event is read only once. Maps to group.id
String
topics
Message topics to subscribe too
List of strings
consumerThreads
Number of consumer threads to use for event parsing
Integer
Default : 1
pollingTimeout
The maximum time to block
Long
Default: 100ms
deserializer
Deserialization configuration
See Deserialization Attributes section
properties
Additional publisher properties to be applied to the Kafka publisher subsystem
Properties map

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.
Attribute
Description
Data Type
Required
transform
User provided implementation
Implementation of CustomTransformer
key deserializer
Domain class that maps to the partition key type. Property maps to key.serializer property
String
Default: IntegerDeserializer
value deserializer
Domain class that deserializes incoming object to StreamEvent. Property maps to value.serializer property
String
Default: StringDeserializer

Deserializer example

- kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

OOTB deserializers

com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer

Custom Transformers

By implementing a CustomTransformer you can provide domain specific data types to ease integration from upstream applications.

Example

- kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer

Java transformer implementation

This class will convert the passed object to an internal StreamEvent object.
public class QuoteToStreamEventParser implements StreamEventParser {
public QuoteToStreamEventParser() {
// Required
}
@Override
public Collection<StreamEvent> translate(Object o) throws TranslationException {
Collection<StreamEvent> events = null;
if(o instanceof Quote){
Quote quote = (Quote) o;
StreamEvent event = new StreamEvent("quote");
event.setEventTime(quote.time());
event.addValue("symbol", quote.symbol());
event.addValue("bid", quote.bid());
event.addValue("ask", quote.ask());
event.addValue("volatility", quote.volatility());
event.addValue("volume", quote.volume());
event.addValue("date", quote.date());
events = Collections.singletonList( event);
}
return events;
}
}