Kafka consumer transport ingests data from subscribed cluster topics. Events consumed are deserialised in to StreamEvents ready for pipeline processing either automatically or through a custom transformer implementation.
Driver details
Copy org . apache . kafka:kafka - clients: 2.7.0
Configuration Guide
Example configuration
Copy kafkaConsumer :
name : nasdaq_quotes_stream
cluster address : localhost:9092
consumerGroupId : nasdaq
topics :
- quotes
properties :
partition.assignment.strategy : org.apache.kafka.clients.consumer.StickyAssignor
This basic example leverages the default setting resulting in events consumed from the quotes topic as StreamEvent Json object. This assumes the upstream data producer publishes the Joule StreamEvent object as a Json object.
Core Attributes
Available configuration parameters
Deserialization Attributes
This topic provides configuration parameters available object deserialization process.
Deserializer example
Copy - kafkaConsumer :
name : nasdaq_quotes_stream
cluster address : localhost:9092
consumerGroupId : nasdaq
topics :
- quotes
deserializer :
parser : com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer : org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer : com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
properties :
partition.assignment.strategy : org.apache.kafka.clients.consumer.StickyAssignor
OOTB deserializers
Copy com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer
Custom Transformers
By implementing a CustomTransformer you can provide domain specific data types to ease integration from upstream applications.
Example
Copy - kafkaConsumer :
name : nasdaq_quotes_stream
cluster address : localhost:9092
consumerGroupId : nasdaq
topics :
- quotes
deserializer :
parser : com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer : org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer : com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
properties :
partition.assignment.strategy : org.apache.kafka.clients.consumer.StickyAssignor
Java transformer implementation
This class will convert the passed object to an internal StreamEvent object.
Copy public class QuoteToStreamEventParser implements StreamEventParser {
public QuoteToStreamEventParser () {
// Required
}
@ Override
public Collection < StreamEvent > translate ( Object o) throws TranslationException {
Collection < StreamEvent > events = null ;
if (o instanceof Quote){
Quote quote = (Quote) o;
StreamEvent event = new StreamEvent( "quote" ) ;
event . setEventTime ( quote . time ());
event . addValue ( "symbol" , quote . symbol ());
event . addValue ( "bid" , quote . bid ());
event . addValue ( "ask" , quote . ask ());
event . addValue ( "volatility" , quote . volatility ());
event . addValue ( "volume" , quote . volume ());
event . addValue ( "date" , quote . date ());
events = Collections . singletonList ( event);
}
return events;
}
}
Last updated 6 months ago