MQTT
Allows publishing events through MQTT, ideal for IoT and lightweight streaming
Overview
MQTT is a lightweight, publish-subscribe protocol ideal for machine-to-machine communication in IoT environments.
Joule leverages MQTT to publish events through an MQTT publisher, allowing communication between devices and services over specified topics.
Examples & DSL attributes
This example configures an MQTT publisher to send events to the lab/humidity_valve topic.
The events are published as compressed JSON, using QoS level 1 for reliable delivery.
The publisher connects with the client ID humidityControllerPublisher, username joule and tenant uk to the broker at tcp://127.0.0.1:1883. Security is handled with a JWT claim using RSA encryption, with a private key stored in humidityControllerKeyFile.
mqttPublisher:
  clientId: humidityControllerPublisher
  username: joule
  tenant: uk
  topic: lab/humidity_valve
  qos: 1
  broker: tcp://127.0.0.1:1883
  serializer:
    compress: true
  security:
    jwtclaim:
      audienceId: humidityMonitorProject
      keyFile: humidityControllerKeyFile
      algorithm: RSA
      isPrivateKey: trueAttributes schema
name
Name of source stream
String
broker
Broker server address i.e. http://<ip-address>:port
String
topic
Message topic to subscribe too
List os topic strings
clientId
A unique client identifier on the server being connected too
String
username
Username
String
password
password
String
tenant
Namespace for created topics
String
qos
Quality of service
Integer
Default: 0
auto reconnect
Automatically reconnect to broker on disconnection
Boolean
Default: true
clean restart
This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained
Boolean
Default: true
session expiry interval
Maximum time that the broker will maintain the session within persistent storage when a client disconnects due to a process or network fault.
Long
Default: 300 (seconds)
5 minutes
registration message
Message to send to broker when a Joule process registers
String
user properties
Sets the user properties
Map<String, String>
connection timeout
This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established
Integer
Default: 30 (Seconds)
keepalive interval
This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout
Integer
Default: 30 (Seconds)
retained
Inform broker to retain last message.
On a client reconnect to the broker it will receive the last published message
Boolean Default: false
append identifier
Append the Joule id and client id to the start of the publishing topic.
Boolean Default: false
message expiry interval
Time in seconds a published message is stored on the broker.
Long Default: 0
security
Security configuration
See Security documentation
Serialisation attributes schema
transform
User provided implementation
CustomTransformer
formatter
This is used when a custom transformer is not provided. Useful when chaining Joule processes
Formatter
Default: jsonFormatter
compressed
Compress payload using Snappy
Boolean
Default: false
batch
Flag to batch multiple messages in to a single payload
Boolean
Default: false
Serialiser example
In this example, the MQTT publisher is configured to serialise StreamEvents as compressed JSON payloads.
The compress setting is enabled to reduce the size of the data and batch is set to false, meaning events are sent individually rather than in batches.
mqttPublisher:
  ...
  serializer:
    compress: true
    batch: falseAdditional resources
- Official Mosquitto documentation 
- Good user documentation 
- MQTT X UI for testing 
Last updated
Was this helpful?
