Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • How are wildcards used?
  • Single level wildcard
  • Example
  • Multi level wildcard
  • Example

Was this helpful?

  1. Components
  2. Connectors
  3. Sources
  4. MQTT

Topic wildcards

Overview

In MQTT, wildcards provide a powerful mechanism for subscribing to multiple topics simultaneously. When a client subscribes to a topic, it can either subscribe to the exact topic of a published message or utilise wildcards to broaden its subscription.

How are wildcards used?

  • Clients can subscribe to a wildcard topic to receive messages from multiple matching topics.

  • Wildcards can reduce overhead by eliminating the need to subscribe to each topic individually.

  • Wildcards are used when there is uncertainty about the topics that publishing clients will use.

Single level wildcard

Taking advantage of MQTT wildcard capabilities is through the use of a custom event parser. This section will walk you through the process.

Example

Events will be received from various rooms within a house using the follow example topic structure:

sensors/kitchen/temperaturehumidity
sensors/livingroom/temperaturehumidity
sensors/hallway/temperaturehumidity
sensors/bedroom1/temperaturehumidity
sensors/bedroom2/temperaturehumidity

And the subscription will use the single level wildcard method '+':

sensors/+/temperaturehumidity

We shall use the '+' MQTT wildcard to recieve all sensor data from each connected room.

mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: sensors/groundfloor/+/temperaturehumidity

  clientId: tempHumidyManagementProcessor
  qos: 1

  deserializer:
    parser: com.fractalworks.mqtt.example.TemperatureHumiditySensorParser
    compressed: false
    batch: false

This custom parser

public class TemperatureHumiditySensorParser implements StreamEventParser<List<StreamEvent>> {

    private String ROOM = "room";
    private ObjectMapper objectMapper;

    public TemperatureHumiditySensorParser() {
        // REQUIRED
    }

    @Override
    public void initialise() throws StreamsException {
        objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.INDENT_OUTPUT, false);
        objectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
    }

    @Override
    public List<StreamEvent> translate(Object payload) throws TranslationException {
        TemperatureHumidityEvent event = parseSingleEvent(payload);
        if(event == null){
            throw new TranslationException("Expected TemperatureHumidityEvent type to parse.");
        }
        return Collections.singletonList(event.toStreamEvent());
    }

    @Override
    public List<StreamEvent> translate(Object payload, String source) throws TranslationException {
        // First parse the event to the standard data type
        List<StreamEvent> events = translate(payload);

        // Now decorate the parsed events with room identifier
        String[] tokens = source.split("/");
        if(tokens.length != 4){
            // Now decorate the event with the source information
            events.forEach(event -> {
                event.addValue(ROOM, tokens[3]);
            });
        }
        return events;
    }

    /**
     * Parse object to a single TemperatureHumidityEvent
     *
     * @param obj
     * @return
     */
    private TemperatureHumidityEvent parseSingleEvent(final Object obj) {
        TemperatureHumidityEvent event = null;
        try {
            if (obj instanceof String str) {
                event = objectMapper.readValue(str, TemperatureHumidityEvent.class);
            } else if (obj instanceof byte[] byteArray) {
                event = objectMapper.readValue(byteArray, TemperatureHumidityEvent.class);
            }
        } catch (IOException e) {
            // Consume exception
        }
        return event;
    }

    /**
     * Parse object to multiple stream events
     *
     * @param obj
     * @return
     */
    private List<TemperatureHumidityEvent> parseMultipleEvents(final Object obj) {
        List<TemperatureHumidityEvent> events = null;
        try {
            if (obj instanceof String str) {
                events = Arrays.asList(objectMapper.readValue(str, TemperatureHumidityEvent[].class));
            } else if (obj instanceof byte[] byteArray) {
                events = Arrays.asList(objectMapper.readValue(byteArray, TemperatureHumidityEvent[].class));
            }
        } catch (IOException e) {
            // Consume exception
        }
        return events;
    }
}

Multi level wildcard

Nothing really changes much when using this wildcard except that you may want to change how you parse the topic string.

Example

This example would now receive all sensor information from a house. The parser would need to coded such that events can be handled holistically within the target stream processor

// Some code
mqttConsumer:
  broker: tcp://127.0.0.1:1883
  topic: sensors/#

  clientId: tempHumidyManagementProcessor
  qos: 1

  deserializer:
    parser: com.fractalworks.mqtt.example.TemperatureHumiditySensorParser
    compressed: false
    batch: false

Notes

PreviousMQTTNextSession management

Last updated 2 months ago

Was this helpful?

See the great documentation HiveMQ has produced that covers this .

subject