MQTT
Allows publishing events through MQTT, ideal for IoT and lightweight streaming
Overview
MQTT is a lightweight, publish-subscribe protocol ideal for machine-to-machine communication in IoT environments.
Joule leverages MQTT to publish events through an MQTT publisher, allowing communication between devices and services over specified topics.
Client library: org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5
Examples & DSL attributes
This example configures an MQTT publisher to send events to the mydevice/leaf
topic.
The events are published as compressed JSON, using QoS level 1 for reliable delivery.
The publisher connects with the client ID myClientId
, username lyndon
and tenant uk
to the broker at tcp://127.0.0.1:1883
. Security is handled with a JWT claim using RSA encryption, with a private key stored in file2
.
Attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
broker | Broker server address | http://<ip-address>:port | |
topic | Message topic to subscribe too | Strings | |
clientId | A unique client identifier on the server being connected too | String | |
username | Username | String | |
password | password | String | |
tenant | Namespace for created topics | String | |
qos | Quality of service | Integer Default: 0 | |
auto reconnect | Automatically reconnect to broker on disconnection | Boolean Default: true | |
clean restart | This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained | Boolean Default: true | |
sessionExpiry interval | Maximum time that the broker will maintain the session for once the client disconnects | Long Default: 300 (seconds) 5 minutes | |
registration message | Message to send to broker when a Joule process registers | String | |
user properties | Sets the user properties | Map<String, String> | |
connection timeout | This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established | Integer Default: 30 (Seconds) | |
keepalive interval | This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout | Integer Default: 30 (Seconds) | |
serializer | Serialisation configuration | See Serialisation attributes section | |
last will | Last will specification | See Last will attributes section | |
security | Security configuration | See Security documentation |
Serialisation attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
transform | User provided implementation | CustomTransformer | |
formatter | This is used when a custom transformer is not provided. Useful when chaining Joule processes | Formatter Default: jsonFormatter | |
compressed | Compress payload using Snappy | Boolean Default: false | |
batch | Flag to batch multiple messages in to a single payload | Boolean Default: false |
Serialiser example
In this example, the MQTT publisher is configured to serialise StreamEvents
as compressed JSON payloads.
The compress
setting is enabled to reduce the size of the data and batch
is set to false
, meaning events are sent individually rather than in batches.
Last will attributes schema
These attributes provide the configuration parameters to send publisher disconnection notifications to connected subscribers.
Attribute | Description | Data Type | Required |
---|---|---|---|
topic | Topic to sent status message on to | String | |
message | Message to send to consumers when publisher no longer exists | String Default: "Publisher disconnected from broker" | |
interval delay | The Server delays publishing the Client's Will Message until the Will Delay Interval has passed or the Session ends, whichever happens first. | Long Default: 5 seconds | |
retain | If message is retained | Boolean Default: true | |
qos | Quality of service | Integer Default: 1 |
Last will example
In this example, the last will
feature is configured for the MQTT publisher.
It specifies that if the publisher disconnects unexpectedly, a message will be sent to the topic usecasename/status/failure
to notify subscribers of the disconnection.
The message will indicate that the Joule MQTT process disconnected from broker
and the publisher will wait for a delay of 15 seconds before sending this notification.
Additional resources
Official Mosquitto documentation
Good user documentation
MQTT X UI for testing
Last updated