MQTT

Overview

MQTT is a lightweight, publish-subscribe, machine to machine network protocol for Message queue/Message queuing service. Joule provides the ability to publish events using MQTT publisher

Example

mqttPublisher:
  clientId: myClientId
  username: lyndon
  tenant: uk
  topic: mydevice/leaf
  qos: 1
  broker: tcp://127.0.0.1:1883

  serializer:
    compress: true

  security:
    jwtclaim:
      audienceId: projectID
      keyFile: file2
      algorithm: RSA
      isPrivateKey: true

This basic example leverages the default setting resulting in events published to the mydevice/leaf topic as StreamEvent Json object.

Attributes schema

Available configuration parameters

AttributeDescriptionData TypeRequired

name

Name of source stream

String

broker

Broker server address

http://<ip-address>:port

topic

Message topic to subscribe too

Strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained

Boolean

Default: true

sessionExpiry interval

Maximum time that the broker will maintain the session for once the client disconnects

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established.

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout.

Integer

Default: 30 (Seconds)

serializer

Serialization configuration

See Serialization Attributes section

last will

Last will specification

See Last Will Attributes section

security

Security configuration

Serialization Attributes

This topic provides configuration parameters available object serialization process.

AttributeDescriptionData TypeRequired

transform

User provided implementation

CustomTransformer

formatter

This is used when a custom transformer is not provided. Useful when chaining Joule processes

Formatter

Default: jsonFormatter

compressed

Compress payload using Snappy

Boolean

Default: false

batch

Flag to batch multiple messages in to a single payload

Boolean

Default: false

Serializer example

The configuration below will serialize StreamEvents as compress Json payloads

serializer:
  compress: true
  batch: false

Last Will Attributes

These attributes provide the configuration parameters to send publisher disconnection notifications to connected subscribers.

AttributeDescriptionData TypeRequired

topic

Topic to sent status message on to

String

message

Message to send to consumers when publisher no longer exists

String

Default: "Publisher disconnected from broker"

interval delay

The Server delays publishing the Client's Will Message until the Will Delay Interval has passed or the Session ends, whichever happens first.

Long

Default: 5 seconds

retain

If message is retained

Boolean

Default: true

qos

Quality of service

Integer

Default: 1

Last will example

last will:
  topic: usecasename/status/failure
  message: "Joule MQTT process disconnected from broker"
  interval delay: 15

Client library

org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5

Additional Resources

Last updated