Kafka


Kafka consumer transport ingests data from subscribed cluster topics. Events consumed are deserialised in to StreamEvents ready for pipeline processing either automatically or through a custom transformer implementation.

Driver details

org.apache.kafka:kafka-clients:2.7.0

Configuration Guide

Example configuration

kafkaConsumer:
  name: nasdaq_quotes_stream
  cluster address: localhost:9092
  consumerGroupId: nasdaq
  topics:
    - quotes
  properties:
    partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor

This basic example leverages the default setting resulting in events consumed from the quotes topic as StreamEvent Json object. This assumes the upstream data producer publishes the Joule StreamEvent object as a Json object.

Core Attributes

Available configuration parameters

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.

Deserializer example

- kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: localhost:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
    
    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor

OOTB deserializers

com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer

Custom Transformers

By implementing a CustomTransformer you can provide domain specific data types to ease integration from upstream applications.

Example

- kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: localhost:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
      
    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor  

Java transformer implementation

This class will convert the passed object to an internal StreamEvent object.

public class QuoteToStreamEventParser implements StreamEventParser {

    public QuoteToStreamEventParser() {
        // Required
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Quote){
            Quote quote = (Quote) o;
            StreamEvent event = new StreamEvent("quote");
            event.setEventTime(quote.time());
            event.addValue("symbol", quote.symbol());
            event.addValue("bid", quote.bid());
            event.addValue("ask", quote.ask());
            event.addValue("volatility", quote.volatility());
            event.addValue("volume", quote.volume());
            event.addValue("date", quote.date());

            events = Collections.singletonList( event);
        }
        return events;
    }
}

Last updated