Lightweight messaging protocol ideal for IoT use cases
Overview
This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.
MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.
Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.
This example illustrates how to configure an MQTT consumer to subscribe to events from the customers topic.
The consumer connects to the MQTT broker at tcp://127.0.0.1:1883 and consumes events with a Quality of Service (QoS) level of 1.
Client settingsThe consumer is identified by the clientId (jouleCustomerConsumer), with authentication using the username joule and the tenant uk.
Event deserialisationThe events are consumed as customer objects and then transformed into Joule StreamEvent objects using the CustomerToStreamEventTranslator class.
Additional settingsThe configuration also includes settings for compressed (set to false) and batch (set to f
alse), which affect how events are processed and transformed.
Create a local configuration as mosquitto.conf within the /etc/mosquitto/conf.d/ directory.
You create the password file using the command. This will create the user joule using the password/tenant pair within the passwd file.
Attributes schema
Attribute
Description
Data Type
Required
name
Name of source stream
String
broker
Broker server address
http://<ip-address>:port
topic
Message topic to subscribe too
Strings
clientId
A unique client identifier on the server being connected too
String
username
Username
String
password
password
String
tenant
Namespace for created topics
String
qos
Quality of service
Integer
Default: 0
auto reconnect
Automatically reconnect to broker on disconnection
Boolean
Default: true
clean restart
This means that if a client disconnects and reconnects within 5 minutes with clean restart=false,qos > 1 then session state data ( i.e. subscribed topics, queued messages) are retained
Boolean
Default: true
sessionExpiry interval
Maximum time that the broker will maintain the session for once the client disconnects
Long
Default: 300 (seconds)
5 minutes
registration message
Message to send to broker when a Joule process registers
String
user properties
Sets the user properties
Map<String, String>
connection timeout
This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established
Integer
Default: 30 (Seconds)
keepalive interval
This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout
This topic provides configuration parameters available object deserialisation process.
Attribute
Description
Data Type
Required
parser
User provided implementation
Implementation of StreamEventParser
compressed
Uncompress payload using Snappy
Boolean
Default: false
batch
Flag to inform process multiple messages have been sent in payload
Boolean
Default: false
Deserialiser example
In the given MQTT consumer configuration, the deserializer section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.
parserThe parser attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator) responsible for converting the incoming message into a Joule StreamEvent object.
This transformation is necessary to make the data compatible with the platform’s processing system.
compressedThe compressed: false setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set to true and the system would decompress the data before processing.
batchThe batch: false setting means that the events will not be grouped into batches for processing.
Each event is processed individually as it is received. If set to true, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.