MQTT
Lightweight messaging protocol ideal for IoT use cases
Overview
This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.
MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.
Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.
Client library: org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5
Examples & DSL attributes
This example illustrates how to configure an MQTT consumer to subscribe to events from the mydevice/leaf
topic.
The consumer connects to the MQTT broker at tcp://127.0.0.1:1883
and consumes events with a Quality of Service (QoS) level of 1.
Client settings The consumer is identified by the
clientId
(myClientId
), with authentication using the usernamelyndon
and the tenantuk
.Event deserialisation The events are consumed as
customer objects
and then transformed intoJoule StreamEvent
objects using theCustomerToStreamEventTranslator
class.Additional settings The configuration also includes settings for
compressed
(set tofalse
) andbatch
(set tofalse
), which affect how events are processed and transformed.
Attributes schema
name
Name of source stream
String
broker
Broker server address
http://<ip-address>:port
topic
Message topic to subscribe too
Strings
clientId
A unique client identifier on the server being connected too
String
username
Username
String
password
password
String
tenant
Namespace for created topics
String
qos
Quality of service
Integer
Default: 0
auto reconnect
Automatically reconnect to broker on disconnection
Boolean
Default: true
clean restart
This means that if a client disconnects and reconnects within 5 minutes with clean start=false,qos>1
then session state data ( i.e. subscribed topics, queued messages) are retained
Boolean
Default: true
sessionExpiry interval
Maximum time that the broker will maintain the session for once the client disconnects
Long
Default: 300 (seconds)
5 minutes
registration message
Message to send to broker when a Joule process registers
String
user properties
Sets the user properties
Map<String, String>
connection timeout
This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established
Integer
Default: 30 (Seconds)
keepalive interval
This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout
Integer
Default: 30 (Seconds)
security
Security configuration
See Security documentation
Deserialisation attributes schema
This topic provides configuration parameters available object deserialisation process.
parser
User provided implementation
Implementation of StreamEventParser
compressed
Uncompress payload using Snappy
Boolean
Default: false
batch
Flag to inform process multiple messages have been sent in payload
Boolean
Default: false
Deserialiser example
In the given MQTT consumer configuration, the deserializer
section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.
parser The
parser
attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
) responsible for converting the incoming message into aJoule StreamEvent
object. This transformation is necessary to make the data compatible with the platform’s processing system.compressed The
compressed: false
setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set totrue
and the system would decompress the data before processing.batch The
batch: false
setting means that the events will not be grouped into batches for processing. Each event is processed individually as it is received. If set totrue
, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.
Additional resources
Official Mosquitto documentation
Good user documentation
MQTT X UI for testing
Last updated