MQTT
Lightweight messaging protocol ideal for IoT use cases
Last updated
Was this helpful?
Lightweight messaging protocol ideal for IoT use cases
Last updated
Was this helpful?
This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.
MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.
Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.
This example illustrates how to configure an MQTT consumer to subscribe to events from the customers
topic.
The consumer connects to the MQTT broker at tcp://127.0.0.1:1883
and consumes events with a Quality of Service (QoS) level of 1.
Client settings
The consumer is identified by the clientId
(jouleCustomerConsumer
), with authentication using the username joule
and the tenant uk
.
Event deserialisation
The events are consumed as customer objects
and then transformed into Joule StreamEvent
objects using the CustomerToStreamEventTranslator
class.
Additional settings
The configuration also includes settings for compressed
(set to false
) and batch
(set to f
alse
), which affect how events are processed and transformed.
name
Name of source stream
String
broker
Broker server address
http://<ip-address>:port
topic
Message topic to subscribe too
Strings
clientId
A unique client identifier on the server being connected too
String
username
Username
String
password
password
String
tenant
Namespace for created topics
String
qos
Quality of service
Integer
Default: 0
auto reconnect
Automatically reconnect to broker on disconnection
Boolean
Default: true
clean restart
This means that if a client disconnects and reconnects within 5 minutes with clean restart=false,qos > 1
then session state data ( i.e. subscribed topics, queued messages) are retained
Boolean
Default: true
sessionExpiry interval
Maximum time that the broker will maintain the session for once the client disconnects
Long
Default: 300 (seconds)
5 minutes
registration message
Message to send to broker when a Joule process registers
String
user properties
Sets the user properties
Map<String, String>
connection timeout
This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established
Integer
Default: 30 (Seconds)
keepalive interval
This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout
Integer
Default: 30 (Seconds)
lwt handler
Last Will and Testament message handler.
deserializer
Deserialisation configuration
security
Security configuration
See Security documentation
This topic provides configuration parameters available object deserialisation process.
parser
User provided implementation
Implementation of StreamEventParser
compressed
Uncompress payload using Snappy
Boolean
Default: false
batch
Flag to inform process multiple messages have been sent in payload
Boolean
Default: false
In the given MQTT consumer configuration, the deserializer
section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.
parser
The parser
attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
) responsible for converting the incoming message into a Joule StreamEvent
object.
This transformation is necessary to make the data compatible with the platform’s processing system.
compressed
The compressed: false
setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set to true
and the system would decompress the data before processing.
batch
The batch: false
setting means that the events will not be grouped into batches for processing.
Each event is processed individually as it is received. If set to true
, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.
See page
See section
Official
Good user
MQTT X