MQTT
Lightweight messaging protocol ideal for IoT use cases
Overview
This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.
MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.
Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.
Client library: org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5
Examples & DSL attributes
This example illustrates how to configure an MQTT consumer to subscribe to events from the mydevice/leaf
topic.
The consumer connects to the MQTT broker at tcp://127.0.0.1:1883
and consumes events with a Quality of Service (QoS) level of 1.
Client settings The consumer is identified by the
clientId
(myClientId
), with authentication using the usernamelyndon
and the tenantuk
.Event deserialisation The events are consumed as
customer objects
and then transformed intoJoule StreamEvent
objects using theCustomerToStreamEventTranslator
class.Additional settings The configuration also includes settings for
compressed
(set tofalse
) andbatch
(set tofalse
), which affect how events are processed and transformed.
Attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
broker | Broker server address | http://<ip-address>:port | |
topic | Message topic to subscribe too | Strings | |
clientId | A unique client identifier on the server being connected too | String | |
username | Username | String | |
password | password | String | |
tenant | Namespace for created topics | String | |
qos | Quality of service | Integer Default: 0 | |
auto reconnect | Automatically reconnect to broker on disconnection | Boolean Default: true | |
clean restart | This means that if a client disconnects and reconnects within 5 minutes with clean | Boolean Default: true | |
sessionExpiry interval | Maximum time that the broker will maintain the session for once the client disconnects | Long Default: 300 (seconds) 5 minutes | |
registration message | Message to send to broker when a Joule process registers | String | |
user properties | Sets the user properties | Map<String, String> | |
connection timeout | This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established | Integer Default: 30 (Seconds) | |
keepalive interval | This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout | Integer Default: 30 (Seconds) | |
deserializer | Deserialisation configuration | See Deserialisation attributes section | |
security | Security configuration | See Security documentation |
Deserialisation attributes schema
This topic provides configuration parameters available object deserialisation process.
Attribute | Description | Data Type | Required |
---|---|---|---|
parser | User provided implementation | Implementation of StreamEventParser | |
compressed | Uncompress payload using Snappy | Boolean Default: false | |
batch | Flag to inform process multiple messages have been sent in payload | Boolean Default: false |
Deserialiser example
In the given MQTT consumer configuration, the deserializer
section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.
parser The
parser
attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
) responsible for converting the incoming message into aJoule StreamEvent
object. This transformation is necessary to make the data compatible with the platform’s processing system.compressed The
compressed: false
setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set totrue
and the system would decompress the data before processing.batch The
batch: false
setting means that the events will not be grouped into batches for processing. Each event is processed individually as it is received. If set totrue
, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.
Additional resources
Official Mosquitto documentation
Good user documentation
MQTT X UI for testing
Last updated