# MQTT

## Overview

This page provides an overview of how **Joule** utilises the **MQTT** protocol to subscribe to events, specifically in IoT use cases.

MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for **machine-to-machine communication** in IoT.

Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.

{% hint style="info" %}
**Client library:** [org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5](https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client/1.2.5)
{% endhint %}

## Examples & DSL attributes

This example illustrates how to configure an MQTT consumer to subscribe to events from the `customers` topic.

The consumer connects to the MQTT broker at `tcp://127.0.0.1:1883` and consumes events with a **Quality of Service (QoS)** level of 1.

1. <mark style="color:green;">**Client settings**</mark>\
   The consumer is identified by the `clientId` (`jouleCustomerConsumer`), with authentication using the username `joule` and the tenant `uk`.
2. <mark style="color:green;">**Event deserialisation**</mark>\
   The events are consumed as `customer objects` and then transformed into `Joule StreamEvent` objects using the `CustomerToStreamEventTranslator` class.
3. <mark style="color:green;">**Additional settings**</mark>\
   The configuration also includes settings for `compressed` (set to `false`) and `batch` (set to `f`
4. `alse`), which affect how events are processed and transformed.

{% tabs %}
{% tab title="DSL" %}

```yaml
mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: customers

  clientId: jouleCustomerConsumer
  username: joule
  password: joule
  tenant: uk
  qos: 1

  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false
```

{% endtab %}

{% tab title="Parser" %}
\
Use the Joule SDK to build a custom parser and deploy the resulting jar file to the `userlibs` directory. See [Joule SDK](/joule/developer-guides/builder-sdk.md) for further information.&#x20;

```java
import com.fasterxml.jackson.annotation.JsonRootName;
import com.fractalworks.streams.core.data.streams.StreamEvent;
import com.fractalworks.streams.core.exceptions.TranslationException;
import com.fractalworks.streams.sdk.codec.StreamEventParser;
import com.fractalworks.streams.transport.Customer;

import java.util.Collection;
import java.util.Collections;

@JsonRootName(value = "customer parser")
public class CustomerToStreamEventTranslator implements StreamEventParser {

    public CustomerToStreamEventTranslator() {
        // REQUIRED
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Customer customer){
            StreamEvent event = new StreamEvent("customer");
            event.addValue("id", customer.getId());
            event.addValue("firstname", customer.getFirstname());
            event.addValue("surname", customer.getSurname());
            event.addValue("age", customer.getAge());

            events = Collections.singletonList( event);
        }
        return events;
    }
}
```

{% endtab %}

{% tab title="MQTT Configuration" %}
Create a local configuration as `mosquitto.conf` within the `/etc/mosquitto/conf.d/` directory.

```bash
per_listener_settings true
pid_file /run/mosquitto/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log

listener 1883
allow_anonymous true
password_file /etc/mosquitto/passwd

include_dir /etc/mosquitto/conf.d
```

{% endtab %}

{% tab title="Password setting" %}
You create the password file using the command. This will create the user `joule` using the `password/tenant` pair within the `passwd` file.

```bash
mosquitto_passwd -b passwd joule joule/uk
```

{% endtab %}
{% endtabs %}

### Attributes schema

