Joule
Search
K
Comment on page

MQTT

MQTT is a lightweight, publish-subscribe, machine to machine network protocol for Message queue/Message queuing service. Joule provides the ability to subscribe to events using MQTT source consumer

Driver details

org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5

Configuration Guide

Example configuration

mqttConsumer:
broker: tcp://127.0.0.1:1883
topic: mydevice/leaf
clientId: myClientId
username: lyndon
tenant: uk
qos: 1
deserializer:
transform: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
compressed: false
batch: false
This basic example leverages the default setting resulting in events consumed from the mydevice/leaf topic as a Customer object. This object is then transformed in to a Joule StreamEvent object to ready for platform processing.

Core Attributes

Available configuration parameters
Attribute
Description
Data Type
Required
name
Name of source stream
String
broker
Broker server address
http://<ip-address>:port
topic
Message topic to subscribe too
Strings
clientId
A unique client identifier on the server being connected too
String
username
Username
String
password
password
String
tenant
Namespace for created topics
String
qos
Quality of service
Integer
Default: 0
auto reconnect
Automatically reconnect to broker on disconnection
Boolean
Default: true
clean restart
This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained
Boolean
Default: true
sessionExpiry interval
Maximum time that the broker will maintain the session for once the client disconnects
Long
Default: 300 (seconds)
5 minutes
registration message
Message to send to broker when a Joule process registers
String
user properties
Sets the user properties
Map<String, String>
connection timeout
This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established.
Integer
Default: 30 (Seconds)
keepalive interval
This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout.
Integer
Default: 30 (Seconds)
deserializer
Deserialization configuration
See Deserialization Attributes section
security
Security configuration

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.
Attribute
Description
Data Type
Required
transform
User provided implementation
Implementation of StreamEventParser
compressed
Uncompress payload using Snappy
Boolean
Default: false
batch
Flag to inform process multiple messages have been sent in payload
Boolean
Default: false

Deserializer example

deserializer:
transform: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
compressed: false
batch: false

Additional Resources

Last modified 4mo ago