MQTT

Lightweight messaging protocol ideal for IoT use cases

Overview

MQTT is a lightweight, publish-subscribe, machine to machine network protocol for Message queue/Message queuing service. Joule provides the ability to subscribe to events using MQTT source consumer

Example

mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: mydevice/leaf

  clientId: myClientId
  username: lyndon
  tenant: uk
  qos: 1

  deserializer:
    transform: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false

This basic example leverages the default setting resulting in events consumed from the mydevice/leaf topic as a Customer object. This object is then transformed in to a Joule StreamEvent object to ready for platform processing.

Attributes schema

Available configuration parameters

AttributeDescriptionData TypeRequired

name

Name of source stream

String

broker

Broker server address

http://<ip-address>:port

topic

Message topic to subscribe too

Strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained

Boolean

Default: true

sessionExpiry interval

Maximum time that the broker will maintain the session for once the client disconnects

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established.

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout.

Integer

Default: 30 (Seconds)

deserializer

Deserialization configuration

See Deserialization Attributes section

security

Security configuration

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.

AttributeDescriptionData TypeRequired

transform

User provided implementation

Implementation of StreamEventParser

compressed

Uncompress payload using Snappy

Boolean

Default: false

batch

Flag to inform process multiple messages have been sent in payload

Boolean

Default: false

Deserializer example

deserializer:
  transform: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
  compressed: false
  batch: false

Client library

org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5

Additional Resources

Last updated