# Kafka

## Overview

The Kafka Publisher Transport enables publishing of processed events to a specified Kafka topic on a defined Kafka cluster. **Events are serialised** using either a default JSON serialiser (`StreamEventJsonSerializer`) or a custom-defined transformer.

Configuration includes setting the Kafka cluster address in the `/etc/hosts` file and defining topics and partition keys for message distribution. Key configuration attributes are **prioritised by importance and cover serialisation options**, allowing flexible data handling.

Out-of-the-box serialisers support common formats (JSON, Avro), while **custom serialisers and transformers facilitate integration** with downstream applications by converting events into domain-specific data types.

The Kafka Publisher setup supports scalable, resilient event processing workflows, suitable for **chaining multiple data-processing stages**.

{% hint style="info" %}
The Kafka consumers relies on the client library [org.apache.kafka:kafka-clients:2.7.0](https://kafka.apache.org/documentation/)
{% endhint %}

## Examples & DSL attributes

In this example, the Kafka Publisher sends events to the `customers` topic on the Kafka broker (`KAFKA_BROKER:9092`).

Events are partitioned by `customerId`, ensuring that all events for the same customer go to the same partition for ordered processing.

Using default settings, events are serialized as JSON `StreamEvent` objects, making them easily readable for downstream processes. This setup enables chaining multiple processing stages, useful for building **scalable and resilient data workflows**.

```yaml
kafkaPublisher:
  cluster address: KAFKA_BROKER:9092
  topic: customers
  partitionKeys:
    - customerId
```

{% hint style="danger" %}
Add to the `/etc/hosts` file the address of the Kafka host i.e. 127.0.0.1 KAFKA\_BROKER
{% endhint %}

### Attributes schema

Configuration parameters available for the InfluxDB publisher transport.

The parameters are organised by order of importance, ranked from high to low.

<table><thead><tr><th width="166">Attribute</th><th width="217">Description</th><th width="246">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>cluster address </td><td>InfluxDB server address</td><td>http://&#x3C;ip-address>:port</td><td>true</td></tr><tr><td>topic</td><td>InfluxDB UI / CLI Authentication access token</td><td>String</td><td>true</td></tr><tr><td>partitionKeys</td><td><p>Authentication access details for v1.x InfluxDB. </p><p>Note: Only required if authToken has not been provided</p></td><td>String</td><td>true</td></tr><tr><td>serializer</td><td>InfluxDB UI / CLI organisation token </td><td>String</td><td>true</td></tr><tr><td>batchSize</td><td>Number of events to batch send, maps to batch.size. Batch sise of zero disables batching function</td><td><p>Integer</p><p>Default: 1024</p></td><td>false</td></tr><tr><td>memBufferSize</td><td>Size in bytes of event buffer. Maps to <code>buffer.memory</code></td><td><p>Long </p><p>Default: 33554432</p></td><td>false</td></tr><tr><td>retries</td><td>Maps to retries</td><td><p>Integer </p><p>Default: 0</p></td><td>false</td></tr><tr><td>properties</td><td>Additional publisher properties to be applied to the Kafka publisher subsystem. Default properties applied are listed <a href="#default-producer-properties">below</a>.</td><td>Properties map</td><td>false</td></tr></tbody></table>

### **Serialisation** attributes schema

<table><thead><tr><th width="161">Attribute</th><th width="220">Description</th><th width="259">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>transform</td><td>User provided implementation</td><td>Implementation of CustomTransformer</td><td>false</td></tr><tr><td>key serializer</td><td>Domain class that maps to the partition key type. Property maps to <code>key.serializer</code> property</td><td><p>String </p><p>Default: IntegerSerializer</p></td><td>false</td></tr><tr><td>value serializer</td><td>Domain class that serializes to Kafka. Property maps to <code>value.serializer</code> property</td><td><p>String</p><p>Default: StreamEventJsonSerializer</p></td><td>false</td></tr></tbody></table>

### **Serialiser example**

This example configures the Kafka Publisher to send events to the `customers` topic on a local broker (`localhost:9092`). Events are partitioned by `customerId`, ensuring ordered processing per customer.

1. <mark style="color:green;">**key serializer**</mark>\
   `IntegerSerializer` for `customerId`.
2. <mark style="color:green;">**value serializer**</mark>\
   `StreamEventJsonSerializer` for JSON-formatted event data.

This setup organises events by "customer ID" and makes them easily readable for downstream processing.

```yaml
kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId

  serializer:
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
```

### **OOTB serialisers**

A flexible event serialisation model allows Joule processes to be chained together for complex use cases and easy downstream integration.

Joule includes built-in serialisers, compatible with Apache Kafka, and also supports custom serialisation through the [Joule SDK](/joule/developer-guides/builder-sdk/connector-api.md).

```yaml
com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
com.fractalworks.streams.transport.kafka.serializers.json.StreamEventJsonSerializer
com.fractalworks.streams.transport.kafka.serializers.avro.AvroStreamEventSerializer
```

## Custom transformers

By implementing a custom transformer, you can create domain-specific data types to simplify integration with downstream applications.

### **Example**

In this example, the Kafka Publisher sends events to the `customers` topic on a local Kafka broker (`localhost:9092`).

Events are partitioned by `customerId` and serialised using custom serialisers:

1. <mark style="color:green;">**key serializer**</mark>\
   `IntegerSerializer` for the `customerId` (integer).
2. <mark style="color:green;">**value serializer**</mark>\
   `ObjectSerializer` for the event data.
3. <mark style="color:green;">**transformer**</mark>\
   The `CustomerTransformer` is used to convert event data into a domain-specific `Customer` object before serialisation.

This setup enables custom data transformations and ensures events are processed and stored efficiently.

```yaml
kafkaPublisher:
  cluster address: localhost:9092
  topic: customers
  partitionKeys:
    - customerId
 
  serializer:
    transform: com.fractalworks.streams.transport.kafka.CustomerTransformer
    key serializer: org.apache.kafka.common.serialization.IntegerSerializer
    value serializer: com.fractalworks.streams.transport.kafka.serializers.object.ObjectSerializer
```

### Java transformer implementation&#x20;

This class will convert an internal `StreamEvent` object to a user defined `Customer` data type.

```java
public class CustomerTransformer implements CustomTransformer<Customer> {

   public CustomerTransformer() {
       // Required
   }

   @Override
   public Collection<Customer> transform(Collection<StreamEvent> events) 
   throws TranslationException {

       Collection<Customer> customers = new ArrayList<>();
       for (StreamEvent event : events) {
           customers.add(transform(event));
       }
       return customers;
   }

   @Override
   public Customer transform(StreamEvent event) 
   throws TranslationException {

       Customer customer = new Customer();
       customer.setCustomerId( (int)event.getValue("customerId"));
       customer.setFirstname( (String) event.getValue("firstname"));
       customer.setSurname( (String) event.getValue("surname"));
       return customer;
   }
}
```

### Default producer properties

```properties
acks=0
retries=1
linger.ms=5
buffer.memory=33554432
request.timeout.ms=60000
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
partitioner.class=com.fractalworks.streams.transport.kafka.KeyPartitioner
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/connectors/sinks/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
