# Kafka

## Overview

The Kafka Publisher Transport enables publishing of processed events to a specified Kafka topic on a defined Kafka cluster. **Events are serialised** using either a default JSON serialiser (`StreamEventJsonSerializer`) or a custom-defined transformer.

Configuration includes setting the Kafka cluster address in the `/etc/hosts` file and defining topics and partition keys for message distribution. Key configuration attributes are **prioritised by importance and cover serialisation options**, allowing flexible data handling.

Out-of-the-box serialisers support common formats (JSON, Avro), while **custom serialisers and transformers facilitate integration** with downstream applications by converting events into domain-specific data types.

The Kafka Publisher setup supports scalable, resilient event processing workflows, suitable for **chaining multiple data-processing stages**.

{% hint style="info" %}
The Kafka consumers relies on the client library [org.apache.kafka:kafka-clients:2.7.0](https://kafka.apache.org/documentation/)
{% endhint %}

## Examples & DSL attributes

In this example, the Kafka Publisher sends events to the `customers` topic on the Kafka broker (`KAFKA_BROKER:9092`).

Events are partitioned by `customerId`, ensuring that all events for the same customer go to the same partition for ordered processing.

Using default settings, events are serialized as JSON `StreamEvent` objects, making them easily readable for downstream processes. This setup enables chaining multiple processing stages, useful for building **scalable and resilient data workflows**.

```yaml
kafkaPublisher:
  cluster address: KAFKA_BROKER:9092
  topic: customers
  partitionKeys:
    - customerId
```

{% hint style="danger" %}
Add to the `/etc/hosts` file the address of the Kafka host i.e. 127.0.0.1 KAFKA\_BROKER
{% endhint %}

### Attributes schema

Configuration parameters available for the InfluxDB publisher transport.

The parameters are organised by order of importance, ranked from high to low.

<table><thead><tr><th width="166">Attribute</th><th width="217">Description</th><th width="246">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>cluster address </td><td>InfluxDB server address</td><td>http://&#x3C;ip-address>:port</td><td>true</td></tr><tr><td>topic</td><td>InfluxDB UI / CLI Authentication access token</td><td>String</td><td>true</td></tr><tr><td>partitionKeys</td><td><p>Authentication access details for v1.x InfluxDB. </p><p>Note: Only required if authToken has not been provided</p></td><td>String</td><td>true</td></tr><tr><td>serializer</td><td>InfluxDB UI / CLI organisation token </td><td>String</td><td>true</td></tr><tr><td>batchSize</td><td>Number of events to batch send, maps to batch.size. Batch sise of zero disables batching function</td><td><p>Integer</p><p>Default: 1024</p></td><td>false</td></tr><tr><td>memBufferSize</td><td>Size in bytes of event buffer. Maps to <code>buffer.memory</code></td><td><p>Long </p><p>Default: 33554432</p></td><td>false</td></tr><tr><td>retries</td><td>Maps to retries</td><td><p>Integer </p><p>Default: 0</p></td><td>false</td></tr><tr><td>properties</td><td>Additional publisher properties to be applied to the Kafka publisher subsystem. Default properties applied are listed <a href="#default-producer-properties">below</a>.</td><td>Properties map</td><td>false</td></tr></tbody></table>

### **Serialisation** attributes schema

<table><thead><tr><th width="161">Attribute</th><th width="220">Description</th><th width="259">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>transform</td><td>User provided implementation</td><td>Implementation of CustomTransformer</td><td>false</td></tr><tr><td>key serializer</td><td>Domain class that maps to the partition key type. Property maps to <code>key.serializer</code> property</td><td><p>String </p><p>Default: IntegerSerializer</p></td><td>false</td></tr><tr><td>value serializer</td><td>Domain class that serializes to Kafka. Property maps to <code>value.serializer</code> property</td><td><p>String</p><p>Default: StreamEventJsonSerializer</p></td><td>false</td></tr></tbody></table>

### **Serialiser example**

This example configures the Kafka Publisher to send events to the `customers` topic on a local broker (`localhost:9092`). Events are partitioned by `customerId`, ensuring ordered processing per customer.

1. <mark style="color:green;">**key serializer**</mark>\
   `IntegerSerializer` for `customerId`.
2. <mark style="color:green;">**value serializer**</mark>\
   `StreamEventJsonSerializer` for JSON-formatted event data.

This setup organises events by "customer ID" and makes them easily readable for downstream processing.

```yaml
kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
```

### **OOTB serialisers**

A flexible event serialisation model allows Joule processes to be chained together for complex use cases and easy downstream integration.

Joule includes built-in serialisers, compatible with Apache Kafka, and also supports custom serialisation through the [Joule SDK](https://docs.fractalworks.io/joule/developer-guides/builder-sdk/connector-api).

```yaml
com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer
```

## Custom transformers

By implementing a custom transformer, you can create domain-specific data types to simplify integration with downstream applications.

### **Example**

In this example, the Kafka Publisher sends events to the `customers` topic on a local Kafka broker (`localhost:9092`).

Events are partitioned by `customerId` and serialised using custom serialisers:

1. <mark style="color:green;">**key serializer**</mark>\
   `IntegerSerializer` for the `customerId` (integer).
2. <mark style="color:green;">**value serializer**</mark>\
   `ObjectSerializer` for the event data.
3. <mark style="color:green;">**transformer**</mark>\
   The `CustomerTransformer` is used to convert event data into a domain-specific `Customer` object before serialisation.

This setup enables custom data transformations and ensures events are processed and stored efficiently.

```yaml
kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId
 
  serializer:
    transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
```

### Java transformer implementation&#x20;

This class will convert an internal `StreamEvent` object to a user defined `Customer` data type.

```java
public class CustomerTransformer implements CustomTransformer<Customer> {

   public CustomerTransformer() {
       // Required
   }

   @Override
   public Collection<Customer> transform(Collection<StreamEvent> events) 
   throws TranslationException {

       Collection<Customer> customers = new ArrayList<>();
       for (StreamEvent event : events) {
           customers.add(transform(event));
       }
       return customers;
   }

   @Override
   public Customer transform(StreamEvent event) 
   throws TranslationException {

       Customer customer = new Customer();
       customer.setCustomerId( (int)event.getValue("customerId"));
       customer.setFirstname( (String) event.getValue("firstname"));
       customer.setSurname( (String) event.getValue("surname"));
       return customer;
   }
}
```

### Default producer properties

```properties
acks=0
retries=1
linger.ms=5
buffer.memory=33554432
request.timeout.ms=60000
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
partitioner.class=com.fractalworks.streams.transport.kafka.KeyPartitioner
```