<table><thead><tr><th width="193">Attribute</th><th width="284">Description</th><th width="165">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>name</td><td>Name of source stream </td><td>String</td><td>true</td></tr><tr><td>broker </td><td>Broker server address</td><td>http://&#x3C;ip-address>:port</td><td>true</td></tr><tr><td>topic</td><td>Message topic to subscribe too</td><td>Strings</td><td>true</td></tr><tr><td>clientId</td><td>A unique client identifier on the server being connected too</td><td>String</td><td>true</td></tr><tr><td>username</td><td>Username</td><td>String</td><td>false</td></tr><tr><td>password</td><td>password</td><td>String</td><td>false</td></tr><tr><td>tenant</td><td>Namespace for created topics</td><td>String</td><td>false</td></tr><tr><td>qos</td><td>Quality of service</td><td><p>Integer</p><p>Default: 0</p></td><td>false</td></tr><tr><td>auto reconnect</td><td>Automatically reconnect to broker on disconnection</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>clean restart</td><td>This means that if a client disconnects and reconnects within 5 minutes with clean <code>restart=false,qos > 1</code> then session state data ( i.e. subscribed topics, queued messages) are retained</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>sessionExpiry interval</td><td>Maximum time that the broker will maintain the session for once the client disconnects</td><td><p>Long</p><p>Default: 300 (seconds)</p><p>5 minutes</p></td><td>false</td></tr><tr><td>registration message</td><td>Message to send to broker when a Joule process registers</td><td>String</td><td>false</td></tr><tr><td>user properties</td><td>Sets the user properties</td><td>Map&#x3C;String, String></td><td>false</td></tr><tr><td>connection timeout</td><td>This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established</td><td><p>Integer</p><p>Default: 30 (Seconds)</p></td><td>false</td></tr><tr><td>keepalive interval</td><td>This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout</td><td><p>Integer</p><p>Default: 30 (Seconds)</p></td><td>false</td></tr><tr><td>lwt handler</td><td>Last Will and Testament message handler. </td><td>See <a href="/pages/OTWEzloJ7UU4e9WXSRWb">Last Will and Testament</a> page </td><td>false</td></tr><tr><td>deserializer</td><td>Deserialisation configuration</td><td>See <a href="#deserialisation-attributes-schema">Deserialisation attributes</a> section</td><td>false</td></tr><tr><td>security</td><td>Security configuration</td><td>See Security documentation</td><td>false</td></tr></tbody></table>

### **Deserialisation** attributes schema

This topic provides configuration parameters available object deserialisation process.&#x20;

<table><thead><tr><th width="200">Attribute</th><th width="220">Description</th><th width="222">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>parser</td><td>User provided implementation</td><td>Implementation of StreamEventParser</td><td>true</td></tr><tr><td>compressed</td><td>Uncompress payload using Snappy</td><td><p>Boolean </p><p>Default: false</p></td><td>false</td></tr><tr><td>batch</td><td>Flag to inform process multiple messages have been sent in payload</td><td><p>Boolean</p><p>Default: false</p></td><td>false</td></tr></tbody></table>

### **Deserialiser example**

In the given MQTT consumer configuration, the `deserializer` section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.

1. <mark style="color:green;">**parser**</mark>\
   The `parser` attribute specifies the class (`com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator`) responsible for converting the incoming message into a `Joule StreamEvent` object.\
   This transformation is necessary to **make the data compatible** with the platform’s processing system.
2. <mark style="color:green;">**compressed**</mark>\
   The `compressed: false` setting indicates that the incoming data is **not compressed**. If the data were compressed, this setting would need to be set to `true` and the system would decompress the data before processing.
3. <mark style="color:green;">**batch**</mark>\
   The `batch: false` setting means that the events will not be grouped into batches for processing.\
   Each event is processed individually as it is received. If set to `true`, it would allow the consumer to **handle multiple events in a single batch**, which can improve efficiency in some scenarios.

{% tabs %}
{% tab title="DSL" %}

```yaml
mqttConsumer:
  ...
  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false
```

{% endtab %}

{% tab title="Parser" %}

```java
import com.fasterxml.jackson.annotation.JsonRootName;
import com.fractalworks.streams.core.data.streams.StreamEvent;
import com.fractalworks.streams.core.exceptions.TranslationException;
import com.fractalworks.streams.sdk.codec.StreamEventParser;
import com.fractalworks.streams.transport.Customer;

import java.util.Collection;
import java.util.Collections;

@JsonRootName(value = "customer parser")
public class CustomerToStreamEventTranslator implements StreamEventParser {

    public CustomerToStreamEventTranslator() {
        // REQUIRED
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Customer customer){
            StreamEvent event = new StreamEvent("customer");
            event.addValue("id", customer.getId());
            event.addValue("firstname", customer.getFirstname());
            event.addValue("surname", customer.getSurname());
            event.addValue("age", customer.getAge());

            events = Collections.singletonList( event);
        }
        return events;
    }
}
```

{% endtab %}
{% endtabs %}

## Additional resources

* Official [Mosquitto documentation](https://mosquitto.org/documentation/)
* Good user [documentation](http://www.steves-internet-guide.com)
* MQTT X [UI for testing](https://mqttx.app/docs/downloading-and-installation)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/connectors/sources/mqtt.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
