# MQTT

## Overview

MQTT is a lightweight, publish-subscribe protocol ideal for machine-to-machine communication in IoT environments.

**Joule leverages MQTT** to publish events through an MQTT publisher, allowing communication between devices and services over specified topics.

{% hint style="info" %}
**Client library:** [org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5](https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client/1.2.5)
{% endhint %}

## Examples & DSL attributes

This example configures an MQTT publisher to send events to the `lab/humidity_valve` topic.

The events are published as compressed JSON, using QoS level 1 for reliable delivery.

The publisher connects with the client ID `humidityControllerPublisher`, username `joule` and tenant `uk` to the broker at `tcp://127.0.0.1:1883`. Security is handled with a JWT claim using RSA encryption, with a private key stored in `humidityControllerKeyFile`.

```yaml
mqttPublisher:
  clientId: humidityControllerPublisher
  username: joule
  tenant: uk
  topic: lab/humidity_valve
  qos: 1
  broker: tcp://127.0.0.1:1883

  serializer:
    compress: true

  security:
    jwtclaim:
      audienceId: humidityMonitorProject
      keyFile: humidityControllerKeyFile
      algorithm: RSA
      isPrivateKey: true
```

### Attributes schema

<table><thead><tr><th width="204">Attribute</th><th width="323">Description</th><th width="135">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>name</td><td>Name of source stream </td><td>String</td><td>true</td></tr><tr><td>broker </td><td>Broker server address i.e. http://&#x3C;ip-address>:port</td><td>String</td><td>true</td></tr><tr><td>topic</td><td>Message topic to subscribe too</td><td>List os topic strings</td><td>true</td></tr><tr><td>clientId</td><td>A unique client identifier on the server being connected too</td><td>String</td><td>true</td></tr><tr><td>username</td><td>Username</td><td>String</td><td>false</td></tr><tr><td>password</td><td>password</td><td>String</td><td>false</td></tr><tr><td>tenant</td><td>Namespace for created topics</td><td>String</td><td>false</td></tr><tr><td>qos</td><td>Quality of service</td><td><p>Integer</p><p>Default: 0</p></td><td>false</td></tr><tr><td>auto reconnect</td><td>Automatically reconnect to broker on disconnection</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>clean restart</td><td>This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>session expiry interval</td><td>Maximum time that the broker will maintain the session within persistent storage when a client disconnects due to a process or network fault.</td><td><p>Long</p><p>Default: 300 (seconds)</p><p>5 minutes</p></td><td>false</td></tr><tr><td>registration message</td><td>Message to send to broker when a Joule process registers</td><td>String</td><td>false</td></tr><tr><td>user properties</td><td>Sets the user properties</td><td>Map&#x3C;String, String></td><td>false</td></tr><tr><td>connection timeout</td><td>This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established</td><td><p>Integer</p><p>Default: 30 (Seconds)</p></td><td>false</td></tr><tr><td>keepalive interval</td><td>This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout</td><td><p>Integer</p><p>Default: 30 (Seconds)</p></td><td>false</td></tr><tr><td>retained</td><td><p>Inform broker to retain last message. </p><p></p><p>On a client reconnect to the broker it will receive the last published message</p></td><td>Boolean<br>Default: false</td><td>false</td></tr><tr><td>append identifier</td><td>Append the Joule id and client id to the start of the publishing topic. </td><td>Boolean<br>Default: false</td><td>false</td></tr><tr><td>message expiry interval</td><td>Time in seconds a published message is stored on the broker.</td><td>Long<br>Default: 0</td><td>false</td></tr><tr><td>lwt notifier</td><td>Last will and testament notification specification</td><td>See <a href="mqtt/last-will-and-testament">Last will</a>  page</td><td>false</td></tr><tr><td>serializer</td><td>Serialisation configuration</td><td>See <a href="#serialization-attributes">Serialisation attributes</a> section</td><td>false</td></tr><tr><td>security</td><td>Security configuration</td><td>See Security documentation</td><td>false</td></tr></tbody></table>

### **Serialisation** attributes schema

<table><thead><tr><th width="200">Attribute</th><th width="228">Description</th><th width="214">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>transform</td><td>User provided implementation</td><td>CustomTransformer</td><td>false</td></tr><tr><td>formatter</td><td>This is used when a custom transformer is not provided. Useful when chaining Joule processes</td><td><p>Formatter</p><p>Default: jsonFormatter</p></td><td>false</td></tr><tr><td>compressed</td><td>Compress payload using Snappy</td><td><p>Boolean </p><p>Default: false</p></td><td>false</td></tr><tr><td>batch</td><td>Flag to batch multiple messages in to a single payload</td><td><p>Boolean</p><p>Default: false</p></td><td>false</td></tr></tbody></table>

### **Serialiser example**

In this example, the MQTT publisher is configured to serialise `StreamEvents` as compressed JSON payloads.

The `compress` setting is enabled to reduce the size of the data and `batch` is set to `false`, meaning events are sent individually rather than in batches.

```yaml
mqttPublisher:
  ...

  serializer:
    compress: true
    batch: false
```

## Additional resources

* Official [Mosquitto documentation](https://mosquitto.org/documentation/)
* Good user [documentation](http://www.steves-internet-guide.com)
* MQTT X [UI for testing](https://mqttx.app/docs/downloading-and-installation)
