RabbitMQ
RabbitMQ is lightweight and easy to deploy messaging platform for event-based data integration
Overview
RabbitMQ is a lightweight and easy-to-deploy messaging platform that supports multiple messaging protocols. It can be configured in distributed and federated setups to meet high-scale and high-availability requirements.
Joule enables publishing to RabbitMQ queues, exchanges and topics, providing flexible event-driven architectures.
This page covers how to configure RabbitMQ publishers within Joule, including queue-based eventing, work queues for task distribution, publish / subscribe models and routing with dynamic keys.
The examples demonstrate how to serialise events into JSON, configure exchanges and use routing keys for targeted message delivery.
Additionally, it provides configuration details for common patterns like worker queues and topic-based exchanges, offering scalability and efficiency in event processing.
Client library: com.rabbitmq:amqp-client:5.16.0
Examples & DSL attributes
This example configures the RabbitMQ Publisher to send StreamEvents
as JSON-formatted messages to a queue named quotes_queue
.
Here's a breakdown:
queue The events will be published to the
quotes_queue
, a named queue in RabbitMQ.serializer The events are serialized using the
jsonFormatter
with UTF-8 encoding, ensuring the data is formatted as JSON.
This setup sends JSON-encoded events to the quotes_queue
in RabbitMQ, with the message content encoded in UTF-8.
Attributes schema
host
RabbitMQ server hostname or Ip address
String
Default: localhost
port
RabbitMQ server port, must be greater than 0.
Integer
Default: 5672
username
User name
String
Default: guest
password
password
String
Default: guest
virtualHost
Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.
String
Default: /
autoAck
Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment.
Boolean
Default: true
global
QoS - true if the settings should be applied to the entire channel rather than each consumer
Boolean
Default: true
durable
Set a durable queue (queue will survive a server restart)
Boolean
Default: true
autoDelete
Server deletes queue when not in use
Boolean
Default: true
exclusive
Exclusive queue to this connection
Boolean
Default: false
arguments
Additional queue properties
Map<String, Object>
Default: null
exchange
Exchange configuration
routing
Routing configuration
serializer
Serialisation configuration
Exchange attributes schema
name
Name of exchange
String
type
In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues
TOPIC, DIRECT, FANOUT, HEADERS
Default: TOPIC
arguments
Additional binding properties
Map<String, Object>
Default: null
Exchange example
This example configures the RabbitMQ Publisher to send events to an exchange instead of a queue:
exchange The events are published to an exchange named
marketQuotes
.type The exchange type is set to
TOPIC
, meaning events can be routed based on routing keys that include wildcards. This allows consumers to subscribe to specific types of messages using patterns likemarket.IBM
ormarket.*
.
This setup sends events to the marketQuotes
topic exchange, enabling more flexible routing of messages based on the routing keys used by consumers.
Routing attributes schema
event fields
List of event fields to create dynamic routing keys
String
Default: None
Routing example
In this example, the RabbitMQ Publisher is configured to dynamically create a routing key based on the market
and symbol
fields from the StreamEvent:
routing The routing key is constructed using the values of the
market
andsymbol
fields in the event. For example, if themarket
isnasdaq
and thesymbol
isIBM
, the routing keynasdaq.IBM
will be created.
This enables fine-grained control over which events are routed to specific consumers, as they can subscribe to events using specific routing keys like nasdaq.IBM
or nasdaq.*
.
Serialisation attributes schema
transform
User provided implementation that transforms the StreamEvent in to a domain class. The resulting domain class must implement serialisable interface
Implementation of StreamEventParser
formatter
If a custom transformer has not been provided the for formatter setting will be applied to the processed StreamEvent
object
Default: jsonFormatter
compressed
Not implemented
Boolean
Default: false
Serialiser example
This example demonstrates how to configure the serialiser for the RabbitMQ Publisher:
serializer The
jsonFormatter
is used to serialise the events.encoding The events are encoded in UTF-8, ensuring that the message content is in a standard JSON format that can be easily consumed by downstream applications.
This setup serialises the events into JSON format with UTF-8 encoding before publishing them to RabbitMQ.
Additional resources
Offical RabbitMQ documentation
CloudAMQP documentation
Last updated