MQTT
Lightweight messaging protocol ideal for IoT use cases
Overview
MQTT is a lightweight, publish-subscribe, machine to machine network protocol for Message queue/Message queuing service. Joule provides the ability to subscribe to events using MQTT source consumer
Example
This basic example leverages the default setting resulting in events consumed from the mydevice/leaf
topic as a Customer object. This object is then transformed in to a Joule StreamEvent object to ready for platform processing.
Attributes schema
Available configuration parameters
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
broker | Broker server address | http://<ip-address>:port | |
topic | Message topic to subscribe too | Strings | |
clientId | A unique client identifier on the server being connected too | String | |
username | Username | String | |
password | password | String | |
tenant | Namespace for created topics | String | |
qos | Quality of service | Integer Default: 0 | |
auto reconnect | Automatically reconnect to broker on disconnection | Boolean Default: true | |
clean restart | This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained | Boolean Default: true | |
sessionExpiry interval | Maximum time that the broker will maintain the session for once the client disconnects | Long Default: 300 (seconds) 5 minutes | |
registration message | Message to send to broker when a Joule process registers | String | |
user properties | Sets the user properties | Map<String, String> | |
connection timeout | This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. | Integer Default: 30 (Seconds) | |
keepalive interval | This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. | Integer Default: 30 (Seconds) | |
deserializer | Deserialization configuration | See Deserialization Attributes section | |
security | Security configuration |
Deserialization Attributes
This topic provides configuration parameters available object deserialization process.
Attribute | Description | Data Type | Required |
---|---|---|---|
transform | User provided implementation | Implementation of StreamEventParser | |
compressed | Uncompress payload using Snappy | Boolean Default: false | |
batch | Flag to inform process multiple messages have been sent in payload | Boolean Default: false |
Deserializer example
Client library
Additional Resources
Official Mosquitto documentation
Good user documentation
MQTT X UI for testing
Last updated