Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Examples & DSL attributes
  • Attributes schema
  • Deserialisation attributes schema
  • Deserialiser example
  • Additional resources

Was this helpful?

  1. Components
  2. Connectors
  3. Sources

MQTT

Lightweight messaging protocol ideal for IoT use cases

PreviousFurther RabbitMQ configurationsNextTopic wildcards

Last updated 2 months ago

Was this helpful?

Overview

This page provides an overview of how Joule utilises the MQTT protocol to subscribe to events, specifically in IoT use cases.

MQTT is a lightweight, publish-subscribe network protocol used for message queuing services, making it ideal for machine-to-machine communication in IoT.

Joule enables the subscription to events through an MQTT source consumer, which allows for the consumption of events from a specified MQTT topic.

Client library:

Examples & DSL attributes

This example illustrates how to configure an MQTT consumer to subscribe to events from the customers topic.

The consumer connects to the MQTT broker at tcp://127.0.0.1:1883 and consumes events with a Quality of Service (QoS) level of 1.

  1. Client settings The consumer is identified by the clientId (jouleCustomerConsumer), with authentication using the username joule and the tenant uk.

  2. Event deserialisation The events are consumed as customer objects and then transformed into Joule StreamEvent objects using the CustomerToStreamEventTranslator class.

  3. Additional settings The configuration also includes settings for compressed (set to false) and batch (set to f

  4. alse), which affect how events are processed and transformed.

mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: customers

  clientId: jouleCustomerConsumer
  username: joule
  password: joule
  tenant: uk
  qos: 1

  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false

Use the Joule SDK to build a custom parser and deploy the resulting jar file to the userlibs directory. See for further information.

import com.fasterxml.jackson.annotation.JsonRootName;
import com.fractalworks.streams.core.data.streams.StreamEvent;
import com.fractalworks.streams.core.exceptions.TranslationException;
import com.fractalworks.streams.sdk.codec.StreamEventParser;
import com.fractalworks.streams.transport.Customer;

import java.util.Collection;
import java.util.Collections;

@JsonRootName(value = "customer parser")
public class CustomerToStreamEventTranslator implements StreamEventParser {

    public CustomerToStreamEventTranslator() {
        // REQUIRED
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Customer customer){
            StreamEvent event = new StreamEvent("customer");
            event.addValue("id", customer.getId());
            event.addValue("firstname", customer.getFirstname());
            event.addValue("surname", customer.getSurname());
            event.addValue("age", customer.getAge());

            events = Collections.singletonList( event);
        }
        return events;
    }
}

Create a local configuration as mosquitto.conf within the /etc/mosquitto/conf.d/ directory.

per_listener_settings true
pid_file /run/mosquitto/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log

listener 1883
allow_anonymous true
password_file /etc/mosquitto/passwd

include_dir /etc/mosquitto/conf.d

You create the password file using the command. This will create the user joule using the password/tenant pair within the passwd file.

mosquitto_passwd -b passwd joule joule/uk

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

broker

Broker server address

http://<ip-address>:port

topic

Message topic to subscribe too

Strings

clientId

A unique client identifier on the server being connected too

String

username

Username

String

password

password

String

tenant

Namespace for created topics

String

qos

Quality of service

Integer

Default: 0

auto reconnect

Automatically reconnect to broker on disconnection

Boolean

Default: true

clean restart

This means that if a client disconnects and reconnects within 5 minutes with clean restart=false,qos > 1 then session state data ( i.e. subscribed topics, queued messages) are retained

Boolean

Default: true

sessionExpiry interval

Maximum time that the broker will maintain the session for once the client disconnects

Long

Default: 300 (seconds)

5 minutes

registration message

Message to send to broker when a Joule process registers

String

user properties

Sets the user properties

Map<String, String>

connection timeout

This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established

Integer

Default: 30 (Seconds)

keepalive interval

This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout

Integer

Default: 30 (Seconds)

lwt handler

Last Will and Testament message handler.

deserializer

Deserialisation configuration

security

Security configuration

See Security documentation

Deserialisation attributes schema

This topic provides configuration parameters available object deserialisation process.

Attribute
Description
Data Type
Required

parser

User provided implementation

Implementation of StreamEventParser

compressed

Uncompress payload using Snappy

Boolean

Default: false

batch

Flag to inform process multiple messages have been sent in payload

Boolean

Default: false

Deserialiser example

In the given MQTT consumer configuration, the deserializer section defines how the incoming events should be processed and transformed before being consumed by the Joule platform.

  1. parser The parser attribute specifies the class (com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator) responsible for converting the incoming message into a Joule StreamEvent object. This transformation is necessary to make the data compatible with the platform’s processing system.

  2. compressed The compressed: false setting indicates that the incoming data is not compressed. If the data were compressed, this setting would need to be set to true and the system would decompress the data before processing.

  3. batch The batch: false setting means that the events will not be grouped into batches for processing. Each event is processed individually as it is received. If set to true, it would allow the consumer to handle multiple events in a single batch, which can improve efficiency in some scenarios.

mqttConsumer:
  ...
  deserializer:
    parser: com.fractalworks.streams.transport.mqtt.CustomerToStreamEventTranslator
    compressed: false
    batch: false
import com.fasterxml.jackson.annotation.JsonRootName;
import com.fractalworks.streams.core.data.streams.StreamEvent;
import com.fractalworks.streams.core.exceptions.TranslationException;
import com.fractalworks.streams.sdk.codec.StreamEventParser;
import com.fractalworks.streams.transport.Customer;

import java.util.Collection;
import java.util.Collections;

@JsonRootName(value = "customer parser")
public class CustomerToStreamEventTranslator implements StreamEventParser {

    public CustomerToStreamEventTranslator() {
        // REQUIRED
    }

    @Override
    public Collection<StreamEvent> translate(Object o) throws TranslationException {
        Collection<StreamEvent> events = null;
        if(o instanceof Customer customer){
            StreamEvent event = new StreamEvent("customer");
            event.addValue("id", customer.getId());
            event.addValue("firstname", customer.getFirstname());
            event.addValue("surname", customer.getSurname());
            event.addValue("age", customer.getAge());

            events = Collections.singletonList( event);
        }
        return events;
    }
}

Additional resources

See page

See section

Official

Good user

MQTT X

org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5
Joule SDK
Mosquitto documentation
documentation
UI for testing
Last Will and Testament
Deserialisation attributes