MQTT

Lightweight messaging protocol ideal for IoT use cases

Overview

This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.

MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.

Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.

Examples & DSL attributes

This example illustrates how to configure an MQTT consumer to subscribe to events from the mydevice/leaf topic.

The consumer connects to the MQTT broker at tcp://127.0.0.1:1883 and consumes events with a Quality of Service (QoS) level of 1.

  1. Client settings The consumer is identified by the clientId (myClientId), with authentication using the username lyndon and the tenant uk.

  2. Event deserialisation The events are consumed as customer objects and then transformed into Joule StreamEvent objects using the CustomerToStreamEventTranslator class.

  3. Additional settings The configuration also includes settings for compressed (set to false) and batch (set to false), which affect how events are processed and transformed.

mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: mydevice/leaf

  clientId: myClientId
  username: lyndon
  tenant: uk
  qos: 1

  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

broker

Broker server address

http://<ip-address>:port

topic

Message topic to subscribe too

Strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean start=false,qos>1 then session state data ( i.e. subscribed topics, queued messages) are retained

Boolean

Default: true

sessionExpiry interval

Maximum time that the broker will maintain the session for once the client disconnects

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout

Integer

Default: 30 (Seconds)

deserializer

Deserialisation configuration

security

Security configuration

See Security documentation

Deserialisation attributes schema

This topic provides configuration parameters available object deserialisation process.

Attribute
Description
Data Type
Required

parser

User provided implementation

Implementation of StreamEventParser

compressed

Uncompress payload using Snappy

Boolean

Default: false

batch

Flag to inform process multiple messages have been sent in payload

Boolean

Default: false

Deserialiser example

In the given MQTT consumer configuration, the deserializer section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.

  1. parser The parser attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator) responsible for converting the incoming message into a Joule StreamEvent object. This transformation is necessary to make the data compatible with the platform’s processing system.

  2. compressed The compressed: false setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set to true and the system would decompress the data before processing.

  3. batch The batch: false setting means that the events will not be grouped into batches for processing. Each event is processed individually as it is received. If set to true, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.

mqttConsumer:
  ...
  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false

Additional resources

Last updated