RabbitMQ

RabbitMQ is lightweight and easy to deploy messaging platform for event-based data integration

Overview

RabbitMQ is a lightweight and easy-to-deploy messaging platform that supports multiple messaging protocols. It can be configured in distributed and federated setups to meet high-scale and high-availability requirements.

Joule enables publishing to RabbitMQ queues, exchanges and topics, providing flexible event-driven architectures.

This page covers how to configure RabbitMQ publishers within Joule, including queue-based eventing, work queues for task distribution, publish / subscribe models and routing with dynamic keys.

The examples demonstrate how to serialise events into JSON, configure exchanges and use routing keys for targeted message delivery.

Additionally, it provides configuration details for common patterns like worker queues and topic-based exchanges, offering scalability and efficiency in event processing.

Examples & DSL attributes

This example configures the RabbitMQ Publisher to send StreamEvents as JSON-formatted messages to a queue named quotes_queue.

Here's a breakdown:

  1. queue The events will be published to the quotes_queue, a named queue in RabbitMQ.

  2. serializer The events are serialized using the jsonFormatter with UTF-8 encoding, ensuring the data is formatted as JSON.

This setup sends JSON-encoded events to the quotes_queue in RabbitMQ, with the message content encoded in UTF-8.

rabbitPublisher:
  queue: quotes_queue
  
  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8

Attributes schema

Attribute
Description
Data Type
Required

host

RabbitMQ server hostname or Ip address

String

Default: localhost

port

RabbitMQ server port, must be greater than 0.

Integer

Default: 5672

username

User name

String

Default: guest

password

password

String

Default: guest

virtualHost

Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.

String

Default: /

autoAck

Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment.

Boolean

Default: true

global

QoS - true if the settings should be applied to the entire channel rather than each consumer

Boolean

Default: true

durable

Set a durable queue (queue will survive a server restart)

Boolean

Default: true

autoDelete

Server deletes queue when not in use

Boolean

Default: true

exclusive

Exclusive queue to this connection

Boolean

Default: false

arguments

Additional queue properties

Map<String, Object>

Default: null

exchange

Exchange configuration

routing

Routing configuration

serializer

Serialisation configuration

Exchange attributes schema

Attribute
Description
Data Type
Required

name

Name of exchange

String

type

In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues

TOPIC, DIRECT, FANOUT, HEADERS

Default: TOPIC

arguments

Additional binding properties

Map<String, Object>

Default: null

Exchange example

This example configures the RabbitMQ Publisher to send events to an exchange instead of a queue:

  1. exchange The events are published to an exchange named marketQuotes.

    1. type The exchange type is set to TOPIC, meaning events can be routed based on routing keys that include wildcards. This allows consumers to subscribe to specific types of messages using patterns like market.IBM or market.*.

This setup sends events to the marketQuotes topic exchange, enabling more flexible routing of messages based on the routing keys used by consumers.

rabbitPublisher:
    ...

    exchange:
        name: marketQuotes
        type: TOPIC

Routing attributes schema

Attribute
Description
Data Type
Required

event fields

List of event fields to create dynamic routing keys

String

Default: None

Routing example

In this example, the RabbitMQ Publisher is configured to dynamically create a routing key based on the market and symbol fields from the StreamEvent:

  1. routing The routing key is constructed using the values of the market and symbol fields in the event. For example, if the market is nasdaq and the symbol is IBM, the routing key nasdaq.IBM will be created.

This enables fine-grained control over which events are routed to specific consumers, as they can subscribe to events using specific routing keys like nasdaq.IBM or nasdaq.*.

rabbitPublisher:
  ...

    routing:
        event fields:
          - market
          - symbol

Serialisation attributes schema

Attribute
Description
Data Type
Required

transform

User provided implementation that transforms the StreamEvent in to a domain class. The resulting domain class must implement serialisable interface

Implementation of StreamEventParser

formatter

If a custom transformer has not been provided the for formatter setting will be applied to the processed StreamEvent object

Default: jsonFormatter

compressed

Not implemented

Boolean

Default: false

Serialiser example

This example demonstrates how to configure the serialiser for the RabbitMQ Publisher:

  1. serializer The jsonFormatter is used to serialise the events.

    1. encoding The events are encoded in UTF-8, ensuring that the message content is in a standard JSON format that can be easily consumed by downstream applications.

This setup serialises the events into JSON format with UTF-8 encoding before publishing them to RabbitMQ.

rabbitPublisher:
  ...

  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8

Additional resources

Last updated