RabbitMQ

RabbitMQ is lightweight and easy to deploy messaging platform. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. Joule provides the ability to consume events using RabbitMQ messaging subscription

Driver details

com.rabbitmq:amqp-client:5.16.0

Configuration Guide

Example configuration

rabbitConsumer:
  host: localhost
  exchange:
    name: marketQuotes
    type: TOPIC
  
  routing:
    keys:
      - NASDAQ.IBM
      - NASDAQ.MSFT
      - NASDAQ.GOOG
  
  deserializer:
    transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer

This example configures a RabbitMQ consumer to receive three event types using the routing keys elements from the topic exchange type. Quote events are deserialised from a StreamEvent Json object to a Joule StreamEvent object.

Core Attributes

Available configuration parameters

AttributeDescriptionData TypeRequired

host

RabbitMQ server hostname or Ip address

String

Default: localhost

port

RabbitNQ server port, must be greater than 0.

Integer

Default: 5672

username

User name

String

Default: guest

password

password

String

Default: guest

virtualHost

Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.

See vhost documentation for further information

String

Default: /

autoAck

Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment.

Boolean

Default: true

prefetchCount

QoS -maximum number of messages that the server will deliver, 0 if unlimited

Integer

Default: 1

global

QoS - true if the settings should be applied to the entire channel rather than each consumer

Boolean

Default: true

durable

Set a durable queue (queue will survive a server restart)

Boolean

Default: true

autoDelete

Server deletes queue when not in use

Boolean

Default: true

exclusive

Exclusive queue to this connection

Boolean

Default: false

arguments

Additional queue properties

Map<String, Object>

Default: null

exchange

Exchange configuration

See Exchange Attributes section

routing

Routing configuration

See Routing Attributes section

deserializer

Deserialization configuration

See Deserialization Attributes section

Exchange Attributes

AttributeDescriptionData TypeRequired

name

Name of exchange

String

type

In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues.

TOPIC, DIRECT, FANOUT, HEADERS

Default: TOPIC

arguments

Additional binding properties

Map<String, Object>

Default: null

Exchange example

exchange:
    name: marketQuotes
    type: DIRECT

Routing Attributes

AttributeDescriptionData TypeRequired

keys

Consumers can selectively consume events sent to a topic exchange by specifying one or more words delimited by dots.

Valid key format examples: 'nasdaq.ibm', 'dept.inventory.bolts'.

There are two important special cases to define keys.

  • (star) can be substituted for exactly one word

  • #(hash) can be substituted for zero or more words

String

Default: All

Constraints: Any number of dot separated word to a limit of 255 bytes

Routing example

This example will subscribe to all events that match the first component of the key 'NASDAQ'

  routing:
    keys:
      - NASDAQ.#

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.

AttributeDescriptionData TypeRequired

transform

User provided implementation

Implementation of StreamEventParser

compressed

Not implemented

Boolean

Default: false

Deserializer example

deserializer:
  transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer
  compressed: false

Examples

All examples assume default values when not applied.

Queue based eventing

Sets up a simple consumer which subscribes to events directly off of a queue.

rabbitConsumer:
  queue: point_to_point_queue
  deserializer:
    transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Work Queues

Subscribe to a Worker Queue that is used to distribute time-consuming tasks among multiple Joule workers.

rabbitConsumer:
  queue: worker_queue
  autoAck: false
  durable: true
  prefetchCount: 1

  deserializer:
    transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Publish/Subscribe

This example each consumer process will receive the same message. Because the exchange is configured as a fanout we do not need to use the routing element, this will be covered in the next example

rabbitConsumer:
  exchange:
    name: quotes_exchange
    type: FANOUT

  deserializer:
    transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Routing

This time we're going to make it possible to subscribe only to a subset of the messages for a single consumer. Therefore each Joule consumer process will have its own routing keys they would subscribe to. In the case below a process using this configuration will only receive IBM and MSFT quotes.

rabbitConsumer:
  host: localhost
  exchange:
    name: quotes_exchange
    type: DIRECT

  routing:
    keys:
      - NASDAQ.IBM
      - NASDAQ.MSFT

  deserializer:
    transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Topic

This example is very similar to the previous case whereby we subscribe to events that we are interested in. However, we are using a topic exchange that enabled us to use the dot notation and wildcards to subscribe to a wider set of events. In the case below we would receive all market venue quotes for both IBM and MSFT.

rabbitConsumer:
  host: localhost
  exchange:
    name: quotes_exchange
    type: TOPIC

  routing:
    keys:
      - "*.IBM"
      - "*.MSFT"

  deserializer:
    transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer

Additional Resources

Last updated