Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Examples & DSL attributes
  • Attributes schema
  • Serialisation attributes schema
  • Serialiser example
  • Additional resources

Was this helpful?

  1. Components
  2. Connectors
  3. Sinks

MQTT

Allows publishing events through MQTT, ideal for IoT and lightweight streaming

PreviousFurther configurationsNextPersistent messaging

Last updated 2 months ago

Was this helpful?

Overview

MQTT is a lightweight, publish-subscribe protocol ideal for machine-to-machine communication in IoT environments.

Joule leverages MQTT to publish events through an MQTT publisher, allowing communication between devices and services over specified topics.

Client library:

Examples & DSL attributes

This example configures an MQTT publisher to send events to the lab/humidity_valve topic.

The events are published as compressed JSON, using QoS level 1 for reliable delivery.

The publisher connects with the client ID humidityControllerPublisher, username joule and tenant uk to the broker at tcp://127.0.0.1:1883. Security is handled with a JWT claim using RSA encryption, with a private key stored in humidityControllerKeyFile.

mqttPublisher:
  clientId: humidityControllerPublisher
  username: joule
  tenant: uk
  topic: lab/humidity_valve
  qos: 1
  broker: tcp://127.0.0.1:1883

  serializer:
    compress: true

  security:
    jwtclaim:
      audienceId: humidityMonitorProject
      keyFile: humidityControllerKeyFile
      algorithm: RSA
      isPrivateKey: true

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

broker

Broker server address i.e. http://<ip-address>:port

String

topic

Message topic to subscribe too

List os topic strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean start= false,qos>1 then session state data ( e.g subscribed topics, queued messages) are retained

Boolean

Default: true

session expiry interval

Maximum time that the broker will maintain the session within persistent storage when a client disconnects due to a process or network fault.

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout

Integer

Default: 30 (Seconds)

retained

Inform broker to retain last message.

On a client reconnect to the broker it will receive the last published message

Boolean Default: false

append identifier

Append the Joule id and client id to the start of the publishing topic.

Boolean Default: false

message expiry interval

Time in seconds a published message is stored on the broker.

Long Default: 0

lwt notifier

Last will and testament notification specification

serializer

Serialisation configuration

security

Security configuration

See Security documentation

Serialisation attributes schema

Attribute
Description
Data Type
Required

transform

User provided implementation

CustomTransformer

formatter

This is used when a custom transformer is not provided. Useful when chaining Joule processes

Formatter

Default: jsonFormatter

compressed

Compress payload using Snappy

Boolean

Default: false

batch

Flag to batch multiple messages in to a single payload

Boolean

Default: false

Serialiser example

In this example, the MQTT publisher is configured to serialise StreamEvents as compressed JSON payloads.

The compress setting is enabled to reduce the size of the data and batch is set to false, meaning events are sent individually rather than in batches.

mqttPublisher:
  ...

  serializer:
    compress: true
    batch: false

Additional resources

See page

See section

Official

Good user

MQTT X

org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5
Mosquitto documentation
documentation
UI for testing
Last will
Serialisation attributes