Joule
Search
K
Comment on page

RabbitMQ

RabbitMQ is lightweight and easy to deploy messaging platform. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. Joule provides the ability to consume events using RabbitMQ messaging subscription

Driver details

com.rabbitmq:amqp-client:5.16.0

Configuration Guide

Example configuration

rabbitConsumer:
host: localhost
exchange:
name: marketQuotes
type: TOPIC
routing:
keys:
- NASDAQ.IBM
- NASDAQ.MSFT
- NASDAQ.GOOG
deserializer:
transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer
This example configures a RabbitMQ consumer to receive three event types using the routing keys elements from the topic exchange type. Quote events are deserialised from a StreamEvent Json object to a Joule StreamEvent object.

Core Attributes

Available configuration parameters
Attribute
Description
Data Type
Required
host
RabbitMQ server hostname or Ip address
String
Default: localhost
port
RabbitNQ server port, must be greater than 0.
Integer
Default: 5672
username
User name
String
Default: guest
password
password
String
Default: guest
virtualHost
Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.
See vhost documentation for further information
String
Default: /
autoAck
Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment.
Boolean
Default: true
prefetchCount
QoS -maximum number of messages that the server will deliver, 0 if unlimited
Integer
Default: 1
global
QoS - true if the settings should be applied to the entire channel rather than each consumer
Boolean
Default: true
durable
Set a durable queue (queue will survive a server restart)
Boolean
Default: true
autoDelete
Server deletes queue when not in use
Boolean
Default: true
exclusive
Exclusive queue to this connection
Boolean
Default: false
arguments
Additional queue properties
Map<String, Object>
Default: null
exchange
Exchange configuration
See Exchange Attributes section
routing
Routing configuration
See Routing Attributes section
deserializer
Deserialization configuration
See Deserialization Attributes section

Exchange Attributes

Attribute
Description
Data Type
Required
name
Name of exchange
String
type
In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues.
TOPIC, DIRECT, FANOUT, HEADERS
Default: TOPIC
arguments
Additional binding properties
Map<String, Object>
Default: null

Exchange example

exchange:
name: marketQuotes
type: DIRECT

Routing Attributes

Attribute
Description
Data Type
Required
keys
Consumers can selectively consume events sent to a topic exchange by specifying one or more words delimited by dots.
Valid key format examples: 'nasdaq.ibm', 'dept.inventory.bolts'.
There are two important special cases to define keys.
  • (star) can be substituted for exactly one word
  • #(hash) can be substituted for zero or more words
String
Default: All
Constraints: Any number of dot separated word to a limit of 255 bytes

Routing example

This example will subscribe to all events that match the first component of the key 'NASDAQ'
routing:
keys:
- NASDAQ.#

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.
Attribute
Description
Data Type
Required
transform
User provided implementation
Implementation of StreamEventParser
compressed
Not implemented
Boolean
Default: false

Deserializer example

deserializer:
transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer
compressed: false

Examples

All examples assume default values when not applied.

Queue based eventing

Sets up a simple consumer which subscribes to events directly off of a queue.
rabbitConsumer:
queue: point_to_point_queue
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Work Queues

Subscribe to a Worker Queue that is used to distribute time-consuming tasks among multiple Joule workers.
rabbitConsumer:
queue: worker_queue
autoAck: false
durable: true
prefetchCount: 1
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Publish/Subscribe

This example each consumer process will receive the same message. Because the exchange is configured as a fanout we do not need to use the routing element, this will be covered in the next example
rabbitConsumer:
exchange:
name: quotes_exchange
type: FANOUT
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Routing

This time we're going to make it possible to subscribe only to a subset of the messages for a single consumer. Therefore each Joule consumer process will have its own routing keys they would subscribe to. In the case below a process using this configuration will only receive IBM and MSFT quotes.
rabbitConsumer:
host: localhost
exchange:
name: quotes_exchange
type: DIRECT
routing:
keys:
- NASDAQ.IBM
- NASDAQ.MSFT
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Topic

This example is very similar to the previous case whereby we subscribe to events that we are interested in. However, we are using a topic exchange that enabled us to use the dot notation and wildcards to subscribe to a wider set of events. In the case below we would receive all market venue quotes for both IBM and MSFT.
rabbitConsumer:
host: localhost
exchange:
name: quotes_exchange
type: TOPIC
routing:
keys:
- "*.IBM"
- "*.MSFT"
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Additional Resources

Last modified 11mo ago