In MQTT, wildcards provide a powerful mechanism for subscribing to multiple topics simultaneously. When a client subscribes to a topic, it can either subscribe to the exact topic of a published message or utilise wildcards to broaden its subscription.
How are wildcards used?
Clients can subscribe to a wildcard topic to receive messages from multiple matching topics.
Wildcards can reduce overhead by eliminating the need to subscribe to each topic individually.
Wildcards are used when there is uncertainty about the topics that publishing clients will use.
Single level wildcard
Taking advantage of MQTT wildcard capabilities is through the use of a custom event parser. This section will walk you through the process.
Example
Events will be received from various rooms within a house using the follow example topic structure:
public class TemperatureHumiditySensorParser implements StreamEventParser<List<StreamEvent>> {
private String ROOM = "room";
private ObjectMapper objectMapper;
public TemperatureHumiditySensorParser() {
// REQUIRED
}
@Override
public void initialise() throws StreamsException {
objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.INDENT_OUTPUT, false);
objectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
@Override
public List<StreamEvent> translate(Object payload) throws TranslationException {
TemperatureHumidityEvent event = parseSingleEvent(payload);
if(event == null){
throw new TranslationException("Expected TemperatureHumidityEvent type to parse.");
}
return Collections.singletonList(event.toStreamEvent());
}
@Override
public List<StreamEvent> translate(Object payload, String source) throws TranslationException {
// First parse the event to the standard data type
List<StreamEvent> events = translate(payload);
// Now decorate the parsed events with room identifier
String[] tokens = source.split("/");
if(tokens.length != 4){
// Now decorate the event with the source information
events.forEach(event -> {
event.addValue(ROOM, tokens[3]);
});
}
return events;
}
/**
* Parse object to a single TemperatureHumidityEvent
*
* @param obj
* @return
*/
private TemperatureHumidityEvent parseSingleEvent(final Object obj) {
TemperatureHumidityEvent event = null;
try {
if (obj instanceof String str) {
event = objectMapper.readValue(str, TemperatureHumidityEvent.class);
} else if (obj instanceof byte[] byteArray) {
event = objectMapper.readValue(byteArray, TemperatureHumidityEvent.class);
}
} catch (IOException e) {
// Consume exception
}
return event;
}
/**
* Parse object to multiple stream events
*
* @param obj
* @return
*/
private List<TemperatureHumidityEvent> parseMultipleEvents(final Object obj) {
List<TemperatureHumidityEvent> events = null;
try {
if (obj instanceof String str) {
events = Arrays.asList(objectMapper.readValue(str, TemperatureHumidityEvent[].class));
} else if (obj instanceof byte[] byteArray) {
events = Arrays.asList(objectMapper.readValue(byteArray, TemperatureHumidityEvent[].class));
}
} catch (IOException e) {
// Consume exception
}
return events;
}
}
Multi level wildcard
Nothing really changes much when using this wildcard except that you may want to change how you parse the topic string.
Example
This example would now receive all sensor information from a house. The parser would need to coded such that events can be handled holistically within the target stream processor