Kafka


Kafka consumer transport ingests data from subscribed cluster topics. Events consumed are deserialised in to StreamEvents ready for pipeline processing either automatically or through a custom transformer implementation.

Driver details

org.apache.kafka:kafka-clients:2.7.0

Configuration Guide

Example configuration

kafkaConsumer:
  name: nasdaq_quotes_stream
  cluster address: localhost:9092
  consumerGroupId: nasdaq
  topics:
    - quotes
  properties:
    partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor

This basic example leverages the default setting resulting in events consumed from the quotes topic as StreamEvent Json object. This assumes the upstream data producer publishes the Joule StreamEvent object as a Json object.

Core Attributes

Available configuration parameters

AttributeDescriptionData TypeRequired

name

Name of source stream

String

cluster address

Kafka cluster details. Maps to bootstrap.servers

http://<ip-address>:port

consumerGroupId

Consumer group to ensure an event is read only once. Maps to group.id

String

topics

Message topics to subscribe too

List of strings

consumerThreads

Number of consumer threads to use for event parsing

Integer

Default : 1

pollingTimeout

The maximum time to block

Long

Default: 100ms

deserializer

Deserialization configuration

See Deserialization Attributes section

properties

Additional publisher properties to be applied to the Kafka publisher subsystem

Properties map

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.

AttributeDescriptionData TypeRequired

transform

User provided implementation

Implementation of CustomTransformer

key deserializer

Domain class that maps to the partition key type. Property maps to key.serializer property

String

Default: IntegerDeserializer

value deserializer

Domain class that deserializes incoming object to StreamEvent. Property maps to value.serializer property

String

Default: StringDeserializer

Deserializer example

- kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: localhost:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
    
    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor

OOTB deserializers

com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonDeserializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventDeserializer

Custom Transformers

By implementing a CustomTransformer you can provide domain specific data types to ease integration from upstream applications.

Example

- kafkaConsumer:
    name: nasdaq_quotes_stream
    cluster address: localhost:9092
    consumerGroupId: nasdaq
    topics:
      - quotes

    deserializer:
      parser: com.fractalworks.examples.banking.data.QuoteToStreamEventParser
      key deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value deserializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectDeserializer
      
    properties:
      partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor  

Java transformer implementation

This class will convert the passed object to an internal StreamEvent object.

public class QuoteToStreamEventParser implements StreamEventParser {

    public QuoteToStreamEventParser() {
        // Required
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Quote){
            Quote quote = (Quote) o;
            StreamEvent event = new StreamEvent("quote");
            event.setEventTime(quote.time());
            event.addValue("symbol", quote.symbol());
            event.addValue("bid", quote.bid());
            event.addValue("ask", quote.ask());
            event.addValue("volatility", quote.volatility());
            event.addValue("volume", quote.volume());
            event.addValue("date", quote.date());

            events = Collections.singletonList( event);
        }
        return events;
    }
}

Last updated