Comment on page
Kafka
Kafka consumer transport ingests data from subscribed cluster topics. Events consumed are deserialised in to StreamEvents ready for pipeline processing either automactically or through a custom transformer implementation.
org.apache.kafka:kafka-clients:2.7.0
kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
This basic example leverages the default setting resulting in events consumed from the quotes topic as StreamEvent Json object. This assumes the upstream data producer publishes the Joule StreamEvent object as a Json object.
Available configuration parameters
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
cluster address | Kafka cluster details. Maps to bootstrap.servers | http://<ip-address>:port | |
consumerGroupId | Consumer group to ensure an event is read only once. Maps to group.id | String | |
topics | Message topics to subscribe too | List of strings | |
consumerThreads | Number of consumer threads to use for event parsing | Integer Default : 1 | |
pollingTimeout | The maximum time to block | Long Default: 100ms | |
deserializer | Deserialization configuration | See Deserialization Attributes section | |
properties | Additional publisher properties to be applied to the Kafka publisher subsystem | Properties map |
This topic provides configuration parameters available object deserialization process.
Attribute | Description | Data Type | Required |
---|---|---|---|
transform | User provided implementation | Implementation of CustomTransformer | |
key deserializer | Domain class that maps to the partition key type. Property maps to key.serializer property | String Default: IntegerDeserializer | |
value deserializer | Domain class that deserializes incoming object to StreamEvent. Property maps to value.serializer property | String Default: StringDeserializer |
- kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer
By implementing a CustomTransformer you can provide domain specific data types to ease integration from upstream applications.
- kafkaConsumer:
name: nasdaq_quotes_stream
cluster address: localhost:9092
consumerGroupId: nasdaq
topics:
- quotes
deserializer:
parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
This class will convert the passed object to an internal StreamEvent object.
public class QuoteToStreamEventParser implements StreamEventParser {
public QuoteToStreamEventParser() {
// Required
}
@Override
public Collection<StreamEvent> translate(Object o) throws TranslationException {
Collection<StreamEvent> events = null;
if(o instanceof Quote){
Quote quote = (Quote) o;
StreamEvent event = new StreamEvent("quote");
event.setEventTime(quote.time());
event.addValue("symbol", quote.symbol());
event.addValue("bid", quote.bid());
event.addValue("ask", quote.ask());
event.addValue("volatility", quote.volatility());
event.addValue("volume", quote.volume());
event.addValue("date", quote.date());
events = Collections.singletonList( event);
}
return events;
}
}
Last modified 1mo ago