MQTT

Allows publishing events through MQTT, ideal for IoT and lightweight streaming

Overview

MQTT is a lightweight, publish-subscribe protocol ideal for machine-to-machine communication in IoT environments.

Joule leverages MQTT to publish events through an MQTT publisher, allowing communication between devices and services over specified topics.

Examples & DSL attributes

This example configures an MQTT publisher to send events to the mydevice/leaf topic.

The events are published as compressed JSON, using QoS level 1 for reliable delivery.

The publisher connects with the client ID myClientId, username lyndon and tenant uk to the broker at tcp://127.0.0.1:1883. Security is handled with a JWT claim using RSA encryption, with a private key stored in file2.

mqttPublisher:
  clientId: myClientId
  username: lyndon
  tenant: uk
  topic: mydevice/leaf
  qos: 1
  broker: tcp://127.0.0.1:1883

  serializer:
    compress: true

  security:
    jwtclaim:
      audienceId: projectID
      keyFile: file2
      algorithm: RSA
      isPrivateKey: true

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

broker

Broker server address

http://<ip-address>:port

topic

Message topic to subscribe too

Strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained

Boolean

Default: true

sessionExpiry interval

Maximum time that the broker will maintain the session for once the client disconnects

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout

Integer

Default: 30 (Seconds)

serializer

Serialisation configuration

last will

Last will specification

security

Security configuration

See Security documentation

Serialisation attributes schema

Attribute
Description
Data Type
Required

transform

User provided implementation

CustomTransformer

formatter

This is used when a custom transformer is not provided. Useful when chaining Joule processes

Formatter

Default: jsonFormatter

compressed

Compress payload using Snappy

Boolean

Default: false

batch

Flag to batch multiple messages in to a single payload

Boolean

Default: false

Serialiser example

In this example, the MQTT publisher is configured to serialise StreamEvents as compressed JSON payloads.

The compress setting is enabled to reduce the size of the data and batch is set to false, meaning events are sent individually rather than in batches.

mqttPublisher:
  ...

  serializer:
    compress: true
    batch: false

Last will attributes schema

These attributes provide the configuration parameters to send publisher disconnection notifications to connected subscribers.

Attribute
Description
Data Type
Required

topic

Topic to sent status message on to

String

message

Message to send to consumers when publisher no longer exists

String

Default: "Publisher disconnected from broker"

interval delay

The Server delays publishing the Client's Will Message until the Will Delay Interval has passed or the Session ends, whichever happens first.

Long

Default: 5 seconds

retain

If message is retained

Boolean

Default: true

qos

Quality of service

Integer

Default: 1

Last will example

In this example, the last will feature is configured for the MQTT publisher.

It specifies that if the publisher disconnects unexpectedly, a message will be sent to the topic usecasename/status/failure to notify subscribers of the disconnection.

The message will indicate that the Joule MQTT process disconnected from broker and the publisher will wait for a delay of 15 seconds before sending this notification.

mqttPublisher:
  ...

  last will:
    topic: usecasename/status/failure
    message: "Joule MQTT process disconnected from broker"
    interval delay: 15

Additional resources

Last updated